SQL

PyODPS supports MaxCompute SQL queries and provides methods to read SQL results. The execute_sql and run_sql methods return instances .

Note

The commands that are executable in the MaxCompute Console may not be executed as SQL statements in MaxCompute. Use other methods to execute non-DDL/DML statements. For example, use run_security_query method to execute GRANT/REVOKE statements. Use run_xflow or execute_xflow method to execute PAI commands.

Execute SQL statements

>>> o.execute_sql('select * from dual')  #  synchronous way, will block till SQL statement finishes execution
>>>
>>> instance = o.run_sql('select * from dual')  # asynchronous way
>>> print(instance.get_logview_address())  # obtain LogView address
>>> instance.wait_for_success()  # block till the statement finishes

Set runtime parameters

You can use the hints parameter to set runtime parameters. This parameter is a dict type.

>>> o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

You can set sql.settings globally. The relevant runtime parameters are automatically added during each execution.

>>> from odps import options
>>> options.sql.settings = {'odps.sql.mapper.split.size': 16}
>>> o.execute_sql('select * from pyodps_iris')  # global hints configured in options.sql.settings will be added

View SQL results

You can execute the open_reader method to retrieve SQL execution results. In the following example, structured data is returned.

>>> with o.execute_sql('select * from dual').open_reader() as reader:
>>>     for record in reader:
>>>         # process every record

When commands such as desc are executed, you can use the reader.raw attribute to get the original execution results.

>>> with o.execute_sql('desc dual').open_reader() as reader:
>>>     print(reader.raw)

If options.tunnel.use_instance_tunnel is set to True when open_reader has been executed, PyODPS calls Instance Tunnel by default. If options.tunnel.use_instance_tunnel is not set to True when open_reader has been executed, PyODPS calls the old Result interface. If you are using an old version of MaxCompute, or an error occurred when calling Instance Tunnel, PyODPS reports a warning and automatically calls the old Result interface instead. If the result of Instance Tunnel does not meet your expectation, set this option to False. When calling open_reader, you can also use the tunnel parameter to specify which result interface to use. For example:

>>> # Use Instance Tunnel
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>>     for record in reader:
>>>         # process every record
>>> # Use Results interface
>>> with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
>>>     for record in reader:
>>>         # process every record

By default, PyODPS does not limit the size of data that can be read from an Instance. For protected projects, downloading data through the Tunnel is limited. If options.tunnel.limit_instance_tunnel is not set, a data cap is automatically enabled. The number of downloadable data records is set in your project configuration. This is usually set to 10,000. If you want to change the amount of downloadable data, you can add the limit option in open_reader, or set options.tunnel.limit_instance_tunnel to True.

If the MaxCompute version you are using only supports the old Result interface, and you need to read all data, you can export the SQL results to another table and use these methods to read data. This may be limited by project security settings.

同时,PyODPS 支持直接将运行结果数据读成 pandas DataFrame。

>>> # 直接使用 reader 的 to_pandas 方法
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>>     # pd_df 类型为 pandas DataFrame
>>>     pd_df = reader.to_pandas()

如果需要使用多核加速读取速度,可以通过 n_process 指定使用进程数:

Note

目前多进程加速在 Windows 下无法使用。

>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>>     # n_process 指定成机器核数
>>>     pd_df = reader.to_pandas(n_process=n_process)

Set alias

Some resources referenced by a UDF are dynamically changing at runtime. You can create an alias for the old resource and use it as a new resource.

from odps.models import Schema

myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    schema=Schema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# write one row with only one value '1'
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
2
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# When we need to replace resource with value '1' with resource with value '2',
# the only thing we need to do is to use alias argument. Modifying UDFs or resources is not needed.
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])
3

Execute SQL statements in an interactive environment

In ipython and jupyter, you can use SQL plugins to execute SQL statements. Besides, parameterized query is also supported. For details, see Documentation.

Set biz_id

In a few cases, it may be necessary to submit biz_id when submitting SQL statements. Otherwise an error occurs during execution. You can set the biz_id in options globally.

from odps import options

options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')