Tables

Tables are the data storage unit in MaxCompute.

Basic operations

Note

Code in this document is only guaranteed to work under PyODPS 0.11.3 and later versions. For PyODPS earlier than 0.11.3, please replace class odps.models.Schema with odps.models.TableSchema and schema property with table_schema.

Use the list_tables method as the ODPS object to list all tables in a project.

for table in o.list_tables():
    print(table.name)

You can list all tables with given prefix with prefix argument.

for table in o.list_tables(prefix="table_prefix"):
    print(table.name)

Table objects obtained with code above do not load properties other than names. If you get properties like table_schema or creation_time, an extra remote call will be performed which may cause extra delay. If you need to get these properties at the same time when listing tables, you can add extended=True argument for list_tables in PyODPS 0.11.5 or later.

for table in o.list_tables(extended=True):
    print(table.name, table.creation_time)

You can list tables of given type by specifying type argument. Examples of listing different types of tables are shown below.

managed_tables = list(o.list_tables(type="managed_table"))  # iterate over MaxCompute-managed tables
external_tables = list(o.list_tables(type="external_table"))  # iterate over external tables
virtual_views = list(o.list_tables(type="virtual_view"))  # iterate over non-materialized views
materialized_views = list(o.list_tables(type="materialized_view"))  # iterate over materialized views

Use exist_table to check whether the specified table exists.

o.exist_table('dual')

Use get_table to obtain the specified table.

>>> t = o.get_table('dual')
>>> t.table_schema
odps.Schema {
  c_int_a                 bigint
  c_int_b                 bigint
  c_double_a              double
  c_double_b              double
  c_string_a              string
  c_string_b              string
  c_bool_a                boolean
  c_bool_b                boolean
  c_datetime_a            datetime
  c_datetime_b            datetime
}
>>> t.lifecycle
-1
>>> print(t.creation_time)
2014-05-15 14:58:43
>>> t.is_virtual_view
False
>>> t.size
1408
>>> t.comment
'Dual Table Comment'
>>> t.table_schema.columns
[<column c_int_a, type bigint>,
 <column c_int_b, type bigint>,
 <column c_double_a, type double>,
 <column c_double_b, type double>,
 <column c_string_a, type string>,
 <column c_string_b, type string>,
 <column c_bool_a, type boolean>,
 <column c_bool_b, type boolean>,
 <column c_datetime_a, type datetime>,
 <column c_datetime_b, type datetime>]
>>> t.table_schema['c_int_a']
<column c_int_a, type bigint>
>>> t.table_schema['c_int_a'].comment
'Comment of column c_int_a'

You can also provide the project parameter to obtain the specified table from another project.

>>> t = o.get_table('dual', project='other_project')

Create the table schema

You can initialize a table in two ways. First, you can use columns or combination of columns and partitions columns to initialize the table.

>>> from odps.models import TableSchema, Column, Partition
>>> columns = [Column(name='num', type='bigint', comment='the column'),
>>>            Column(name='num2', type='double', comment='the column2')]
>>> partitions = [Partition(name='pt', type='string', comment='the partition')]
>>> schema = TableSchema(columns=columns, partitions=partitions)
>>> schema.columns
[<column num, type bigint>,
 <column num2, type double>,
 <partition pt, type string>]
>>> schema.partitions
[<partition pt, type string>]
>>> schema.names  # get column name of none-partition columns
['num', 'num2']
>>> schema.types  # get column type of none-partition columns
[bigint, double]

Second, you can use Schema.from_lists to initialize the table. This method is easier, but you cannot directly set the comments of the columns and the partitions.

>>> schema = TableSchema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
>>> schema.columns
[<column num, type bigint>,
 <column num2, type double>,
 <partition pt, type string>]

Create tables

You can use the table schema to create a table in the following way:

>>> table = o.create_table('my_new_table', schema)
>>> table = o.create_table('my_new_table', schema, if_not_exists=True)  # create table only when the table does not exist
>>> table = o.create_table('my_new_table', schema, lifecycle=7)  # configure lifecycle of the table (in days)

An easier way is to use a string in the structure of “field name field type” to create the table, as shown in the following code:

>>> table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
>>> # a tuple like (column list, partition list) can be passed to create a partitioned table
>>> table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

