Storage API V2

MaxCompute Storage API V2 is a high-throughput data read/write interface provided by MaxCompute. Compared to the Tunnel-based data channel, Storage API V2 offers finer-grained session management, support for Arrow and Blob format read/write, incremental reading, and table preview, making it suitable for large-scale parallel data read/write scenarios.

Note

Storage API V2 requires server-side support. Make sure the relevant features are enabled on your MaxCompute cluster.

Core concepts

The core concepts of Storage API V2 include:

  • Session: The transactional context for read/write operations. A read session manages data splits, while a write session ensures data atomicity.

  • Split: A read session divides data into multiple splits by size, parallelism, etc. Each split can be read independently, enabling parallel processing.

  • Stream: A data upload channel within a write session. A write session can create multiple streams to support parallel writes.

  • Compression: Supports UNCOMPRESSED (default), LZ4, and ZSTD compression algorithms to reduce network data transfer.

  • Route Token: A routing identifier returned by the server, used for session affinity to ensure subsequent requests are routed to the same node.

  • Exactly-Once Mode: Write streams support exactly-once semantics, achieving idempotent writes via access_token and row_offset.

Client initialization

To use Storage API V2, you need to create a StorageApiClient or StorageApiArrowClient instance. Both require an ODPS entry object and a table (or Instance) object.

from odps import ODPS
from odps.apis.storage_api_v2 import StorageApiClient, StorageApiArrowClient

# Initialize the ODPS entry object
odps = ODPS(
    access_id="your_access_id",
    secret_access_key="your_secret_access_key",
    project="your_project",
    endpoint="your_endpoint",
)

# Get the table object
table = odps.get_table("your_table")

# Create the base client (returns raw byte streams)
client = StorageApiClient(odps, table)

# Create the Arrow client (returns Arrow RecordBatches, recommended)
arrow_client = StorageApiArrowClient(odps, table)

StorageApiArrowClient inherits from StorageApiClient and provides additional read_rows_arrow, write_rows_arrow, and preview_table_arrow methods that directly read/write PyArrow RecordBatch objects, making it easy to integrate with pandas and other data processing frameworks. StorageApiArrowClient is recommended for most use cases.

You can also specify quota_name and rest_endpoint parameters to use a specific quota and custom endpoint:

client = StorageApiArrowClient(
    odps, table,
    quota_name="your_quota",
    rest_endpoint="https://your-custom-endpoint",
)

You can also set request tags via the tags parameter:

client = StorageApiArrowClient(
    odps, table,
    tags="tag1,tag2",
)

Reading data

Complete read workflow

Reading data with Storage API V2 involves the following steps:

  1. Create a read session

  2. Read data by split

  3. Close the read session (sessions expire automatically, no manual close needed)

Create a read session

Create a read session with the create_read_session() method. The read session determines how data is split, which columns and partitions are returned, etc.

from odps.apis.storage_api_v2 import StorageApiArrowClient

arrow_client = StorageApiArrowClient(odps, table)

# Create a read session with default split options
read_resp = arrow_client.create_read_session()
print(f"Session ID: {read_resp.session_id}")
print(f"Split count: {read_resp.splits_count}")
print(f"Total record count: {read_resp.record_count}")
print(f"Session status: {read_resp.session_status}")
print(f"Expiration time: {read_resp.expiration_time}")

When creating a read session, you can specify columns, partitions, and split options:

from odps.apis.storage_api_v2 import SplitOptions

# Read only specific columns and partitions
read_resp = arrow_client.create_read_session(
    required_data_columns=["id", "name", "value"],
    required_partitions=["pt=20230101"],
)

# Split by parallelism, create 10 splits
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
split_opts.split_number = 10
read_resp = arrow_client.create_read_session(
    split_options=split_opts,
)

# Split by row offset
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.ROW_OFFSET
split_opts.split_number = 1000000  # 1 million rows per split
read_resp = arrow_client.create_read_session(
    split_options=split_opts,
)

You can also specify partition columns and Bucket IDs when creating a read session:

# Read only specific partition columns
read_resp = arrow_client.create_read_session(
    required_partition_columns=["pt"],
)

