Python python 操作达梦数据库实例

未闻佳音 · 2022年06月02日 · 最后由 yonghang.zhang 回复于 2022年06月02日 · 5273 次阅读

此贴记录本人为了适应公司开发及实际项目生产环境,学习达梦数据库的相关知识,进而写出的一些应用 demo。
希望和大佬们一起学习交流,若有不对的地方,欢迎大佬们指正。

dmPyhon 应用 demo

#!/usr/bin/python
# -*- coding:utf-8 -*-
# @Author : jilin
import dmPython

r"""
默认开启自动提交(autoCommit)模式
"""


class DmDb:
    def __init__(self, user='SYSDBA', password='SYSDBA', server='localhost', port=5236):
        """
        初始化方法,创建连接对象,游标对象
        :param user: 账号(对应模式名)
        :param password:密码
        :param server: 数据库ip
        :param port: 端口
        """
        try:
            self.conn = dmPython.connect(user=user, password=password, server=server, port=port)
            self.cursor = self.conn.cursor()
            if self.cursor: print('>>>数据库连接成功<<<')
        except Exception as err:
            print(err)

    def __del__(self):
        # 对象被销毁的时候执行 关闭连接对象
        self.conn.close()

    def select_data(self, sql):
        """
        查询数据
        :param sql:select语句
        :return: 查询结果
        """
        try:
            self.cursor.execute(sql)
            return self.cursor.fetchall()
        except Exception as err:
            print(err)

    def execute_one(self, sql):
        """
        插入一条数据
        :param sql:一条sql语句
        :return: 受影响的条数
        """
        try:
            self.cursor.execute(sql)
        except Exception as err:
            print(err)
        return self.cursor.rowcount

    def execute_many_for(self, sql_list: list):
        """
        批量插入数据,多条insert语句装载在列表中
        :param sql_list: 多条insert语句组成的列表
        :return: 每条sql插入成功受影响的条数组成的列表, 执行失败的sql列表
        """
        failed_sql = []
        effect_rows = []
        for sql in sql_list:
            try:
                self.cursor.execute(sql)
                # 受影响的条数计数
                effect_row = self.cursor.rowcount
                effect_rows.append(effect_row)
            except Exception as err:
                print(err, f'执行异常的sql为: {sql}')
                failed_sql.append(sql)
        return effect_rows, failed_sql

    def del_data(self, sql):
        """
        删除数据
        :param sql:删除sql语句
        :return: 受影响的条数
        """
        try:
            self.cursor.execute(sql)
        except Exception as err:
            print(err)
        return self.cursor.rowcount

    def execute_many(self, sql, params: list):
        """
        批量执行,用于批量执行insert语句
        :param sql: sql语句, eg: 'insert into "TEST"."stu" values(?, ?, ?, ?)'
        :param params: 对应sql语句中的参数,按位置顺序传参;参数类型为[(),()]或[[],[]]
        :return: 受影响的记录条数
        """
        try:
            self.cursor.executemany(sql, params)
            return self.cursor.rowcount
        except Exception as err:
            print(f'!!!批量插入失败: {err}, 全部回滚<<<')

    def create_user_schema(self, user_schema, password):
        """
        创建用户和模式,并对用户授权
        :param user_schema: 用户名(模式名)
        :param password: 密码
        :return:
        """
        sql = [
            rf'CREATE USER "{user_schema}" IDENTIFIED BY "{password}" HASH WITH SHA512 NO SALT PASSWORD_POLICY 2 ENCRYPT BY "{password}" LIMIT FAILED_LOGIN_ATTEMPS 3, PASSWORD_LOCK_TIME 1, PASSWORD_GRACE_TIME 10 DEFAULT TABLESPACE "MAIN";',
            rf'grant "PUBLIC","VTI","SOI" to "{user_schema}";',
            rf'grant CREATE SESSION to "{user_schema}";']
        sql2 = r'select a.name as username, b.name as schenma from sysobjects a inner join sysobjects b on a.id = b.pid where b.subtype$ is null'
        expected_results = (f'{user_schema}', f'{user_schema}')
        try:
            for i in sql:
                self.cursor.execute(i)
            select_results = self.select_data(sql2)
            if expected_results in select_results:
                return True
        except Exception as err:
            print(err)

    def del_user_schema(self, user_and_schema):
        """
        删除用户(模式)
        :param user_and_schema: 用户名(模式名)
        :return:
        """
        sql = f'drop user {user_and_schema} cascade;'
        sql2 = r'select a.name as username, b.name as schenma from sysobjects a inner join sysobjects b on a.id = b.pid where b.subtype$ is null'
        expected_results = (f'{user_and_schema}', f'{user_and_schema}')
        try:
            if expected_results in self.select_data(sql2):
                self.cursor.execute(sql)
                select_results = self.select_data(sql2)
                if expected_results not in select_results:
                    return True
            else:
                print(f'要删除的目标对象不存在: {user_and_schema}')
        except Exception as err:
            print(err)


