Integration of SQLAlchemy

Note

Supported since PyODPS 0.10.0

PyODPS supports integration of SQLAlchemy and can use it to query data in MaxCompute.

Create connections

You can create MaxCompute connection by specifying access_id, access_key, project and other arguments in a connection string.

import os
from sqlalchemy import create_engine

# Make sure environment variable ALIBABA_CLOUD_ACCESS_KEY_ID already set to Access Key ID of user
# while environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET set to Access Key Secret of user.
# Not recommended to hardcode Access Key ID or Access Key Secret in your code.
conn_string = 'odps://%s:%s@<project>' % (
   os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
   os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)

You can use methods below to specify endpoint in connection strings:

import os
from sqlalchemy import create_engine

# Make sure environment variable ALIBABA_CLOUD_ACCESS_KEY_ID already set to Access Key ID of user
# while environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET set to Access Key Secret of user.
# Not recommended to hardcode Access Key ID or Access Key Secret in your code.
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>' % (
   os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
   os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)

Replace <access_id> and other placeholders with real account information.

For existing ODPS entries, after calling o.to_global() to make accounts global, there is no need to specify connection strings in detail again.

from sqlalchemy import create_engine
o.to_global()  # set ODPS object as global one
engine = create_engine('odps://')

Then connections can be created.

conn = engine.connect()

If you want to set execution settings for SQL tasks, you may still use options object provided by PyODPS:

import os
from odps import options
from sqlalchemy import create_engine

# Make sure environment variable ALIBABA_CLOUD_ACCESS_KEY_ID already set to Access Key ID of user
# while environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET set to Access Key Secret of user.
# Not recommended to hardcode Access Key ID or Access Key Secret in your code.
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>' % (
   os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
   os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
options.sql.settings = {'odps.sql.hive.compatible': 'true'}
engine = create_engine(conn_string)

Settings can also be configured with connection strings:

import os
from sqlalchemy import create_engine

# Make sure environment variable ALIBABA_CLOUD_ACCESS_KEY_ID already set to Access Key ID of user
# while environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET set to Access Key Secret of user.
# Not recommended to hardcode Access Key ID or Access Key Secret in your code.
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>&odps.sql.hive.compatible=true' % (
   os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
   os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)

Note that when configuring with connection strings, different engines may have different settings.

Using SQLAlchemy interfaces

After establishing connections, you can call SQLAlchemy interfaces as usual. Here are examples for creating, writing data and querying.

Creating tables

from sqlalchemy import Table, Column, Integer, String, MetaData
metadata = MetaData()

users = Table('users', metadata,
    Column('id', Integer),
    Column('name', String),
    Column('fullname', String),
)

metadata.create_all(engine)

Writing data

ins = users.insert().values(id=1, name='jack', fullname='Jack Jones')
conn.execute(ins)

Querying

>>> from sqlalchemy.sql import select
>>> s = select([users])
>>> result = conn.execute(s)
>>> for row in result:
>>>     print(row)
(1, 'jack', 'Jack Jones')