Tunnel

class odps.tunnel.TableTunnel(odps=None, client=None, project=None, endpoint=None, quota_name=None, namespace=None)[source]

Table tunnel API Entry.

Parameters:
  • odps – ODPS Entry object

  • project (str) – project name

  • endpoint (str) – tunnel endpoint

  • quota_name (str) – name of tunnel quota

create_download_session(table, async_mode=True, partition_spec=None, download_id=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, schema=None, timeout=None, tags=None, **kw)[source]

Create a download session for table.

Parameters:
  • table (str | odps.models.Table) – table object to read

  • partition_spec (str | odps.types.PartitionSpec) – partition spec to read

  • download_id (str) – existing download id

  • compress_option (odps.tunnel.CompressOption) – compress option

  • compress_algo (str) – compress algorithm

  • compress_level (int) – compress level

  • schema (str) – name of schema of the table

  • tags (str | list) – tags of the upload session

Returns:

TableDownloadSession

create_stream_upload_session(table, partition_spec=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, schema=None, schema_version=None, zorder_columns=None, upload_id=None, tags=None, allow_schema_mismatch=True, create_partition=False, slot_num=0, dynamic_partition=False)[source]

Create a stream upload session for table.

Parameters:
  • table (str | odps.models.Table) – table object to read

  • partition_spec (str | odps.types.PartitionSpec) – partition spec

  • upload_id (str) – existing upload id

  • compress_option (odps.tunnel.CompressOption) – compress option

  • compress_algo (str) – compress algorithm

  • compress_level (int) – compress level

  • schema (str) – name of schema of the table

  • schema_version (str) – schema version of the upload

  • zorder_columns (str | list) – zorder columns for clustering

  • tags (str | list) – tags of the upload session

  • allow_schema_mismatch (bool) – whether to allow table schema to be mismatched

  • create_partition (bool) – whether to create partition if not exist

  • slot_num (int) – number of slots for the session

  • dynamic_partition (bool) – whether to enable dynamic partition

Returns:

TableStreamUploadSession

create_upload_session(table, partition_spec=None, upload_id=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, schema=None, overwrite=False, create_partition=False, tags=None)[source]

Create an upload session for table.

Parameters:
  • table (str | odps.models.Table) – table object to read

  • partition_spec (str | odps.types.PartitionSpec) – partition spec

  • upload_id (str) – existing upload id

  • compress_option (odps.tunnel.CompressOption) – compress option

  • compress_algo (str) – compress algorithm

  • compress_level (int) – compress level

  • schema (str) – name of schema of the table

  • overwrite (bool) – whether to overwrite the table

  • create_partition (bool) – whether to create partitition if not exist

  • tags (str | list) – tags of the upload session

Returns:

TableUploadSession

create_upsert_session(table, partition_spec=None, slot_num=1, commit_timeout=120, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, schema=None, upsert_id=None, tags=None, lifecycle=None)[source]

Create an upsert session for table.

Parameters:
  • table (str | odps.models.Table) – table object to read

  • partition_spec (str | odps.types.PartitionSpec) – partition spec

  • upsert_id (str) – existing upsert id

  • commit_timeout – timeout for commit

  • compress_option (odps.tunnel.CompressOption) – compress option

  • compress_algo (str) – compress algorithm

  • compress_level (int) – compress level

  • schema (str) – name of schema of the table

  • tags (str | list) – tags of the upload session

  • lifecycle (int) – session lifecycle in hours, valid range 1-24

Returns:

TableUpsertSession

open_preview_reader(table, partition_spec=None, columns=None, limit=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, arrow=True, timeout=None, make_compat=True, read_all=False, tags=None)[source]

Open a preview reader for table to read initial rows.

Parameters:
  • table (str | odps.models.Table) – table object to read

  • partition_spec (str | odps.types.PartitionSpec) – partition spec to read

  • columns – columns to read

  • limit (int) – number of rows to read, 10000 by default

  • compress_option (odps.tunnel.CompressOption) – compress option

  • compress_algo (str) – compress algorithm

  • compress_level (int) – compress level

  • schema (str) – name of schema of the table

  • arrow (bool) – if True, return an Arrow reader, otherwise return a record reader

  • tags (str | list) – tags of the upload session