# Read specific Buckets (for clustered tables)
read_resp = arrow_client.create_read_session(
    required_bucket_ids=["0", "1"],
)

Reading data in Arrow format

Use the read_rows_arrow method of StorageApiArrowClient to read data directly as Arrow RecordBatches:

import pyarrow as pa

read_resp = arrow_client.create_read_session()

# Iterate over all splits to read data
for split_index in range(read_resp.splits_count):
    reader = arrow_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        df = batch.to_pandas()
        # Process the DataFrame

Reading data with raw byte streams

Use the read_rows_stream method of StorageApiClient which returns a StreamReader for reading raw byte streams:

from odps.apis.storage_api_v2 import StorageApiClient, ArrowReader

client = StorageApiClient(odps, table)
read_resp = client.create_read_session()

reader = client.read_rows_stream(read_resp.session_id, split_index=0)
arrow_reader = ArrowReader(reader)
while True:
    batch = arrow_reader.read()
    if batch is None:
        break
    df = batch.to_pandas()
    # Process the DataFrame

Reading a specific range of data

You can read a specific range of data using the row_offset and row_count parameters:

# Read 500 rows starting from row 1000
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    row_offset=1000,
    row_count=500,
)

You can also control the number of rows per batch with max_batch_rows to manage memory usage:

# At most 1024 rows per batch, reducing memory usage
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    max_batch_rows=1024,
)

You can also control the raw byte size of each batch with max_batch_raw_size:

# Raw size per batch no more than 8MB
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    max_batch_raw_size=8 * 1024 * 1024,
)

When reading, you can also skip rows with skip_row_num, specify returned columns with data_columns, and specify the data format with data_format:

# Skip the first 100 rows
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    skip_row_num=100,
)

# Read only specific columns
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    data_columns=["id", "name"],
)

Parallel reading

The split mechanism of Storage API V2 natively supports parallel reading. Each split can be read independently, making it suitable for multi-threaded or multi-process scenarios:

from concurrent.futures import ThreadPoolExecutor

read_resp = arrow_client.create_read_session()

def read_split(split_index):
    reader = arrow_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    batches = []
    while True:
        batch = reader.read()
        if batch is None:
            break
        batches.append(batch)
    return pa.concat_batches(batches) if batches else None

# Read all splits in parallel using a thread pool
with ThreadPoolExecutor(max_workers=read_resp.splits_count) as pool:
    futures = [
        pool.submit(read_split, i)
        for i in range(read_resp.splits_count)
    ]
    results = [f.result() for f in futures]

Refreshing a read session

Read sessions have an expiration time. If a session expires during a long-running read, you can use the refresh parameter of get_read_session to extend the session lifetime:

from odps.apis.storage_api_v2 import SessionStatus

# Check session status
status = arrow_client.get_read_session(read_resp.session_id)
if status.session_status == SessionStatus.EXPIRED:
    # Refresh the session
    status = arrow_client.get_read_session(read_resp.session_id, refresh=True)

Incremental reading

Storage API V2 supports incremental reading mode to capture table data changes:

from odps.apis.storage_api_v2 import IncrementalReadOptions

incr_opts = IncrementalReadOptions()
incr_opts.start_version = 100
read_resp = arrow_client.create_read_session(
    incremental_read=True,
    incremental_read_options=incr_opts,
)
print(f"Latest version: {read_resp.latest_version}")

You can also perform incremental reading by timestamp range:

incr_opts.start_time_stamp = "2023-01-01 00:00:00"
incr_opts.end_time_stamp = "2023-01-02 00:00:00"
read_resp = arrow_client.create_read_session(
    incremental_read=True,
    incremental_read_options=incr_opts,
)

Reading data from a SQL Instance

Storage API V2 supports reading data from SQL execution results (Instance). In this case, the client operates in read-only mode:

# Execute SQL to get an Instance
instance = odps.execute_sql("SELECT * FROM your_table LIMIT 1000")

# Create a client with the Instance
instance_client = StorageApiArrowClient(odps, instance)
read_resp = instance_client.create_read_session()

# Read the results
for split_index in range(read_resp.splits_count):
    reader = instance_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        df = batch.to_pandas()

Note

