SQL

PyODPS支持ODPS SQL的查询,并可以读取执行的结果。 execute_sql / execute_sql_interactive / run_sql / run_sql_interactive 方法的返回值是 运行实例

备注

并非所有在 ODPS Console 中可以执行的命令都是 ODPS 可以接受的 SQL 语句。 在调用非 DDL / DML 语句时,请使用其他方法,例如 GRANT / REVOKE 等语句请使用 run_security_query 方法,PAI 命令请使用 run_xflowexecute_xflow 方法。

执行 SQL

你可以使用 execute_sql 方法以同步方式执行 SQL。调用时,该方法会阻塞直至 SQL 执行完成,并返回一个 Instance 实例。如果 SQL 执行报错,该方法会抛出以 odps.errors.ODPSError 为基类的错误。

>>> o.execute_sql('select * from dual')  # 同步的方式执行,会阻塞直到SQL执行完成

你也可以使用非阻塞方式异步执行 SQL。调用时,该方法在将 SQL 提交到 MaxCompute 后即返回 Instance 实例。你需要使用 wait_for_success 方法等待该 SQL 执行完成。同样地,如果 instance 出现错误, wait_for_success 会抛出以 odps.errors.ODPSError 为基类的错误。

>>> instance = o.run_sql('select * from dual')  # 异步的方式执行
>>> print(instance.get_logview_address())  # 获取logview地址
>>> instance.wait_for_success()  # 阻塞直到完成

关于如何操作 run_sql / execute_sql 返回的 Instance 实例,可以参考 运行实例

使用 MCQA 执行 SQL

MCQA 是 MaxCompute 提供的查询加速功能, 支持使用独立资源池对中小规模数据进行加速。PyODPS 从 0.11.4.1 开始支持以下列方式通过 MCQA 执行 SQL ,同时需要 MaxCompute 具备 MCQA 的支持。

你可以使用 execute_sql_interactive 通过 MCQA 执行 SQL 并返回 MCQA Instance。如果 MCQA 无法执行相应的 SQL ,会自动回退到传统模式。此时,函数返回的 Instance 为回退后的 Instance。

>>> o.execute_sql_interactive('select * from dual', fallback='all')

如果不希望回退,可以指定参数 fallback=False。也可以指定为回退策略(或回退策略的组合,使用逗号分隔的字符串)。 可用的策略名如下。默认策略为 unsupported,upgrading,noresource,timeout 。为使回退一直生效,建议设置为 all ,即 generic,unsupported,upgrading,noresource,timeout 的组合。

  • generic :指定时,表示发生未知错误时回退到传统模式。

  • noresource :指定时,表示发生资源不足问题时回退到传统模式。

  • upgrading :指定时,表示升级期间回退到传统模式。

  • timeout :指定时,表示执行超时时回退到传统模式。

  • unsupported :指定时,表示遇到 MCQA 不支持的场景时回退到传统模式。

例如,下面的代码要求在 MCQA 不支持和资源不足时回退:

>>> o.execute_sql_interactive('select * from dual', fallback="noresource,unsupported")

你也可以使用 run_sql_interactive 通过 MCQA 异步执行 SQL。类似 run_sql,该方法会在提交任务后即返回 MCQA Instance,你需要自行等待 Instance 完成。需要注意的是,该方法不会自动回退。当执行失败时,你需要自行重试或执行 execute_sql

>>> instance = o.run_sql_interactive('select * from dual')  # 异步的方式执行
>>> print(instance.get_logview_address())  # 获取logview地址
>>> instance.wait_for_success()  # 阻塞直到完成

设置时区

有时我们希望对于查询出来的时间数据显示为特定时区下的时间,可以通过 options.local_timezone 设置客户端的时区。

options.local_timezone 可设置为以下三种类型:

  • False:使用 UTC 时间(默认设置)。

  • True:使用本地时区。

  • 时区字符串:使用指定的时区,例如 Asia/Shanghai

例如,使用 UTC 时间:

>>> from odps import options
>>> options.local_timezone = False

使用本地时区:

>>> from odps import options
>>> options.local_timezone = True

使用 Asia/Shanghai

>>> from odps import options
>>> options.local_timezone = "Asia/Shanghai"

备注

设置 options.local_timezone 后,PyODPS 会根据它的值自动设置 odps.sql.timezone。 两者的值不同可能导致服务端和客户端时间不一致,因此不应再手动设置 odps.sql.timezone