By default, you can only use the bigint, double, decimal, string, datetime, boolean, map and array types to create a table. If you use public cloud services, you can set options.sql.use_odps2_extension = True to enable more types such as tinyint and struct, as shown in the following code:

>>> from odps import options
>>> options.sql.use_odps2_extension = True
>>> table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body string>')

Synchronize table updates

If a table has been updated by another program and has changes in the schema, you can use reload to synchronize the update.

>>> table.reload()

Record

A record is a row record in a table. You can use new_record of a table object to create a new record.

>>> t = o.get_table('mytable')
>>> r = t.new_record(['val0', 'val1'])  # the number of values must be the same with the number of columns in the schema
>>> r2 = t.new_record()  # initializing without values is also acceptable
>>> r2[0] = 'val0' # values can be set via column indices
>>> r2['field1'] = 'val1'  # values can also be set via column names
>>> r2.field1 = 'val1'  # values can also be set via attributes
>>>
>>> print(record[0])  # get the value of Column 0
>>> print(record['c_double_a'])  # get value via column name
>>> print(record.c_double_a)  # get value via attributes
>>> print(record[0: 3])  # slice over the column
>>> print(record[0, 2, 3])  # get multiple values via indices
>>> print(record['c_int_a', 'c_double_a'])  # get multiple values via column names

Obtain table data

You can obtain table data in different ways. First, you can use head to retrieve the first 10,000 or fewer data items in each table.

>>> t = o.get_table('dual')
>>> for record in t.head(3):
>>>     # process every Record object

Then, use open_reader as the table object to open a reader and read the data. If you need to read data from a partitioned table, you need to add a partition argument to specify the partition to read.

Open the reader using a WITH clause, as shown in the following code. It is ensured by with expression that the reader is closed once the with block is exited.

>>> with t.open_reader(partition='pt=test,pt2=test2') as reader:
>>>     count = reader.count
>>>     for record in reader[5:10]:  # This line can be executed many times until all records are visited. Parallelism can also be introduced.
>>>         # process one record

Open the reader without using a WITH clause, as shown in the following code:

>>> reader = t.open_reader(partition='pt=test,pt2=test2')
>>> count = reader.count
>>> for record in reader[5:10]:  # This line can be executed many times until all records are visited. Parallelism can also be introduced.
>>>     # process one record
>>> reader.close()

An easier way is to use read_table as the ODPS object, as shown in the following code:

>>> for record in o.read_table('test_table', partition='pt=test,pt2=test2'):
>>>     # process one record

Read directly into Pandas DataFrames:

>>> with t.open_reader(partition='pt=test,pt2=test2') as reader:
>>>     pd_df = reader.to_pandas()

Accelerate data read using multiple processes:

>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with t.open_reader(partition='pt=test,pt2=test2') as reader:
>>>     pd_df = reader.to_pandas(n_process=n_process)

Note

open_reader or read_table only supports reading from one single partition. If you need to read from multiple partitions, for instance, partitions specified by the inequality dt>20230119, you need to use method iterate_partitions. For more details please take a look at iterating over table partitions section.

Write data to tables

Similar to open_reader, you can use open_writer as the table object to open a writer and write data to the table. If the table to write is partitioned, you need to add a partition argument to specify the partition to write into.

Open the reader using a WITH clause, as shown in the following code. It is ensured by with expression that the writer is closed once the with block is exited and all written data are committed.

>>> with t.open_writer(partition='pt=test') as writer:
>>>     records = [[111, 'aaa', True],                 # a list can be used here
>>>                [222, 'bbb', False],
>>>                [333, 'ccc', True],
>>>                [444, '中文', False]]
>>>     writer.write(records)  # records can also be iterable objects
>>>
>>>     records = [t.new_record([111, 'aaa', True]),   # a list with records can also be used
>>>                t.new_record([222, 'bbb', False]),
>>>                t.new_record([333, 'ccc', True]),
>>>                t.new_record([444, '中文', False])]
>>>     writer.write(records)
>>>

If the specified partition does not exist, use the create_partition parameter to create a partition, as shown in the following code:

>>> with t.open_writer(partition='pt=test', create_partition=True) as writer:
>>>     records = [[111, 'aaa', True],                 # a list can be used here
>>>                [222, 'bbb', False],
>>>                [333, 'ccc', True],
>>>                [444, '中文', False]]
>>>     writer.write(records)  # records can also be iterable objects