Instance-based clients do not support write operations. Calling write methods will raise a ValueError.

Previewing table data

The preview feature provides a lightweight way to browse data without creating a session:

from odps.apis.storage_api_v2 import ArrowReader

# Preview using the Arrow client
reader = arrow_client.preview_table_arrow(limit=10)
batch = reader.read()
if batch is not None:
    df = batch.to_pandas()
    print(df)

# Preview using the base client
stream_reader = client.preview_table(limit=10)
arrow_reader = ArrowReader(stream_reader)
batch = arrow_reader.read()
if batch is not None:
    df = batch.to_pandas()
    print(df)

You can specify partitions and columns for preview:

# Preview a specific partition
reader = arrow_client.preview_table_arrow(limit=10, partition="pt=20230101")

# Preview specific columns
reader = arrow_client.preview_table_arrow(limit=10, columns=["id", "name"])

Note

The preview feature only works with tables, not Instance clients. The number of rows returned by preview may not be exact. For production use cases requiring precise row counts, use create_read_session + read_rows_arrow instead.

Writing data

Complete write workflow

Writing data with Storage API V2 involves the following steps:

  1. Create a write session

  2. Create a write stream

  3. Writing data

  4. Close the write stream

  5. Commit the write session

import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient

arrow_client = StorageApiArrowClient(odps, table)

# 1. Create a write session
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

# 2. Create a write stream
stream_resp = arrow_client.create_write_stream(session_id, stream_id=0)

# 3. Write data
schema = pa.schema([
    pa.field("id", pa.int64()),
    pa.field("name", pa.string()),
    pa.field("value", pa.float64()),
])
batch = pa.record_batch([
    pa.array([1, 2, 3]),
    pa.array(["Alice", "Bob", "Carol"]),
    pa.array([100.0, 200.0, 150.0]),
], schema=schema)

writer = arrow_client.write_rows_arrow(
    session_id, stream_id=0, record_count=3,
)
writer.write(batch)
commit_msg, success = writer.finish()

# 4. Close the write stream
arrow_client.close_write_stream(session_id, stream_id=0)

# 5. Commit the write session
arrow_client.commit_write_session(session_id)

Writing to partitioned tables

When writing to a partitioned table, specify the partition when creating the write session:

# Write to a specific partition
write_resp = arrow_client.create_write_session(
    partial_partition_spec="pt=20230101"
)

To overwrite existing partition data, use the flags parameter:

# Overwrite the partition
write_resp = arrow_client.create_write_session(
    partial_partition_spec="pt=20230101",
    flags={"overwrite": True},
)

Parallel writing

A write session can create multiple write streams to support parallel writing:

from concurrent.futures import ThreadPoolExecutor

write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

def write_stream(stream_id, data_batch):
    # Create a write stream
    arrow_client.create_write_stream(session_id, stream_id=stream_id)
    # Write data
    writer = arrow_client.write_rows_arrow(
        session_id, stream_id=stream_id, record_count=len(data_batch),
    )
    writer.write(data_batch)
    writer.finish()
    # Close the write stream
    arrow_client.close_write_stream(session_id, stream_id=stream_id)

# Write in parallel with multiple threads
with ThreadPoolExecutor(max_workers=3) as pool:
    futures = [
        pool.submit(write_stream, i, batch_data[i])
        for i in range(3)
    ]
    [f.result() for f in futures]

# Commit the write session
arrow_client.commit_write_session(session_id)

Writing data with raw byte streams

Use the write_rows_stream method of StorageApiClient to get a StreamWriter, then wrap it with ArrowWriter for writing:

from odps.apis.storage_api_v2 import StorageApiClient, ArrowWriter, Compression

client = StorageApiClient(odps, table)

write_resp = client.create_write_session()
session_id = write_resp.session_id
client.create_write_stream(session_id, stream_id=0)

stream_writer = client.write_rows_stream(
    session_id, stream_id=0, record_count=3,
)
arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED)
arrow_writer.write(batch)
commit_msg, success = arrow_writer.finish()

client.close_write_stream(session_id, stream_id=0)
client.commit_write_session(session_id)

Exactly-Once write mode