设置运行参数

有时,我们在运行时,需要设置运行时参数,我们可以通过设置 hints 参数,参数类型是 dict。该参数对 execute_sql / execute_sql_interactive / run_sql / run_sql_interactive 均有效。

>>> o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

我们可以对于全局配置设置sql.settings后,每次运行时则都会添加相关的运行时参数。

>>> from odps import options
>>> options.sql.settings = {'odps.sql.mapper.split.size': 16}
>>> o.execute_sql('select * from pyodps_iris')  # 会根据全局配置添加hints

读取SQL执行结果

运行 SQL 的 instance 能够直接执行 open_reader 的操作,一种情况是SQL返回了结构化的数据。

>>> with o.execute_sql('select * from dual').open_reader() as reader:
>>>     for record in reader:
>>>         # 处理每一个record

另一种情况是 SQL 可能执行的比如 desc,这时通过 reader.raw 属性取到原始的SQL执行结果。

>>> with o.execute_sql('desc dual').open_reader() as reader:
>>>     print(reader.raw)

如果 options.tunnel.use_instance_tunnel == True,在调用 open_reader 时,PyODPS 会默认调用 Instance Tunnel, 否则会调用旧的 Result 接口。如果你使用了版本较低的 MaxCompute 服务,或者调用 Instance Tunnel 出现了问题,PyODPS 会给出警告并自动降级到旧的 Result 接口,可根据警告信息判断导致降级的原因。如果 Instance Tunnel 的结果不合预期, 请将该选项设为 False。在调用 open_reader 时,也可以使用 tunnel 参数来指定使用何种结果接口,例如

>>> # 使用 Instance Tunnel
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>>     for record in reader:
>>>         # 处理每一个record
>>> # 使用 Results 接口
>>> with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
>>>     for record in reader:
>>>         # 处理每一个record

PyODPS 默认不限制能够从 Instance 读取的数据规模,但 Project Owner 可能在 MaxCompute Project 上增加保护设置以限制对 Instance 结果的读取,此时只能使用受限读取模式读取数据,在此模式下可读取的行数受到 Project 配置限制,通常为 10000 行。如果 PyODPS 检测到读取 Instance 数据被限制,且 options.tunnel.limit_instance_tunnel 未设置,会自动启用受限读取模式。 如果你的 Project 被保护,想要手动启用受限读取模式,可以为 open_reader 方法增加 limit=True 选项,或者设置 options.tunnel.limit_instance_tunnel = True

在部分环境中,例如 DataWorks,options.tunnel.limit_instance_tunnel 可能默认被置为 True。此时,如果需要读取所有数据,需要为 open_reader 增加参数 tunnel=True, limit=False 。需要注意的是,如果 Project 本身被保护,这两个参数 不能 解除保护,此时应联系 Project Owner 开放相应的读权限。

如果你所使用的 MaxCompute 只能支持旧 Result 接口,同时你需要读取所有数据,可将 SQL 结果写入另一张表后用读表接口读取 (可能受到 Project 安全设置的限制)。

同时,PyODPS 支持直接将运行结果数据读成 pandas DataFrame。

>>> # 直接使用 reader 的 to_pandas 方法
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>>     # pd_df 类型为 pandas DataFrame
>>>     pd_df = reader.to_pandas()

如果需要使用多核加速读取速度,可以通过 n_process 指定使用进程数:

>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
>>>     # n_process 指定成机器核数
>>>     pd_df = reader.to_pandas(n_process=n_process)

备注

目前 Instance 结果暂不支持使用 Arrow 格式读取。

设置alias

有时在运行时,比如某个UDF引用的资源是动态变化的,我们可以alias旧的资源名到新的资源,这样免去了重新删除并重新创建UDF的麻烦。

from odps.models import TableSchema

myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    TableSchema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# 写入一行数据,只包含一个值1
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
2
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把内容为1的资源alias成内容为2的资源,我们不需要修改UDF或资源
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])
3

在交互式环境执行 SQL

在 ipython 和 jupyter 里支持 使用 SQL 插件的方式运行 SQL,且支持 参数化查询, 详情参阅 文档

设置 biz_id

在少数情形下,可能在提交 SQL 时,需要同时提交 biz_id,否则执行会报错。此时,你可以设置全局 options 里的 biz_id。

from odps import options

options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')