An easier way is to use write_table as the ODPS object to write data, as shown in the following code:

>>> records = [[111, 'aaa', True],                 # a list can be used here
>>>            [222, 'bbb', False],
>>>            [333, 'ccc', True],
>>>            [444, '中文', False]]
>>> o.write_table('test_table', records, partition='pt=test', create_partition=True)

Note

Note:Every time when write_table is invoked,MaxCompute generates a new file on the server side, which is an expensive operation that reduces the throughput drastically. What’s more, too many files may increase query time on that table. Hence we propose writing multiple records or passing a Python generator object when calling write_table.

When calling `write_table`, new data will be appended to existing data. If you need to overwrite existing data, you can add an argument overwrite=True to write_table call when you are using PyODPS later than 0.11.1, or call truncate on tables or partitions.

You can write data with multiple threads. Since PyODPS 0.11.6, simply spawning writer objects created with open_writer method into different threads and then data can be written in those threads. Note that you shall not close writers until all data are written.

import random
# for Python 2.7 please import ThreadPoolExecutor from
# third-party library `futures`
from concurrent.futures import ThreadPoolExecutor

def write_records(writer):
    for i in range(5):
        # generate data and write to passed writers
        record = table.new_record([random.randint(1, 100), random.random()])
        writer.write(record)

N_THREADS = 3

# creation of MaxCompute entry object o is omitted here
table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)

with table.open_writer() as writer:
    pool = ThreadPoolExecutor(N_THREADS)
    futures = []
    for i in range(N_THREADS):
        futures.append(pool.submit(write_records, writer))
    # wait for threaded calls to finish
    [f.result() for f in futures]

You can also write data with multiprocessing module in Python to avoid performance loss from GIL. Since PyODPS 0.11.6, you can simply pass writer object created with open_writer method into subprocess functions with multiprocessing APIs. Note that different from the multi threading case, you need to close writers in every subprocess once writing is finished and close writer in the main process once writing in all subprocesses is done to make sure all written data are committed.

import random
from multiprocessing import Pool

def write_records(writer):
    for i in range(5):
        # generate data and write to passed writers
        record = table.new_record([random.randint(1, 100), random.random()])
        writer.write(record)
    # need to close writers in every subprocess once writing is done
    writer.close()

# need to judge if current code is executed as main module to make sure
# the code is not executed by multiprocessing repeatedly
if __name__ == '__main__':
    N_WORKERS = 3

    # creation of MaxCompute entry object o is omitted here
    table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)

    with table.open_writer() as writer:
        pool = Pool(processes=N_WORKERS)
        futures = []
        for i in range(N_WORKERS):
            futures.append(pool.apply_async(write_records, (writer,)))
        # wait for subprocesses to finish
        [f.get() for f in futures]

Use Arrow format to read and write data

Apache Arrow is a language-neutral format supporting data exchange between different platforms. MaxCompute supports reading and writing table data with Arrow format since 2021, and PyODPS starts experimental support in 0.11.2. After installing pyarrow in your Python environment, you can enable reading and writing with Arrow format by adding arrow=True argument in open_reader or open_writer calls to handle RecordBatch <https://arrow.apache.org/docs/python/data.html#record-batches>`_ instead of single records.

Read table content by record batches

>>> reader = t.open_reader(partition='pt=test', arrow=True)
>>> count = reader.count
>>> for batch in reader:  # This line can be executed many times until all record batches are visited.
>>>     # process one RecordBatch, for instance, convert to Pandas
>>>     print(batch.to_pandas())

Write record batches

>>> import pandas as pd
>>> import pyarrow as pa
>>>
>>> with t.open_writer(partition='pt=test', create_partition=True) as writer:
>>>     records = [[111, 'aaa', True],
>>>                [222, 'bbb', False],
>>>                [333, 'ccc', True],
>>>                [444, '中文', False]]
>>>     df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
>>>     # write a RecordBatch
>>>     batch = pa.RecordBatch.from_pandas(df)
>>>     writer.write(batch)
>>>     # Pandas DataFrame can also be used directly
>>>     writer.write(df)

Delete tables

>>> o.delete_table('my_table_name', if_exists=True)  #  delete only when the table exists
>>> t.drop()  # call drop method of the Table object to delete directly

Create a DataFrame

