SQL

PyODPS supports MaxCompute SQL queries and provides methods to read SQL results. All execute_sql , execute_sql_interactive , run_sql and run_sql_interactive methods return instances .

Note

The commands that are executable in the MaxCompute Console may not be executed as SQL statements in MaxCompute. Use other methods to execute non-DDL/DML statements. For example, use run_security_query method to execute GRANT/REVOKE statements. Use run_xflow or execute_xflow method to execute PAI commands.

Execute SQL statements

You can use execute_sql to run SQL and wait for finish. The method will block until execution is finished and returns an instance object. If execution of SQL statement fails, the method will raise an error based on odps.errors.ODPSError class.

>>> o.execute_sql('select * from dual')  # synchronous way, will block till SQL statement finishes execution

You can also use non-blocking method to run SQL. The method will submit your SQL statement to MaxCompute and return the corresponding Instance object. You need to call wait_for_success method to wait till execution finishes. Similar to execute_sql, wait_for_success also raises errors based on odps.errors.ODPSError if there are any errors with the SQL instance.

>>> instance = o.run_sql('select * from dual')  # asynchronous way
>>> print(instance.get_logview_address())  # obtain LogView address
>>> instance.wait_for_success()  # block till the statement finishes

You can take a look at instances for more information on instance objects returned by run_sql or execute_sql method.

Execute SQL with MCQA acceleration

MCQA is the query acceleration service provided by MaxCompute, which supports accelerating queries on small-sized data sets with standalone resource pools. PyODPS supports methods below to execute SQL with MCQA since 0.11.4.1. You need to use these methods under a MaxCompute service with MCQA.

You can use execute_sql_interactive to execute SQL with MCQA and return MCQA Instance object. If MCQA does not support your SQL statement, it will fallback to traditional mode and return traditional Instance object after fallback.

>>> o.execute_sql_interactive('select * from dual')

If you don’t want the method to fallback automatically, you can specify fallback=False. You can also specify a fallback policy or combination of fallback policies with a comma-separated string. Available policy names are listed below. The default policy is all, which is an alias of a combination of generic,unsupported,upgrading,noresource,timeout.

  • generic : once specified, will fallback to traditional mode when unknown error happens.

  • noresource : once specified, will fallback to traditional mode when resource is not sufficient.

  • upgrading : once specified, will fallback to traditional mode when the service is upgrading.

  • timeout : once specified, will fallback to traditional mode when execution timed out.

  • unsupported : once specified, will fallback to traditional mode when MCQA does not support certain SQL statement.

For instance, code below requires fallback when resource is not sufficient or MCQA does not support certain statements:

>>> o.execute_sql_interactive('select * from dual', fallback="noresource,unsupported")

You can also use run_sql_interactive to run SQL with MCQA. The method returns MCQA Instance once your SQL is submitted to the cluster, and you need to wait till Instance finishes like what is needed for run_sql. Note that this method will NOT fallback automatically when it fails. You need to retry or call execute_sql yourself.

>>> instance = o.run_sql_interactive('select * from dual')  # asynchronous way
>>> print(instance.get_logview_address())  # obtain LogView address
>>> instance.wait_for_success()  # block till the statement finishes

Set timezone

Sometimes we want to display the queried time data with a correct timezone. We can set it via options.local_timezone.

options.local_timezone accepts the following 3 data types:

  • False: Use the UTC time.

  • True: Use the local timezone (default).

  • Timezone string: Use the passed timezone, e.g. Asia/Shanghai.

For example, use UTC time:

>>> from odps import options
>>> options.local_timezone = False

Use local timezone:

>>> from odps import options
>>> options.local_timezone = True

Use Asia/shanghai:

>>> from odps import options
>>> options.local_timezone = "Asia/Shanghai"

Note

After setting options.local_timezone, PyODPS will set odps.sql.timezone according to it automatically. The difference of them may cause the inconsistency of server time and client time, so setting odps.sql.timezone manually is not recommended.

