Tables
Tables are the data storage unit in MaxCompute.
Basic operations
Note
Code in this document is only guaranteed to work under PyODPS 0.11.3 and later versions. For PyODPS earlier than 0.11.3, please replace class odps.models.Schema with odps.models.TableSchema and schema property with table_schema.
Use the list_tables() method as the ODPS object to list all tables in a project.
for table in o.list_tables():
print(table.name)
You can list all tables with given prefix with prefix argument.
for table in o.list_tables(prefix="table_prefix"):
print(table.name)
Table objects obtained with code above do not load properties other than names. If you get properties like table_schema or creation_time, an extra remote call will be performed which may cause extra delay. If you need to get these properties at the same time when listing tables, you can add extended=True argument for list_tables() in PyODPS 0.11.5 or later.
for table in o.list_tables(extended=True):
print(table.name, table.creation_time)
You can list tables of given type by specifying type argument. Examples of listing different types of tables are shown below.
managed_tables = list(o.list_tables(type="managed_table")) # iterate over MaxCompute-managed tables
external_tables = list(o.list_tables(type="external_table")) # iterate over external tables
virtual_views = list(o.list_tables(type="virtual_view")) # iterate over non-materialized views
materialized_views = list(o.list_tables(type="materialized_view")) # iterate over materialized views
Use exist_table() to check whether the specified table exists.
o.exist_table('dual')
Use get_table() to obtain the specified table.
>>> t = o.get_table('dual')
>>> t.table_schema
odps.Schema {
c_int_a bigint
c_int_b bigint
c_double_a double
c_double_b double
c_string_a string
c_string_b string
c_bool_a boolean
c_bool_b boolean
c_datetime_a datetime
c_datetime_b datetime
}
>>> t.lifecycle
-1
>>> print(t.creation_time)
2014-05-15 14:58:43
>>> t.is_virtual_view
False
>>> t.size
1408
>>> t.comment
'Dual Table Comment'
>>> t.table_schema.columns
[<column c_int_a, type bigint>,
<column c_int_b, type bigint>,
<column c_double_a, type double>,
<column c_double_b, type double>,
<column c_string_a, type string>,
<column c_string_b, type string>,
<column c_bool_a, type boolean>,
<column c_bool_b, type boolean>,
<column c_datetime_a, type datetime>,
<column c_datetime_b, type datetime>]
>>> t.table_schema['c_int_a']
<column c_int_a, type bigint>
>>> t.table_schema['c_int_a'].comment
'Comment of column c_int_a'
You can also provide the project parameter to obtain the specified table from another project.
>>> t = o.get_table('dual', project='other_project')
Create tables
You can use create_table() with the table schema to create a table in the following way:
>>> from odps.models import TableSchema, Column, Partition
>>>
>>> schema = TableSchema.from_lists(
>>> ['num', 'num2', 'arr'], ['bigint', 'double', 'array<int>'], ['pt'], ['string']
>>> )
>>> table = o.create_table('my_new_table', schema)
>>> table = o.create_table('my_new_table', schema, if_not_exists=True) # create table only when the table does not exist
>>> table = o.create_table('my_new_table', schema, lifecycle=7) # configure lifecycle of the table (in days)
An easier way is to use a string in the structure of “field name field type” to create the table, as shown in the following code:
>>> table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
>>> # a tuple like (column list, partition list) can be passed to create a partitioned table
>>> table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
在未经设置的情况下,创建表时,只允许使用 bigint、double、decimal、string、datetime、boolean、map 和 array 类型。如果你使用的是位于公共云上的服务,或者支持 tinyint、struct 等新类型,可以设置 options.sql.use_odps2_extension = True
打开这些类型的支持,示例如下:
>>> from odps import options
>>> options.sql.use_odps2_extension = True
>>> table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body string>')
Other parameters of create_table() can be used to set table properties or transaction properties. For instance, code below creates an ACID 2.0 table and specify key as the primary key. The column key should be specified as NOT NULL.
>>> table = o.create_table('my_trans_table', 'key string not null, value string',
>>> primary_key=['key'], transactional=True)
Synchronize table updates
If a table has been updated by another program and has changes in the schema, you can use reload() to synchronize the update.
>>> table.reload()
Read and write data
Obtain table data
You can obtain table data in different ways. First, you can use head() to retrieve the first 10,000 or fewer data items in each table.
>>> t = o.get_table('dual')
>>> for record in t.head(3):
>>> # process every Record object
Then, use open_reader() as the table object to open a reader and read the data. If you need to read data from a partitioned table, you need to add a partition argument to specify the partition to read.
Open the reader using a WITH clause, as shown in the following code. It is ensured by with expression that the reader is closed once the with block is exited.
>>> with t.open_reader(partition='pt=test,pt2=test2') as reader:
>>> count = reader.count
>>> for record in reader[5:10]: # This line can be executed many times until all records are visited. Parallelism can also be introduced.
>>> # process one record
Open the reader without using a WITH clause, as shown in the following code:
>>> reader = t.open_reader(partition='pt=test,pt2=test2')
>>> count = reader.count
>>> for record in reader[5:10]: # This line can be executed many times until all records are visited. Parallelism can also be introduced.
>>> # process one record
>>> reader.close()
An easier way is to use read_table() as the ODPS object, as shown in the following code:
>>> for record in o.read_table('test_table', partition='pt=test,pt2=test2'):
>>> # process one record
从 0.11.2 开始,PyODPS 支持使用 Arrow 格式读写数据,该格式可以以更高效率与 pandas 等格式互相转换。安装 pyarrow 后,在调用 open_reader 时增加 arrow=True 参数,即可按
Arrow RecordBatch
格式读取表内容。
>>> with t.open_reader(partition='pt=test,pt2=test2', arrow=True) as reader:
>>> count = reader.count
>>> for batch in reader: # This line can be executed many times until all record batches are visited.
>>> # process one RecordBatch, for instance, convert to Pandas
>>> print(batch.to_pandas())
You can also call to_pandas method on readers to read pandas DataFrame. Start row index (starts from 0) and row count can be specified on reading. If row indexes are not specified, all data will be read by default.
>>> with t.open_reader(partition='pt=test,pt2=test2', arrow=True) as reader:
>>> # specify start row index and row count
>>> pd_df = reader.to_pandas(start=10, count=20)
>>> # if not specified, all data will be read
>>> pd_df = reader.to_pandas()
You can read data directly into Pandas DataFrames with multiple processes.
>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with t.open_reader(partition='pt=test,pt2=test2', arrow=True) as reader:
>>> pd_df = reader.to_pandas(n_process=n_process)
To facilitate reading data as pandas, since PyODPS 0.12.0, to_pandas method is added to table and partition objects.
>>> # read table as pandas dataframe
>>> pd_df = table.to_pandas(start=10, count=20)
>>> # read all data with 2 processes
>>> pd_df = table.to_pandas(n_process=2)
>>> # read partition as pandas
>>> pd_df = partitioned_table.to_pandas(partition="pt=test", start=10, count=20)
At the same time, since PyODPS 0.12.0, you can also use iter_pandas method to read multiple batches of pandas DataFrames from a table or partition. The size of Dataframes can be specified with batch_size argument, whose default value is specified with options.tunnel.read_row_batch_size and the default value is 1024.
>>> # iterate all data with default batch_size
>>> for batch in table.iter_pandas():
>>> print(batch)
>>> # iterate first 1000 rows with batch_size==100
>>> for batch in table.iter_pandas(batch_size=100, start=0, count=1000):
>>> print(batch)
Note
open_reader、read_table 以及 to_pandas 方法仅支持读取单个分区。如果需要读取多个分区的值,例如读取所有符合 dt>20230119 这样条件的分区,需要使用 iterate_partitions 方法,详见
遍历表分区 章节。
导出数据是否包含分区列的值由输出格式决定。Record 格式数据默认包含分区列的值,而 Arrow 格式默认不包含。从 PyODPS 0.12.0 开始,你可以通过指定 append_partitions=True 显示引入分区列的值,通过
append_partitions=False 将分区列排除在结果之外。
Write data to tables
Similar to open_reader(), you can use open_writer() as the table object to open a writer and write data to the table. If the table to write is partitioned, you need to add a partition argument to specify the partition to write into.
Open the reader using a WITH clause, as shown in the following code. It is ensured by with expression that the writer is closed once the with block is exited and all written data are committed.
>>> with t.open_writer(partition='pt=test') as writer:
>>> records = [[111, 'aaa', True], # a list can be used here
>>> [222, 'bbb', False],
>>> [333, 'ccc', True],
>>> [444, '中文', False]]
>>> writer.write(records) # records can also be iterable objects
>>>
>>> records = [t.new_record([111, 'aaa', True]), # a list with records can also be used
>>> t.new_record([222, 'bbb', False]),
>>> t.new_record([333, 'ccc', True]),
>>> t.new_record([444, '中文', False])]
>>> writer.write(records)
>>>
If the specified partition does not exist, use the create_partition parameter to create a partition, as shown in the following code:
>>> with t.open_writer(partition='pt=test', create_partition=True) as writer:
>>> records = [[111, 'aaa', True], # a list can be used here
>>> [222, 'bbb', False],
>>> [333, 'ccc', True],
>>> [444, '中文', False]]
>>> writer.write(records) # records can also be iterable objects
An easier way is to use write_table() as the ODPS object to write data, as shown in the following code:
>>> records = [[111, 'aaa', True], # a list can be used here
>>> [222, 'bbb', False],
>>> [333, 'ccc', True],
>>> [444, '中文', False]]
>>> o.write_table('test_table', records, partition='pt=test', create_partition=True)
Note
注意:每次调用 write_table(),MaxCompute 都会在服务端生成一个文件。这一操作需要较大的时间开销,同时过多的文件会降低后续的查询效率。因此,我们建议在使用
write_table() 方法时,一次性写入多组数据,或者传入一个 generator 对象。
When calling write_table(), new data will be appended to existing data. If you need to overwrite existing data, you can add an argument overwrite=True to write_table() call when you are using PyODPS later than 0.11.1, or call Table.truncate() on tables or partitions.
你可以使用多线程写入数据。从 PyODPS 0.11.6 开始,直接将 open_writer 创建的 Writer 对象分发到各个线程中即可完成多线程写入,写入时请注意不要关闭 writer,待所有数据写入完成后再关闭 writer。
import random
# for Python 2.7 please import ThreadPoolExecutor from
# third-party library `futures`
from concurrent.futures import ThreadPoolExecutor
def write_records(writer):
for i in range(5):
# generate data and write to passed writers
record = table.new_record([random.randint(1, 100), random.random()])
writer.write(record)
N_THREADS = 3
# creation of MaxCompute entry object o is omitted here
table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
with table.open_writer() as writer:
pool = ThreadPoolExecutor(N_THREADS)
futures = []
for i in range(N_THREADS):
futures.append(pool.submit(write_records, writer))
# wait for threaded calls to finish
[f.result() for f in futures]
你也可以使用多进程写入数据,以避免 Python GIL 带来的性能损失。从 PyODPS 0.11.6 开始,只需要将 open_writer 创建的 Writer 对象通过 multiprocessing 标准库传递到需要写入的子进程中即可写入。需要注意的是,与多线程的情形不同,你应当在每个子进程完成写入后关闭 writer,并在所有写入子进程退出后再关闭主进程 writer(或离开 with 语句块),以保证所有数据被提交。
import random
from multiprocessing import Pool
def write_records(writer):
for i in range(5):
# generate data and write to passed writers
record = table.new_record([random.randint(1, 100), random.random()])
writer.write(record)
# need to close writers in every subprocess once writing is done
writer.close()
# need to judge if current code is executed as main module to make sure
# the code is not executed by multiprocessing repeatedly
if __name__ == '__main__':
N_WORKERS = 3
# creation of MaxCompute entry object o is omitted here
table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
with table.open_writer() as writer:
pool = Pool(processes=N_WORKERS)
futures = []
for i in range(N_WORKERS):
futures.append(pool.apply_async(write_records, (writer,)))
# wait for subprocesses to finish
[f.get() for f in futures]
Since 0.11.2, PyODPS supports reading and writing table data with Arrow format, which can be converted from and to pandas or other formats with high efficiency. After installing pyarrow, you can write data into tables with Arrow RecordBatch format by adding arrow=True argument when calling open_writer method. PyODPS also supports writing tables with pandas DataFrames, which will be converted into Arrow RecordBatch directly.
>>> import pandas as pd
>>> import pyarrow as pa
>>>
>>> with t.open_writer(partition='pt=test', create_partition=True) as writer:
>>> records = [[111, 'aaa', True],
>>> [222, 'bbb', False],
>>> [333, 'ccc', True],
>>> [444, '中文', False]]
>>> df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
>>> # write a RecordBatch
>>> batch = pa.RecordBatch.from_pandas(df)
>>> writer.write(batch)
>>> # Pandas DataFrame can also be used directly
>>> writer.write(df)
为方便写入 pandas DataFrame,从 0.12.0 开始,PyODPS 支持直接通过 write_table 方法写入 pandas DataFrame。如果写入数据前对应表不存在,可以增加 create_table=True 参数以自动创建表。
>>> import pandas as pd
>>> df = pd.DataFrame([
>>> [111, 'aaa', True],
>>> [222, 'bbb', False],
>>> [333, 'ccc', True],
>>> [444, '中文', False]
>>> ], columns=['num_col', 'str_col', 'bool_col'])
>>> # if table test_table does not exist, it will be created automatically
>>> o.write_table('test_table', df, partition='pt=test', create_table=True, create_partition=True)
从 PyODPS 0.12.0 开始,write_table 方法也支持动态分区,可通过 partition_cols 参数传入需要作为分区的列名,并指定 create_partition=True,相应的分区将会自动创建。
>>> import pandas as pd
>>> df = pd.DataFrame([
>>> [111, 'aaa', True, 'p1'],
>>> [222, 'bbb', False, 'p1'],
>>> [333, 'ccc', True, 'p2'],
>>> [444, '中文', False, 'p2']
>>> ], columns=['num_col', 'str_col', 'bool_col', 'pt'])
>>> # if partition pt=p1 or pt=p2 does not exist, they will be created automatically
>>> o.write_table('test_part_table', df, partition_cols=['pt'], create_partition=True)
Note
partition_cols is supported since PyODPS 0.12.3. Please use partitions argument instead when using PyODPS 0.12.2 or earlier.
Compression options
You can specify compression options to accelerate data upload or download. To achieve this, you may create a CompressOption instance and specify compression algorithm and level in it. Currently zlib and ZSTD is supported, and you need to install zstandard package to enable ZSTD support.
from odps.tunnel import CompressOption
compress_option = CompressOption(
compress_algo="zlib", # algorithm name
level=0, # (optional) compression level
strategy=0, # (optional) compression strategy, only for zlib
)
Then you may specify compression option when calling open_reader or open_writer. For instance,
with table.open_writer(compress_option=compress_option) as writer:
# replace this comment with actual data writing code
If you only need to specify name of the compression algorithm, you can specify it with compress_algo argument directly.
with table.open_writer(compress_algo="zlib") as writer:
# replace this comment with actual data writing code
Delete tables
>>> o.delete_table('my_table_name', if_exists=True) # delete only when the table exists
>>> t.drop() # call drop method of the Table object to delete directly
Create a DataFrame
PyODPS provides a DataFrame framework to easily search and operate MaxCompute data. You can use to_df to convert a table to a DataFrame object.
>>> table = o.get_table('my_table_name')
>>> df = table.to_df()
Table partitions
Basic operations
Check if a table is partitioned:
>>> if table.table_schema.partitions:
>>> print('Table %s is partitioned.' % table.name)
Check whether the specified partition exists with exist_partition() method, all field values should be provided:
>>> table.exist_partition('pt=test,sub=2015')
Check whether partitions satisfying provided prefix exist:
>>> # the order of partitions fields of table is pt, sub
>>> table.exist_partitions('pt=test')
Obtain information about one specified partition with get_partition() method:
>>> partition = table.get_partition('pt=test')
>>> print(partition.creation_time)
2015-11-18 22:22:27
>>> partition.size
0
Note
The word partition here refers to a partition specification that specifies values of all partition columns which uniquely specifies a sub-table, not partition columns. If one partition column is specified with multiple values, it may refer to multiple sub-tables, or multiple partitions. Meanwhile the method get_partition() can only obtain information of only one sub-table. Thus,
如果某些分区未指定,那么这个分区定义可能对应多个子表,
get_partition时则不被 PyODPS 支持。此时,需要使用iterate_partitions()分别处理每个分区。When some partition column is specified multiple times, or non-deterministic logic expressions like
pt>20210302is used,get_partitioncannot be used to obtain partition information. In this case,iterate_partitionsmight be used to iterate over all partitions.
Create partitions
Code below will create a partition with create_partition() An error will be raised if the partition already exists.
>>> t.create_partition('pt=test')
Code below will create a partition or do nothing if the partition already exists.
>>> t.create_partition('pt=test', if_not_exists=True)
Iterate through partitions
Code below iterates through all the partitions in a table.
>>> for partition in table.partitions:
>>> print(partition.name)
If you need to iterate through partitions with certain values of partition fields fixed, you can use iterate_partitions() method.
>>> for partition in table.iterate_partitions(spec='pt=test'):
>>> print(partition.name)
自 PyODPS 0.11.3 开始,支持为 iterate_partitions 指定简单的逻辑表达式及通过逗号连接,每个子表达式均须满足的复合逻辑表达式。或运算符暂不支持。
>>> for partition in table.iterate_partitions(spec='dt>20230119'):
>>> print(partition.name)
Note
Before 0.11.3, iterate_partitions only supports specifying partition values for the first partition fields. For instance, when a table has 3 partition fields, pt1, pt2 and pt3, spec argument of iterate_partitions can only accept values like pt1=xxx or pt1=xxx,pt2=yyy. Since 0.11.3, iterate_partitions supports more flexible forms of spec arguments. However, it is still recommended to fix values of first partition fields to improve speed of iteration.
Delete partitions
Code below will delete a partition with delete_partition() method.
>>> t.delete_partition('pt=test', if_exists=True) # delete only when the partition exists
>>> partition.drop() # delete directly via the drop method of the partition object
Obtain the partition with maximal value:
Sometimes you want to get the partition with maximal value, for instance, when dates are used as partition values, you may want to get the partition with data and latest date. PyODPS starts supporting this function since 0.11.3.
Create a partitioned table and write some data.
t = o.create_table("test_multi_pt_table", ("col string", "pt1 string, pt2 string"))
for pt1, pt2 in (("a", "a"), ("a", "b"), ("b", "c"), ("b", "d")):
o.write_table("test_multi_pt_table", [["value"]], partition="pt1=%s,pt2=%s" % (pt1, pt2))
If you want to get the partition with maximal value, you can use code below:
>>> part = t.get_max_partition()
>>> part
<Partition cupid_test_release.`test_multi_pt_table`(pt1='b',pt2='d')>
>>> part.partition_spec["pt1"] # get value of certain partition field
b
If you want to get latest partition while ignore whether the partition has data, you may use
>>> t.get_max_partition(skip_empty=False)
<Partition cupid_test_release.`test_multi_pt_table`(pt1='b',pt2='d')>
For tables with multiple partitions, you may specify the parent partition specification to get child partition with maximal value, for instance,
>>> t.get_max_partition("pt1=a")
<Partition cupid_test_release.`test_multi_pt_table`(pt1='a',pt2='b')>
Data upload and download tunnels
Note
If you just need to upload a small amount of data, we do not recommend using table tunnel directly for simple table reading and writing, as there are more convenient read and write methods which wraps table tunnel invocations. You might use tunnel interfaces directly when writing tables distributedly or under complicated scenarios.
ODPS Tunnel 是 MaxCompute 的数据通道,用户可以通过 Tunnel 向 MaxCompute 中上传或者下载数据。关于 ODPS Tunnel 的详细解释可以参考https://help.aliyun.com/zh/maxcompute/user-guide/overview-of-dts。
Upload
Block upload interface
直接使用 Tunnel 分块接口上传时,需要首先通过 create_upload_session()
方法使用表名和分区创建 Upload Session,此后从 Upload Session 创建 Writer。每个 Upload Session 可多次调用
open_record_writer() 方法创建多个 Writer,每个 Writer 拥有一个
block_id 对应一个数据块。写入的数据类型为 Record 类型。完成所有写入后,需要调用
Upload Session 上的 commit() 方法并指定需要提交的数据块列表。如果有某个 block_id 有数据写入但未包括在 commit 的参数中,则该数据块不会出现在最终的表中。
For writing data, commit method can only be called once. After commit is called, the upload session is closed and no more data can be written on the same session.
from odps.tunnel import TableTunnel
table = o.get_table('my_table')
tunnel = TableTunnel(o)
# create an upload session for my_table and partition pt=test
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
# create a record writer with block_id as 0
with upload_session.open_record_writer(0) as writer:
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
record = table.new_record(['test2', 'id2'])
writer.write(record)
# submit block 0 which is just written. multiple blocks need to be specified
# on commit.
# need to commit outside with-block, or data will not be written before
# commit, and an error will be raised and all data are lost.
# for every upload_session, commit can be called only once.
upload_session.commit([0])
如果你需要在多个进程乃至节点中使用相同的 Upload Session,可以先创建 Upload Session,并获取其 id
属性。此后在其他进程中调用 create_upload_session 方法时,将该值作为 upload_id 参数。完成每个进程的上传后,需要收集各进程提交数据所用的 block_id,并在某个进程中完成 commit。
from odps.tunnel import TableTunnel
##############
# main process
##############
table = o.get_table('my_table')
tunnel = TableTunnel(o)
# create upload session for table and partition pt=test
upload_session_main = tunnel.create_upload_session(table.name, partition_spec='pt=test')
# obtain session id
session_id = upload_session_main.id
# distribute session id to processes, need to be implemented by yourself
##############
# sub-process
##############
# create (or reuse) upload session with distributed session id
upload_session_sub = tunnel.create_upload_session(table.name, partition_spec='pt=test', upload_id=session_id)
# create reader and write data. you need to make block_id different between processes
with upload_session_sub.open_record_writer(local_block_id) as writer:
# ... generate data ...
writer.write(record)
# send back block_id used in current process, need to be implemented by yourself
##############
# main process
##############
# collect all block_id in subprocesses, need to be implemented by yourself
# submit all collected block_id
upload_session_main.commit(collected_block_ids)
需要注意的是,指定 block id 后,所创建的 Writer 为长连接,如果长时间不写入会导致连接关闭,并导致写入失败,该时间通常为 5 分钟。如果你写入数据的间隔较大,建议生成一批数据后再通过 open_record_writer 接口创建
Writer 并按需写入数据。如果你只希望在单个 Writer 上通过 Tunnel 写入数据,可以考虑在调用 open_record_writer
时不指定 block id,此时创建的 Writer 在写入数据时将首先将数据缓存在本地,当 Writer 关闭或者缓存数据大于一定大小(默认为 20MB,可通过 options.tunnel.block_buffer_size 指定)时才会写入数据。写入数据后,需要先通过 Writer 上的 get_blocks_written()
方法获得已经写入的 block 列表,再进行提交。
from odps.tunnel import TableTunnel
table = o.get_table('my_table')
tunnel = TableTunnel(o)
# create an upload session for my_table and partition pt=test
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
# create writer without block id to create a record writer with buffers
with upload_session.open_record_writer() as writer:
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
record = table.new_record(['test2', 'id2'])
writer.write(record)
# need to commit outside with-block, or data will not be written before
# commit, and an error will be raised.
# obtain block ids from the writer and then commit
upload_session.commit(writer.get_blocks_written())
Note
使用带缓存的 Writer 时,需要注意不能在同一 Upload Session 上开启多个带缓存 Writer 进行写入,否则可能导致冲突而使数据丢失。
If you need to upload with arrow format instead of record format, you may replace open_record_writer() with open_arrow_writer() and write arrow RecordBatches, Tables or pandas DataFrames.
import pandas as pd
import pyarrow as pa
from odps.tunnel import TableTunnel
table = o.get_table('my_table')
tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
# use open_arrow_writer instead of open_record_writer
with upload_session.open_arrow_writer(0) as writer:
df = pd.DataFrame({"name": ["test1", "test2"], "id": ["id1", "id2"]})
batch = pa.RecordBatch.from_pandas(df)
writer.write(batch)
# need to commit outside with-block, or data will not be written before
# commit, and an error will be raised.
upload_session.commit([0])
All writers described in this chapter are not thread safe. You need to create separate writers for every thread.
Stream upload interface
MaxCompute 提供了流式上传接口用于简化分布式服务开发成本。可以使用 create_stream_upload_session()
方法创建专门的 Upload Session。此时,不需要为该 Session 的 open_record_writer 提供 block id。
from odps.tunnel import TableTunnel
table = o.get_table('my_table')
tunnel = TableTunnel(o)
upload_session = tunnel.create_stream_upload_session(table.name, partition_spec='pt=test')
with upload_session.open_record_writer() as writer:
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
record = table.new_record(['test2', 'id2'])
writer.write(record)
Download
When using download interface of MaxCompute tunnel, you need to create a download session and then create readers on the session. You may call open_record_reader() multiple times to create multiple readers. Every reader need to be specified with a initial row number and a row count number. The initial row number begins at 0, and the row count can be count property of the session, which provides total row number of the table or partition. Data read from the reader are of record type.
from odps.tunnel import TableTunnel
tunnel = TableTunnel(o)
# create a download session for my_table and partition pt=test
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
# create a record reader and specify row range to read
with download_session.open_record_reader(0, download_session.count) as reader:
for record in reader:
# process every record
If you need to reuse your download session in multiple processes or nodes, you may create your download session first, obtain its id property and then call create_download_session() in other processes with the value as download_id argument.
from odps.tunnel import TableTunnel
##############
# main process
##############
table = o.get_table('my_table')
tunnel = TableTunnel(o)
# create download session for table and partition pt=test
download_session_main = tunnel.create_download_session(table.name, partition_spec='pt=test')
# obtain session id
session_id = download_session_main.id
# distribute session id to processes, need to be implemented by yourself
##############
# sub-process
##############
# create (or reuse) download session with distributed session id
download_session_sub = tunnel.create_download_session(table.name, partition_spec='pt=test', download_id=session_id)
# create reader and read data. note that you may specify different start / count for different process
with download_session_sub.open_record_reader(start, count) as reader:
for record in reader:
# handle data records
You can download data with arrow format instead of record format by calling open_arrow_reader() instead of open_record_reader().
from odps.tunnel import TableTunnel
tunnel = TableTunnel(o)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
with download_session.open_arrow_reader(0, download_session.count) as reader:
for batch in reader:
# process every Arrow RecordBatch
Compression options
You can specify compression options to accelerate data upload or download. To achieve this, you may create a CompressOption instance and specify compression algorithm and level in it. Currently zlib and ZSTD is supported, and you need to install zstandard package to enable ZSTD support.
from odps.tunnel import CompressOption
compress_option = CompressOption(
compress_algo="zlib", # algorithm name
level=0, # (optional) compression level
strategy=0, # (optional) compression strategy, only for zlib
)
After that, when creating upload or download sessions, you may specify compress_option argument and use compress=True in open_xxx_reader or open_xxx_writer methods to enable compression.
from odps.tunnel import TableTunnel
tunnel = TableTunnel(o)
# create a download session for my_table and partition pt=test
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
# create a record reader and specify row range to read
with download_session.open_record_reader(0, download_session.count, compress=True) as reader:
for record in reader:
# process every record
Since PyODPS 0.12.3, you may specify compress options used in current Python process by setting global options as follows.
from odps import options
# enable compression (default to zlib / deflate)
options.tunnel.compress.enabled = True
# set compression algorithm
options.tunnel.compress.algo = "zstd"
After that, compression will be enabled for all subsequent data read or write operations. For more details, please refer to Options.
Improve upload and download performance
Performance of tunnel upload and download can be affected by various factors. First, you may consider optimizing your local code, which includes:
减少创建 Upload Session 或者 Download Session 的次数,尽量复用。Tunnel Session 本身创建代价较大,因而除非必要,一次读取或写入只应当创建一个。
Increase the amount of data read or written by readers or writers.
Compress data to reduce the amount of data to transfer.
If the data source or target is pandas, since Record type itself has time overhead in Python interpreter, it is recommended to use Arrow interface for pandas data if possible.
如有可能,使用多线程或者 multiprocessing 进行读写。需要注意的是,Python 使用了 GIL,因而如果你读写数据前的预处理步骤使用了较多纯 Python 代码,那么多线程可能未必提升性能。
此外,读写数据时的网络状况等因素也可能影响上传和下载速度,可能发生共享 Tunnel 服务资源用满或者客户端到 Tunnel 服务网络链路不稳定等因素。针对这些情形,可以考虑购买独享资源 Tunnel 或者使用阿里云内网,相关信息可以参考Tunnel 文档。