Tables

Tables are the data storage unit in MaxCompute.

Basic operations

Use the list_tables method as the ODPS object to list all tables in a project.

for table in o.list_tables():
    # handle every table

Use exist_table to check whether the specified table exists.

Use get_table to obtain the specified table.

>>> t = o.get_table('dual')
>>> t.schema
odps.Schema {
  c_int_a                 bigint
  c_int_b                 bigint
  c_double_a              double
  c_double_b              double
  c_string_a              string
  c_string_b              string
  c_bool_a                boolean
  c_bool_b                boolean
  c_datetime_a            datetime
  c_datetime_b            datetime
}
>>> t.lifecycle
-1
>>> print(t.creation_time)
2014-05-15 14:58:43
>>> t.is_virtual_view
False
>>> t.size
1408
>>> t.comment
'Dual Table Comment'
>>> t.schema.columns
[<column c_int_a, type bigint>,
 <column c_int_b, type bigint>,
 <column c_double_a, type double>,
 <column c_double_b, type double>,
 <column c_string_a, type string>,
 <column c_string_b, type string>,
 <column c_bool_a, type boolean>,
 <column c_bool_b, type boolean>,
 <column c_datetime_a, type datetime>,
 <column c_datetime_b, type datetime>]
>>> t.schema['c_int_a']
<column c_int_a, type bigint>
>>> t.schema['c_int_a'].comment
'Comment of column c_int_a'

You can also provide the project parameter to obtain the specified table from another project.

>>> t = o.get_table('dual', project='other_project')

Create the table schema

You can initialize a table in two ways. First, you can use columns or combination of columns and partitions columns to initialize the table.

>>> from odps.models import Schema, Column, Partition
>>> columns = [Column(name='num', type='bigint', comment='the column'),
>>>            Column(name='num2', type='double', comment='the column2')]
>>> partitions = [Partition(name='pt', type='string', comment='the partition')]
>>> schema = Schema(columns=columns, partitions=partitions)
>>> schema.columns
[<column num, type bigint>,
 <column num2, type double>,
 <partition pt, type string>]
>>> schema.partitions
[<partition pt, type string>]
>>> schema.names  # get column name of none-partition columns
['num', 'num2']
>>> schema.types  # get column type of none-partition columns
[bigint, double]

Second, you can use Schema.from_lists to initialize the table. This method is easier, but you cannot directly set the comments of the columns and the partitions.

>>> schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
>>> schema.columns
[<column num, type bigint>,
 <column num2, type double>,
 <partition pt, type string>]

Create tables

You can use the table schema to create a table in the following way:

>>> table = o.create_table('my_new_table', schema)
>>> table = o.create_table('my_new_table', schema, if_not_exists=True)  # create table only when the table does not exist
>>> table = o.create_table('my_new_table', schema, lifecycle=7)  # configure lifecycle of the table (in days)

An easier way is to use a string in the structure of “field name field type” to create the table, as shown in the following code:

>>> table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
>>> # a tuple like (column list, partition list) can be passed to create a partitioned table
>>> table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

By default, you can only use the bigint, double, decimal, string, datetime, boolean, map and array types to create a table. If you use public cloud services, you can set options.sql.use_odps2_extension = True to enable more types such as tinyint and struct, as shown in the following code:

>>> from odps import options
>>> options.sql.use_odps2_extension = True
>>> table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body string>')

Synchronize table updates

If a table has been updated by another program and has changes in the schema, you can use reload to synchronize the update.

>>> table.reload()

Record

A record is a row record in a table. You can use new_record of a table object to create a new record.

>>> t = o.get_table('mytable')
>>> r = t.new_record(['val0', 'val1'])  # the number of values must be the same with the number of columns in the schema
>>> r2 = t.new_record()  # initializing without values is also acceptable
>>> r2[0] = 'val0' # values can be set via column indices
>>> r2['field1'] = 'val1'  # values can also be set via column names
>>> r2.field1 = 'val1'  # values can also be set via attributes
>>>
>>> print(record[0])  # get the value of Column 0
>>> print(record['c_double_a'])  # get value via column name
>>> print(record.c_double_a)  # get value via attributes
>>> print(record[0: 3])  # slice over the column
>>> print(record[0, 2, 3])  # get multiple values via indices
>>> print(record['c_int_a', 'c_double_a'])  # get multiple values via column names

Obtain table data

You can obtain table data in different ways. First, you can use head to retrieve the first 10,000 or fewer data items in each table.

>>> t = o.get_table('dual')
>>> for record in t.head(3):
>>>     # process every Record object

Then, use open_reader as the table object to open a reader and read the data.

Open the reader using a WITH clause, as shown in the following code:

>>> with t.open_reader(partition='pt=test') as reader:
>>>     count = reader.count
>>>     for record in reader[5:10]  # This line can be executed many times until all records are visited. Parallelism can also be introduced.
>>>         # process one record

Open the reader without using a WITH clause, as shown in the following code:

>>> with t.open_reader(partition='pt=test') as reader:
>>>     count = reader.count
>>>     for record in reader[5:10]  # This line can be executed many times until all records are visited. Parallelism can also be introduced.
>>>         # process one record

Read directly into Pandas DataFrames:

>>> with t.open_reader(partition='pt=test') as reader:
>>>     pd_df = reader.to_pandas()

Accelerate data read using multiple processes:

Note

Currently acceleration using multiple processes is not available under Windows.

>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with t.open_reader(partition='pt=test') as reader:
>>>     pd_df = reader.to_pandas(n_process=n_process)

An easier way is to use read_table as the ODPS object, as shown in the following code:

>>> for record in o.read_table('test_table', partition='pt=test'):
>>>     # process one record

Write data to tables

Similar to open_reader, you can use open_writer as the table object to open a writer and write data to the table.