Set runtime parameters

You can use the hints parameter to set runtime parameters. The parameter is a dict type which is supported for execute_sql, execute_sql_interactive, run_sql and run_sql_interactive.

>>> o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

You can set sql.settings globally. The relevant runtime parameters are automatically added during each execution.

>>> from odps import options
>>> options.sql.settings = {'odps.sql.mapper.split.size': 16}
>>> o.execute_sql('select * from pyodps_iris')  # global hints configured in options.sql.settings will be added

View SQL results

You can execute the open_reader method to retrieve SQL execution results. In the following example, structured data is returned.

>>> with o.execute_sql('select * from dual').open_reader() as reader:
>>>     for record in reader:
>>>         # process every record

When commands such as desc are executed, you can use the reader.raw attribute to get the original execution results.

>>> with o.execute_sql('desc dual').open_reader() as reader:
>>>     print(reader.raw)

If options.tunnel.use_instance_tunnel is set to True when open_reader has been executed, PyODPS calls Instance Tunnel by default. If options.tunnel.use_instance_tunnel is not set to True when open_reader has been executed, PyODPS calls the old Result interface. If you are using an old version of MaxCompute, or an error occurred when calling Instance Tunnel, PyODPS reports a warning and automatically calls the old Result interface instead. If the result of Instance Tunnel does not meet your expectation, set this option to False. When calling open_reader, you can also use the tunnel parameter to specify which result interface to use. For example:

>>> # Use Instance Tunnel
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>>     for record in reader:
>>>         # process every record
>>> # Use Results interface
>>> with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
>>>     for record in reader:
>>>         # process every record

By default, PyODPS does not limit the size of data that can be read from an Instance. However, project owners might add protection configuration on projects to limit reading from instances. At that time you might only be permitted to read under limit mode which limits number of rows to read given configuration in the project, which is 10000 rows. If PyODPS detects the existence of read limit while options.tunnel.limit_instance_tunnel is not set, limit mode is automatically enabled and number of downloadable records is limited. If your project is protected and want to enable limit mode manually, you can add limit=True option to open_reader, or set options.tunnel.limit_instance_tunnel to True.

In some environments, for instance, options.tunnel.limit_instance_tunnel might be set to True for compatibility. In those environments, if you want to read all data, you need to add arguments tunnel=True, limit=False for open_reader method. Note that these two arguments will NOT lift read limitation on your project. If you still meet read limitations, please ask your project owner to grant read privileges for you.

If the MaxCompute version you are using only supports the old Result interface, and you need to read all data, you can export the SQL results to another table and use these methods to read data. This may be limited by project security settings.

PyODPS also supports reading data as pandas DataFrames.

>>> # use to_pandas() method of the reader directly
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>>     # type of pd_df is pandas DataFrame
>>>     pd_df = reader.to_pandas()

If you want to accelerate data reading with multiple cores, you can specify n_process with number of cores you want to use:

>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>>     # n_process should be number of processes to use
>>>     pd_df = reader.to_pandas(n_process=n_process)

Note

Currently Arrow format is not supported for instance results yet.

Set alias

Some resources referenced by a UDF are dynamically changing at runtime. You can create an alias for the old resource and use it as a new resource.

from odps.models import TableSchema

myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    TableSchema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# write one row with only one value '1'
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
2
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# When we need to replace resource with value '1' with resource with value '2',
# the only thing we need to do is to use alias argument. Modifying UDFs or resources is not needed.
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])
3

Execute SQL statements in an interactive environment

In ipython and jupyter, you can use SQL plugins to execute SQL statements. Besides, parameterized query is also supported. For details, see Documentation.

Set biz_id

In a few cases, it may be necessary to submit biz_id when submitting SQL statements. Otherwise an error occurs during execution. You can set the biz_id in options globally.

from odps import options

options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')