clickhouse_sqlalchemy 版本为 0.0.10
sqlalchemy 版本为 1.3.12
from clickhouse_sqlalchemy import make_session
from sqlalchemy import create_engine
conf = {"user": "root", "password": "123456", "server_host": "xxx.xxx.xxx.xx", "port": "8123", "db": "test"}
connection = 'clickhouse://{user}:{password}@{server_host}:{port}/{db}'.format(**conf)
engine = create_engine(connection, pool_size=100, pool_recycle=3600, pool_timeout=20)
def get_session(engine):
return make_session(engine)
def execute(sql):
session = get_session(engine)
cursor = session.execute(sql)
try:
fields = cursor._metadata.keys
return [dict(zip(fields, item)) for item in cursor.fetchall()]
finally:
cursor.close()
session.close()