PyODPS provides a DataFrame framework to easily search and operate MaxCompute data. You can use to_df to convert a table to a DataFrame object.

>>> table = o.get_table('my_table_name')
>>> df = table.to_df()

Table partitions

Basic operations

Check if a table is partitioned:

>>> if table.table_schema.partitions:
>>>     print('Table %s is partitioned.' % table.name)

Check whether the specified partition exists, all field values should be provided:

>>> table.exist_partition('pt=test,sub=2015')

Check whether partitions satisfying provided prefix exist:

>>> # the order of partitions fields of table is pt, sub
>>> table.exist_partitions('pt=test')

Obtain information about one specified partition:

>>> partition = table.get_partition('pt=test')
>>> print(partition.creation_time)
2015-11-18 22:22:27
>>> partition.size
0

Note

The word partition here refers to a partition specification that specifies values of all partition columns which uniquely specifies a sub-table, not partition columns. If one partition column is specified with multiple values, it may refer to multiple sub-tables, or multiple partitions. Meanwhile the method get_partition can only obtain information of only one sub-table. Thus,

  1. When some values of partition columns are absent, the specification could represent multiple tables, and then calling get_partitions with this specification is not supported in PyODPS. You need to use iter_partitions to handle every partition respectively.

  2. When some partition column is specified multiple times, or non-deterministic logic expressions like pt>20210302 is used, get_partition cannot be used to obtain partition information. In this case, iterate_partitions might be used to iterate over all partitions.

Create partitions

Code below will create a partition or raise an error if the partition already exists.

>>> t.create_partition('pt=test')

Code below will create a partition or do nothing if the partition already exists.

>>> t.create_partition('pt=test', if_not_exists=True)

Iterate through partitions

Code below iterates through all the partitions in a table.

>>> for partition in table.partitions:
>>>     print(partition.name)

If you need to iterate through partitions with certain values of partition fields fixed, you can use iterate_partitions method.

>>> for partition in table.iterate_partitions(spec='pt=test'):
>>>     print(partition.name)

Since PyODPS 0.11.3, PyODPS supports using simple logic expressions or logic expressions connected with commas which means combined conditions when iterating through partitions. OR operator is not supported currently.

>>> for partition in table.iterate_partitions(spec='dt>20230119'):
>>>     print(partition.name)

Note

Before 0.11.3, iterate_partitions only supports specifying partition values for the first partition fields. For instance, when a table has 3 partition fields, pt1, pt2 and pt3, spec argument of iterate_partitions can only accept values like pt1=xxx or pt1=xxx,pt2=yyy. Since 0.11.3, iterate_partitions supports more flexible forms of spec arguments. However, it is still recommended to fix values of first partition fields to improve speed of iteration.

Delete partitions

Code below will delete a partition.

>>> t.delete_partition('pt=test', if_exists=True)  # delete only when the partition exists
>>> partition.drop()  # delete directly via the drop method of the partition object

Obtain the partition with maximal value:

Sometimes you want to get the partition with maximal value, for instance, when dates are used as partition values, you may want to get the partition with data and latest date. PyODPS starts supporting this function since 0.11.3.

Create a partitioned table and write some data.

t = o.create_table("test_multi_pt_table", ("col string", "pt1 string, pt2 string"))
for pt1, pt2 in (("a", "a"), ("a", "b"), ("b", "c"), ("b", "d")):
    o.write_table("test_multi_pt_table", [["value"]], partition="pt1=%s,pt2=%s" % (pt1, pt2))

If you want to get the partition with maximal value, you can use code below:

>>> part = t.get_max_partition()
>>> part
<Partition cupid_test_release.`test_multi_pt_table`(pt1='b',pt2='d')>
>>> part.partition_spec["pt1"]  # get value of certain partition field
b

If you want to get latest partition while ignore whether the partition has data, you may use

>>> t.get_max_partition(skip_empty=False)
<Partition cupid_test_release.`test_multi_pt_table`(pt1='b',pt2='d')>

For tables with multiple partitions, you may specify the parent partition specification to get child partition with maximal value, for instance,

>>> t.get_max_partition("pt1=a")
<Partition cupid_test_release.`test_multi_pt_table`(pt1='a',pt2='b')>

Data upload and download channels

Note

If you just need to upload a small amount of data, we do not recommend using table tunnel directly for simple table reading and writing, as there are more convenient read and write methods which wraps table tunnel invocations. You might use tunnel interfaces directly when writing tables distributedly or under complicated scenarios.