if __name__ == '__main__':
    # dm = DmDb()
    sql1 = 'select * from "TEST"."stu";'
    sql2 = """insert into "TEST"."stu" values(1,'dmpython','男',20);"""
    sql3 = ["""insert into "TEST"."stu" values(2,'dmpython','女',21);""",
            """insert into "TEST"."stu" values(3,'dmpython','男',22);""",
            """insert into "TEST"."stu" values(3,'dmpython','男',22);"""]
    sql4 = """delete from "TEST"."stu" where "name" = 'dmpython'"""
    sql5 = 'delete from "TEST"."stu"'
    # Seq_params = [(1, 'dmpython', '男', 20), (2, 'dmpython', '女', 21), (3, 'dmpython', '男', 22)]
    Seq_params = [[1, 'dmpython', '男', 20], [2, 'dmpython', '女', 21], [3, 'dmpython', '男', 22]]
    # Seq_params = [[1, 'dmpython', '男', 20], [2, 'dmpython', '女', 21], [2, 'dmpython', '男', 22]]
    sql_many = """insert into "TEST"."stu" values(?, ?, ?, ?)"""
    table = '"TEST"."stu"'
    sql6 = """update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 100;"""
    sql7 = ["""update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 100;""",
            """update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 99;"""]
    sql8 = ["""insert into "TEST"."stu" values(1,'dmpython','男',20);""",
            """update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 1;""",
            """update "TEST"."stu" set "name" = 'dm_manager_modified_dmPython' where "id" = 100;"""]
    dm_admin = DmDb(user='SYSDBA', password='SYSDBA')
    sql9 = r'select a.name as username, b.name as schenma from sysobjects a inner join sysobjects b on a.id = b.pid where b.subtype$ is null order by username desc;'
    print(dm_admin.create_user_schema('LSZ_INSIDE', 'Gwxa123456'))
    # print(dm_admin.del_user_schema('LSZ_INSIDE'))
    pass

使用 dexp、dimp 工具实现导入导出 demo

#!/usr/bin/python
# -*- coding:utf-8 -*-
# @Author : jilin

import time
import os
import func_timeout


def open_cmd(commands):
    with os.popen(commands) as fp:
        return fp.read()


@user3t_timeout(2)
def check_file_exists(file_path):
    result = False
    num = 0
    while not result:
        result = os.path.exists(file_path)
        num += 1
        # print(f'第 {num} 次判断, 结果: {result}')
        if result:
            break
    return result


def get_check_file_exists(file_path):
    try:
        # 调用定时内检测文件是否存在方法
        check_result = check_file_exists(file_path=file_path)
        if check_result:
            return file_path
    # 捕获超时异常
    except func_timeout.exceptions.FunctionTimedOut as err:
        print(f'判断超时! {err}')
        return None


