Instructions for running MaxCompute in DataWorks
Create flow nodes
Flow nodes include the Python on MaxCompute (PyODPS) node. You can create the PyODPS node.
Use the ODPS object
The PyODPS node in DataWorks includes global variable odps or o, which is the ODPS object. You do not need to manually define the ODPS object.
print(o.exist_table('pyodps_iris'))
Note
Value of variable o is already set before your PyODPS node is executed. Please do not set this variable manually unless you have to, or the MaxCompute entry object might be lost, and you might receive privilege errors when the node is executed in production due to change of default accounts.
Execute SQL statements
For more information, see Execute SQL statements .
Note
Instance tunnel is not enabled by default on Dataworks, thus 10000 records can be fetched at most. When instance tunnel is enabled, reader.count illustrates the number of records, and limitation should be disabled to fetch all data by iteration.
In order to enable instance tunnel globally, do as the code shown below.
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # disable limitation to fetch all data
with instance.open_reader() as reader:
# you can fetch all data by instance tunnel
Also you can add tunnel=True to open_reader to enable instance tunnel for this reader only, and add limit=False to disable limitation and fetch all data.
with instance.open_reader(tunnel=True, limit=False) as reader:
# use instance tunnel and fetch all data without limitation
Note that some project may limit downloading all data from tables, therefore you may get a permission error after configuring these options. You may contact your project owner for help, or process data in MaxCompute rather than download and process them locally.
DataFrame
Execution
To execute DataFrame in DataWorks, you need to explicitly call automatically executed actions such as execute and head .
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
for record in iris[iris.sepal_width < 3].execute(): # filtering will be executed immediately with execute() called
# process every record
To call automatically executed actions for print, set options.interactive to True.
from odps import options
from odps.df import DataFrame
options.interactive = True # configure at the start of code
iris = DataFrame(o.get_table('pyodps_iris'))
print(iris.sepal_width.sum()) # sum() will be executed immediately because we use print here
Print details
To print details, you need to set options.verbose. By default, this parameter is set to True in DataWorks. The system prints the logview and other details during operation.
Obtain scheduling parameters
Different from SQL nodes in DataWorks, to avoid invading your Python code which might lead to unpredictable consequences, PyODPS nodes DOES NOT automatically replace placeholder strings like ${param_name}. Instead, PyODPS node will create a dict named args in global variables, which contains all the scheduling parameters. For instance, if you set ds=${yyyymmdd} in Schedule -> Parameter in DataWorks, you can use the following code to obtain the value of ds:
print('ds=' + args['ds'])
print statement above will put the string below in the output frame of DataWorks:
ds=20161116
Specifically, if you want to get the table partition ds=${yyyymmdd}, the code below can be used:
o.get_table('table_name').get_partition('ds=' + args['ds'])
More examples of using schedule parameters can be seen in DataWorks documentation .
Note
Value of arg is already set before your PyODPS node is executed. Please do not set this variable manually or schedule parameters can be overwritten.
${param_name} style parameters in SQL nodes shall never be used in PyODPS nodes, even if it seems that they produce correct results in some scenario.
Use third-party libraries
DataWorks node already installs several third-party libraries by default. Installed versions are listed below.
Package Name |
Version under Python 2 Node |
Version under Python 3 Node |
|---|---|---|
requests |
2.11.1 |
2.26.0 |
numpy |
1.16.6 |
1.18.1 |
pandas |
0.24.2 |
1.0.5 |
scipy |
0.19.0 |
1.3.0 |
scikit_learn |
0.18.1 |
0.22.1 |
pyarrow |
0.16.0 |
2.0.0 |
lz4 |
2.1.4 |
3.1.10 |
zstandard |
0.14.1 |
0.17.0 |
If you need to use packages not listed above, resource_pack comment annotation can be used when you are using PyODPS Python 3 node in DataWorks and the version is above 0.12.0. After calling pyodps-pack to pack your dependencies, you can add a comment with resource_pack to install third-party libraries and then use import statement to use them. Details about pyodps-pack can be found here.
Note
If you are creating packages for Python2 nodes, please add --dwpy27 argument when calling pyodps-pack.
建议使用 PyODPS 包版本至少为 0.11.3,否则部分生成的包可能无法正常加载。关于 PyODPS 包及节点执行组件的升级可参考这个章节。
For instance, we use command below to create package.
pyodps-pack -o ipaddress-bundle.tar.gz ipaddress
After uploading and submitting ipaddress-bundle.tar.gz as a resource, you can use ipaddress package with code below. Note that you need to keep the comment line in your code.
# -*- resource_pack: ipaddress-bundle.tar.gz
import ipaddress
DataWorks limits total package size to 100MB. If you want to exclude these preinstalled packages, you can add --exclude argument when calling pyodps-pack. For instance, command below excludes numpy and scipy which exists in DataWorks environment.
pyodps-pack -o bundle.tar.gz --exclude numpy --exclude pandas <your_package>
You may add multiple packages with resource_pack by separating them with commas.
对于 0.11.3 以上版本的 DataWorks PyODPS Python 3 节点,你也可以使用 pyodps-pack 打包,并在包加载前使用 load_resource_package 方法引入三方包:
load_resource_package('ipaddress-bundle.tar.gz')
import ipaddress
需要注意的是,如果你需要使用的三方包已经在预装三方包中,使用 load_resource_package 可能无法加载所需的版本,此时建议使用 resource_pack 注释的方式。
Use other accounts
In some cases you may want to use another account to access MaxCompute instead of the one provided by the platform. Since PyODPS 0.11.3, you may use as_account method of MaxCompute entrance object to create a new entrance object independent with the variable o provided by the platform. For instance,
import os
# Make sure environment variable ALIBABA_CLOUD_ACCESS_KEY_ID already set to Access Key ID of user
# while environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET set to Access Key Secret of user.
# Not recommended to hardcode Access Key ID or Access Key Secret in your code.
new_odps = o.as_account(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
Diagnose problems
If your code stuck on execution and no outputs emitted, you can add comment shown below to let DataWorks PyODPS Python 3 node dumps stack trace of all threads every 30 seconds.
# -*- dump_traceback: true -*-
Feature restriction
DataWorks does not have the matplotlib library. Therefore, the following features may be restricted:
DataFrame plot function
Custom functions in DataFrame need to be submitted to MaxCompute before execution. Due to Python sandbox, third-party libraries which are written in pure Python or referencing merely numpy can be executed without uploading auxiliary libraries. Other libraries including Pandas should be uploaded before use. See support for third-party libraries for more details. Code outside custom functions can use pre-installed Numpy and Pandas in DataWorks. Other third-party libraries with binary codes are not supported currently.
For compatibility reasons, options.tunnel.use_instance_tunnel in DataWorks is set to False by default. To enable Instance Tunnel globally, you need to manually set options.tunnel.use_instance_tunnel to True.
For implementation reasons, the Python atexit package is not supported. You need to use the try - finally structure to implement related features.
Usage restrictions
To avoid pressure on the gateway of DataWorks when running PyODPS in DataWorks, the CPU and memory usage is restricted. DataWorks provides central management for this restriction.
If the system displays Got killed, this indicates an out-of-memory error and that the process has been terminated. Therefore, we do not recommend starting local data operations.
However, the preceding restriction does not work on SQL and DataFrame tasks (except to_pandas) that are initiated by PyODPS.
Upgrade
共享资源组中的 DataWorks PyODPS 节点执行组件及 PyODPS 包版本由阿里云维护,并会随着 PyODPS 更新而更新。独享资源组中的节点执行组件及 PyODPS 包则可能在资源组生成时即固定下来。如果你需要使用更新版本 PyODPS 包中提供的功能(通常指本文档以外的 API),可以参考该文档自行升级所需的 PyODPS 版本。需要注意的是,下列功能由 PyODPS 节点执行组件而非 PyODPS 包本身提供。无法通过自行升级进行安装:
Scheduling parameters
Capabilities provided by comments, for instance,
dump_tracebackload_resource_packageAutomatic hints for errors
对于 0.11.5 及后续版本的 PyODPS 节点执行组件,当版本与 PyODPS 版本不一致时,会在执行时在日志中同时显示两个版本号。阿里云将会不定期更新 PyODPS 节点执行组件,更新时间点相比共享资源组存在一定的延后。如你对节点执行组件有更新需求,可以通过工单联系阿里云寻求升级支持。