Storage API V2 supports Exactly-Once semantics, ensuring idempotent write operations. When enabled, the server returns an access_token and row_offset to guarantee no duplicate data on retries:

# Create a write session
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

# Create a write stream with Exactly-Once mode
stream_resp = arrow_client.create_write_stream(session_id, stream_id=0)
access_token = stream_resp.access_token

# Pass access_token when writing data
writer = arrow_client.write_rows_arrow(
    session_id, stream_id=0, record_count=3,
    access_token=access_token,
)
writer.write(batch)
commit_msg, success = writer.finish()

# Get the write offset in Exactly-Once mode
write_stream_resp = writer.get_write_stream_response()
if write_stream_resp is not None:
    print(f"Write offset: {write_stream_resp.exactly_once_row_offset}")

# Query write stream status to get access_token and row_offset
stream_status = arrow_client.get_write_stream(
    session_id, stream_id=0, stream_version=0,
)
print(f"Latest schema version: {stream_status.latest_schema_version}")
print(f"Write offset: {stream_status.row_offset}")

Note

In Exactly-Once mode, when retrying a write stream, you must use the row_offset and access_token from the last successful write to ensure idempotency.

Aborting a write session

If an error occurs during writing, you can abort the write session to discard all uploaded data:

write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

try:
    # Write data...
    pass
except Exception as e:
    # Abort the session on error
    arrow_client.abort_write_session(session_id)
    raise

Querying write session status

Use the get_write_session method to check the status of streams in a write session:

status = arrow_client.get_write_session(session_id)
if status.streams:
    for stream_info in status.streams:
        print(f"Stream: {stream_info}")

You can also use the get_write_stream method to query the detailed status of an individual write stream:

stream_status = arrow_client.get_write_stream(
    session_id, stream_id=0, stream_version=0,
)
print(f"Stream status: {stream_status.status}")
print(f"Records written: {stream_status.record_count}")
print(f"Bytes written: {stream_status.byte_size}")

For transactional (Delta) tables, you can specify stream IDs and versions when committing the write session:

arrow_client.commit_write_session(
    session_id,
    stream_ids=["stream-1", "stream-2"],
    stream_versions=[1, 1],
)

Route tokens and session affinity

The server returns a route token when creating sessions and write streams. The client automatically stores and includes this token in subsequent requests, ensuring requests are routed to the same node for better performance:

# Route token is automatically extracted from responses and stored
write_resp = arrow_client.create_write_session()
print(f"Current route token: {arrow_client.route_token}")

# You can also manually specify the route_token parameter to override
arrow_client.commit_write_session(
    session_id, route_token="your-route-token",
)

Blob data read/write

Storage API V2 supports reading and writing Blob data. Blob read/write applies to tables with BLOB columns (table format must be V2 with transactional mode enabled).

Writing Blob data

Use the write_blob_stream method to upload a single Blob in streaming mode:

# Write Blob data (no compression by default, MD5 checksum enabled)
blob_writer = arrow_client.write_blob_stream(
    session_id, stream_id=0, partition_values=["pt=20230101"],
)
blob_writer.write(b"your blob data here")
response = blob_writer.finish()
print(f"Blob reference: {response.blob_reference}")

You can specify a compression algorithm when writing:

from odps.apis.storage_api_v2 import Compression

# Write Blob with ZSTD compression
blob_writer = arrow_client.write_blob_stream(
    session_id, stream_id=0, partition_values=["pt=20230101"],
    compression=Compression.ZSTD,
)
blob_writer.write(b"your blob data here")
response = blob_writer.finish()

Use the write_blob_batch method to upload multiple Blobs in batch:

from odps.apis.storage_api_v2 import BlobWriteItem, ChecksumType

# Create a list of BlobWriteItems
items = [
    BlobWriteItem(
        data=b"blob data 1",
        partition_values=["pt=20230101"],
        column_index=0,
        mime_type="image/png",
        checksum_type=ChecksumType.MD5,
        distribution_key="key1",
    ),
    BlobWriteItem(
        data=b"blob data 2",
        partition_values=["pt=20230101"],
        column_index=0,
    ),
]

# Batch write
response = arrow_client.write_blob_batch(
    items, session_id=session_id, stream_id=0,
)
print(f"Blob references: {response.blob_references}")