MaxCompute Tunnel is the data channel of MaxCompute. You can use this to upload data to or download data from MaxCompute.

Upload

Block upload interface

When using block upload interface of MaxCompute tunnel, you need to create an upload session and then create writers on the session. You may call open_record_writer multiple times to create multiple writers. Every writer need to be specified with a unique block_id. After writing, you need to call commit method of the upload session to commit selected blocks as a list.

from odps.tunnel import TableTunnel

table = o.get_table('my_table')

tunnel = TableTunnel(o)
# create an upload session for my_table and partition pt=test
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')

# create a record writer with block_id as 0
with upload_session.open_record_writer(0) as writer:
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = table.new_record(['test2', 'id2'])
    writer.write(record)

# submit block 0 which is just written. multiple blocks need to be specified
#  on commit.
# need to commit outside with-block, or data will not be written before
#  commit, and an error will be raised.
upload_session.commit([0])

Note that writers created with open_record_writer establish long connections which will be closed if no data are written in a long period and writing fails. The gap is often 5 minutes. If you write data with long gaps, it is recommended to cache data locally and create writers with open_record_writer and then write data immediately on demand. For simplicity, if you only want to upload data with one single writer with tunnel interface, you can call open_record_writer without block id and a buffered writer will be created. The buffered writer will cache data locally. Submission will be performed on writer close or buffer size exceeded certain limits, which is 20MB by default and can be configured with options.tunnel.block_buffer_size. After writing all data you need to obtain all written blocks with get_blocks_written method.

from odps.tunnel import TableTunnel

table = o.get_table('my_table')

tunnel = TableTunnel(o)
# create an upload session for my_table and partition pt=test
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')

# create writer without block id to create a record writer with buffers
with upload_session.open_record_writer() as writer:
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = table.new_record(['test2', 'id2'])
    writer.write(record)

# need to commit outside with-block, or data will not be written before
#  commit, and an error will be raised.
# obtain block ids from the writer and then commit
upload_session.commit(writer.get_blocks_written())

Note

When using buffered writers, you need to avoid opening multiple writers on a single upload session, or there might be collisions and data might be lost.

If you need to upload with arrow format instead of record format, you may replace open_record_writer with open_arrow_writer and write arrow RecordBatches, Tables or pandas DataFrames.

import pandas as pd
import pyarrow as pa
from odps.tunnel import TableTunnel

table = o.get_table('my_table')

tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')

# use open_arrow_writer instead of open_record_writer
with upload_session.open_arrow_writer(0) as writer:
    df = pd.DataFrame({"name": ["test1", "test2"], "id": ["id1", "id2"]})
    batch = pa.RecordBatch.from_pandas(df)
    writer.write(batch)

# need to commit outside with-block, or data will not be written before
# commit, and an error will be raised.
upload_session.commit([0])

All writers described in this chapter are not thread safe. You need to create separate writers for every thread.

Stream upload interface

MaxCompute provides stream upload interface to reduce development cost of distributed services. You may use create_stream_upload_session to create special upload sessions. Block ids are not needed for open_record_writer for this session type.

from odps.tunnel import TableTunnel

table = o.get_table('my_table')

tunnel = TableTunnel(o)
upload_session = tunnel.create_stream_upload_session(table.name, partition_spec='pt=test')

with upload_session.open_record_writer() as writer:
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = table.new_record(['test2', 'id2'])
    writer.write(record)

Download

When using download interface of MaxCompute tunnel, you need to create a download session and then create readers on the session. You may call open_record_reader multiple times to create multiple readers. Every reader need to be specified with a start row number and a stop row number. reading, you need to call commit method of the upload session to commit selected blocks as a list.

from odps.tunnel import TableTunnel

tunnel = TableTunnel(o)
# create a download session for my_table and partition pt=test
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')

# create a record reader and specify row range to read
with download_session.open_record_reader(0, download_session.count) as reader:
    for record in reader:
        # process every record

You can download data with arrow format instead of record format by calling open_arrow_reader instead of open_record_reader.

from odps.tunnel import TableTunnel

tunnel = TableTunnel(o)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')

with download_session.open_arrow_reader(0, download_session.count) as reader:
    for batch in reader:
        # process every Arrow RecordBatch