此贴记录本人为了适应公司开发及实际项目生产环境,学习达梦数据库的相关知识,进而写出的一些应用 demo。
希望和大佬们一起学习交流,若有不对的地方,欢迎大佬们指正。
#!/usr/bin/python
# -*- coding:utf-8 -*-
# @Author : jilin
import dmPython
r"""
默认开启自动提交(autoCommit)模式
"""
class DmDb:
def __init__(self, user='SYSDBA', password='SYSDBA', server='localhost', port=5236):
"""
初始化方法,创建连接对象,游标对象
:param user: 账号(对应模式名)
:param password:密码
:param server: 数据库ip
:param port: 端口
"""
try:
self.conn = dmPython.connect(user=user, password=password, server=server, port=port)
self.cursor = self.conn.cursor()
if self.cursor: print('>>>数据库连接成功<<<')
except Exception as err:
print(err)
def __del__(self):
# 对象被销毁的时候执行 关闭连接对象
self.conn.close()
def select_data(self, sql):
"""
查询数据
:param sql:select语句
:return: 查询结果
"""
try:
self.cursor.execute(sql)
return self.cursor.fetchall()
except Exception as err:
print(err)
def execute_one(self, sql):
"""
插入一条数据
:param sql:一条sql语句
:return: 受影响的条数
"""
try:
self.cursor.execute(sql)
except Exception as err:
print(err)
return self.cursor.rowcount
def execute_many_for(self, sql_list: list):
"""
批量插入数据,多条insert语句装载在列表中
:param sql_list: 多条insert语句组成的列表
:return: 每条sql插入成功受影响的条数组成的列表, 执行失败的sql列表
"""
failed_sql = []
effect_rows = []
for sql in sql_list:
try:
self.cursor.execute(sql)
# 受影响的条数计数
effect_row = self.cursor.rowcount
effect_rows.append(effect_row)
except Exception as err:
print(err, f'执行异常的sql为: {sql}')
failed_sql.append(sql)
return effect_rows, failed_sql
def del_data(self, sql):
"""
删除数据
:param sql:删除sql语句
:return: 受影响的条数
"""
try:
self.cursor.execute(sql)
except Exception as err:
print(err)
return self.cursor.rowcount
def execute_many(self, sql, params: list):
"""
批量执行,用于批量执行insert语句
:param sql: sql语句, eg: 'insert into "TEST"."stu" values(?, ?, ?, ?)'
:param params: 对应sql语句中的参数,按位置顺序传参;参数类型为[(),()]或[[],[]]
:return: 受影响的记录条数
"""
try:
self.cursor.executemany(sql, params)
return self.cursor.rowcount
except Exception as err:
print(f'!!!批量插入失败: {err}, 全部回滚<<<')
def create_user_schema(self, user_schema, password):
"""
创建用户和模式,并对用户授权
:param user_schema: 用户名(模式名)
:param password: 密码
:return:
"""
sql = [
rf'CREATE USER "{user_schema}" IDENTIFIED BY "{password}" HASH WITH SHA512 NO SALT PASSWORD_POLICY 2 ENCRYPT BY "{password}" LIMIT FAILED_LOGIN_ATTEMPS 3, PASSWORD_LOCK_TIME 1, PASSWORD_GRACE_TIME 10 DEFAULT TABLESPACE "MAIN";',
rf'grant "PUBLIC","VTI","SOI" to "{user_schema}";',
rf'grant CREATE SESSION to "{user_schema}";']
sql2 = r'select a.name as username, b.name as schenma from sysobjects a inner join sysobjects b on a.id = b.pid where b.subtype$ is null'
expected_results = (f'{user_schema}', f'{user_schema}')
try:
for i in sql:
self.cursor.execute(i)
select_results = self.select_data(sql2)
if expected_results in select_results:
return True
except Exception as err:
print(err)
def del_user_schema(self, user_and_schema):
"""
删除用户(模式)
:param user_and_schema: 用户名(模式名)
:return:
"""
sql = f'drop user {user_and_schema} cascade;'
sql2 = r'select a.name as username, b.name as schenma from sysobjects a inner join sysobjects b on a.id = b.pid where b.subtype$ is null'
expected_results = (f'{user_and_schema}', f'{user_and_schema}')
try:
if expected_results in self.select_data(sql2):
self.cursor.execute(sql)
select_results = self.select_data(sql2)
if expected_results not in select_results:
return True
else:
print(f'要删除的目标对象不存在: {user_and_schema}')
except Exception as err:
print(err)
if __name__ == '__main__':
# dm = DmDb()
sql1 = 'select * from "TEST"."stu";'
sql2 = """insert into "TEST"."stu" values(1,'dmpython','男',20);"""
sql3 = ["""insert into "TEST"."stu" values(2,'dmpython','女',21);""",
"""insert into "TEST"."stu" values(3,'dmpython','男',22);""",
"""insert into "TEST"."stu" values(3,'dmpython','男',22);"""]
sql4 = """delete from "TEST"."stu" where "name" = 'dmpython'"""
sql5 = 'delete from "TEST"."stu"'
# Seq_params = [(1, 'dmpython', '男', 20), (2, 'dmpython', '女', 21), (3, 'dmpython', '男', 22)]
Seq_params = [[1, 'dmpython', '男', 20], [2, 'dmpython', '女', 21], [3, 'dmpython', '男', 22]]
# Seq_params = [[1, 'dmpython', '男', 20], [2, 'dmpython', '女', 21], [2, 'dmpython', '男', 22]]
sql_many = """insert into "TEST"."stu" values(?, ?, ?, ?)"""
table = '"TEST"."stu"'
sql6 = """update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 100;"""
sql7 = ["""update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 100;""",
"""update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 99;"""]
sql8 = ["""insert into "TEST"."stu" values(1,'dmpython','男',20);""",
"""update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 1;""",
"""update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 100;"""]
dm_admin = DmDb(user='SYSDBA', password='SYSDBA')
sql9 = r'select a.name as username, b.name as schenma from sysobjects a inner join sysobjects b on a.id = b.pid where b.subtype$ is null order by username desc;'
print(dm_admin.create_user_schema('LSZ_INSIDE', 'Gwxa123456'))
# print(dm_admin.del_user_schema('LSZ_INSIDE'))
pass
#!/usr/bin/python
# -*- coding:utf-8 -*-
# @Author : jilin
import time
import os
import func_timeout
def open_cmd(commands):
with os.popen(commands) as fp:
return fp.read()
@user3t_timeout(2)
def check_file_exists(file_path):
result = False
num = 0
while not result:
result = os.path.exists(file_path)
num += 1
# print(f'第 {num} 次判断, 结果: {result}')
if result:
break
return result
def get_check_file_exists(file_path):
try:
# 调用定时内检测文件是否存在方法
check_result = check_file_exists(file_path=file_path)
if check_result:
return file_path
# 捕获超时异常
except func_timeout.exceptions.FunctionTimedOut as err:
print(f'判断超时! {err}')
return None
@user4t_timeout(2)
def read_log_exp(log_path):
if os.path.exists(log_path):
start_time = time.time()
with open(log_path, 'r') as fp:
log_content = fp.readlines()
lines = [line for line in log_content if line]
while len(lines) < 7: # 判断内容行数
log_content = fp.readlines()
lines = [line for line in log_content if line]
if len(lines) >= 7:
break
end_time = time.time()
print(f'读取对象: {log_path}, 读取耗时: {end_time - start_time}, 总行数: {len(lines)}')
return lines
else:
print(f'读取对象: {log_path} 不存在!')
@user5t_timeout(2)
def read_log_imp(log_path):
if os.path.exists(log_path):
start_time = time.time()
with open(log_path, 'r') as fp:
log_content = fp.readlines()
lines = [line for line in log_content if line]
while len(lines) < 3:
log_content = fp.readlines()
lines = [line for line in log_content if line]
if len(lines) >= 3:
break
end_time = time.time()
print(f'读取对象: {log_path}, 读取耗时: {end_time - start_time}, 总行数: {len(lines)}')
return lines
else:
print(f'读取对象: {log_path} 不存在!')
def get_log_content(exp_or_imp, log_path):
if exp_or_imp == 'exp':
try:
content_lines = read_log_exp(log_path=log_path)
if content_lines:
return content_lines
except func_timeout.exceptions.FunctionTimedOut as err:
print(f'超时! {err}')
return None
elif exp_or_imp == 'imp':
try:
content_lines = read_log_imp(log_path=log_path)
if content_lines:
return content_lines
except func_timeout.exceptions.FunctionTimedOut as err:
print(f'超时! {err}')
return None
else:
print('不支持的类型!')
def export_dmp(drive_letter, dm_bin_path, sysdba, sysdba_password, host, port, dmp_dir, owner_and_schema):
"""
适用于Windows环境,导出指定的模式(用户级别),使用Dexp进行用户级逻辑导出
:param drive_letter:达梦数据库的安装盘符, eg:d
:param dm_bin_path:达梦数据库的安装路径下的bin目录下, eg:D:/dmdbms/bin/
:param sysdba:数据库管理员账号, eg:SYSDBA
:param sysdba_password:数据库管理员账号密码, eg:SYSDBA
:param host:数据库所在ip
:param port:数据库端口
:param dmp_dir:存放导出的dmp文件的目录,eg:D:/test/dm_sql/
:param owner_and_schema:指定的用户名(模式名)
:return:导出成功的dmp文件的完整路径
"""
t = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
file_name = t + '-' + owner_and_schema + '-exp'
commands_dm = rf'''chcp 65001 && {drive_letter}: && cd {dm_bin_path} && dexp "{sysdba}"/"{sysdba_password}"@{host}:{port} DIRECTORY={dmp_dir} FILE={file_name}.dmp owner={owner_and_schema} TABLESPACE=Y DROP=N LOG={file_name}.log LOG_WRITE=N'''
print(f'目标命令为: {commands_dm}')
try:
open_cmd(commands_dm)
except Exception as err:
print(err)
dmp_file_path = os.path.join(dmp_dir, f'{file_name}.dmp') # eg: D:/test/dm_sql/20220526112545-LSZ_INSIDE-exp.dmp
dmp_log_path = os.path.join(dmp_dir, f'{file_name}.log') # eg: D:/test/dm_sql/20220526112545-LSZ_INSIDE-exp.log
if get_check_file_exists(os.path.realpath(dmp_file_path)): # 坑,无论是否真的导出成功,均会生成dmp文件;故还得进一步判断导出真实性
log_info = get_log_content(exp_or_imp='exp', log_path=os.path.realpath(dmp_log_path))
if len(log_info) > 7:
exp_summary = log_info[-7] # 日志中的导出总结,eg: 共导出 0 个SCHEMA
if int(exp_summary[4]) > 0:
return dmp_file_path, os.path.realpath(dmp_file_path), dmp_log_path
else:
print(f'导出异常: {exp_summary}')
return None
else:
print(f'导出日志异常! {os.path.realpath(dmp_log_path)}')
else:
print('导出异常! 生成dmp文件异常!')
def import_dmp(drive_letter, dm_bin_path, sysdba, sysdba_password, host, port, dmp_dir, dmp_file, owner_and_schema):
"""
适用于Windows环境, 使用Dimp进行用户级逻辑导入
:param drive_letter:达梦数据库的安装盘符, eg:d
:param dm_bin_path:达梦数据库的安装路径下的bin目录下, eg:D:/dmdbms/bin
:param sysdba:数据库管理员账号, eg:SYSDBA
:param sysdba_password:数据库管理员账号密码, eg:SYSDBA
:param host:数据库所在ip
:param port:数据库端口
:param dmp_dir:存放导出的dmp文件的目录
:param dmp_file:dmp文件名
:param owner_and_schema:指定的用户名(模式名)
:return:
"""
t = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
log_name = t + '-' + owner_and_schema + '-imp'
commands_dm = rf'chcp 65001 && {drive_letter}: && cd {dm_bin_path} && dimp "{sysdba}"/"{sysdba_password}"@{host}:{port} DIRECTORY={dmp_dir} FILE={dmp_file} OWNER={owner_and_schema} LOG={log_name}.log LOG_WRITE=N'
print(f'目标命令为: {commands_dm}')
try:
open_cmd(commands_dm)
except Exception as err:
print(err)
log_path = os.path.join(dmp_dir, f'{log_name}.log')
if get_check_file_exists(os.path.realpath(log_path)):
print(f'导入日志生成成功: {log_path}')
log_content = get_log_content(exp_or_imp='imp', log_path=os.path.realpath(log_path))
if str(log_content[-1]) == '成功终止导入, 没有出现警告\n':
return True
else:
print(f'导入异常: {log_content[-1]}详情参见: {log_path}')
return None
else:
print(f'导入异常, 请检查参数!')
return None
if __name__ == '__main__':
drive_letter = 'd'
dm_bin_path = 'D:/dmdbms/bin'
sysdba = 'SYSDBA'
sysdba_password = 'SYSDBA'
# sysdba_password = 'gwxa123456'
host = '127.0.0.1'
port = '5236'
dmp_dir = 'D:/test/dm_sql/'
dmp_name = 'Python'
owner_and_schema = 'LSZ_INSIDE'
dmp_file = '20220602132541-LSZ_INSIDE-exp.dmp'
# owner_and_schema = 'TEST'
start_time = time.time()
dmp_file_path = export_dmp(drive_letter=drive_letter, dm_bin_path=dm_bin_path, sysdba=sysdba,
sysdba_password=sysdba_password, host=host, port=port, dmp_dir=dmp_dir,
owner_and_schema=owner_and_schema)
end_time = time.time()
print(f'导出耗时: {end_time - start_time}')
print(dmp_file_path)
start_time = time.time()
imp_result = import_dmp(drive_letter=drive_letter, dm_bin_path=dm_bin_path, sysdba=sysdba,
sysdba_password=sysdba_password, host=host, port=port, dmp_dir=dmp_dir,
dmp_file=dmp_file, owner_and_schema=owner_and_schema)
end_time = time.time()
print(f'导入耗时: {end_time - start_time}')
print(imp_result)
pass
学习踩坑不易,若有学习转载,还请注明出处。