BlobWriteItem supports the distribution_key parameter to specify a distribution key. The checksum_type parameter supports the following checksum types:

  • ChecksumType.NONE – No verification (default)

  • ChecksumType.CRC32 – CRC32 checksum

  • ChecksumType.MD5 – MD5 checksum

Reading Blob data

Use the read_blobs method to download data by Blob references. It returns a BlobDataIterator that yields (data_bytes, mime_type) tuples:

# Read Blobs in batch
iterator = arrow_client.read_blobs(blob_references=["ref1", "ref2"])
for data, mime_type in iterator:
    print(f"Blob size: {len(data)}, MIME type: {mime_type}")

You can also pass stream=True to get a BlobStreamReader for streaming reads:

# Stream Blobs
stream_reader = arrow_client.read_blobs(
    blob_references=["ref1", "ref2"], stream=True,
)
while stream_reader is not None:
    chunk = stream_reader.read(4096)  # Read 4KB at a time
    if not chunk:
        # Current Blob exhausted, move to the next one
        stream_reader = stream_reader.next()
    else:
        # Process the data chunk
        process_chunk(chunk)
        print(f"Current blob MIME type: {stream_reader.mime_type}")

You can specify a compression algorithm when reading:

# Read Blobs compressed with ZSTD
iterator = arrow_client.read_blobs(
    blob_references=["ref1", "ref2"],
    compression=Compression.ZSTD,
)

Complete Blob read/write example

from odps.apis.storage_api_v2 import (
    StorageApiArrowClient, BlobWriteItem, ChecksumType,
)

arrow_client = StorageApiArrowClient(odps, blob_table)

# Create write session and stream
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
arrow_client.create_write_stream(session_id, stream_id=0)

# Write Blob data
items = [
    BlobWriteItem(data=b"hello world", partition_values=["pt=test"]),
    BlobWriteItem(data=b"foo bar", partition_values=["pt=test"]),
]
write_resp = arrow_client.write_blob_batch(
    items, session_id=session_id, stream_id=0,
)
blob_refs = write_resp.blob_references

# Close the write stream and commit the session
arrow_client.close_write_stream(session_id, stream_id=0)
arrow_client.commit_write_session(session_id)

# Read Blob data
iterator = arrow_client.read_blobs(blob_references=blob_refs)
for data, mime_type in iterator:
    print(f"Data: {data!r}, MIME type: {mime_type}")

Compression options

Storage API V2 supports the following compression algorithms:

  • Compression.UNCOMPRESSED – No compression (default)

  • Compression.ZSTD – ZSTD compression (requires zstandard library)

  • Compression.LZ4_FRAME – LZ4 frame format compression (requires lz4 library)

By default, read/write operations use no compression (Compression.UNCOMPRESSED). You can specify a compression algorithm when reading and writing:

from odps.apis.storage_api_v2 import Compression

# Use ZSTD compression when writing
writer = arrow_client.write_rows_arrow(
    session_id, stream_id=0, record_count=100,
    compression=Compression.ZSTD,
)

# Use LZ4 frame decompression when reading
reader = arrow_client.read_rows_arrow(
    session_id, split_index=0,
    compression=Compression.LZ4_FRAME,
)

Data filtering

Storage API V2 supports specifying filter conditions via the filter_predicate parameter when creating a read session, reducing network data transfer:

# Use a filter predicate
read_resp = arrow_client.create_read_session(
    filter_predicate="id > 100 AND name = 'test'",
)

If predicate pushdown fails, you can set filter_predicate_fallback=True to fall back to server-side filtering:

read_resp = arrow_client.create_read_session(
    filter_predicate="id > 100",
    filter_predicate_fallback=True,
)

You can also limit the maximum number of files per split with the split_max_file_num parameter when creating a read session:

read_resp = arrow_client.create_read_session(
    split_max_file_num=1000,
)

Arrow format options

When creating a read session, you can control Arrow format precision via the arrow_options parameter:

from odps.apis.storage_api_v2 import ArrowOptions

