SQL
PyODPS支持ODPS SQL的查询,并可以读取执行的结果。 execute_sql
/ execute_sql_interactive
/
run_sql
/ run_sql_interactive
方法的返回值是 运行实例 。
备注
并非所有在 ODPS Console 中可以执行的命令都是 ODPS 可以接受的 SQL 语句。
在调用非 DDL / DML 语句时,请使用其他方法,例如 GRANT / REVOKE 等语句请使用
run_security_query
方法,PAI 命令请使用 run_xflow
或 execute_xflow
方法。
执行 SQL
你可以使用 execute_sql
方法以同步方式执行 SQL。调用时,该方法会阻塞直至 SQL 执行完成,并返回一个
Instance 实例。如果 SQL 执行报错,该方法会抛出以 odps.errors.ODPSError
为基类的错误。
>>> o.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成
你也可以使用非阻塞方式异步执行 SQL。调用时,该方法在将 SQL 提交到 MaxCompute 后即返回 Instance
实例。你需要使用 wait_for_success
方法等待该 SQL 执行完成。同样地,如果 instance 出现错误,
wait_for_success
会抛出以 odps.errors.ODPSError
为基类的错误。
>>> instance = o.run_sql('select * from dual') # 异步的方式执行
>>> print(instance.get_logview_address()) # 获取logview地址
>>> instance.wait_for_success() # 阻塞直到完成
关于如何操作 run_sql / execute_sql 返回的 Instance 实例,可以参考 运行实例 。
使用 MCQA 执行 SQL
MCQA 是 MaxCompute 提供的查询加速功能, 支持使用独立资源池对中小规模数据进行加速。PyODPS 从 0.11.4.1 开始支持以下列方式通过 MCQA 执行 SQL ,同时需要 MaxCompute 具备 MCQA 的支持。
你可以使用 execute_sql_interactive
通过 MCQA 执行 SQL 并返回 MCQA Instance。如果
MCQA 无法执行相应的 SQL ,会自动回退到传统模式。此时,函数返回的 Instance 为回退后的 Instance。
>>> o.execute_sql_interactive('select * from dual')
如果不希望回退,可以指定参数 fallback=False
。也可以指定为回退策略(或回退策略的组合,使用逗号分隔的字符串)。
可用的策略名如下。默认策略为 all
(即 generic,unsupported,upgrading,noresource,timeout
)。
generic
:指定时,表示发生未知错误时回退到传统模式。noresource
:指定时,表示发生资源不足问题时回退到传统模式。upgrading
:指定时,表示升级期间回退到传统模式。timeout
:指定时,表示执行超时时回退到传统模式。unsupported
:指定时,表示遇到 MCQA 不支持的场景时回退到传统模式。
例如,下面的代码要求在 MCQA 不支持和资源不足时回退:
>>> o.execute_sql_interactive('select * from dual', fallback="noresource,unsupported")
你也可以使用 run_sql_interactive
通过 MCQA 异步执行 SQL。类似 run_sql
,该方法会在提交任务后即返回
MCQA Instance,你需要自行等待 Instance 完成。需要注意的是,该方法不会自动回退。当执行失败时,你需要自行重试或执行
execute_sql
。
>>> instance = o.run_sql_interactive('select * from dual') # 异步的方式执行
>>> print(instance.get_logview_address()) # 获取logview地址
>>> instance.wait_for_success() # 阻塞直到完成
设置时区
有时我们希望对于查询出来的时间数据显示为特定时区下的时间,可以通过 options.local_timezone
设置客户端的时区。
options.local_timezone
可设置为以下三种类型:
False
:使用 UTC 时间。True
:使用本地时区(默认设置)。时区字符串:使用指定的时区,例如
Asia/Shanghai
。
例如,使用 UTC 时间:
>>> from odps import options
>>> options.local_timezone = False
使用本地时区:
>>> from odps import options
>>> options.local_timezone = True
使用 Asia/Shanghai
:
>>> from odps import options
>>> options.local_timezone = "Asia/Shanghai"
备注
设置 options.local_timezone
后,PyODPS 会根据它的值自动设置 odps.sql.timezone
。
两者的值不同可能导致服务端和客户端时间不一致,因此不应再手动设置 odps.sql.timezone
。
设置运行参数
有时,我们在运行时,需要设置运行时参数,我们可以通过设置 hints
参数,参数类型是 dict。该参数对 execute_sql
/
execute_sql_interactive
/ run_sql
/ run_sql_interactive
均有效。
>>> o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
我们可以对于全局配置设置sql.settings后,每次运行时则都会添加相关的运行时参数。
>>> from odps import options
>>> options.sql.settings = {'odps.sql.mapper.split.size': 16}
>>> o.execute_sql('select * from pyodps_iris') # 会根据全局配置添加hints
读取SQL执行结果
运行 SQL 的 instance 能够直接执行 open_reader
的操作,一种情况是SQL返回了结构化的数据。
>>> with o.execute_sql('select * from dual').open_reader() as reader:
>>> for record in reader:
>>> # 处理每一个record
另一种情况是 SQL 可能执行的比如 desc
,这时通过 reader.raw
属性取到原始的SQL执行结果。
>>> with o.execute_sql('desc dual').open_reader() as reader:
>>> print(reader.raw)
如果 options.tunnel.use_instance_tunnel == True,在调用 open_reader 时,PyODPS 会默认调用 Instance Tunnel,
否则会调用旧的 Result 接口。如果你使用了版本较低的 MaxCompute 服务,或者调用 Instance Tunnel 出现了问题,PyODPS
会给出警告并自动降级到旧的 Result 接口,可根据警告信息判断导致降级的原因。如果 Instance Tunnel 的结果不合预期,
请将该选项设为 False。在调用 open_reader 时,也可以使用 tunnel
参数来指定使用何种结果接口,例如
>>> # 使用 Instance Tunnel
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>> for record in reader:
>>> # 处理每一个record
>>> # 使用 Results 接口
>>> with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
>>> for record in reader:
>>> # 处理每一个record
PyODPS 默认不限制能够从 Instance 读取的数据规模,但 Project Owner 可能在 MaxCompute Project 上增加保护设置以限制对 Instance 结果的读取,此时只能使用受限读取模式读取数据,在此模式下可读取的行数受到 Project 配置限制,通常为 10000 行。如果 PyODPS 检测到读取 Instance 数据被限制,且 options.tunnel.limit_instance_tunnel 未设置,会自动启用受限读取模式。 如果你的 Project 被保护,想要手动启用受限读取模式,可以为 open_reader 方法增加 limit=True 选项,或者设置 options.tunnel.limit_instance_tunnel = True 。
在部分环境中,例如 DataWorks,options.tunnel.limit_instance_tunnel 可能默认被置为 True。此时,如果需要读取所有数据,需要为 open_reader 增加参数 tunnel=True, limit=False 。需要注意的是,如果 Project 本身被保护,这两个参数 不能 解除保护,此时应联系 Project Owner 开放相应的读权限。
如果你所使用的 MaxCompute 只能支持旧 Result 接口,同时你需要读取所有数据,可将 SQL 结果写入另一张表后用读表接口读取 (可能受到 Project 安全设置的限制)。
同时,PyODPS 支持直接将运行结果数据读成 pandas DataFrame。
>>> # 直接使用 reader 的 to_pandas 方法
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>> # pd_df 类型为 pandas DataFrame
>>> pd_df = reader.to_pandas()
如果需要使用多核加速读取速度,可以通过 n_process 指定使用进程数:
>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>> # n_process 指定成机器核数
>>> pd_df = reader.to_pandas(n_process=n_process)
备注
目前 Instance 结果暂不支持使用 Arrow 格式读取。
设置alias
有时在运行时,比如某个UDF引用的资源是动态变化的,我们可以alias旧的资源名到新的资源,这样免去了重新删除并重新创建UDF的麻烦。
from odps.models import TableSchema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
TableSchema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# 写入一行数据,只包含一个值1
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
2
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把内容为1的资源alias成内容为2的资源,我们不需要修改UDF或资源
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
3
在交互式环境执行 SQL
在 ipython 和 jupyter 里支持 使用 SQL 插件的方式运行 SQL,且支持 参数化查询, 详情参阅 文档。
设置 biz_id
在少数情形下,可能在提交 SQL 时,需要同时提交 biz_id,否则执行会报错。此时,你可以设置全局 options 里的 biz_id。
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')