SQL
PyODPS supports MaxCompute SQL queries and provides methods to read SQL results. All execute_sql
, execute_sql_interactive
, run_sql
and run_sql_interactive
methods return instances .
Note
The commands that are executable in the MaxCompute Console may not be executed as SQL statements in MaxCompute. Use other methods to execute non-DDL/DML statements. For example, use run_security_query
method to execute GRANT/REVOKE statements. Use run_xflow
or execute_xflow
method to execute PAI commands.
Execute SQL statements
You can use execute_sql
to run SQL and wait for finish. The method will block until execution is finished and returns an instance object. If execution of SQL statement fails, the method will raise an error based on odps.errors.ODPSError
class.
>>> o.execute_sql('select * from dual') # synchronous way, will block till SQL statement finishes execution
You can also use non-blocking method to run SQL. The method will submit your SQL statement to MaxCompute and return the corresponding Instance object. You need to call wait_for_success
method to wait till execution finishes. Similar to execute_sql
, wait_for_success
also raises errors based on odps.errors.ODPSError
if there are any errors with the SQL instance.
>>> instance = o.run_sql('select * from dual') # asynchronous way
>>> print(instance.get_logview_address()) # obtain LogView address
>>> instance.wait_for_success() # block till the statement finishes
You can take a look at instances for more information on instance objects returned by run_sql or execute_sql method.
Execute SQL with MCQA acceleration
MCQA is the query acceleration service provided by MaxCompute, which supports accelerating queries on small-sized data sets with standalone resource pools. PyODPS supports methods below to execute SQL with MCQA since 0.11.4.1. You need to use these methods under a MaxCompute service with MCQA.
You can use execute_sql_interactive
to execute SQL with MCQA and return MCQA Instance object. If MCQA does not support your SQL statement, it will fallback to traditional mode and return traditional Instance object after fallback.
>>> o.execute_sql_interactive('select * from dual')
If you don’t want the method to fallback automatically, you can specify fallback=False
. You can also specify a fallback policy or combination of fallback policies with a comma-separated string. Available policy names are listed below. The default policy is all
, which is an alias of a combination of generic,unsupported,upgrading,noresource,timeout
.
generic
: once specified, will fallback to traditional mode when unknown error happens.noresource
: once specified, will fallback to traditional mode when resource is not sufficient.upgrading
: once specified, will fallback to traditional mode when the service is upgrading.timeout
: once specified, will fallback to traditional mode when execution timed out.unsupported
: once specified, will fallback to traditional mode when MCQA does not support certain SQL statement.
For instance, code below requires fallback when resource is not sufficient or MCQA does not support certain statements:
>>> o.execute_sql_interactive('select * from dual', fallback="noresource,unsupported")
You can also use run_sql_interactive
to run SQL with MCQA. The method returns MCQA Instance once your SQL is submitted to the cluster, and you need to wait till Instance finishes like what is needed for run_sql
. Note that this method will NOT fallback automatically when it fails. You need to retry or call execute_sql
yourself.
>>> instance = o.run_sql_interactive('select * from dual') # asynchronous way
>>> print(instance.get_logview_address()) # obtain LogView address
>>> instance.wait_for_success() # block till the statement finishes
Set timezone
Sometimes we want to display the queried time data with a correct timezone. We can set it via options.local_timezone
.
options.local_timezone
accepts the following 3 data types:
False
: Use the UTC time.True
: Use the local timezone (default).Timezone string: Use the passed timezone, e.g.
Asia/Shanghai
.
For example, use UTC time:
>>> from odps import options
>>> options.local_timezone = False
Use local timezone:
>>> from odps import options
>>> options.local_timezone = True
Use Asia/shanghai
:
>>> from odps import options
>>> options.local_timezone = "Asia/Shanghai"
Note
After setting options.local_timezone
, PyODPS will set odps.sql.timezone
according to it automatically. The difference of them may cause the inconsistency of server time and client time, so setting odps.sql.timezone
manually is not recommended.
Set runtime parameters
You can use the hints
parameter to set runtime parameters. The parameter is a dict type which is supported for execute_sql
, execute_sql_interactive
, run_sql
and run_sql_interactive
.
>>> o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
You can set sql.settings globally. The relevant runtime parameters are automatically added during each execution.
>>> from odps import options
>>> options.sql.settings = {'odps.sql.mapper.split.size': 16}
>>> o.execute_sql('select * from pyodps_iris') # global hints configured in options.sql.settings will be added
View SQL results
You can execute the open_reader
method to retrieve SQL execution results. In the following example, structured data is returned.
>>> with o.execute_sql('select * from dual').open_reader() as reader:
>>> for record in reader:
>>> # process every record
When commands such as desc
are executed, you can use the reader.raw
attribute to get the original execution results.
>>> with o.execute_sql('desc dual').open_reader() as reader:
>>> print(reader.raw)
If options.tunnel.use_instance_tunnel is set to True when open_reader has been executed, PyODPS calls Instance Tunnel by default. If options.tunnel.use_instance_tunnel is not set to True when open_reader has been executed, PyODPS calls the old Result interface. If you are using an old version of MaxCompute, or an error occurred when calling Instance Tunnel, PyODPS reports a warning and automatically calls the old Result interface instead. If the result of Instance Tunnel does not meet your expectation, set this option to False. When calling open_reader, you can also use the tunnel
parameter to specify which result interface to use. For example:
>>> # Use Instance Tunnel
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>> for record in reader:
>>> # process every record
>>> # Use Results interface
>>> with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
>>> for record in reader:
>>> # process every record
By default, PyODPS does not limit the size of data that can be read from an Instance. However, project owners might add protection configuration on projects to limit reading from instances. At that time you might only be permitted to read under limit mode which limits number of rows to read given configuration in the project, which is 10000 rows. If PyODPS detects the existence of read limit while options.tunnel.limit_instance_tunnel is not set, limit mode is automatically enabled and number of downloadable records is limited. If your project is protected and want to enable limit mode manually, you can add limit=True option to open_reader, or set options.tunnel.limit_instance_tunnel to True.
In some environments, for instance, options.tunnel.limit_instance_tunnel might be set to True for compatibility. In those environments, if you want to read all data, you need to add arguments tunnel=True, limit=False for open_reader method. Note that these two arguments will NOT lift read limitation on your project. If you still meet read limitations, please ask your project owner to grant read privileges for you.
If the MaxCompute version you are using only supports the old Result interface, and you need to read all data, you can export the SQL results to another table and use these methods to read data. This may be limited by project security settings.
PyODPS also supports reading data as pandas DataFrames.
>>> # use to_pandas() method of the reader directly
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>> # type of pd_df is pandas DataFrame
>>> pd_df = reader.to_pandas()
If you want to accelerate data reading with multiple cores, you can specify n_process with number of cores you want to use:
>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>> # n_process should be number of processes to use
>>> pd_df = reader.to_pandas(n_process=n_process)
Note
Currently Arrow format is not supported for instance results yet.
Set alias
Some resources referenced by a UDF are dynamically changing at runtime. You can create an alias for the old resource and use it as a new resource.
from odps.models import TableSchema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
TableSchema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# write one row with only one value '1'
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
2
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# When we need to replace resource with value '1' with resource with value '2',
# the only thing we need to do is to use alias argument. Modifying UDFs or resources is not needed.
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
3
Execute SQL statements in an interactive environment
In ipython and jupyter, you can use SQL plugins to execute SQL statements. Besides, parameterized query is also supported. For details, see Documentation.
Set biz_id
In a few cases, it may be necessary to submit biz_id when submitting SQL statements. Otherwise an error occurs during execution. You can set the biz_id in options globally.
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')