class odps.tunnel.TableDownloadSession(client, table, partition_spec, download_id=None, compress_option=None, async_mode=True, timeout=None, quota_name=None, tags=None, **kw)[source]

Tunnel session for downloading data from tables. Instances of this class should be created by TableTunnel.create_download_session().

id

ID of the download session which can be passed to create_download_session() for session reuse.

count

Count of records in the table.

schema

Schema of the table.

open_arrow_reader(start, count, compress=False, columns=None, append_partitions=False, on_exception=None, buffered=False)[source]

Open a reader to read data as Arrow format from the tunnel.

Parameters:
  • start (int) – start row index

  • count (int) – number of rows to read

  • compress (bool) – whether to compress data

  • columns – list of column names to read

  • append_partitions – whether to append partition values as columns

  • on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.

Returns:

an Arrow reader

Return type:

TunnelArrowReader

open_record_reader(start, count, compress=False, columns=None, append_partitions=True, buffered=False, buffer_size=None, row_batch_size=None, on_exception=None)[source]

Open a reader to read data as records from the tunnel.

Parameters:
  • start (int) – start row index

  • count (int) – number of rows to read

  • compress (bool) – whether to compress data

  • columns – list of column names to read

  • append_partitions – whether to append partition values as columns

  • buffered (bool) – whether to use buffered reader

  • buffer_size (int) – download buffer size in bytes. Num of rows read in every batch will be limited by this parameter as well as row_batch_size.

  • row_batch_size (bool) – number of rows to read per batch. Num of rows read in every batch will be limited by this parameter as well as buffer_size.

  • on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.

Returns:

a record reader

Return type:

TunnelRecordReader

class odps.tunnel.TableStreamUploadSession(client, table, partition_spec, compress_option=None, quota_name=None, create_partition=False, zorder_columns=None, schema_version=None, allow_schema_mismatch=True, upload_id=None, tags=None, schema_version_reloader=None, slot_num=0, dynamic_partition=False)[source]

Tunnel session for uploading data in stream method to tables. Instances of this class should be created by TableTunnel.create_stream_upload_session().

id

ID of the stream upload session which can be passed to create_stream_upload_session() for session reuse.

abort()[source]

Abort the upload session.

new_record(values=None)

Generate a record of the current upload session.

Parameters:

values (list) – the values of this records

Returns:

record

Return type:

odps.models.Record

Example:

>>> session = TableTunnel(o).create_upload_session('test_table')
>>> record = session.new_record()
>>> record[0] = 'my_name'
>>> record[1] = 'my_id'
>>> record = session.new_record(['my_name', 'my_id'])
open_record_writer(compress=False)[source]

Open a writer to write data in records to the tunnel.

Parameters:

compress (bool) – whether to compress data

Returns:

a record writer

Return type:

RecordWriter

class odps.tunnel.TableUploadSession(client, table, partition_spec, upload_id=None, compress_option=None, create_partition=None, overwrite=False, quota_name=None, tags=None)[source]

Tunnel session for uploading data to tables. Instances of this class should be created by TableTunnel.create_upload_session().

id

ID of the upload session which can be passed to create_upload_session() for session reuse.

commit(blocks)[source]

Commit written blocks to the tunnel. Can be called only once on a single session.

Parameters:

blocks (list) – list of block ids to commit

new_record(values=None)

Generate a record of the current upload session.

Parameters:

values (list) – the values of this records

Returns:

record

Return type:

odps.models.Record

Example:

>>> session = TableTunnel(o).create_upload_session('test_table')
>>> record = session.new_record()
>>> record[0] = 'my_name'
>>> record[1] = 'my_id'
>>> record = session.new_record(['my_name', 'my_id'])
open_arrow_writer(block_id=None, compress=False, buffer_size=None, on_exception=None, initial_block_id=None, block_id_gen=None)[source]