Open the reader using a WITH clause, as shown in the following code:

>>> with t.open_writer(partition='pt=test') as writer:
>>>     records = [[111, 'aaa', True],                 # a list can be used here
>>>                [222, 'bbb', False],
>>>                [333, 'ccc', True],
>>>                [444, '中文', False]]
>>>     writer.write(records)  # records can also be iterable objects
>>>
>>>     records = [t.new_record([111, 'aaa', True]),   # a list with records can also be used
>>>                t.new_record([222, 'bbb', False]),
>>>                t.new_record([333, 'ccc', True]),
>>>                t.new_record([444, '中文', False])]
>>>     writer.write(records)
>>>

If the specified partition does not exist, use the create_partition parameter to create a partition, as shown in the following code:

>>> with t.open_writer(partition='pt=test', create_partition=True) as writer:
>>>     records = [[111, 'aaa', True],                 # a list can be used here
>>>                [222, 'bbb', False],
>>>                [333, 'ccc', True],
>>>                [444, '中文', False]]
>>>     writer.write(records)  # records can also be iterable objects

An easier way is to use write_table as the ODPS object to write data, as shown in the following code:

>>> records = [[111, 'aaa', True],                 # a list can be used here
>>>            [222, 'bbb', False],
>>>            [333, 'ccc', True],
>>>            [444, '中文', False]]
>>> o.write_table('test_table', records, partition='pt=test', create_partition=True)

Note

Note:Every time when write_table is invoked,MaxCompute generates a new file on the server side, which is an expensive operation that reduces the throughput drastically. What’s more, too many files may increase query time on that table. Hence we propose writing multiple records or passing a Python generator object when calling write_table.

When calling `write_table`, new data will be appended to existing data. PyODPS does not provide options to overwrite existing data. When overwriting is needed, you need to explicitly remove existing data in tables by calling table.truncate() en unpartitioned tables or removing partitions in partitioned tables.

Use multiple processes to write records:

All processes share one single session_id but own different block_id, each block_id represents a server-side file respectively. After all writing done the main process commits and data are uploaded.

import random
from multiprocessing import Pool
from odps.tunnel import TableTunnel

def write_records(session_id, block_id):
    # create sessions with given id from main process
    local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
    # specify block_id when creating writers
    with local_session.open_record_writer(block_id) as writer:
        for i in range(5):
            # generate data and write to corresponding blocks
            record = table.new_record([random.randint(1, 100), random.random()])
            writer.write(record)

if __name__ == '__main__':
    N_WORKERS = 3

    table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
    tunnel = TableTunnel(o)
    upload_session = tunnel.create_upload_session(table.name)

    # all processes share one single session_id
    session_id = upload_session.id

    pool = Pool(processes=N_WORKERS)
    futures = []
    block_ids = []
    for i in range(N_WORKERS):
        futures.append(pool.apply_async(write_records, (session_id, i)))
        block_ids.append(i)
    [f.get() for f in futures]

    # finally we call commit with all block_ids
    upload_session.commit(block_ids)

Delete tables

>>> o.delete_table('my_table_name', if_exists=True)  #  delete only when the table exists
>>> t.drop()  # call drop method of the Table object to delete directly

Create a DataFrame

PyODPS provides a DataFrame framework to easily search and operate MaxCompute data. You can use to_df to convert a table to a DataFrame object.

>>> table = o.get_table('my_table_name')
>>> df = table.to_df()

Table partitions

Basic operations

Check if a table is partitioned:

>>> if table.schema.partitions:
>>>     print('Table %s is partitioned.' % table.name)

Iterate through all the partitions in a table:

>>> for partition in table.partitions:
>>>     print(partition.name)
>>> for partition in table.iterate_partitions(spec='pt=test'):
>>>     # iterate over secondary partitions

Check whether the specified partition exists, all field values should be provided:

>>> table.exist_partition('pt=test,sub=2015')

Check whether partitions satisfying provided prefix exist:

>>> # the order of partitions fields of table is pt, sub
>>> table.exist_partitions('pt=test')

Obtain the specified partition:

>>> partition = table.get_partition('pt=test')
>>> print(partition.creation_time)
2015-11-18 22:22:27
>>> partition.size
0

Create partitions

>>> t.create_partition('pt=test', if_not_exists=True)  # create only when the partition does not exist

Delete partitions

>>> t.delete_partition('pt=test', if_exists=True)  # delete only when the partition exists
>>> partition.drop()  # delete directly via the drop method of the partition object

Data upload and download channels

Note

If you just need to upload a small amount of data, we do not recommend using table tunnel directly as there are more convenient read and write methods which wraps table tunnel invocations. However, when your data amount is large and throughput is critical, you may try using table tunnel methods directly.

MaxCompute Tunnel is the data channel of MaxCompute. You can use this to upload data to or download data from MaxCompute.

Note: In a Cython environment, PyODPS compiles C programs during installation to increase the Tunnel upload and download speed when required.

Upload

from odps.tunnel import TableTunnel

table = o.get_table('my_table')

tunnel = TableTunnel(odps)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')

with upload_session.open_record_writer(0) as writer:
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = table.new_record(['test2', 'id2'])
    writer.write(record)

upload_session.commit([0])

You can also use stream-like upload interface:

from odps.tunnel import TableTunnel

table = o.get_table('my_table')

tunnel = TableTunnel(odps)
upload_session = tunnel.create_stream_upload_session(table.name, partition_spec='pt=test')

with upload_session.open_record_writer() as writer:
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = table.new_record(['test2', 'id2'])
    writer.write(record)

Download

from odps.tunnel import TableTunnel

tunnel = TableTunnel(odps)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')

with download_session.open_record_reader(0, download_session.count) as reader:
    for record in reader:
        # process every record