Instructions for running MaxCompute in DataWorks

Create flow nodes

Flow nodes include the Python on MaxCompute (PyODPS) node. You can create the PyODPS node.

_images/d2-node-en.png

Use the ODPS object

The PyODPS node in DataWorks includes global variable odps or o, which is the ODPS object. You do not need to manually define the ODPS object.

print(o.exist_table('pyodps_iris'))

Note

Value of variable o is already set before your PyODPS node is executed. Please do not set this variable manually unless you have to, or the MaxCompute entry object might be lost, and you might receive privilege errors when the node is executed in production due to change of default accounts.

Execute SQL statements

For more information, see Execute SQL statements .

Note

Instance tunnel is not enabled by default on Dataworks, thus 10000 records can be fetched at most. When instance tunnel is enabled, reader.count illustrates the number of records, and limitation should be disabled to fetch all data by iteration.

In order to enable instance tunnel globally, do as the code shown below.

options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False  # disable limitation to fetch all data

with instance.open_reader() as reader:
    # you can fetch all data by instance tunnel

Also you can add tunnel=True to open_reader to enable instance tunnel for this reader only, and add limit=False to disable limitation and fetch all data.

with instance.open_reader(tunnel=True, limit=False) as reader:
    # use instance tunnel and fetch all data without limitation

Note that some project may limit downloading all data from tables, therefore you may get a permission error after configuring these options. You may contact your project owner for help, or process data in MaxCompute rather than download and process them locally.

DataFrame

Execution

To execute DataFrame in DataWorks, you need to explicitly call automatically executed actions such as execute and head .

from odps.df import DataFrame

iris = DataFrame(o.get_table('pyodps_iris'))
for record in iris[iris.sepal_width < 3].execute():  # filtering will be executed immediately with execute() called
    # process every record

To call automatically executed actions for print, set options.interactive to True.

from odps import options
from odps.df import DataFrame

options.interactive = True  # configure at the start of code

iris = DataFrame(o.get_table('pyodps_iris'))
print(iris.sepal_width.sum())  # sum() will be executed immediately because we use print here

Print details

To print details, you need to set options.verbose. By default, this parameter is set to True in DataWorks. The system prints the logview and other details during operation.

Obtain scheduling parameters

Different from SQL nodes in DataWorks, to avoid invading your Python code which might lead to unpredictable consequences, PyODPS nodes DOES NOT automatically replace placeholder strings like ${param_name}. Instead, PyODPS node will create a dict named args in global variables, which contains all the scheduling parameters. For instance, if you set ds=${yyyymmdd} in Schedule -> Parameter in DataWorks, you can use the following code to obtain the value of ds:

print('ds=' + args['ds'])

print statement above will put the string below in the output frame of DataWorks:

ds=20161116

Specifically, if you want to get the table partition ds=${yyyymmdd}, the code below can be used:

o.get_table('table_name').get_partition('ds=' + args['ds'])

More examples of using schedule parameters can be seen in DataWorks documentation .

Note

Value of arg is already set before your PyODPS node is executed. Please do not set this variable manually or schedule parameters can be overwritten.

${param_name} style parameters in SQL nodes shall never be used in PyODPS nodes, even if it seems that they produce correct results in some scenario.

Use third-party libraries

DataWorks node already installs several third-party libraries by default. Installed versions are listed below.

Package Name

Version under Python 2 Node

Version under Python 3 Node

requests

2.11.1

2.26.0

numpy

1.16.6

1.18.1

pandas

0.24.2

1.0.5

scipy

0.19.0

1.3.0

scikit_learn

0.18.1

0.22.1

pyarrow

0.16.0

2.0.0

lz4

2.1.4

3.1.10

zstandard

0.14.1

0.17.0

If you need to use packages not listed above, load_resource_package method can be used. After calling pyodps-pack to pack your dependencies, you can call load_resource_package to install third-party libraries and then use import statement to use them. Details about pyodps-pack can be found here.

Note

If you are creating packages for Python2 nodes, please add --dwpy27 argument when calling pyodps-pack.

For instance, we use command below to create package.

pyodps-pack -o ipaddress-bundle.tar.gz ipaddress

After uploading and submitting ipaddress-bundle.tar.gz as a resource, you can use ipaddress package with code below.

load_resource_package("ipaddress-bundle.tar.gz")
import ipaddress

DataWorks limits total package size to 100MB. If you want to exclude these preinstalled packages, you can add --exclude argument when calling pyodps-pack. For instance, command below excludes numpy and scipy which exists in DataWorks environment.

pyodps-pack -o bundle.tar.gz --exclude numpy --exclude pandas <your_package>

Use other accounts

Note

as_account method is supported since PyODPS 0.11.3. The method is not available when DataWorks you are using does not deploy this version. If you are using exclusive resource groups, you may consider upgrading PyODPS to the latest version. Details can be seen in this document.

In some cases you may want to use another account to access MaxCompute instead of the one provided by the platform. You may use as_account method of ODPS entrance object to create a new entrance object independent with the variable o provided by the platform. For instance,

import os
# Make sure environment variable ALIBABA_CLOUD_ACCESS_KEY_ID already set to Access Key ID of user
# while environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET set to Access Key Secret of user.
# Not recommended to hardcode Access Key ID or Access Key Secret in your code.
new_odps = o.as_account(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)

问题诊断

If your code stuck on execution and no outputs emitted, you can add comment shown below to let DataWorks dump stack trace of all threads every 30 seconds.

# -*- dump_traceback: true -*-

Feature restriction

DataWorks does not have the matplotlib library. Therefore, the following features may be restricted:

  • DataFrame plot function

Custom functions in DataFrame need to be submitted to MaxCompute before execution. Due to Python sandbox, third-party libraries which are written in pure Python or referencing merely numpy can be executed without uploading auxiliary libraries. Other libraries including Pandas should be uploaded before use. See support for third-party libraries for more details. Code outside custom functions can use pre-installed Numpy and Pandas in DataWorks. Other third-party libraries with binary codes are not supported currently.

For compatibility reasons, options.tunnel.use_instance_tunnel in DataWorks is set to False by default. To enable Instance Tunnel globally, you need to manually set options.tunnel.use_instance_tunnel to True.

For implementation reasons, the Python atexit package is not supported. You need to use the try - finally structure to implement related features.

Usage restrictions

To avoid pressure on the gateway of DataWorks when running PyODPS in DataWorks, the CPU and memory usage is restricted. DataWorks provides central management for this restriction.

If the system displays Got killed, this indicates an out-of-memory error and that the process has been terminated. Therefore, we do not recommend starting local data operations.

However, the preceding restriction does not work on SQL and DataFrame tasks (except to_pandas) that are initiated by PyODPS.