clickhouse_sqlalchemy 版本为 0.0.10
sqlalchemy 版本为 1.3.12


from clickhouse_sqlalchemy import make_session
from sqlalchemy import create_engine
conf = {"user": "root", "password": "123456", "server_host": "xxx.xxx.xxx.xx", "port": "8123", "db": "test"}
connection = 'clickhouse://{user}:{password}@{server_host}:{port}/{db}'.format(**conf)
engine = create_engine(connection, pool_size=100, pool_recycle=3600, pool_timeout=20)
def get_session(engine):
    return make_session(engine)

def execute(sql):
    session = get_session(engine)
    cursor = session.execute(sql)
    try:
        fields = cursor._metadata.keys
        return [dict(zip(fields, item)) for item in cursor.fetchall()]
    finally:
        cursor.close()
        session.close()


↙↙↙阅读原文可查看相关链接,并与作者交流