@user4t_timeout(2)
def read_log_exp(log_path):
    if os.path.exists(log_path):
        start_time = time.time()
        with open(log_path, 'r') as fp:
            log_content = fp.readlines()
            lines = [line for line in log_content if line]
            while len(lines) < 7:  # 判断内容行数
                log_content = fp.readlines()
                lines = [line for line in log_content if line]
                if len(lines) >= 7:
                    break
        end_time = time.time()
        print(f'读取对象: {log_path}, 读取耗时: {end_time - start_time}, 总行数: {len(lines)}')
        return lines
    else:
        print(f'读取对象: {log_path} 不存在!')


@user5t_timeout(2)
def read_log_imp(log_path):
    if os.path.exists(log_path):
        start_time = time.time()
        with open(log_path, 'r') as fp:
            log_content = fp.readlines()
            lines = [line for line in log_content if line]
            while len(lines) < 3:
                log_content = fp.readlines()
                lines = [line for line in log_content if line]
                if len(lines) >= 3:
                    break
        end_time = time.time()
        print(f'读取对象: {log_path}, 读取耗时: {end_time - start_time}, 总行数: {len(lines)}')
        return lines
    else:
        print(f'读取对象: {log_path} 不存在!')


def get_log_content(exp_or_imp, log_path):
    if exp_or_imp == 'exp':
        try:
            content_lines = read_log_exp(log_path=log_path)
            if content_lines:
                return content_lines
        except func_timeout.exceptions.FunctionTimedOut as err:
            print(f'超时! {err}')
            return None
    elif exp_or_imp == 'imp':
        try:
            content_lines = read_log_imp(log_path=log_path)
            if content_lines:
                return content_lines
        except func_timeout.exceptions.FunctionTimedOut as err:
            print(f'超时! {err}')
            return None
    else:
        print('不支持的类型!')


def export_dmp(drive_letter, dm_bin_path, sysdba, sysdba_password, host, port, dmp_dir, owner_and_schema):
    """
    适用于Windows环境,导出指定的模式(用户级别),使用Dexp进行用户级逻辑导出
    :param drive_letter:达梦数据库的安装盘符, eg:d
    :param dm_bin_path:达梦数据库的安装路径下的bin目录下, eg:D:/dmdbms/bin/
    :param sysdba:数据库管理员账号, eg:SYSDBA
    :param sysdba_password:数据库管理员账号密码, eg:SYSDBA
    :param host:数据库所在ip
    :param port:数据库端口
    :param dmp_dir:存放导出的dmp文件的目录,eg:D:/test/dm_sql/
    :param owner_and_schema:指定的用户名(模式名)
    :return:导出成功的dmp文件的完整路径
    """
    t = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    file_name = t + '-' + owner_and_schema + '-exp'
    commands_dm = rf'''chcp 65001 && {drive_letter}: && cd {dm_bin_path} && dexp "{sysdba}"/"{sysdba_password}"@{host}:{port} DIRECTORY={dmp_dir} FILE={file_name}.dmp owner={owner_and_schema} TABLESPACE=Y DROP=N LOG={file_name}.log LOG_WRITE=N'''
    print(f'目标命令为: {commands_dm}')
    try:
        open_cmd(commands_dm)
    except Exception as err:
        print(err)
    dmp_file_path = os.path.join(dmp_dir, f'{file_name}.dmp')  # eg: D:/test/dm_sql/20220526112545-LSZ_INSIDE-exp.dmp
    dmp_log_path = os.path.join(dmp_dir, f'{file_name}.log')  # eg: D:/test/dm_sql/20220526112545-LSZ_INSIDE-exp.log
    if get_check_file_exists(os.path.realpath(dmp_file_path)):  # 坑,无论是否真的导出成功,均会生成dmp文件;故还得进一步判断导出真实性
        log_info = get_log_content(exp_or_imp='exp', log_path=os.path.realpath(dmp_log_path))
        if len(log_info) > 7:
            exp_summary = log_info[-7]  # 日志中的导出总结,eg: 共导出 0 个SCHEMA
            if int(exp_summary[4]) > 0:
                return dmp_file_path, os.path.realpath(dmp_file_path), dmp_log_path
            else:
                print(f'导出异常: {exp_summary}')
                return None
        else:
            print(f'导出日志异常! {os.path.realpath(dmp_log_path)}')
    else:
        print('导出异常! 生成dmp文件异常!')