Open a writer to write data in Arrow format to the tunnel.

Parameters:
  • block_id (int) – id of the block to write to. If not specified, a BufferedArrowWriter will be created.

  • buffer_size (int) – size of the buffer to use for buffered writers.

  • compress (bool) – whether to compress data

  • on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.

Returns:

an Arrow writer

Return type:

ArrowWriter or BufferedArrowWriter

open_record_writer(block_id=None, compress=False, buffer_size=None, on_exception=None, initial_block_id=None, block_id_gen=None)[source]

Open a writer to write data in records to the tunnel.

Parameters:
  • block_id (int) – id of the block to write to. If not specified, a BufferedRecordWriter will be created.

  • buffer_size (int) – size of the buffer to use for buffered writers.

  • compress (bool) – whether to compress data

  • on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.

Returns:

a record writer

Return type:

RecordWriter or BufferedRecordWriter

class odps.tunnel.TableUpsertSession(client, table, partition_spec, compress_option=None, slot_num=1, commit_timeout=120, lifecycle=None, quota_name=None, upsert_id=None, tags=None)[source]

Tunnel session for inserting or updating data to upsert tables. Instances of this class should be created by TableTunnel.create_upsert_session().

id

ID of the upsert session which can be passed to create_upsert_session() for session reuse.

abort()[source]

Abort the current session.

commit(async_=False)[source]

Commit the current session. Can be called only once on a single session.

new_record(values=None)[source]

Generate a record of the current upload session.

Parameters:

values (list) – the values of this records

Returns:

record

Return type:

odps.models.Record

Example:

>>> session = TableTunnel(o).create_upload_session('test_table')
>>> record = session.new_record()
>>> record[0] = 'my_name'
>>> record[1] = 'my_id'
>>> record = session.new_record(['my_name', 'my_id'])
open_upsert_stream(compress=False)[source]

Open an upsert stream to insert or update data in records to the tunnel.

Parameters:

compress (bool) – whether to compress data

Returns:

an upsert stream

Return type:

Upsert

class odps.tunnel.InstanceTunnel(odps=None, client=None, project=None, endpoint=None, quota_name=None, namespace=None)[source]

Instance tunnel API Entry.

Parameters:
  • odps – ODPS Entry object

  • project (str) – project name

  • endpoint (str) – tunnel endpoint

  • quota_name (str) – name of tunnel quota

create_download_session(instance, download_id=None, limit=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, timeout=None, tags=None, **kw)[source]

Create a download session for instance results.

Parameters:
  • instance (str | odps.models.Instance) – instance object to read

  • download_id (str) – existing download id

  • limit (int) – record limit of the download session

  • compress_option (odps.tunnel.CompressOption) – compress option

  • compress_algo (str) – compress algorithm

  • compress_level (int) – compress level

  • schema (str) – name of schema of the table

  • tags (str | list) – tags of the upload session

Returns:

InstanceDownloadSession

class odps.tunnel.InstanceDownloadSession(client, instance, download_id=None, limit=None, compress_option=None, quota_name=None, timeout=None, tags=None, **kw)[source]

Tunnel session for downloading data from instance results. Instances of this class should be created by InstanceTunnel.create_download_session().

You may get the id of the session for reuse by attribute id of the session object.

id

ID of the download session which can be passed to create_download_session() for session reuse.

count

Count of records in the instance result.

schema

Schema of the instance result.

open_arrow_reader(start, count, compress=False, columns=None, on_exception=None, **_)[source]

Open a reader to read data as arrow format from the tunnel.

Parameters:
  • start (int) – start row index

  • count (int) – number of rows to read

  • compress (bool) – whether to compress data

  • columns – list of column names to read

  • on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.

Returns:

an arrow reader

Return type:

TunnelArrowReader

open_record_reader(start, count, compress=False, columns=None, buffered=False, buffer_size=None, row_batch_size=None, on_exception=None, **_)[source]

Open a reader to read data as records from the tunnel.

