SQL
PyODPS supports MaxCompute SQL queries and provides methods to read SQL results. All execute_sql() , execute_sql_interactive() , run_sql() and run_sql_interactive() methods return instances .
Note
The commands that are executable in the MaxCompute Console may not be executed as SQL statements in MaxCompute. Use other methods to execute non-DDL/DML statements. For example, use run_security_query() method to execute GRANT/REVOKE statements. Use run_xflow() or execute_xflow() method to execute PAI commands.
Execute SQL statements
You can use execute_sql() to run SQL and wait for finish. The method will block until execution is finished and returns an Instance object. If execution of SQL statement fails, the method will raise an error based on odps.errors.ODPSError class.
>>> o.execute_sql('select * from dual') # synchronous way, will block till SQL statement finishes execution
你也可以使用非阻塞方式异步执行 SQL。调用时,该方法在将 SQL 提交到 MaxCompute 后即返回 Instance
实例。你需要使用 wait_for_success() 方法等待该 SQL 执行完成。同样地,如果 instance 出现错误,wait_for_success() 会抛出以
odps.errors.ODPSError 为基类的错误。
>>> instance = o.run_sql('select * from dual') # asynchronous way
>>> print(instance.get_logview_address()) # obtain LogView address
>>> instance.wait_for_success() # block till the statement finishes
You can take a look at instances for more information on instance objects returned by run_sql or execute_sql method.
Execute SQL with MCQA acceleration
MCQA is the query acceleration service provided by MaxCompute, which supports accelerating queries on small-sized data sets with standalone resource pools. PyODPS supports methods below to execute SQL with MCQA since 0.11.4.1. You need to use these methods under a MaxCompute service with MCQA.
You can use execute_sql_interactive() to execute SQL with MCQA and return MCQA Instance object. If MCQA does not support your SQL statement, it will fallback to traditional mode and return traditional Instance object after fallback.
>>> o.execute_sql_interactive('select * from dual')
If you don’t want the method to fallback automatically, you can specify fallback=False. You can also specify a fallback policy or combination of fallback policies with a comma-separated string. Available policy names are listed below. The default policy is all, which is an alias of a combination of generic,unsupported,upgrading,noresource,timeout.
generic: once specified, will fallback to traditional mode when unknown error happens.noresource: once specified, will fallback to traditional mode when resource is not sufficient.upgrading: once specified, will fallback to traditional mode when the service is upgrading.timeout: once specified, will fallback to traditional mode when execution timed out.unsupported: once specified, will fallback to traditional mode when MCQA does not support certain SQL statement.
For instance, code below requires fallback when resource is not sufficient or MCQA does not support certain statements:
>>> o.execute_sql_interactive('select * from dual', fallback="noresource,unsupported")
你也可以使用 run_sql_interactive() 通过 MCQA 异步执行 SQL。类似 run_sql(),该方法会在提交任务后即返回 MCQA Instance,你需要自行等待 Instance 完成。需要注意的是,该方法不会自动回退。当执行失败时,你需要自行重试或执行 execute_sql()。
>>> instance = o.run_sql_interactive('select * from dual') # asynchronous way
>>> print(instance.get_logview_address()) # obtain LogView address
>>> instance.wait_for_success() # block till the statement finishes
Set timezone
Sometimes we want to display the queried time data with a correct timezone. We can set it via options.local_timezone.
options.local_timezone accepts the following 3 data types:
False: Use the UTC time.True: Use the local timezone (default).Timezone string: Use the passed timezone, e.g.
Asia/Shanghai.
For example, use UTC time:
>>> from odps import options
>>> options.local_timezone = False
Use local timezone:
>>> from odps import options
>>> options.local_timezone = True
Use Asia/shanghai:
>>> from odps import options
>>> options.local_timezone = "Asia/Shanghai"
Note
After setting options.local_timezone, PyODPS will set odps.sql.timezone according to it automatically. The difference of them may cause the inconsistency of server time and client time, so setting odps.sql.timezone manually is not recommended.
Set runtime parameters
You can use the hints parameter to set runtime parameters. The parameter is a dict type which is supported for execute_sql(), execute_sql_interactive(), run_sql() and run_sql_interactive().
>>> hints = {'odps.stage.mapper.split.size': 16, 'odps.sql.reducer.instances': 1024}
>>> o.execute_sql('select * from pyodps_iris', hints=hints)
You can set sql.settings globally. The relevant runtime parameters are automatically added during each execution.
>>> from odps import options
>>> options.sql.settings = {
>>> 'odps.stage.mapper.split.size': 16,
>>> 'odps.sql.reducer.instances': 1024,
>>> }
>>> o.execute_sql('select * from pyodps_iris') # global hints configured in options.sql.settings will be added
View SQL results
You can execute the open_reader() method to retrieve SQL execution results. In the following example, structured data is returned.
>>> with o.execute_sql('select * from dual').open_reader() as reader:
>>> for record in reader:
>>> # process every record
When commands such as desc are executed, you can use the reader.raw attribute to get the original execution results.
>>> with o.execute_sql('desc dual').open_reader() as reader:
>>> print(reader.raw)
If options.tunnel.use_instance_tunnel is set to True when open_reader has been executed, PyODPS calls Instance Tunnel by default. If options.tunnel.use_instance_tunnel is not set to True when open_reader has been executed, PyODPS calls the old Result interface. If you are using an old version of MaxCompute, or an error occurred when calling Instance Tunnel, PyODPS reports a warning and automatically calls the old Result interface instead. If the result of Instance Tunnel does not meet your expectation, set this option to False. When calling open_reader, you can also use the tunnel parameter to specify which result interface to use. For example:
>>> # Use Instance Tunnel
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>> for record in reader:
>>> # process every record
>>> # Use Results interface
>>> with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
>>> for record in reader:
>>> # process every record
By default, PyODPS does not limit the size of data that can be read from an Instance. However, project owners might add protection configuration on projects to limit reading from instances. At that time you might only be permitted to read under limit mode which limits number of rows to read given configuration in the project, which is 10000 rows. If PyODPS detects the existence of read limit while options.tunnel.limit_instance_tunnel is not set, limit mode is automatically enabled and number of downloadable records is limited. If your project is protected and want to enable limit mode manually, you can add limit=True option to open_reader, or set options.tunnel.limit_instance_tunnel = True.
在部分环境中,例如 DataWorks,options.tunnel.limit_instance_tunnel 可能默认被置为 True。此时,如果需要读取所有数据,需要为 open_reader 增加参数 tunnel=True, limit=False 。需要注意的是,如果 Project 本身被保护,这两个参数不能解除保护,MaxCompute 也不提供绕开该权限限制读取更多数据的方法。此时应联系 Project Owner
开放相应的读权限。
If the MaxCompute version you are using only supports the old Result interface, and you need to read all data, you can export the SQL results to another table and use these methods to read data. This may be limited by project security settings.
PyODPS also supports reading data as pandas DataFrames.
>>> # use to_pandas() method of the reader directly
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>> # type of pd_df is pandas DataFrame
>>> pd_df = reader.to_pandas()
If you want to accelerate data reading with multiple cores, you can specify n_process with number of cores you want to use:
>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>> # n_process should be number of processes to use
>>> pd_df = reader.to_pandas(n_process=n_process)
Note
It is expected that since late 2024, MaxCompute will support reading results of offline SQL instances into arrow format with Instance.open_reader like tables. MCQA instances do not support this feature by now. Before that time, reading data with Instance.open_reader(arrow=True) will lead to errors.
Since PyODPS 0.12.0, you can call to_pandas() method on Instance to read instance results into pandas format. Start row number and row count can be specified with this method, or all data will be read. limit argument is also supported with the same definition as open_reader. This method will try using arrow format if available and convert the result into pandas. If arrow format is not supported by service, it will fall back into record format.
>>> inst = o.execute_sql('select * from dual')
>>> pd_df = inst.to_pandas(start=10, count=20)
Similar to tables, since PyODPS 0.12.0, you can use iter_pandas() method of Instance to read pandas DataFrames in multiple batches. The method share similar arguments with Table.iter_pandas.
>>> inst = o.execute_sql('select * from dual')
>>> for batch in inst.iter_pandas(start=0, count=1000, batch_size=100):
>>> print(batch)
Set alias
Some resources referenced by a UDF are dynamically changing at runtime. You can create an alias for the old resource and use it as a new resource.
from odps.models import TableSchema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
TableSchema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# write one row with only one value '1'
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
2
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# When we need to replace resource with value '1' with resource with value '2',
# the only thing we need to do is to use alias argument. Modifying UDFs or resources is not needed.
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
3
Execute SQL statements in an interactive environment
In ipython and jupyter, you can use SQL plugins to execute SQL statements. Besides, parameterized query is also supported. For details, see Documentation.
Set biz_id
In a few cases, it may be necessary to submit biz_id when submitting SQL statements. Otherwise an error occurs during execution. You can set the biz_id in options globally.
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')