# Set timestamp precision to microseconds
arrow_opts = ArrowOptions()
arrow_opts.timestamp_unit = ArrowOptions.TimestampUnit.MICRO
arrow_opts.date_time_unit = ArrowOptions.TimestampUnit.MILLI

read_resp = arrow_client.create_read_session(
    arrow_options=arrow_opts,
)

Precision options supported by ArrowOptions:

  • TimestampUnit.SECOND – Seconds

  • TimestampUnit.MILLI – Milliseconds

  • TimestampUnit.MICRO – Microseconds

  • TimestampUnit.NANO – Nanoseconds (default)

SplitOptions

When creating a read session, you can control how data is split via the split_options parameter:

from odps.apis.storage_api_v2 import SplitOptions

# Split by size (default), 256MB per split
split_opts = SplitOptions()

# Split by parallelism, create a specified number of splits
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
split_opts.split_number = 10

# Split by row offset
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.ROW_OFFSET
split_opts.split_number = 100000  # 100k rows per split

# Split by Bucket ID (for clustered tables)
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.BUCKET

# Whether to split across partitions (default is True)
split_opts = SplitOptions()
split_opts.cross_partition = False

Modes supported by SplitOptions.SplitMode:

  • SIZE – Split by data size (default), split_number specifies the size of each split in bytes

  • PARALLELISM – Split by parallelism, split_number specifies the number of splits

  • ROW_OFFSET – Split by row offset, split_number specifies the number of rows per split

  • BUCKET – Split by Bucket ID

Common scenarios

Reading an entire table as a pandas DataFrame

import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient

arrow_client = StorageApiArrowClient(odps, table)
read_resp = arrow_client.create_read_session()

batches = []
for split_index in range(read_resp.splits_count):
    reader = arrow_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        batches.append(batch)

df = pa.concat_batches(batches).to_pandas() if batches else None

Reading a specific partition from a partitioned table

read_resp = arrow_client.create_read_session(
    required_partitions=["pt=20230101"],
)

for split_index in range(read_resp.splits_count):
    reader = arrow_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        # Process the batch

Writing a pandas DataFrame to a table

import pandas as pd
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient

df = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Carol"],
    "value": [100.0, 200.0, 150.0],
})
batch = pa.RecordBatch.from_pandas(df)

arrow_client = StorageApiArrowClient(odps, table)

# Create a write session
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

# Create a write stream
arrow_client.create_write_stream(session_id, stream_id=0)

# Write data
writer = arrow_client.write_rows_arrow(
    session_id, stream_id=0, record_count=len(df),
)
writer.write(batch)
writer.finish()

# Close the write stream and commit the session
arrow_client.close_write_stream(session_id, stream_id=0)
arrow_client.commit_write_session(session_id)

Reading SQL query results

instance = odps.execute_sql("SELECT * FROM your_table WHERE id > 100")
instance_client = StorageApiArrowClient(odps, instance)

read_resp = instance_client.create_read_session()
for split_index in range(read_resp.splits_count):
    reader = instance_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        df = batch.to_pandas()
        # Process query results

Multi-process parallel reading

from multiprocessing import Pool

read_resp = arrow_client.create_read_session()

def read_split(split_index):
    # Each process creates its own client
    from odps import ODPS
    from odps.apis.storage_api_v2 import StorageApiArrowClient
    local_odps = ODPS(...)  # Initialize ODPS
    local_client = StorageApiArrowClient(local_odps, table)
    reader = local_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    batches = []
    while True:
        batch = reader.read()
        if batch is None:
            break
        batches.append(batch.to_pandas())
    return batches

if __name__ == "__main__":
    with Pool(processes=read_resp.splits_count) as pool:
        results = pool.map(read_split, range(read_resp.splits_count))

Session status reference

Read session status (SessionStatus)

Status

Description

INIT

Session is initializing

NORMAL

Session is healthy, data can be read

CRITICAL

Session encountered a critical error

EXPIRED

Session has expired, needs refresh or recreation

COMMITTING

Write session is being committed

COMMITTED

Write session has been committed

Stream status (Status)

Status

Description

INIT

Stream is initializing

OK

Stream operation completed

WAIT

Waiting for data

RUNNING

Stream is running

Reference

For the complete API reference, see Storage API V2.