Parameters:
  • start (int) – start row index

  • count (int) – number of rows to read

  • compress (bool) – whether to compress data

  • columns – list of column names to read

  • buffered (bool) – whether to use buffered reader

  • buffer_size (int) – download buffer size in bytes. Num of rows read in every batch will be limited by this parameter as well as row_batch_size.

  • row_batch_size (bool) – number of rows to read per batch. Num of rows read in every batch will be limited by this parameter as well as buffer_size.

  • on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.

Returns:

a record reader

Return type:

TunnelRecordReader

class odps.tunnel.ArrowWriter(schema, request_callback, compress_option=None, chunk_size=None)[source]

Writer object to write data to ODPS using Arrow format. Should be created with TableUploadSession.open_arrow_writer() with block_id specified.

Example:

Here we show an example of writing a pandas DataFrame to ODPS.

import pandas as pd
from odps.tunnel import TableTunnel

tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session('my_table', partition_spec='pt=test')

# creates an ArrowWriter instance for block 0
with upload_session.open_arrow_writer(0) as writer:
    df = pd.DataFrame({'col1': ['test1', 'test2'], 'col2': ['id1', 'id2']})
    writer.write(df)

# commit block 0
upload_session.commit([0])
Note:

ArrowWriter holds long HTTP connection which might be closed at server end when the duration is over 3 minutes. Please avoid opening ArrowWriter for a long period. Details can be found here.

close()

Closes the writer and flush all data to server.

write(data)

Write an Arrow RecordBatch, an Arrow Table or a pandas DataFrame.

class odps.tunnel.RecordWriter(schema, request_callback, compress_option=None, encoding='utf-8')[source]

Writer object to write data to ODPS with records. Should be created with TableUploadSession.open_record_writer() with block_id specified.

Example:

Here we show an example of writing data to ODPS with two records created in different ways.

from odps.tunnel import TableTunnel

tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session('my_table', partition_spec='pt=test')

# creates a RecordWriter instance for block 0
with upload_session.open_record_writer(0) as writer:
    record = upload_session.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = upload_session.new_record(['test2', 'id2'])
    writer.write(record)

# commit block 0
upload_session.commit([0])
Note:

RecordWriter holds long HTTP connection which might be closed at server end when the duration is over 3 minutes. Please avoid opening RecordWriter for a long period. Details can be found here.

close()[source]

Close the writer and flush all data to server.

write(record)[source]

Write a record to the tunnel.

Parameters:

record (odps.models.Record) – record to write

class odps.tunnel.BufferedArrowWriter(schema, request_callback, compress_option=None, buffer_size=None, chunk_size=None, block_id=None, block_id_gen=None)[source]

Writer object to write data to ODPS using Arrow format. Should be created with TableUploadSession.open_arrow_writer() without block_id. Results should be submitted with TableUploadSession.commit() with returned value from get_blocks_written().

Example:

Here we show an example of writing a pandas DataFrame to ODPS.

import pandas as pd
from odps.tunnel import TableTunnel

tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session('my_table', partition_spec='pt=test')

# creates a BufferedArrowWriter instance
with upload_session.open_arrow_writer() as writer:
    df = pd.DataFrame({'col1': ['test1', 'test2'], 'col2': ['id1', 'id2']})
    writer.write(df)

# commit blocks
upload_session.commit(writer.get_blocks_written())
close()[source]

Closes the writer and flush all data to server.

get_blocks_written()[source]

Get block ids created during writing. Should be provided as the argument to TableUploadSession.commit().

write(data)[source]

Write an Arrow RecordBatch, an Arrow Table or a pandas DataFrame.

class odps.tunnel.BufferedRecordWriter(schema, request_callback, compress_option=None, encoding='utf-8', buffer_size=None, block_id=None, block_id_gen=None)[source]

Writer object to write data to ODPS with records. Should be created with TableUploadSession.open_record_writer() without block_id. Results should be submitted with TableUploadSession.commit() with returned value from get_blocks_written().

Example:

