Integration of SQLAlchemy
Note
Supported since PyODPS 0.10.0
PyODPS supports integration of SQLAlchemy and can use it to query data in MaxCompute.
Create connections
You can create MaxCompute connection by specifying access_id, access_key, project and other arguments in a connection string.
import os
from sqlalchemy import create_engine
# Make sure environment variable ALIBABA_CLOUD_ACCESS_KEY_ID already set to Access Key ID of user
# while environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET set to Access Key Secret of user.
# Not recommended to hardcode Access Key ID or Access Key Secret in your code.
conn_string = 'odps://%s:%s@<project>' % (
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)
You can use methods below to specify endpoint in connection strings:
import os
from sqlalchemy import create_engine
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>' % (
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)
Replace <access_id> and other placeholders with real account information.
For existing ODPS entries, after calling o.to_global() to make accounts global, there is no need to specify connection strings in detail again.
from sqlalchemy import create_engine
o.to_global() # set ODPS object as global one
engine = create_engine('odps://')
Then connections can be created.
conn = engine.connect()
If you want to set execution settings for SQL tasks, you may still use options object provided by PyODPS:
import os
from odps import options
from sqlalchemy import create_engine
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>' % (
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
options.sql.settings = {'odps.sql.hive.compatible': 'true'}
engine = create_engine(conn_string)
Settings can also be configured with connection strings:
import os
from sqlalchemy import create_engine
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>&odps.sql.hive.compatible=true' % (
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)
Note that when configuring with connection strings, different engines may have different settings.
部分商业智能引擎(例如 Apache Superset)可能会频繁列举 MaxCompute 表等对象,这可能会带来较大的延迟。如果你在数据分析过程中对新增的 MaxCompute 对象不敏感,在 PyODPS 0.12.0 及以上版本中可以考虑为连接字符串增加 cache_names=true 选项以启用对象名缓存,并可指定缓存超时的时间 cache_seconds=<超时秒数>
(默认为 24 * 3600)。下面的例子开启缓存并将缓存超时时间设定为 1200 秒。
import os
from sqlalchemy import create_engine
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>&cache_names=true&cache_seconds=1200' % (
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)
Using SQLAlchemy interfaces
After establishing connections, you can call SQLAlchemy interfaces as usual. Here are examples for creating, writing data and querying.
Creating tables
from sqlalchemy import Table, Column, Integer, String, MetaData
metadata = MetaData()
users = Table('users', metadata,
Column('id', Integer),
Column('name', String),
Column('fullname', String),
)
metadata.create_all(engine)
Writing data
ins = users.insert().values(id=1, name='jack', fullname='Jack Jones')
conn.execute(ins)
Querying
>>> from sqlalchemy.sql import select
>>> s = select([users])
>>> result = conn.execute(s)
>>> for row in result:
>>> print(row)
(1, 'jack', 'Jack Jones')