Tunnel
- class odps.tunnel.TableTunnel(odps=None, client=None, project=None, endpoint=None, quota_name=None, namespace=None)[source]
Table tunnel API Entry.
- Parameters:
odps – ODPS Entry object
project (str) – project name
endpoint (str) – tunnel endpoint
quota_name (str) – name of tunnel quota
- create_download_session(table, async_mode=True, partition_spec=None, download_id=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, schema=None, timeout=None, tags=None, **kw)[source]
Create a download session for table.
- Parameters:
table (str |
odps.models.Table) – table object to readpartition_spec (str |
odps.types.PartitionSpec) – partition spec to readdownload_id (str) – existing download id
compress_option (
odps.tunnel.CompressOption) – compress optioncompress_algo (str) – compress algorithm
compress_level (int) – compress level
schema (str) – name of schema of the table
tags (str | list) – tags of the upload session
- Returns:
- create_stream_upload_session(table, partition_spec=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, schema=None, schema_version=None, zorder_columns=None, upload_id=None, tags=None, allow_schema_mismatch=True, create_partition=False)[source]
Create a stream upload session for table.
- Parameters:
table (str |
odps.models.Table) – table object to readpartition_spec (str |
odps.types.PartitionSpec) – partition specupload_id (str) – existing upload id
compress_option (
odps.tunnel.CompressOption) – compress optioncompress_algo (str) – compress algorithm
compress_level (int) – compress level
schema (str) – name of schema of the table
schema_version (str) – schema version of the upload
tags (str | list) – tags of the upload session
allow_schema_mismatch (bool) – whether to allow table schema to be mismatched
create_partition (bool) – whether to create partition if not exist
- Returns:
- create_upload_session(table, partition_spec=None, upload_id=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, schema=None, overwrite=False, create_partition=False, tags=None)[source]
Create an upload session for table.
- Parameters:
table (str |
odps.models.Table) – table object to readpartition_spec (str |
odps.types.PartitionSpec) – partition specupload_id (str) – existing upload id
compress_option (
odps.tunnel.CompressOption) – compress optioncompress_algo (str) – compress algorithm
compress_level (int) – compress level
schema (str) – name of schema of the table
overwrite (bool) – whether to overwrite the table
create_partition (bool) – whether to create partitition if not exist
tags (str | list) – tags of the upload session
- Returns:
- create_upsert_session(table, partition_spec=None, slot_num=1, commit_timeout=120, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, schema=None, upsert_id=None, tags=None)[source]
Create an upsert session for table.
- Parameters:
table (str |
odps.models.Table) – table object to readpartition_spec (str |
odps.types.PartitionSpec) – partition specupsert_id (str) – existing upsert id
commit_timeout – timeout for commit
compress_option (
odps.tunnel.CompressOption) – compress optioncompress_algo (str) – compress algorithm
compress_level (int) – compress level
schema (str) – name of schema of the table
tags (str | list) – tags of the upload session
- Returns:
- open_preview_reader(table, partition_spec=None, columns=None, limit=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, arrow=True, timeout=None, make_compat=True, read_all=False, tags=None)[source]
Open a preview reader for table to read initial rows.
- Parameters:
table (str |
odps.models.Table) – table object to readpartition_spec (str |
odps.types.PartitionSpec) – partition spec to readcolumns – columns to read
limit (int) – number of rows to read, 10000 by default
compress_option (
odps.tunnel.CompressOption) – compress optioncompress_algo (str) – compress algorithm
compress_level (int) – compress level
schema (str) – name of schema of the table
arrow (bool) – if True, return an Arrow reader, otherwise return a record reader
tags (str | list) – tags of the upload session
- class odps.tunnel.TableDownloadSession(client, table, partition_spec, download_id=None, compress_option=None, async_mode=True, timeout=None, quota_name=None, tags=None, **kw)[source]
Tunnel session for downloading data from tables. Instances of this class should be created by
TableTunnel.create_download_session().- id
ID of the download session which can be passed to
create_download_session()for session reuse.
- count
Count of records in the table.
- schema
Schema of the table.
- open_arrow_reader(start, count, compress=False, columns=None, append_partitions=False, on_exception=None, buffered=False)[source]
Open a reader to read data as Arrow format from the tunnel.
- Parameters:
start (int) – start row index
count (int) – number of rows to read
compress (bool) – whether to compress data
columns – list of column names to read
append_partitions – whether to append partition values as columns
on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.
- Returns:
an Arrow reader
- Return type:
- open_record_reader(start, count, compress=False, columns=None, append_partitions=True, buffered=False, buffer_size=None, row_batch_size=None, on_exception=None)[source]
Open a reader to read data as records from the tunnel.
- Parameters:
start (int) – start row index
count (int) – number of rows to read
compress (bool) – whether to compress data
columns – list of column names to read
append_partitions – whether to append partition values as columns
buffered (bool) – whether to use buffered reader
buffer_size (int) – download buffer size in bytes. Num of rows read in every batch will be limited by this parameter as well as row_batch_size.
row_batch_size (bool) – number of rows to read per batch. Num of rows read in every batch will be limited by this parameter as well as buffer_size.
on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.
- Returns:
a record reader
- Return type:
- class odps.tunnel.TableStreamUploadSession(client, table, partition_spec, compress_option=None, quota_name=None, create_partition=False, zorder_columns=None, schema_version=None, allow_schema_mismatch=True, upload_id=None, tags=None, schema_version_reloader=None)[source]
Tunnel session for uploading data in stream method to tables. Instances of this class should be created by
TableTunnel.create_stream_upload_session().- id
ID of the stream upload session which can be passed to
create_stream_upload_session()for session reuse.
- new_record(values=None)
Generate a record of the current upload session.
- Parameters:
values (list) – the values of this records
- Returns:
record
- Return type:
- Example:
>>> session = TableTunnel(o).create_upload_session('test_table') >>> record = session.new_record() >>> record[0] = 'my_name' >>> record[1] = 'my_id' >>> record = session.new_record(['my_name', 'my_id'])
See also
- class odps.tunnel.TableUploadSession(client, table, partition_spec, upload_id=None, compress_option=None, create_partition=None, overwrite=False, quota_name=None, tags=None)[source]
Tunnel session for uploading data to tables. Instances of this class should be created by
TableTunnel.create_upload_session().- id
ID of the upload session which can be passed to
create_upload_session()for session reuse.
- commit(blocks)[source]
Commit written blocks to the tunnel. Can be called only once on a single session.
- Parameters:
blocks (list) – list of block ids to commit
- new_record(values=None)
Generate a record of the current upload session.
- Parameters:
values (list) – the values of this records
- Returns:
record
- Return type:
- Example:
>>> session = TableTunnel(o).create_upload_session('test_table') >>> record = session.new_record() >>> record[0] = 'my_name' >>> record[1] = 'my_id' >>> record = session.new_record(['my_name', 'my_id'])
See also
- open_arrow_writer(block_id=None, compress=False, buffer_size=None, on_exception=None, initial_block_id=None, block_id_gen=None)[source]
Open a writer to write data in Arrow format to the tunnel.
- Parameters:
block_id (int) – id of the block to write to. If not specified, a
BufferedArrowWriterwill be created.buffer_size (int) – size of the buffer to use for buffered writers.
compress (bool) – whether to compress data
on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.
- Returns:
an Arrow writer
- Return type:
- open_record_writer(block_id=None, compress=False, buffer_size=None, on_exception=None, initial_block_id=None, block_id_gen=None)[source]
Open a writer to write data in records to the tunnel.
- Parameters:
block_id (int) – id of the block to write to. If not specified, a
BufferedRecordWriterwill be created.buffer_size (int) – size of the buffer to use for buffered writers.
compress (bool) – whether to compress data
on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.
- Returns:
a record writer
- Return type:
- class odps.tunnel.TableUpsertSession(client, table, partition_spec, compress_option=None, slot_num=1, commit_timeout=120, lifecycle=None, quota_name=None, upsert_id=None, tags=None)[source]
Tunnel session for inserting or updating data to upsert tables. Instances of this class should be created by
TableTunnel.create_upsert_session().- id
ID of the upsert session which can be passed to
create_upsert_session()for session reuse.
- commit(async_=False)[source]
Commit the current session. Can be called only once on a single session.
- new_record(values=None)[source]
Generate a record of the current upload session.
- Parameters:
values (list) – the values of this records
- Returns:
record
- Return type:
- Example:
>>> session = TableTunnel(o).create_upload_session('test_table') >>> record = session.new_record() >>> record[0] = 'my_name' >>> record[1] = 'my_id' >>> record = session.new_record(['my_name', 'my_id'])
See also
- class odps.tunnel.InstanceTunnel(odps=None, client=None, project=None, endpoint=None, quota_name=None, namespace=None)[source]
Instance tunnel API Entry.
- Parameters:
odps – ODPS Entry object
project (str) – project name
endpoint (str) – tunnel endpoint
quota_name (str) – name of tunnel quota
- create_download_session(instance, download_id=None, limit=None, compress_option=None, compress_algo=None, compress_level=None, compress_strategy=None, timeout=None, tags=None, **kw)[source]
Create a download session for instance results.
- Parameters:
instance (str |
odps.models.Instance) – instance object to readdownload_id (str) – existing download id
limit (int) – record limit of the download session
compress_option (
odps.tunnel.CompressOption) – compress optioncompress_algo (str) – compress algorithm
compress_level (int) – compress level
schema (str) – name of schema of the table
tags (str | list) – tags of the upload session
- Returns:
- class odps.tunnel.InstanceDownloadSession(client, instance, download_id=None, limit=None, compress_option=None, quota_name=None, timeout=None, tags=None, **kw)[source]
Tunnel session for downloading data from instance results. Instances of this class should be created by
InstanceTunnel.create_download_session().You may get the id of the session for reuse by attribute
idof the session object.- id
ID of the download session which can be passed to
create_download_session()for session reuse.
- count
Count of records in the instance result.
- schema
Schema of the instance result.
- open_arrow_reader(start, count, compress=False, columns=None, on_exception=None, **_)[source]
Open a reader to read data as arrow format from the tunnel.
- Parameters:
start (int) – start row index
count (int) – number of rows to read
compress (bool) – whether to compress data
columns – list of column names to read
on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.
- Returns:
an arrow reader
- Return type:
- open_record_reader(start, count, compress=False, columns=None, buffered=False, buffer_size=None, row_batch_size=None, on_exception=None, **_)[source]
Open a reader to read data as records from the tunnel.
- Parameters:
start (int) – start row index
count (int) – number of rows to read
compress (bool) – whether to compress data
columns – list of column names to read
buffered (bool) – whether to use buffered reader
buffer_size (int) – download buffer size in bytes. Num of rows read in every batch will be limited by this parameter as well as row_batch_size.
row_batch_size (bool) – number of rows to read per batch. Num of rows read in every batch will be limited by this parameter as well as buffer_size.
on_exception – custom error handling function accepting an Exception instance as input. If return value is True, error will be raised. Otherwise retry will continue.
- Returns:
a record reader
- Return type:
- class odps.tunnel.ArrowWriter(schema, request_callback, compress_option=None, chunk_size=None)[source]
Writer object to write data to ODPS using Arrow format. Should be created with
TableUploadSession.open_arrow_writer()withblock_idspecified.- Example:
Here we show an example of writing a pandas DataFrame to ODPS.
import pandas as pd from odps.tunnel import TableTunnel tunnel = TableTunnel(o) upload_session = tunnel.create_upload_session('my_table', partition_spec='pt=test') # creates an ArrowWriter instance for block 0 with upload_session.open_arrow_writer(0) as writer: df = pd.DataFrame({'col1': ['test1', 'test2'], 'col2': ['id1', 'id2']}) writer.write(df) # commit block 0 upload_session.commit([0])
- Note:
ArrowWriterholds long HTTP connection which might be closed at server end when the duration is over 3 minutes. Please avoid openingArrowWriterfor a long period. Details can be found here.- close()
Closes the writer and flush all data to server.
- write(data)
Write an Arrow RecordBatch, an Arrow Table or a pandas DataFrame.
- class odps.tunnel.RecordWriter(schema, request_callback, compress_option=None, encoding='utf-8')[source]
Writer object to write data to ODPS with records. Should be created with
TableUploadSession.open_record_writer()withblock_idspecified.- Example:
Here we show an example of writing data to ODPS with two records created in different ways.
from odps.tunnel import TableTunnel tunnel = TableTunnel(o) upload_session = tunnel.create_upload_session('my_table', partition_spec='pt=test') # creates a RecordWriter instance for block 0 with upload_session.open_record_writer(0) as writer: record = upload_session.new_record() record[0] = 'test1' record[1] = 'id1' writer.write(record) record = upload_session.new_record(['test2', 'id2']) writer.write(record) # commit block 0 upload_session.commit([0])
- Note:
RecordWriterholds long HTTP connection which might be closed at server end when the duration is over 3 minutes. Please avoid openingRecordWriterfor a long period. Details can be found here.- write(record)[source]
Write a record to the tunnel.
- Parameters:
record (
odps.models.Record) – record to write
- class odps.tunnel.BufferedArrowWriter(schema, request_callback, compress_option=None, buffer_size=None, chunk_size=None, block_id=None, block_id_gen=None)[source]
Writer object to write data to ODPS using Arrow format. Should be created with
TableUploadSession.open_arrow_writer()withoutblock_id. Results should be submitted withTableUploadSession.commit()with returned value fromget_blocks_written().- Example:
Here we show an example of writing a pandas DataFrame to ODPS.
import pandas as pd from odps.tunnel import TableTunnel tunnel = TableTunnel(o) upload_session = tunnel.create_upload_session('my_table', partition_spec='pt=test') # creates a BufferedArrowWriter instance with upload_session.open_arrow_writer() as writer: df = pd.DataFrame({'col1': ['test1', 'test2'], 'col2': ['id1', 'id2']}) writer.write(df) # commit blocks upload_session.commit(writer.get_blocks_written())
- get_blocks_written()[source]
Get block ids created during writing. Should be provided as the argument to
TableUploadSession.commit().
- class odps.tunnel.BufferedRecordWriter(schema, request_callback, compress_option=None, encoding='utf-8', buffer_size=None, block_id=None, block_id_gen=None)[source]
Writer object to write data to ODPS with records. Should be created with
TableUploadSession.open_record_writer()withoutblock_id. Results should be submitted withTableUploadSession.commit()with returned value fromget_blocks_written().- Example:
Here we show an example of writing data to ODPS with two records created in different ways.
from odps.tunnel import TableTunnel tunnel = TableTunnel(o) upload_session = tunnel.create_upload_session('my_table', partition_spec='pt=test') # creates a BufferedRecordWriter instance with upload_session.open_record_writer() as writer: record = upload_session.new_record() record[0] = 'test1' record[1] = 'id1' writer.write(record) record = upload_session.new_record(['test2', 'id2']) writer.write(record) # commit blocks upload_session.commit(writer.get_blocks_written())
- get_blocks_written()[source]
Get block ids created during writing. Should be provided as the argument to
TableUploadSession.commit().
- write(record)[source]
Write a record to the tunnel.
- Parameters:
record (
odps.models.Record) – record to write
- class odps.tunnel.TunnelArrowReader(schema, stream_creator, columns=None, partition_spec=None, append_partitions=False, use_ipc_stream=False, on_exception=None)[source]
Reader object to read data from ODPS in Arrow format. Should be created with
TableDownloadSession.open_arrow_reader().- Example:
from odps.tunnel import TableTunnel tunnel = TableTunnel(o) download_session = tunnel.create_download_session('my_table', partition_spec='pt=test') # create a TunnelArrowReader with download_session.open_arrow_reader(0, download_session.count) as reader: for batch in reader: print(batch.to_pandas())
- class odps.tunnel.TunnelRecordReader(schema, stream_creator, columns=None, partition_spec=None, append_partitions=False, on_exception=None)[source]
Reader object to read data from ODPS in records. Should be created with
TableDownloadSession.open_record_reader().- Example:
from odps.tunnel import TableTunnel tunnel = TableTunnel(o) download_session = tunnel.create_download_session('my_table', partition_spec='pt=test') # create a TunnelRecordReader with download_session.open_record_reader(0, download_session.count) as reader: for record in reader: print(record.values)
- class odps.tunnel.Upsert(schema, request_callback, session, compress_option=None, encoding='utf-8', max_buffer_size=None, slot_buffer_size=None)[source]
Object to insert or update data into an ODPS upsert table with records. Should be created with
TableUpsertSession.open_upsert_stream().- Example:
Here we show an example of inserting, updating and deleting data to an upsert table.
from odps.tunnel import TableTunnel tunnel = TableTunnel(o) upsert_session = tunnel.create_upsert_session('my_table', partition_spec='pt=test') # creates a BufferedRecordWriter instance stream = upsert_session.open_upsert_stream(compress=True) rec = upsert_session.new_record(["0", "v1"]) stream.upsert(rec) rec = upsert_session.new_record(["0", "v2"]) stream.upsert(rec) rec = upsert_session.new_record(["1", "v1"]) stream.upsert(rec) rec = upsert_session.new_record(["2", "v1"]) stream.upsert(rec) stream.delete(rec) stream.flush() stream.close() upsert_session.commit()
- delete(record)[source]
Delete a record.
- Parameters:
record (
odps.models.Record) – record to write
- upsert(record)[source]
Insert or update a record.
- Parameters:
record (
odps.models.Record) – record to write
- class odps.tunnel.VolumeTunnel(odps=None, client=None, project=None, endpoint=None, quota_name=None, namespace=None)[source]
Volume tunnel API Entry.
- Parameters:
odps – ODPS Entry object
project (str) – project name
endpoint (str) – tunnel endpoint
quota_name (str) – name of tunnel quota