Here we show an example of writing data to ODPS with two records created in different ways.

from odps.tunnel import TableTunnel

tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session('my_table', partition_spec='pt=test')

# creates a BufferedRecordWriter instance
with upload_session.open_record_writer() as writer:
    record = upload_session.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = upload_session.new_record(['test2', 'id2'])
    writer.write(record)

# commit blocks
upload_session.commit(writer.get_blocks_written())
close()[source]

Close the writer and flush all data to server.

get_blocks_written()[source]

Get block ids created during writing. Should be provided as the argument to TableUploadSession.commit().

write(record)[source]

Write a record to the tunnel.

Parameters:

record (odps.models.Record) – record to write

class odps.tunnel.TunnelArrowReader(schema, stream_creator, columns=None, partition_spec=None, append_partitions=False, use_ipc_stream=False, timestamp_as_struct=False, on_exception=None)[source]

Reader object to read data from ODPS in Arrow format. Should be created with TableDownloadSession.open_arrow_reader().

Example:

from odps.tunnel import TableTunnel

    tunnel = TableTunnel(o)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')

    # create a TunnelArrowReader
    with download_session.open_arrow_reader(0, download_session.count) as reader:
        for batch in reader:
            print(batch.to_pandas())
read()[source]

Read all data from tunnel and forms an Arrow Table.

Returns:

Arrow Table

read_next_batch()[source]

Read next Arrow RecordBatch from tunnel.

Returns:

Arrow RecordBatch

to_pandas()[source]

Read all data from tunnel and convert to a Pandas DataFrame.

class odps.tunnel.TunnelRecordReader(schema, stream_creator, columns=None, partition_spec=None, append_partitions=False, on_exception=None)[source]

Reader object to read data from ODPS in records. Should be created with TableDownloadSession.open_record_reader().

Example:

from odps.tunnel import TableTunnel

    tunnel = TableTunnel(o)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')

    # create a TunnelRecordReader
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
            print(record.values)
read()

Read next record.

Returns:

A record object

Return type:

Record

class odps.tunnel.Upsert(schema, request_callback, session, compress_option=None, encoding='utf-8', max_buffer_size=None, slot_buffer_size=None)[source]

Object to insert or update data into an ODPS upsert table with records. Should be created with TableUpsertSession.open_upsert_stream().

Example:

Here we show an example of inserting, updating and deleting data to an upsert table.

from odps.tunnel import TableTunnel

tunnel = TableTunnel(o)
upsert_session = tunnel.create_upsert_session('my_table', partition_spec='pt=test')

# creates a BufferedRecordWriter instance
stream = upsert_session.open_upsert_stream(compress=True)
rec = upsert_session.new_record(["0", "v1"])
stream.upsert(rec)
rec = upsert_session.new_record(["0", "v2"])
stream.upsert(rec)
rec = upsert_session.new_record(["1", "v1"])
stream.upsert(rec)
rec = upsert_session.new_record(["2", "v1"])
stream.upsert(rec)
stream.delete(rec)
stream.flush()
stream.close()

upsert_session.commit()
close()[source]

Close the stream and write all data to server.

delete(record)[source]

Delete a record.

Parameters:

record (odps.models.Record) – record to write

flush(flush_all=True)[source]

Flush all data in buffer to server.

upsert(record)[source]

Insert or update a record.

Parameters:

record (odps.models.Record) – record to write

class odps.tunnel.VolumeTunnel(odps=None, client=None, project=None, endpoint=None, quota_name=None, namespace=None)[source]

Volume tunnel API Entry.

Parameters:
  • odps – ODPS Entry object

  • project (str) – project name

  • endpoint (str) – tunnel endpoint

  • quota_name (str) – name of tunnel quota

class odps.tunnel.VolumeDownloadSession(client, volume, partition_spec, file_name=None, download_id=None, compress_option=None, quota_name=None, tags=None)[source]
class odps.tunnel.VolumeUploadSession(client, volume, partition_spec, upload_id=None, compress_option=None, quota_name=None, tags=None)[source]