def import_dmp(drive_letter, dm_bin_path, sysdba, sysdba_password, host, port, dmp_dir, dmp_file, owner_and_schema):
    """
    适用于Windows环境, 使用Dimp进行用户级逻辑导入
    :param drive_letter:达梦数据库的安装盘符, eg:d
    :param dm_bin_path:达梦数据库的安装路径下的bin目录下, eg:D:/dmdbms/bin
    :param sysdba:数据库管理员账号, eg:SYSDBA
    :param sysdba_password:数据库管理员账号密码, eg:SYSDBA
    :param host:数据库所在ip
    :param port:数据库端口
    :param dmp_dir:存放导出的dmp文件的目录
    :param dmp_file:dmp文件名
    :param owner_and_schema:指定的用户名(模式名)
    :return:
    """
    t = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    log_name = t + '-' + owner_and_schema + '-imp'
    commands_dm = rf'chcp 65001 && {drive_letter}: && cd {dm_bin_path} && dimp "{sysdba}"/"{sysdba_password}"@{host}:{port} DIRECTORY={dmp_dir} FILE={dmp_file} OWNER={owner_and_schema} LOG={log_name}.log LOG_WRITE=N'
    print(f'目标命令为: {commands_dm}')
    try:
        open_cmd(commands_dm)
    except Exception as err:
        print(err)
    log_path = os.path.join(dmp_dir, f'{log_name}.log')
    if get_check_file_exists(os.path.realpath(log_path)):
        print(f'导入日志生成成功: {log_path}')
        log_content = get_log_content(exp_or_imp='imp', log_path=os.path.realpath(log_path))
        if str(log_content[-1]) == '成功终止导入, 没有出现警告\n':
            return True
        else:
            print(f'导入异常: {log_content[-1]}详情参见: {log_path}')
            return None
    else:
        print(f'导入异常, 请检查参数!')
        return None


if __name__ == '__main__':
    drive_letter = 'd'
    dm_bin_path = 'D:/dmdbms/bin'
    sysdba = 'SYSDBA'
    sysdba_password = 'SYSDBA'
    # sysdba_password = 'gwxa123456'
    host = '127.0.0.1'
    port = '5236'
    dmp_dir = 'D:/test/dm_sql/'
    dmp_name = 'Python'
    owner_and_schema = 'LSZ_INSIDE'
    dmp_file = '20220602132541-LSZ_INSIDE-exp.dmp'
    # owner_and_schema = 'TEST'
    start_time = time.time()
    dmp_file_path = export_dmp(drive_letter=drive_letter, dm_bin_path=dm_bin_path, sysdba=sysdba,
                               sysdba_password=sysdba_password, host=host, port=port, dmp_dir=dmp_dir,
                               owner_and_schema=owner_and_schema)
    end_time = time.time()
    print(f'导出耗时: {end_time - start_time}')
    print(dmp_file_path)
    start_time = time.time()
    imp_result = import_dmp(drive_letter=drive_letter, dm_bin_path=dm_bin_path, sysdba=sysdba,
                            sysdba_password=sysdba_password, host=host, port=port, dmp_dir=dmp_dir,
                            dmp_file=dmp_file, owner_and_schema=owner_and_schema)
    end_time = time.time()
    print(f'导入耗时: {end_time - start_time}')
    print(imp_result)
    pass

学习踩坑不易,若有学习转载,还请注明出处。

共收到 2 条回复 时间 点赞
# @func_timeout.func_set_timeout(2)    这个装饰器在帖子里被识别成Markdown语法了
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册