Storage API V2
MaxCompute Storage API V2 is a high-throughput data read/write interface provided by MaxCompute. Compared to the Tunnel-based data channel, Storage API V2 offers finer-grained session management, support for Arrow and Blob format read/write, incremental reading, and table preview, making it suitable for large-scale parallel data read/write scenarios.
Note
Storage API V2 requires server-side support. Make sure the relevant features are enabled on your MaxCompute cluster.
Core concepts
The core concepts of Storage API V2 include:
Session: The transactional context for read/write operations. A read session manages data splits, while a write session ensures data atomicity.
Split: A read session divides data into multiple splits by size, parallelism, etc. Each split can be read independently, enabling parallel processing.
Stream: A data upload channel within a write session. A write session can create multiple streams to support parallel writes.
Compression: Supports UNCOMPRESSED (default), LZ4, and ZSTD compression algorithms to reduce network data transfer.
Route Token: A routing identifier returned by the server, used for session affinity to ensure subsequent requests are routed to the same node.
Exactly-Once Mode: Write streams support exactly-once semantics, achieving idempotent writes via access_token and row_offset.
Client initialization
To use Storage API V2, you need to create a StorageApiClient or StorageApiArrowClient instance. Both require an ODPS entry object and a table (or Instance) object.
from odps import ODPS
from odps.apis.storage_api_v2 import StorageApiClient, StorageApiArrowClient
# Initialize the ODPS entry object
odps = ODPS(
access_id="your_access_id",
secret_access_key="your_secret_access_key",
project="your_project",
endpoint="your_endpoint",
)
# Get the table object
table = odps.get_table("your_table")
# Create the base client (returns raw byte streams)
client = StorageApiClient(odps, table)
# Create the Arrow client (returns Arrow RecordBatches, recommended)
arrow_client = StorageApiArrowClient(odps, table)
StorageApiArrowClient inherits from StorageApiClient and provides additional read_rows_arrow, write_rows_arrow, and preview_table_arrow methods that directly read/write PyArrow RecordBatch objects, making it easy to integrate with pandas and other data processing frameworks. StorageApiArrowClient is recommended for most use cases.
You can also specify quota_name and rest_endpoint parameters to use a specific quota and custom endpoint:
client = StorageApiArrowClient(
odps, table,
quota_name="your_quota",
rest_endpoint="https://your-custom-endpoint",
)
You can also set request tags via the tags parameter:
client = StorageApiArrowClient(
odps, table,
tags="tag1,tag2",
)
Reading data
Complete read workflow
Reading data with Storage API V2 involves the following steps:
Create a read session
Read data by split
Close the read session (sessions expire automatically, no manual close needed)
Create a read session
Create a read session with the create_read_session() method. The read session determines how data is split, which columns and partitions are returned, etc.
from odps.apis.storage_api_v2 import StorageApiArrowClient
arrow_client = StorageApiArrowClient(odps, table)
# Create a read session with default split options
read_resp = arrow_client.create_read_session()
print(f"Session ID: {read_resp.session_id}")
print(f"Split count: {read_resp.splits_count}")
print(f"Total record count: {read_resp.record_count}")
print(f"Session status: {read_resp.session_status}")
print(f"Expiration time: {read_resp.expiration_time}")
When creating a read session, you can specify columns, partitions, and split options:
from odps.apis.storage_api_v2 import SplitOptions
# Read only specific columns and partitions
read_resp = arrow_client.create_read_session(
required_data_columns=["id", "name", "value"],
required_partitions=["pt=20230101"],
)
# Split by parallelism, create 10 splits
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
split_opts.split_number = 10
read_resp = arrow_client.create_read_session(
split_options=split_opts,
)
# Split by row offset
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.ROW_OFFSET
split_opts.split_number = 1000000 # 1 million rows per split
read_resp = arrow_client.create_read_session(
split_options=split_opts,
)
You can also specify partition columns and Bucket IDs when creating a read session:
# Read only specific partition columns
read_resp = arrow_client.create_read_session(
required_partition_columns=["pt"],
)
# Read specific Buckets (for clustered tables)
read_resp = arrow_client.create_read_session(
required_bucket_ids=["0", "1"],
)
Reading data in Arrow format
Use the read_rows_arrow method of StorageApiArrowClient to read data directly as Arrow RecordBatches:
import pyarrow as pa
read_resp = arrow_client.create_read_session()
# Iterate over all splits to read data
for split_index in range(read_resp.splits_count):
reader = arrow_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
df = batch.to_pandas()
# Process the DataFrame
Reading data with raw byte streams
Use the read_rows_stream method of StorageApiClient which returns a StreamReader for reading raw byte streams:
from odps.apis.storage_api_v2 import StorageApiClient, ArrowReader
client = StorageApiClient(odps, table)
read_resp = client.create_read_session()
reader = client.read_rows_stream(read_resp.session_id, split_index=0)
arrow_reader = ArrowReader(reader)
while True:
batch = arrow_reader.read()
if batch is None:
break
df = batch.to_pandas()
# Process the DataFrame
Reading a specific range of data
You can read a specific range of data using the row_offset and row_count parameters:
# Read 500 rows starting from row 1000
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
row_offset=1000,
row_count=500,
)
You can also control the number of rows per batch with max_batch_rows to manage memory usage:
# At most 1024 rows per batch, reducing memory usage
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
max_batch_rows=1024,
)
You can also control the raw byte size of each batch with max_batch_raw_size:
# Raw size per batch no more than 8MB
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
max_batch_raw_size=8 * 1024 * 1024,
)
When reading, you can also skip rows with skip_row_num, specify returned columns with data_columns, and specify the data format with data_format:
# Skip the first 100 rows
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
skip_row_num=100,
)
# Read only specific columns
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
data_columns=["id", "name"],
)
Parallel reading
The split mechanism of Storage API V2 natively supports parallel reading. Each split can be read independently, making it suitable for multi-threaded or multi-process scenarios:
from concurrent.futures import ThreadPoolExecutor
read_resp = arrow_client.create_read_session()
def read_split(split_index):
reader = arrow_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
batches = []
while True:
batch = reader.read()
if batch is None:
break
batches.append(batch)
return pa.concat_batches(batches) if batches else None
# Read all splits in parallel using a thread pool
with ThreadPoolExecutor(max_workers=read_resp.splits_count) as pool:
futures = [
pool.submit(read_split, i)
for i in range(read_resp.splits_count)
]
results = [f.result() for f in futures]
Refreshing a read session
Read sessions have an expiration time. If a session expires during a long-running read, you can use the refresh parameter of get_read_session to extend the session lifetime:
from odps.apis.storage_api_v2 import SessionStatus
# Check session status
status = arrow_client.get_read_session(read_resp.session_id)
if status.session_status == SessionStatus.EXPIRED:
# Refresh the session
status = arrow_client.get_read_session(read_resp.session_id, refresh=True)
Incremental reading
Storage API V2 supports incremental reading mode to capture table data changes:
from odps.apis.storage_api_v2 import IncrementalReadOptions
incr_opts = IncrementalReadOptions()
incr_opts.start_version = 100
read_resp = arrow_client.create_read_session(
incremental_read=True,
incremental_read_options=incr_opts,
)
print(f"Latest version: {read_resp.latest_version}")
You can also perform incremental reading by timestamp range:
incr_opts.start_time_stamp = "2023-01-01 00:00:00"
incr_opts.end_time_stamp = "2023-01-02 00:00:00"
read_resp = arrow_client.create_read_session(
incremental_read=True,
incremental_read_options=incr_opts,
)
Reading data from a SQL Instance
Storage API V2 supports reading data from SQL execution results (Instance). In this case, the client operates in read-only mode:
# Execute SQL to get an Instance
instance = odps.execute_sql("SELECT * FROM your_table LIMIT 1000")
# Create a client with the Instance
instance_client = StorageApiArrowClient(odps, instance)
read_resp = instance_client.create_read_session()
# Read the results
for split_index in range(read_resp.splits_count):
reader = instance_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
df = batch.to_pandas()
Note
Instance-based clients do not support write operations. Calling write methods will raise a ValueError.
Previewing table data
The preview feature provides a lightweight way to browse data without creating a session:
from odps.apis.storage_api_v2 import ArrowReader
# Preview using the Arrow client
reader = arrow_client.preview_table_arrow(limit=10)
batch = reader.read()
if batch is not None:
df = batch.to_pandas()
print(df)
# Preview using the base client
stream_reader = client.preview_table(limit=10)
arrow_reader = ArrowReader(stream_reader)
batch = arrow_reader.read()
if batch is not None:
df = batch.to_pandas()
print(df)
You can specify partitions and columns for preview:
# Preview a specific partition
reader = arrow_client.preview_table_arrow(limit=10, partition="pt=20230101")
# Preview specific columns
reader = arrow_client.preview_table_arrow(limit=10, columns=["id", "name"])
Note
The preview feature only works with tables, not Instance clients. The number of rows returned by preview may not be exact. For production use cases requiring precise row counts, use create_read_session + read_rows_arrow instead.
Writing data
Complete write workflow
Writing data with Storage API V2 involves the following steps:
Create a write session
Create a write stream
Writing data
Close the write stream
Commit the write session
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient
arrow_client = StorageApiArrowClient(odps, table)
# 1. Create a write session
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
# 2. Create a write stream
stream_resp = arrow_client.create_write_stream(session_id, stream_id=0)
# 3. Write data
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("value", pa.float64()),
])
batch = pa.record_batch([
pa.array([1, 2, 3]),
pa.array(["Alice", "Bob", "Carol"]),
pa.array([100.0, 200.0, 150.0]),
], schema=schema)
writer = arrow_client.write_rows_arrow(
session_id, stream_id=0, record_count=3,
)
writer.write(batch)
commit_msg, success = writer.finish()
# 4. Close the write stream
arrow_client.close_write_stream(session_id, stream_id=0)
# 5. Commit the write session
arrow_client.commit_write_session(session_id)
Writing to partitioned tables
When writing to a partitioned table, specify the partition when creating the write session:
# Write to a specific partition
write_resp = arrow_client.create_write_session(
partial_partition_spec="pt=20230101"
)
To overwrite existing partition data, use the flags parameter:
# Overwrite the partition
write_resp = arrow_client.create_write_session(
partial_partition_spec="pt=20230101",
flags={"overwrite": True},
)
Parallel writing
A write session can create multiple write streams to support parallel writing:
from concurrent.futures import ThreadPoolExecutor
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
def write_stream(stream_id, data_batch):
# Create a write stream
arrow_client.create_write_stream(session_id, stream_id=stream_id)
# Write data
writer = arrow_client.write_rows_arrow(
session_id, stream_id=stream_id, record_count=len(data_batch),
)
writer.write(data_batch)
writer.finish()
# Close the write stream
arrow_client.close_write_stream(session_id, stream_id=stream_id)
# Write in parallel with multiple threads
with ThreadPoolExecutor(max_workers=3) as pool:
futures = [
pool.submit(write_stream, i, batch_data[i])
for i in range(3)
]
[f.result() for f in futures]
# Commit the write session
arrow_client.commit_write_session(session_id)
Writing data with raw byte streams
Use the write_rows_stream method of StorageApiClient to get a StreamWriter, then wrap it with ArrowWriter for writing:
from odps.apis.storage_api_v2 import StorageApiClient, ArrowWriter, Compression
client = StorageApiClient(odps, table)
write_resp = client.create_write_session()
session_id = write_resp.session_id
client.create_write_stream(session_id, stream_id=0)
stream_writer = client.write_rows_stream(
session_id, stream_id=0, record_count=3,
)
arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED)
arrow_writer.write(batch)
commit_msg, success = arrow_writer.finish()
client.close_write_stream(session_id, stream_id=0)
client.commit_write_session(session_id)
Exactly-Once write mode
Storage API V2 supports Exactly-Once semantics, ensuring idempotent write operations. When enabled, the server returns an access_token and row_offset to guarantee no duplicate data on retries:
# Create a write session
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
# Create a write stream with Exactly-Once mode
stream_resp = arrow_client.create_write_stream(session_id, stream_id=0)
access_token = stream_resp.access_token
# Pass access_token when writing data
writer = arrow_client.write_rows_arrow(
session_id, stream_id=0, record_count=3,
access_token=access_token,
)
writer.write(batch)
commit_msg, success = writer.finish()
# Get the write offset in Exactly-Once mode
write_stream_resp = writer.get_write_stream_response()
if write_stream_resp is not None:
print(f"Write offset: {write_stream_resp.exactly_once_row_offset}")
# Query write stream status to get access_token and row_offset
stream_status = arrow_client.get_write_stream(
session_id, stream_id=0, stream_version=0,
)
print(f"Latest schema version: {stream_status.latest_schema_version}")
print(f"Write offset: {stream_status.row_offset}")
Note
In Exactly-Once mode, when retrying a write stream, you must use the row_offset and access_token from the last successful write to ensure idempotency.
Aborting a write session
If an error occurs during writing, you can abort the write session to discard all uploaded data:
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
try:
# Write data...
pass
except Exception as e:
# Abort the session on error
arrow_client.abort_write_session(session_id)
raise
Querying write session status
Use the get_write_session method to check the status of streams in a write session:
status = arrow_client.get_write_session(session_id)
if status.streams:
for stream_info in status.streams:
print(f"Stream: {stream_info}")
You can also use the get_write_stream method to query the detailed status of an individual write stream:
stream_status = arrow_client.get_write_stream(
session_id, stream_id=0, stream_version=0,
)
print(f"Stream status: {stream_status.status}")
print(f"Records written: {stream_status.record_count}")
print(f"Bytes written: {stream_status.byte_size}")
For transactional (Delta) tables, you can specify stream IDs and versions when committing the write session:
arrow_client.commit_write_session(
session_id,
stream_ids=["stream-1", "stream-2"],
stream_versions=[1, 1],
)
Route tokens and session affinity
The server returns a route token when creating sessions and write streams. The client automatically stores and includes this token in subsequent requests, ensuring requests are routed to the same node for better performance:
# Route token is automatically extracted from responses and stored
write_resp = arrow_client.create_write_session()
print(f"Current route token: {arrow_client.route_token}")
# You can also manually specify the route_token parameter to override
arrow_client.commit_write_session(
session_id, route_token="your-route-token",
)
Blob data read/write
Storage API V2 supports reading and writing Blob data. Blob read/write applies to tables with BLOB columns (table format must be V2 with transactional mode enabled).
Writing Blob data
Use the write_blob_stream method to upload a single Blob in streaming mode:
# Write Blob data (no compression by default, MD5 checksum enabled)
blob_writer = arrow_client.write_blob_stream(
session_id, stream_id=0, partition_values=["pt=20230101"],
)
blob_writer.write(b"your blob data here")
response = blob_writer.finish()
print(f"Blob reference: {response.blob_reference}")
You can specify a compression algorithm when writing:
from odps.apis.storage_api_v2 import Compression
# Write Blob with ZSTD compression
blob_writer = arrow_client.write_blob_stream(
session_id, stream_id=0, partition_values=["pt=20230101"],
compression=Compression.ZSTD,
)
blob_writer.write(b"your blob data here")
response = blob_writer.finish()
Use the write_blob_batch method to upload multiple Blobs in batch:
from odps.apis.storage_api_v2 import BlobWriteItem, ChecksumType
# Create a list of BlobWriteItems
items = [
BlobWriteItem(
data=b"blob data 1",
partition_values=["pt=20230101"],
column_index=0,
mime_type="image/png",
checksum_type=ChecksumType.MD5,
distribution_key="key1",
),
BlobWriteItem(
data=b"blob data 2",
partition_values=["pt=20230101"],
column_index=0,
),
]
# Batch write
response = arrow_client.write_blob_batch(
items, session_id=session_id, stream_id=0,
)
print(f"Blob references: {response.blob_references}")
BlobWriteItem supports the distribution_key parameter to specify a distribution key. The checksum_type parameter supports the following checksum types:
ChecksumType.NONE– No verification (default)ChecksumType.CRC32– CRC32 checksumChecksumType.MD5– MD5 checksum
Reading Blob data
Use the read_blobs method to download data by Blob references. It returns a BlobDataIterator that yields (data_bytes, mime_type) tuples:
# Read Blobs in batch
iterator = arrow_client.read_blobs(blob_references=["ref1", "ref2"])
for data, mime_type in iterator:
print(f"Blob size: {len(data)}, MIME type: {mime_type}")
You can also pass stream=True to get a BlobStreamReader for streaming reads:
# Stream Blobs
stream_reader = arrow_client.read_blobs(
blob_references=["ref1", "ref2"], stream=True,
)
while stream_reader is not None:
chunk = stream_reader.read(4096) # Read 4KB at a time
if not chunk:
# Current Blob exhausted, move to the next one
stream_reader = stream_reader.next()
else:
# Process the data chunk
process_chunk(chunk)
print(f"Current blob MIME type: {stream_reader.mime_type}")
You can specify a compression algorithm when reading:
# Read Blobs compressed with ZSTD
iterator = arrow_client.read_blobs(
blob_references=["ref1", "ref2"],
compression=Compression.ZSTD,
)
Complete Blob read/write example
from odps.apis.storage_api_v2 import (
StorageApiArrowClient, BlobWriteItem, ChecksumType,
)
arrow_client = StorageApiArrowClient(odps, blob_table)
# Create write session and stream
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
arrow_client.create_write_stream(session_id, stream_id=0)
# Write Blob data
items = [
BlobWriteItem(data=b"hello world", partition_values=["pt=test"]),
BlobWriteItem(data=b"foo bar", partition_values=["pt=test"]),
]
write_resp = arrow_client.write_blob_batch(
items, session_id=session_id, stream_id=0,
)
blob_refs = write_resp.blob_references
# Close the write stream and commit the session
arrow_client.close_write_stream(session_id, stream_id=0)
arrow_client.commit_write_session(session_id)
# Read Blob data
iterator = arrow_client.read_blobs(blob_references=blob_refs)
for data, mime_type in iterator:
print(f"Data: {data!r}, MIME type: {mime_type}")
Compression options
Storage API V2 supports the following compression algorithms:
Compression.UNCOMPRESSED– No compression (default)Compression.ZSTD– ZSTD compression (requireszstandardlibrary)Compression.LZ4_FRAME– LZ4 frame format compression (requireslz4library)
By default, read/write operations use no compression (Compression.UNCOMPRESSED). You can specify a compression algorithm when reading and writing:
from odps.apis.storage_api_v2 import Compression
# Use ZSTD compression when writing
writer = arrow_client.write_rows_arrow(
session_id, stream_id=0, record_count=100,
compression=Compression.ZSTD,
)
# Use LZ4 frame decompression when reading
reader = arrow_client.read_rows_arrow(
session_id, split_index=0,
compression=Compression.LZ4_FRAME,
)
Data filtering
Storage API V2 supports specifying filter conditions via the filter_predicate parameter when creating a read session, reducing network data transfer:
# Use a filter predicate
read_resp = arrow_client.create_read_session(
filter_predicate="id > 100 AND name = 'test'",
)
If predicate pushdown fails, you can set filter_predicate_fallback=True to fall back to server-side filtering:
read_resp = arrow_client.create_read_session(
filter_predicate="id > 100",
filter_predicate_fallback=True,
)
You can also limit the maximum number of files per split with the split_max_file_num parameter when creating a read session:
read_resp = arrow_client.create_read_session(
split_max_file_num=1000,
)
Arrow format options
When creating a read session, you can control Arrow format precision via the arrow_options parameter:
from odps.apis.storage_api_v2 import ArrowOptions
# Set timestamp precision to microseconds
arrow_opts = ArrowOptions()
arrow_opts.timestamp_unit = ArrowOptions.TimestampUnit.MICRO
arrow_opts.date_time_unit = ArrowOptions.TimestampUnit.MILLI
read_resp = arrow_client.create_read_session(
arrow_options=arrow_opts,
)
Precision options supported by ArrowOptions:
TimestampUnit.SECOND– SecondsTimestampUnit.MILLI– MillisecondsTimestampUnit.MICRO– MicrosecondsTimestampUnit.NANO– Nanoseconds (default)
SplitOptions
When creating a read session, you can control how data is split via the split_options parameter:
from odps.apis.storage_api_v2 import SplitOptions
# Split by size (default), 256MB per split
split_opts = SplitOptions()
# Split by parallelism, create a specified number of splits
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
split_opts.split_number = 10
# Split by row offset
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.ROW_OFFSET
split_opts.split_number = 100000 # 100k rows per split
# Split by Bucket ID (for clustered tables)
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.BUCKET
# Whether to split across partitions (default is True)
split_opts = SplitOptions()
split_opts.cross_partition = False
Modes supported by SplitOptions.SplitMode:
SIZE– Split by data size (default),split_numberspecifies the size of each split in bytesPARALLELISM– Split by parallelism,split_numberspecifies the number of splitsROW_OFFSET– Split by row offset,split_numberspecifies the number of rows per splitBUCKET– Split by Bucket ID
Common scenarios
Reading an entire table as a pandas DataFrame
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient
arrow_client = StorageApiArrowClient(odps, table)
read_resp = arrow_client.create_read_session()
batches = []
for split_index in range(read_resp.splits_count):
reader = arrow_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
batches.append(batch)
df = pa.concat_batches(batches).to_pandas() if batches else None
Reading a specific partition from a partitioned table
read_resp = arrow_client.create_read_session(
required_partitions=["pt=20230101"],
)
for split_index in range(read_resp.splits_count):
reader = arrow_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
# Process the batch
Writing a pandas DataFrame to a table
import pandas as pd
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Carol"],
"value": [100.0, 200.0, 150.0],
})
batch = pa.RecordBatch.from_pandas(df)
arrow_client = StorageApiArrowClient(odps, table)
# Create a write session
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
# Create a write stream
arrow_client.create_write_stream(session_id, stream_id=0)
# Write data
writer = arrow_client.write_rows_arrow(
session_id, stream_id=0, record_count=len(df),
)
writer.write(batch)
writer.finish()
# Close the write stream and commit the session
arrow_client.close_write_stream(session_id, stream_id=0)
arrow_client.commit_write_session(session_id)
Reading SQL query results
instance = odps.execute_sql("SELECT * FROM your_table WHERE id > 100")
instance_client = StorageApiArrowClient(odps, instance)
read_resp = instance_client.create_read_session()
for split_index in range(read_resp.splits_count):
reader = instance_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
df = batch.to_pandas()
# Process query results
Multi-process parallel reading
from multiprocessing import Pool
read_resp = arrow_client.create_read_session()
def read_split(split_index):
# Each process creates its own client
from odps import ODPS
from odps.apis.storage_api_v2 import StorageApiArrowClient
local_odps = ODPS(...) # Initialize ODPS
local_client = StorageApiArrowClient(local_odps, table)
reader = local_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
batches = []
while True:
batch = reader.read()
if batch is None:
break
batches.append(batch.to_pandas())
return batches
if __name__ == "__main__":
with Pool(processes=read_resp.splits_count) as pool:
results = pool.map(read_split, range(read_resp.splits_count))
Session status reference
Read session status (SessionStatus)
Status |
Description |
|---|---|
INIT |
Session is initializing |
NORMAL |
Session is healthy, data can be read |
CRITICAL |
Session encountered a critical error |
EXPIRED |
Session has expired, needs refresh or recreation |
COMMITTING |
Write session is being committed |
COMMITTED |
Write session has been committed |
Stream status (Status)
Status |
Description |
|---|---|
INIT |
Stream is initializing |
OK |
Stream operation completed |
WAIT |
Waiting for data |
RUNNING |
Stream is running |
Reference
For the complete API reference, see Storage API V2.