Storage API V2

class odps.apis.storage_api_v2.StorageApiClient(odps: ODPS, table_or_instance=None, rest_endpoint: str = None, quota_name: str = None, tags: None | str | List[str] = None)[source]

Client for the MaxCompute Storage API V2.

read_blobs(blob_references=None, compression=None, stream=False)[source]

Download binary blobs by their blob references.

This method retrieves blob data uploaded via write_blob_stream or write_blob_batch by providing the blob references returned during upload. The data is returned as an iterator that yields tuples of (data_bytes, mime_type), handling protocol framing and CRC stripping automatically.

Parameters:
  • blob_references (list of str or bytes, optional) – List of blob references obtained from previous upload operations (write_blob_stream or write_blob_batch). References may be UTF-8 strings or raw bytes and will be decoded automatically if needed.

  • compression (Compression, default None) – Compression algorithm for the response data. None means Compression.UNCOMPRESSED.

  • stream (bool, optional) – If True, return a BlobStreamReader instead of BlobDataIterator. The reader provides file-like read() access per blob, a mime_type property, and a next() method to advance to the next blob. Default is False.

Returns:

When stream is False (default), returns an iterator yielding (data_bytes, mime_type) tuples per blob. When stream is True, returns a BlobStreamReader for incremental, file-like reading of each blob.

Return type:

BlobDataIterator or BlobStreamReader

See also

write_blob_batch

Upload blobs and get references.

write_blob_stream

Upload single blob and get reference.

BlobWriteItem

Helper for batch uploads.

Notes

The download stream uses a multi-layer protocol: 1. CRC32C checksums are embedded every 4096 bytes (stripped automatically) 2. Protocol framing wraps each blob (parsed automatically)

For single blob downloads, the server may omit framing and send raw decompressed data directly. The iterator automatically detects this case and returns the entire payload as one blob.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Read blobs using references obtained from previous uploads. First upload blobs, then read them back using the references:

>>> # Upload blobs (from earlier write_blob_batch example)
>>> blob_refs = ['blob_ref_001', 'blob_ref_002', 'blob_ref_003']
>>> # Read the blobs back using references
>>> blob_iterator = client.read_blobs(blob_references=blob_refs)
>>> for data, mime_type in blob_iterator:
...     print(f"Blob size: {len(data)} bytes, MIME type: {mime_type}")
...     # Save blob to file
...     filename = f"blob_{blob_refs[i]}"
...     with open(filename, 'wb') as f:
...         f.write(data)
Blob size: 1024 bytes, MIME type: image/jpeg
Blob size: 2048 bytes, MIME type: image/png
Blob size: 512 bytes, MIME type: application/json

Download a single blob by passing a single-element reference list. The iterator yields one (data, mime_type) tuple:

>>> single_ref = ['blob_ref_abc123']
>>> blob_iterator = client.read_blobs(blob_references=single_ref)
>>> data, mime_type = next(blob_iterator)
>>> print(f"Downloaded {len(data)} bytes")
Downloaded 524288 bytes
>>> # Process the blob data
>>> if mime_type == 'image/jpeg':
...     # Process as JPEG image
...     process_image(data)

Download all blobs from a batch upload by storing references during upload and using them to read later:

>>> # Earlier: upload batch
>>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0)
>>> all_refs = batch_resp.blob_references  # Save these
>>> # Later: read all blobs from the batch
>>> blob_iterator = client.read_blobs(blob_references=all_refs)
>>> downloaded_blobs = []
>>> for data, mime_type in blob_iterator:
...     downloaded_blobs.append({
...         'data': data,
...         'mime_type': mime_type,
...         'size': len(data)
...     })
>>> print(f"Downloaded {len(downloaded_blobs)} blobs")
Downloaded 10 blobs

Process blobs incrementally without loading all into memory by iterating and processing one at a time:

>>> blob_iterator = client.read_blobs(blob_references=large_blob_refs)
>>> for i, (data, mime_type) in enumerate(blob_iterator):
...     # Process each blob and discard data after processing
...     result = analyze_blob(data)
...     print(f"Blob {i}: {result}")
...     # data is freed after this iteration
Blob 0: Analysis complete
Blob 1: Analysis complete

Check MIME type to determine how to handle blob content when the type was set during upload:

>>> blob_iterator = client.read_blobs(blob_references=blob_refs)
>>> for data, mime_type in blob_iterator:
...     if mime_type == 'application/json':
...         # Parse as JSON
...         import json
...         json_obj = json.loads(data.decode('utf-8'))
...         process_json(json_obj)
...     elif mime_type == 'image/jpeg':
...         # Process as image
...         display_image(data)
...     elif mime_type is None:
...         # No type hint, use generic binary handling
...         process_binary(data)
...     else:
...         print(f"Unknown MIME type: {mime_type}")

Convert blob iterator to list for multiple iterations or indexing, though this loads all data into memory:

>>> blob_iterator = client.read_blobs(blob_references=blob_refs)
>>> all_blobs = list(blob_iterator)  # [(data1, mime1), (data2, mime2), ...]
>>> # Now can access by index
>>> first_blob_data = all_blobs[0][0]
>>> first_blob_mime = all_blobs[0][1]
>>> # Can iterate multiple times
>>> for data, mime_type in all_blobs:
...     process_blob(data)
>>> for data, mime_type in all_blobs:
...     validate_blob(data)

Handle empty blob reference list by checking iterator length or attempting to iterate:

>>> empty_refs = []
>>> blob_iterator = client.read_blobs(blob_references=empty_refs)
>>> count = 0
>>> for _ in blob_iterator:
...     count += 1
>>> print(f"Downloaded {count} blobs")
Downloaded 0 blobs

Use blob references from a table query to download specific blobs referenced in table rows:

>>> # Query table to get blob references
>>> instance = odps.execute_sql("SELECT blob_ref FROM your_table WHERE id=1")
>>> with instance.open_reader() as reader:
...     for record in reader:
...         blob_ref = record[0]  # blob_ref column
...         # Download the referenced blob
...         blob_iterator = client.read_blobs(blob_references=[blob_ref])
...         data, mime_type = next(blob_iterator)
...         print(f"Downloaded blob {blob_ref}: {len(data)} bytes")

Stream blobs incrementally using file-like read interface with stream=True:

>>> reader = client.read_blobs(blob_references=refs, stream=True)
>>> while reader is not None:
...     print(f"MIME: {reader.mime_type}")
...     chunk = reader.read(4096)
...     while chunk:
...         process(chunk)
...         chunk = reader.read(4096)
...     reader = reader.next()
abort_write_session(session_id: str, route_token=None)[source]

Abort a write session to discard all uploaded data.

This method cancels an active write session and discards all data uploaded through its streams. Use abort when you encounter errors during the upload process or when you want to cancel the operation without making the data visible in the table. Aborting releases all resources associated with the session.

Parameters:

session_id (str) – The unique identifier of the write session to abort. This can be any active or expired session.

Returns:

Method returns None after aborting the session. All uploaded data is discarded and session resources are released.

Return type:

None

Raises:

ValueError – If session_id is None or empty, or if called on an instance-based client.

See also

create_write_session

Create a write session.

commit_write_session

Commit the session instead of aborting.

get_write_session

Check session status before aborting.

Notes

After aborting, the session cannot be committed. All data uploaded is permanently discarded. Call abort as soon as you detect errors to release resources quickly. It’s safe to abort a session multiple times if the first abort attempt fails.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Abort a session when an error occurs during data upload to prevent partial data from being committed:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> try:
...     # Create stream and write data
...     stream_resp = client.create_write_stream(...)
...     # Write data that fails due to schema mismatch
...     writer = client.write_rows_stream(...)
...     writer.write(invalid_data)
...     writer.finish()
... except Exception as e:
...     print(f"Upload failed: {e}")
...     # Abort to discard all uploaded data
...     client.abort_write_session(session_id)
...     print("Session aborted, no data committed to table")
Upload failed: Schema mismatch error
Session aborted, no data committed to table

Use abort in cleanup code to ensure resources are released even if commit never happens:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> try:
...     # Upload data...
...     # Commit if everything succeeds
...     client.commit_write_session(session_id)
... finally:
...     # If commit didn't happen (exception or early return),
...     # ensure session is aborted to release resources
...     try:
...         status = client.get_write_session(session_id)
...         if status.streams is None or len(status.streams) == 0:
...             client.abort_write_session(session_id)
...     except:
...         # Session might already be committed or aborted
...         pass

Abort a session after detecting validation errors in the data before attempting to commit:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Write data to streams
>>> writer = client.write_rows_stream(...)
>>> # Validate the data locally
>>> if data_has_errors:
...     print("Data validation failed, aborting session")
...     client.abort_write_session(session_id)
... else:
...     # Data is valid, proceed with commit
...     client.close_write_stream(...)
...     client.commit_write_session(session_id)
close_write_stream(session_id=None, stream_id=None, stream_version=0, route_token=None) CloseWriteStreamResponse[source]

Close a write stream to finalize the data upload for that stream.

After finishing the write operation via the stream writer, call this method to formally close the stream. This signals to the server that no more data will be uploaded to this stream and marks the stream as ready for the session commit. All streams must be closed before the session can be committed.

Parameters:
  • session_id (str) – The write session identifier.

  • stream_id (str or int) – The stream identifier to close.

  • stream_version (int, default 0) – Version of the stream (should match create_write_stream).

Returns:

Response with warning_message and request_id. See CloseWriteStreamResponse.

Return type:

CloseWriteStreamResponse

Raises:

ValueError – If called on an instance-based client.

See also

write_rows_stream

Finish writing before closing the stream.

create_write_stream

Create the stream before writing.

commit_write_session

Commit session after all streams closed.

get_write_stream

Verify stream is closed.

Notes

The stream writer’s finish() method and close_write_stream serve different purposes: finish() completes the data upload, while close_write_stream formally closes the stream on the server side. Both must be called in sequence: finish the writer, then close the stream. After closing, the stream cannot accept more data.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowWriter, Compression
... )
>>> import pyarrow as pa
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Complete workflow showing the relationship between writer finish() and stream close. First finish the writer, then close the stream:

>>> # 1. Create write session and stream
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 2. Write Arrow data
>>> batch = pa.record_batch([...], schema=schema)
>>> stream_writer = client.write_rows_stream(session_id, stream_id=0, record_count=3)
>>> arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED)
>>> arrow_writer.write(batch)
>>> # 3. Finish the writer to complete data upload
>>> commit_msg, success = arrow_writer.finish()
>>> print(f"Writer finished: {success}")
Writer finished: True
>>> # 4. Close the stream to mark it ready for commit
>>> close_resp = client.close_write_stream(session_id, stream_id=0)
>>> if close_resp.warning_message:
...     print(f"Warning: {close_resp.warning_message}")
>>> # 5. Commit the session to make data visible
>>> client.commit_write_session(session_id)

Close multiple parallel streams after each finishes uploading. All streams must be closed before committing the session:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Create 3 parallel streams
>>> for stream_id in range(3):
...     # Create stream, write data, finish writer
...     client.create_write_stream(session_id, stream_id=stream_id)
...     writer = ArrowWriter(
...         client.write_rows_stream(session_id, stream_id=stream_id),
...         Compression.UNCOMPRESSED
...     )
...     writer.write(batch)
...     writer.finish()
>>> # Close all 3 streams
>>> for stream_id in range(3):
...     close_resp = client.close_write_stream(session_id, stream_id=stream_id)
...     print(f"Stream {stream_id} closed")
Stream 0 closed
Stream 1 closed
Stream 2 closed
>>> # Now commit the session
>>> client.commit_write_session(session_id)

Check for warnings when closing streams to detect potential issues that may affect the commit:

>>> close_resp = client.close_write_stream(session_id, stream_id=0)
>>> if close_resp.warning_message:
...     print(f"Stream close warning: {close_resp.warning_message}")
...     # Decide whether to proceed with commit or abort
...     if "data_incomplete" in close_resp.warning_message:
...         client.abort_write_session(session_id)
...     else:
...         # Warning is informational, proceed with commit
...         client.commit_write_session(session_id)
... else:
...     client.commit_write_session(session_id)

Use get_write_stream to verify a stream is properly closed before attempting to commit the session:

>>> # Close stream
>>> client.close_write_stream(session_id, stream_id=0)
>>> # Verify it's closed
>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0)
>>> if status.status == 'CLOSED':
...     print("Stream properly closed, safe to commit")
...     client.commit_write_session(session_id)
... else:
...     print(f"Unexpected stream status: {status.status}")
Stream properly closed, safe to commit

Handle the case where close fails due to stream errors by checking the warning message and potentially aborting:

>>> try:
...     close_resp = client.close_write_stream(session_id, stream_id=0)
...     if close_resp.warning_message and "error" in close_resp.warning_message.lower():
...         print(f"Stream had errors: {close_resp.warning_message}")
...         # Abort session instead of committing incomplete data
...         client.abort_write_session(session_id)
... except Exception as e:
...     print(f"Failed to close stream: {e}")
...     client.abort_write_session(session_id)
commit_write_session(session_id: str, stream_ids=None, stream_versions=None, route_token=None)[source]

Commit a write session to finalize all uploaded data.

This is the final step in the write workflow. Calling commit atomically makes all data uploaded through the session’s write streams visible in the table. The commit operation ensures that either all writes succeed (transaction committed) or none of them appear in the table (transaction aborted).

Parameters:
  • session_id (str) – The unique identifier of the write session to commit. All write streams in this session must be closed before committing.

  • stream_ids (list of str, optional) – List of stream identifiers to commit. Used for transactional/delta tables to specify which streams to include in the commit. When provided, stream_versions must also be provided with matching length.

  • stream_versions (list of int, optional) – List of stream version numbers corresponding to stream_ids. Must be provided together with stream_ids and must have the same length.

Returns:

Method returns None on successful commit. The commit operation makes all uploaded data visible in the table atomically.

Return type:

None

Raises:
  • ValueError – If session_id is None or empty, or if called on an instance-based client.

  • errors.ODPSError – If any write streams are still open or if the session has expired.

See also

create_write_session

Create a write session.

close_write_stream

Close all streams before committing.

abort_write_session

Abort instead of commit to discard writes.

get_write_session

Verify all streams are closed.

Notes

Before committing, ensure all write streams are closed. Use get_write_session to verify stream states. If commit fails, call abort_write_session to clean up resources.

Commit is a synchronous operation. Once it returns successfully, the data is immediately visible in the table and cannot be rolled back.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowWriter, Compression
... )
>>> import pyarrow as pa
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Complete write workflow: create session, create stream, write Arrow data, close stream, and commit:

>>> # 1. Create write session
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # 2. Create write stream
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 3. Write data using Arrow writer
>>> schema = pa.schema([
...     pa.field("id", pa.int64()),
...     pa.field("name", pa.string()),
... ])
>>> batch = pa.record_batch([
...     pa.array([1, 2, 3]),
...     pa.array(["Alice", "Bob", "Carol"]),
... ], schema=schema)
>>> writer = ArrowWriter(
...     client.write_rows_stream(session_id, stream_id=0, record_count=3),
...     Compression.UNCOMPRESSED
... )
>>> writer.write(batch)
>>> commit_msg, success = writer.finish()
>>> # 4. Close the write stream
>>> client.close_write_stream(session_id, stream_id=0)
>>> # 5. Commit the session to finalize all writes
>>> client.commit_write_session(session_id)
>>> print("Data successfully uploaded to table")
Data successfully uploaded to table

For transactional/delta tables, specify stream_ids and stream_versions when committing:

>>> client.commit_write_session(
...     session_id,
...     stream_ids=["stream-1"],
...     stream_versions=[1],
... )

For multi-stream uploads, ensure all streams are closed before committing. Use get_write_session to verify:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Create multiple streams and write data to each
>>> for stream_id in range(3):
...     # Create stream, write data, close stream...
...     pass
>>> # Verify all streams are closed
>>> status = client.get_write_session(session_id)
>>> all_closed = all(
...     s.get('Status') == 'CLOSED'
...     for s in (status.streams or [])
... )
>>> if all_closed:
...     # Commit session to make data visible
...     client.commit_write_session(session_id)
...     print("Session committed successfully")
Session committed successfully

If commit fails due to errors, abort the session to clean up:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> try:
...     # Write data...
...     client.commit_write_session(session_id)
... except Exception as e:
...     print(f"Commit failed: {e}")
...     # Abort to discard all writes and release resources
...     client.abort_write_session(session_id)
Commit failed: [error message]
create_read_session(required_data_columns=None, required_partition_columns=None, required_partitions=None, required_bucket_ids=None, split_options=None, arrow_options=None, filter_predicate=None, filter_predicate_fallback=None, split_max_file_num=None, incremental_read=None, incremental_read_options=None) CreateReadSessionResponse[source]

Create a read session for table or instance data retrieval.

A read session is a prerequisite for reading data from a MaxCompute table or SQL instance result. The session determines how data will be split into readable chunks and what data schema will be returned. Sessions have an expiration time and must be refreshed if they expire during long-running read operations.

Parameters:
  • required_data_columns (list of str, optional) – List of column names to read. If empty, all columns are returned.

  • required_partition_columns (list of str, optional) – Partition columns to include in the result.

  • required_partitions (list of str, optional) – Specific partition values to read (e.g., [‘pt=20230101’]).

  • required_bucket_ids (list of str, optional) – Bucket IDs to read for bucket-based tables.

  • split_options (SplitOptions, optional) – Controls how data is split into chunks. Defaults to size-based splitting with 256MB chunks.

  • arrow_options (ArrowOptions, optional) – Arrow format settings like timestamp precision.

  • filter_predicate (str, optional) – SQL-like filter condition to apply during reading.

  • filter_predicate_fallback (bool, optional) – Whether to fallback to server-side filtering if predicate pushdown fails.

  • split_max_file_num (int, optional) – Maximum number of files per split for file-based splitting.

  • incremental_read (bool, optional) – Enable incremental reading mode for capturing table changes.

  • incremental_read_options (IncrementalReadOptions, optional) – Options for incremental read mode (version range, timestamp range).

Returns:

Response with session_id, session_status, splits_count, record_count, data_schema, and expiration_time. See CreateReadSessionResponse.

Return type:

CreateReadSessionResponse

Raises:

ValueError – If the table or instance is not properly configured.

See also

get_read_session

Get current read session status.

read_rows_stream

Read data from a specific split.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, SplitOptions
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Create a basic read session that reads all columns with default splitting:

>>> response = client.create_read_session()
>>> print(f"Session ID: {response.session_id}")
Session ID: session_12345
>>> print(f"Available splits: {response.splits_count}")
Available splits: 5

Specify which columns to read and how to split the data. Use split_options to control parallelism by setting the split number instead of split size:

>>> split_opts = SplitOptions()
>>> split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
>>> split_opts.split_number = 10  # Create 10 splits
>>> response = client.create_read_session(
...     required_data_columns=["id", "name", "value"],
...     split_options=split_opts,
... )
>>> print(f"Created {response.splits_count} splits")
Created 10 splits

Read from a specific partition by providing partition values in the required_partitions parameter:

>>> response = client.create_read_session(
...     required_partitions=["pt=20230101", "region=us-west"]
... )
>>> print(f"Records in partition: {response.record_count}")
Records in partition: 1000

For incremental data capture, enable incremental_read to read changes since a specific version or timestamp:

>>> from odps.apis.storage_api_v2 import IncrementalReadOptions
>>> incr_opts = IncrementalReadOptions()
>>> incr_opts.start_version = 100
>>> response = client.create_read_session(
...     incremental_read=True,
...     incremental_read_options=incr_opts,
... )
>>> print(f"Latest version: {response.latest_version}")
Latest version: 150
create_write_session(partial_partition_spec=None, flags=None) CreateWriteSessionResponse[source]

Create a write session for uploading data to a table.

A write session is the first step in the data upload workflow. It establishes a transactional context for writing data to a MaxCompute table, ensuring atomicity and consistency. After creating a session, you must create one or more write streams within it, upload data to those streams, close the streams, and finally commit the session.

Parameters:
  • partial_partition_spec (str, optional) – Partition specification for writing to a specific partition. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’. If empty, writes to the table’s default location.

  • flags (dict, optional) – Additional flags for session configuration. Common flags include ‘overwrite’ to replace existing partition data.

Returns:

Response with session_id, warning_message, and request_id. See CreateWriteSessionResponse.

Return type:

CreateWriteSessionResponse

Raises:

ValueError – If called on an instance-based client (writes only support tables).

See also

create_write_stream

Create a stream within the session.

commit_write_session

Commit the session to finalize writes.

abort_write_session

Abort the session to discard all writes.

Notes

Write sessions are transactional. All writes within a session are committed atomically when commit_write_session is called. If the session is aborted, all uploaded data is discarded. Sessions have a limited lifetime and must be committed before expiration.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Create a basic write session for uploading data to the table’s default location:

>>> response = client.create_write_session()
>>> session_id = response.session_id
>>> print(f"Write session ID: {session_id}")
Write session ID: write_session_12345

Write to a specific partition by specifying the partition spec. This is required for partitioned tables unless writing to the default partition:

>>> response = client.create_write_session(
...     partial_partition_spec="pt=20230101"
... )
>>> session_id = response.session_id
>>> print(f"Writing to partition pt=20230101")

Use the ‘overwrite’ flag to replace existing data in a partition instead of appending:

>>> response = client.create_write_session(
...     partial_partition_spec="pt=20230101",
...     flags={"overwrite": True}
... )
>>> session_id = response.session_id
>>> # After committing, existing partition data will be replaced

Complete write workflow: create session, create stream, write data, close stream, and commit:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Create write stream (see create_write_stream example)
>>> # Write data (see write_rows_stream example)
>>> # Close stream (see close_write_stream example)
>>> # Commit session to finalize
>>> client.commit_write_session(session_id)

If an error occurs during the write process, abort the session to discard all uploaded data and release resources:

>>> session_resp = client.create_write_session()
>>> try:
...     # Write data...
...     # If error occurs:
...     client.abort_write_session(session_resp.session_id)
... except Exception as e:
...     client.abort_write_session(session_resp.session_id)
...     raise
create_write_stream(session_id=None, stream_id=None, stream_version=0, route_token=None) CreateWriteStreamResponse[source]

Create a write stream within an active write session.

A write stream is a data upload channel within a write session. You can create multiple streams in parallel to upload data concurrently, improving throughput for large datasets. Each stream must be closed individually after writing data, and the session must be committed after all streams are closed.

Parameters:
  • session_id (str) – The write session identifier from create_write_session.

  • stream_id (str or int) – Unique identifier for this stream within the session. Typically an integer (0, 1, 2, …) for parallel streams.

  • stream_version (int, default 0) – Version number for the stream. Increment if retrying after a failed upload to the same stream_id.

Returns:

Response with data_schema, table_id, schema_version, and route_token. See CreateWriteStreamResponse.

Return type:

CreateWriteStreamResponse

Raises:

ValueError – If called on an instance-based client.

See also

create_write_session

Create the parent write session first.

write_rows_stream

Write data to the created stream.

close_write_stream

Close the stream after writing.

get_write_stream

Get stream status.

Notes

Each stream can be written to independently, enabling parallel uploads from multiple workers. The stream_version allows retrying failed uploads without creating a new session. After a stream is closed, it cannot be reopened with the same stream_id and version.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

First create a write session, then create a single stream for sequential data upload:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> print(f"Stream created, schema: {stream_resp.data_schema}")
Stream created, schema: {'Columns': [...]}

For parallel upload from multiple workers, create multiple streams with different stream_ids. Each worker processes one stream:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Worker 1 creates stream 0
>>> stream_resp0 = client.create_write_stream(session_id, stream_id=0)
>>> # Worker 2 creates stream 1
>>> stream_resp1 = client.create_write_stream(session_id, stream_id=1)
>>> # Worker 3 creates stream 2
>>> stream_resp2 = client.create_write_stream(session_id, stream_id=2)
>>> print(f"Created 3 parallel streams")

Use stream_version to retry failed uploads. If a stream fails, increment the version to create a new stream with the same ID:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0, stream_version=0)
>>> # Attempt to write data (fails)
>>> try:
...     writer = client.write_rows_stream(...)
...     writer.write(data)
... except Exception:
...     # Retry with incremented version
...     retry_resp = client.create_write_stream(
...         session_id, stream_id=0, stream_version=1
...     )
...     writer = client.write_rows_stream(...)
...     writer.write(data)

Check the data_schema before writing to ensure your data matches the expected table schema:

>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> schema = stream_resp.data_schema
>>> if schema:
...     columns = schema.get('Columns', [])
...     for col in columns:
...         print(f"Column: {col.get('Name')}, Type: {col.get('Type')}")
Column: id, Type: bigint
Column: name, Type: string
get_read_session(session_id: str, refresh: bool = False) CreateReadSessionResponse[source]

Get current status and metadata of an existing read session.

This method retrieves the current state of a read session, including its status, split count, schema, and expiration time. Use this to check if a session is still valid before attempting to read data, or to refresh a session that is approaching expiration.

Parameters:
  • session_id (str) – The unique identifier of the read session to retrieve. This is obtained from create_read_session response.

  • refresh (bool, default False) – Whether to refresh the session expiration time. Set to True if the session is about to expire or has expired. When True, the server extends the session lifetime, allowing continued reading operations.

Returns:

Response with session_id, session_status, splits_count, record_count, data_schema, and expiration_time. See CreateReadSessionResponse.

Return type:

CreateReadSessionResponse

Raises:

ValueError – If session_id is None or empty.

See also

create_read_session

Create a new read session.

read_rows_stream

Read data from a specific split.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

First create a read session, then check its status:

>>> create_response = client.create_read_session()
>>> session_id = create_response.session_id
>>> status_response = client.get_read_session(session_id)
>>> print(f"Session status: {status_response.session_status}")
Session status: NORMAL
>>> print(f"Expiration: {status_response.expiration_time}")
Expiration: 2023-01-01T12:00:00Z

When a session expires during a long-running read operation, use the refresh parameter to extend its lifetime:

>>> # Check if session expired
>>> response = client.get_read_session(session_id)
>>> if response.session_status == SessionStatus.EXPIRED:
...     # Refresh the session to continue reading
...     refreshed_response = client.get_read_session(session_id, refresh=True)
...     print(f"New expiration: {refreshed_response.expiration_time}")
New expiration: 2023-01-01T14:00:00Z

Monitor session status during a parallel read operation to ensure all splits are processed before expiration:

>>> create_response = client.create_read_session()
>>> for split_index in range(create_response.splits_count):
...     # Check session health before each split
...     status = client.get_read_session(create_response.session_id)
...     if status.session_status == SessionStatus.EXPIRED:
...         client.get_read_session(create_response.session_id, refresh=True)
...     # Read the split data...
...     reader = client.read_rows_stream(...)
get_write_session(session_id: str) GetWriteSessionResponse[source]

Get the current status and stream information of a write session.

This method retrieves metadata about a write session, including the list of active write streams and their states. Use this to monitor the progress of a multi-stream upload operation or to verify that all streams have been properly closed before committing the session.

Parameters:

session_id (str) – The unique identifier of the write session to query. This is obtained from create_write_session response.

Returns:

Response with streams (list of stream info dicts), warning_message, and request_id. See GetWriteSessionResponse.

Return type:

GetWriteSessionResponse

Raises:

ValueError – If session_id is None or empty, or if called on an instance-based client.

See also

create_write_session

Create a write session.

commit_write_session

Commit the session after all streams closed.

create_write_stream

Create a new stream in the session.

Notes

Before calling commit_write_session, ensure all write streams are closed. Use get_write_session to verify that no streams are still in the RUNNING or OPEN state.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Create a write session and check its status before creating streams:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> status_resp = client.get_write_session(session_id)
>>> print(f"Active streams: {status_resp.streams}")
Active streams: None

After creating multiple write streams for parallel upload, use get_write_session to track which streams are active:

>>> # Create multiple streams (see create_write_stream examples)
>>> stream_resp1 = client.create_write_stream(...)
>>> stream_resp2 = client.create_write_stream(...)
>>> status = client.get_write_session(session_id)
>>> print(f"Streams: {len(status.streams) if status.streams else 0}")
Streams: 2

Before committing the session, verify all streams are closed to ensure data upload is complete:

>>> # Close all streams after writing data
>>> client.close_write_stream(...)
>>> client.close_write_stream(...)
>>> # Verify all streams are closed
>>> status = client.get_write_session(session_id)
>>> if status.streams:
...     # Check if any stream is still open
...     open_streams = [s for s in status.streams if s.get('Status') == 'OPEN']
...     if open_streams:
...         print(f"Warning: {len(open_streams)} streams still open")
...     else:
...         # All streams closed, safe to commit
...         client.commit_write_session(session_id)
... else:
...     client.commit_write_session(session_id)

Monitor the progress of a long-running parallel upload by periodically checking stream status:

>>> import time
>>> while True:
...     status = client.get_write_session(session_id)
...     if status.streams:
...         closed_count = sum(
...             1 for s in status.streams
...             if s.get('Status') == 'CLOSED'
...         )
...         print(f"Progress: {closed_count}/{len(status.streams)} streams closed")
...         if closed_count == len(status.streams):
...             break
...     time.sleep(5)
>>> # All streams closed, commit the session
>>> client.commit_write_session(session_id)
get_write_stream(session_id: str, stream_id: str, stream_version: int, route_token=None, exactly_once_mode=False) GetWriteStreamResponse[source]

Get the current status and metadata of a write stream.

This method retrieves information about a specific write stream, including its current state, statistics, and any error information. Use this to monitor stream health during uploads or to verify stream state before closing.

Parameters:
  • session_id (str) – The unique identifier of the write session.

  • stream_id (str) – The identifier of the stream to query (typically 0, 1, 2, …).

  • stream_version (int) – The version number of the stream.

Returns:

Response with status, record_count, byte_size, error_code, and error_message. See GetWriteStreamResponse.

Return type:

GetWriteStreamResponse

Raises:

ValueError – If any parameter is None or empty, or if called on an instance-based client.

See also

create_write_stream

Create a stream before getting its status.

close_write_stream

Close the stream after verifying status.

get_write_session

Get all streams in a session.

Notes

Use get_write_stream to check if a stream has encountered errors before attempting to close it. If a stream is in ERROR state, it may need to be recreated with an incremented stream_version.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Create a write stream and check its status before writing data:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0)
>>> print(f"Stream status: {status.status}")
Stream status: OPEN

After writing data, check the stream statistics to verify upload progress before closing:

>>> # Write data to the stream
>>> writer = client.write_rows_stream(...)
>>> writer.write(batch)
>>> writer.finish()
>>> # Check stream statistics
>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0)
>>> print(f"Uploaded {status.record_count or 0} records, {status.byte_size or 0} bytes")
Uploaded 1000 records, 524288 bytes
>>> # Close the stream after verifying the upload
>>> client.close_write_stream(...)

Monitor multiple parallel streams during upload to identify which streams have encountered errors:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Create 3 parallel streams and write data
>>> for i in range(3):
...     stream_resp = client.create_write_stream(session_id, stream_id=i)
>>> # Check status of all streams
>>> for i in range(3):
...     status = client.get_write_stream(session_id, stream_id=str(i), stream_version=0)
...     if status.status == 'ERROR':
...         print(f"Stream {i} failed: {status.error_message}")
...     else:
...         print(f"Stream {i}: {status.status}, {status.record_count or 0} records")
Stream 0: OPEN, 500 records
Stream 1: OPEN, 300 records
Stream 2: ERROR, Schema validation failed

If a stream shows ERROR status, use get_write_stream to get detailed error information before deciding to retry:

>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0)
>>> if status.status == 'ERROR':
...     print(f"Stream error [{status.error_code}]: {status.error_message}")
...     # Decide whether to retry with new version or abort session
...     if status.error_code == 'SCHEMA_MISMATCH':
...         # Retry with corrected data
...         client.create_write_stream(session_id, stream_id=0, stream_version=1)
...     else:
...         # Abort the entire session
...         client.abort_write_session(session_id)
preview_table(limit=None, partition=None, columns=None) StreamReader[source]

Preview table data without creating a session.

This method provides a lightweight way to sample table data without the overhead of creating a read session. Unlike read_rows_stream, preview_table directly returns an Arrow IPC stream, making it ideal for quick data exploration, schema inspection, or testing table connectivity.

Parameters:
  • limit (int, optional) – Maximum number of rows to preview. If None, returns a small default sample (typically 100-1000 rows).

  • partition (str, optional) – Partition specification to preview specific partition data. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’.

  • columns (list of str, optional) – Specific columns to preview. If empty, all columns are returned.

Returns:

Stream reader with read(), get_status(), get_request_id(), and close() methods. See StreamReader.

Return type:

StreamReader

Raises:

ValueError – If called on an instance-based client (preview only works on tables).

See also

read_rows_stream

Read full table data with session management.

create_read_session

Create a session for large-scale reading.

Notes

Preview is optimized for quick sampling and may not return exact row counts specified in limit. For production data reading with guaranteed row counts and retry support, use create_read_session and read_rows_stream instead.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowReader
... )
>>> import pyarrow as pa
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Preview the first few rows of a table to quickly explore its data:

>>> stream_reader = client.preview_table(limit=10)
>>> arrow_reader = ArrowReader(stream_reader)
>>> batch = arrow_reader.read()
>>> if batch is not None:
...     df = batch.to_pandas()
...     print(df)
   id    name    value
0   1  Alice      100
1   2    Bob      200
2   3  Carol      150

Preview specific columns to check their data types and values without downloading all columns:

>>> reader = ArrowReader(client.preview_table(
...     limit=5, columns=["id", "name"]
... ))
>>> batch = reader.read()
>>> if batch is not None:
...     print(batch.schema)
id: int64
name: string

Preview data from a specific partition to test partition filtering before running a full query:

>>> reader = ArrowReader(client.preview_table(
...     limit=20, partition="pt=20230101"
... ))
>>> batch = reader.read()
>>> df = batch.to_pandas() if batch is not None else None
>>> print(f"Previewed {len(df) if df is not None else 0} rows")
Previewed 20 rows

Use preview to inspect table schema by requesting a small sample. This is useful for understanding column names and types before creating a read session:

>>> reader = ArrowReader(client.preview_table(limit=1))
>>> batch = reader.read()
>>> if batch is not None:
...     for field in batch.schema:
...         print(f"{field.name}: {field.type}")
id: int64
name: string
value: double
pt: string
read_rows_stream(session_id=None, split_index=None, row_offset=None, row_count=None, max_batch_rows=4096, skip_row_num=0, max_batch_raw_size=0, data_format=None, data_columns=None, compression=None, route_token=None) StreamReader[source]

Read data from a specific split in a read session.

This method reads a chunk of data from a table or instance result by specifying which split to read. The data is returned as a stream that can be processed incrementally, supporting efficient handling of large datasets. Each split can be read independently, enabling parallel processing of the same session across multiple workers.

Parameters:
  • session_id (str) – The read session identifier from create_read_session.

  • split_index (int, optional) – Which split to read (0 to splits_count-1). If None, reads all data in the session.

  • row_offset (int, optional) – Starting row offset within the split. Defaults to 0.

  • row_count (int, optional) – Maximum number of rows to read. If None, reads all rows in the split.

  • max_batch_rows (int, default 4096) – Maximum rows per Arrow batch in the stream. Controls memory usage during reading.

  • skip_row_num (int, default 0) – Number of rows to skip before reading.

  • max_batch_raw_size (int, default 0) – Maximum raw byte size per batch. 0 means no limit.

  • data_format (DataFormat, optional) – Format of returned data (Arrow V5 is default).

  • data_columns (list of str, optional) – Specific columns to read. Must match session schema.

  • compression (Compression, default None) – Compression algorithm for the stream data. None means Compression.UNCOMPRESSED.

Returns:

Stream reader with read(), get_status(), get_request_id(), and close() methods. See StreamReader.

Return type:

StreamReader

Raises:

ValueError – If the session or split parameters are invalid.

See also

create_read_session

Create a read session first.

ArrowReader

Wrap StreamReader to read Arrow batches.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowReader
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Read data from a specific split using ArrowReader for convenient batch processing:

>>> reader = client.read_rows_stream(session_id, split_index=0)
>>> arrow_reader = ArrowReader(reader)
>>> while True:
...     batch = arrow_reader.read()
...     if batch is None:
...         break
...     df = batch.to_pandas()
...     # Process dataframe
>>> arrow_reader.get_request_id()  # Get request ID after completion

For parallel processing, distribute splits across multiple workers. Each worker reads a different split index from the same session:

>>> # Worker 1 reads split 0
>>> reader1 = client.read_rows_stream(session_id, split_index=0)
>>> # Worker 2 reads split 1
>>> reader2 = client.read_rows_stream(session_id, split_index=1)

Control memory usage by limiting batch size with max_batch_rows. Smaller batches reduce memory footprint but may have lower throughput:

>>> reader = client.read_rows_stream(session_id, split_index=0, max_batch_rows=1024)
>>> arrow_reader = ArrowReader(reader)
>>> # Read batches one at a time to control memory
>>> while True:
...     batch = arrow_reader.read()
...     if batch is None:
...         break
...     df = batch.to_pandas()
...     # Process df and then discard to free memory

Read a specific range of rows within a split by using row_offset and row_count parameters:

>>> reader = client.read_rows_stream(
...     session_id,
...     split_index=0,
...     row_offset=1000,  # Skip first 1000 rows
...     row_count=500,    # Read 500 rows
... )
property route_token

The stored route token for session affinity.

This is automatically updated from response headers. Pass route_token=... to individual methods to override, or rely on this stored value when not specified.

write_blob_batch(items: List[BlobWriteItem], session_id=None, stream_id=None, stream_version=0, partition_values=None, column_index=0) WriteBlobResponse[source]

Upload multiple blobs in a single batch request for efficiency.

This method uploads multiple binary blobs (such as images, videos, or files) in one consolidated request, reducing network overhead compared to individual uploads. Each blob is packaged with metadata (partition location, column index, MIME type) and optional checksum verification.

Parameters:
  • items (list of BlobWriteItem) – List of blob items to upload. Each item contains data (bytes), partition_values (list of str), column_index (int), distribution_key (str), mime_type (str), and checksum_type (ChecksumType). See BlobWriteItem.

  • session_id (str, optional) – The write session identifier from create_write_session.

  • stream_id (str or int, optional) – Stream identifier for this batch upload.

  • stream_version (int, default 0) – Version number for the stream.

  • partition_values (list of str, optional) – Default partition values for blobs (can be overridden per item). Format: [‘pt=20230101’, ‘region=us-west’].

  • column_index (int, default 0) – Default column index for blobs (can be overridden per item).

Returns:

Response with blob_reference, blob_references, warning_message, and size. See WriteBlobResponse.

Return type:

WriteBlobResponse

Raises:

ValueError – If called on an instance-based client.

See also

write_blob_stream

Upload single blob via streaming.

read_blobs

Read uploaded blobs using the references.

BlobWriteItem

Class to construct blob items.

Notes

Batch upload is more efficient than individual uploads when uploading many small blobs. The method uses a custom wire format with header, data, and footer sections for each blob. Checksums (CRC32 or MD5) are computed and included in the footer for server-side verification. The order of blob_references matches the input items order.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, BlobWriteItem, ChecksumType
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Upload multiple image blobs in a single batch request for efficient network usage:

>>> # 1. Create write session and stream
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 2. Prepare multiple blob items
>>> items = []
>>> for image_path in ['img1.jpg', 'img2.jpg', 'img3.jpg']:
...     with open(image_path, 'rb') as f:
...         image_data = f.read()
...     item = BlobWriteItem(
...         data=image_data,
...         partition_values=['pt=20230101'],
...         column_index=0,
...         mime_type='image/jpeg',
...         checksum_type=ChecksumType.CRC32  # Verify integrity
...     )
...     items.append(item)
>>> # 3. Upload all blobs in one batch
>>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0)
>>> print(f"Uploaded {len(batch_resp.blob_references)} blobs")
>>> print(f"Total size: {batch_resp.size} bytes")
Uploaded 3 blobs
Total size: 1572864 bytes
>>> # 4. Store references for later reading
>>> blob_refs = batch_resp.blob_references
>>> # 5. Close stream and commit session
>>> client.close_write_stream(session_id, stream_id=0)
>>> client.commit_write_session(session_id)

Upload blobs to different partitions and columns by customizing each BlobWriteItem’s metadata:

>>> items = [
...     BlobWriteItem(
...         data=blob1_data,
...         partition_values=['pt=20230101'],
...         column_index=0
...     ),
...     BlobWriteItem(
...         data=blob2_data,
...         partition_values=['pt=20230102'],
...         column_index=1
...     ),
...     BlobWriteItem(
...         data=blob3_data,
...         partition_values=['pt=20230103'],
...         column_index=0
...     ),
... ]
>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> # Each blob is stored in its specified partition/column

Use MD5 checksum for stronger integrity verification on critical data uploads. The checksum is computed and sent with the blob:

>>> critical_data = read_critical_file()
>>> item = BlobWriteItem(
...     data=critical_data,
...     column_index=0,
...     checksum_type=ChecksumType.MD5
... )
>>> response = client.write_blob_batch([item], session_id, stream_id=0)
>>> # Server verifies MD5 checksum matches computed value

Add MIME type metadata to help applications understand blob content type without examining the binary data:

>>> items = [
...     BlobWriteItem(
...         data=json_data_bytes,
...         mime_type='application/json',
...         column_index=0
...     ),
...     BlobWriteItem(
...         data=pdf_data_bytes,
...         mime_type='application/pdf',
...         column_index=1
...     ),
... ]
>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> # MIME types stored with blobs for content type hints

Use distribution_key for hash-based storage to ensure blobs are distributed across storage locations based on the key:

>>> items = [
...     BlobWriteItem(
...         data=blob_data,
...         distribution_key='user123',
...         column_index=0
...     ),
... ]
>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> # Blob stored at location determined by distribution key hash

Upload a large number of small files efficiently by batching them instead of uploading individually:

>>> # Prepare 100 small file blobs
>>> items = []
>>> for file_path in small_files:  # 100 small files
...     with open(file_path, 'rb') as f:
...         data = f.read()
...     items.append(BlobWriteItem(data=data, column_index=0))
>>> # Upload all in one batch (much faster than 100 individual uploads)
>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> print(f"Batch uploaded {len(response.blob_references)} blobs")
Batch uploaded 100 blobs

Check for warnings in the response to detect partial upload failures or other issues:

>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> if response.warning_message:
...     print(f"Upload warning: {response.warning_message}")
...     # Some blobs may have had issues
...     # Check if all blob_references are present
...     if len(response.blob_references) < len(items):
...         print(f"Only {len(response.blob_references)} of {len(items)} uploaded")
Upload warning: 2 blobs exceeded size limit
Only 98 of 100 uploaded

Use the returned blob_references to read the blobs later by calling read_blobs with the reference list:

>>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0)
>>> blob_refs = batch_resp.blob_references
>>> # Later, read the blobs back
>>> blob_iterator = client.read_blobs(blob_references=blob_refs)
>>> for data, mime_type in blob_iterator:
...     # Process each blob data
...     print(f"Read blob: {len(data)} bytes, type: {mime_type}")
write_blob_stream(session_id, stream_id, stream_version=0, partition_values=None, column_index=0, compression=None) BlobStreamWriter[source]

Upload a single blob via streaming upload.

This method creates a streaming writer for uploading a single binary blob (such as an image, video, or large binary file) to a specific column in a MaxCompute table. The data is optionally compressed using the specified compression algorithm and verified with MD5 checksum to ensure upload integrity.

Parameters:
  • session_id (str) – The write session identifier from create_write_session.

  • stream_id (str or int) – Stream identifier for this upload.

  • stream_version (int, default 0) – Version number for the stream.

  • partition_values (list of str, optional) – Partition values for the blob location. Format: [‘pt=20230101’, ‘region=us-west’].

  • column_index (int, default 0) – Column index in the table schema where the blob will be stored.

  • compression (Compression, default None) – Compression algorithm to use. None means Compression.UNCOMPRESSED. See Compression. Supported values: Compression.ZSTD, Compression.LZ4_FRAME, Compression.UNCOMPRESSED.

Returns:

Blob stream writer with write(), finish(), get_status(), get_request_id(), and writable() methods. See BlobStreamWriter.

Return type:

BlobStreamWriter

Raises:
  • ValueError – If called on an instance-based client.

  • errors.DependencyNotInstalledError – If zstandard library is not installed (required for ZSTD compression).

  • errors.ChecksumError – If MD5 checksum verification fails after finish().

See also

write_blob_batch

Upload multiple blobs in one request.

read_blobs

Read uploaded blobs by reference.

BlobWriteItem

Helper class for batch blob uploads.

Notes

Blob uploads default to no compression (Compression.UNCOMPRESSED). Compression algorithms like ZSTD or LZ4_FRAME can be enabled for efficient transfer. Set compression=Compression.ZSTD for zstd compression or compression=Compression.LZ4_FRAME for lz4. The writer computes MD5 checksum incrementally during write and verifies against server response when finished. If the checksum doesn’t match, a ChecksumError is raised, indicating data corruption during upload.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Complete workflow for uploading a single image blob to a table with a binary column:

>>> # 1. Create write session
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # 2. Create write stream
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 3. Upload blob data
>>> blob_writer = client.write_blob_stream(
...     session_id, stream_id=0,
...     partition_values=['pt=20230101'],
...     column_index=0  # First column is the blob column
... )
>>> # Read image file and upload in chunks
>>> with open('image.jpg', 'rb') as f:
...     while True:
...         chunk = f.read(8192)  # 8KB chunks
...         if not chunk:
...             break
...         blob_writer.write(chunk)
>>> # 4. Finish and verify MD5
>>> response = blob_writer.finish()
>>> print(f"Blob reference: {response.blob_reference}")
>>> print(f"Uploaded size: {response.size} bytes")
Blob reference: blob_ref_abc123
Uploaded size: 524288 bytes
>>> # 5. Close stream and commit session
>>> client.close_write_stream(session_id, stream_id=0)
>>> client.commit_write_session(session_id)

Upload blob data directly from memory without reading from a file, useful for dynamically generated binary data:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> # Generate or prepare binary data
>>> binary_data = b"generated binary content..."
>>> blob_writer.write(binary_data)
>>> # Finish upload
>>> response = blob_writer.finish()
>>> print(f"Blob uploaded: {response.blob_reference}")

Upload a blob to a specific partition by providing partition values. The blob is stored in the specified partition’s location:

>>> blob_writer = client.write_blob_stream(
...     session_id, stream_id=0,
...     partition_values=['pt=20230101', 'region=us-west'],
...     column_index=0
... )
>>> blob_writer.write(image_data)
>>> response = blob_writer.finish()
>>> # Blob is now in partition pt=20230101/region=us-west

Handle checksum errors when the upload integrity check fails, indicating data corruption during transfer:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> blob_writer.write(data)
>>> try:
...     response = blob_writer.finish()
...     print("Upload successful")
... except errors.ChecksumError as e:
...     print(f"Checksum mismatch: {e}")
...     # Data was corrupted during upload
...     # Retry the upload or abort the session
...     client.abort_write_session(session_id)
Checksum mismatch: MD5 value mismatch, expected: abc123, actual: def456

Upload large blobs incrementally by reading and writing in chunks to avoid loading the entire blob into memory:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> # Upload a large video file in 64KB chunks
>>> chunk_size = 65536
>>> with open('large_video.mp4', 'rb') as f:
...     while True:
...         chunk = f.read(chunk_size)
...         if not chunk:
...             break
...         success = blob_writer.write(chunk)
...         if not success:
...             print("Writer closed unexpectedly")
...             break
>>> response = blob_writer.finish()
>>> print(f"Large video uploaded: {response.size} bytes")
Large video uploaded: 104857600 bytes

Upload text data by converting strings to bytes. The writer automatically handles the conversion:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> text_content = "This is text data to store as a blob"
>>> blob_writer.write(text_content)  # Automatically converted to bytes
>>> response = blob_writer.finish()

Monitor writer status during long uploads to detect early failures:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> for chunk in large_data_chunks:
...     success = blob_writer.write(chunk)
...     if not success or blob_writer.get_status() != Status.RUNNING:
...         print("Upload stopped unexpectedly")
...         break
>>> response = blob_writer.finish()
>>> request_id = blob_writer.get_request_id()
write_rows_stream(session_id, stream_id, stream_version=0, record_count=0, compression=None, route_token=None, row_offset=-1, access_token=None) StreamWriter[source]

Write row data to a write stream via streaming upload.

This method creates a streaming writer for uploading row data to a write stream. The writer accepts Arrow record batches or raw binary data and uploads it incrementally, enabling efficient handling of large datasets without loading all data into memory at once.

Parameters:
  • session_id (str) – The write session identifier.

  • stream_id (str or int) – The stream identifier from create_write_stream.

  • stream_version (int, default 0) – Version of the stream (should match create_write_stream).

  • record_count (int, default 0) – Total number of records to be written. Set this to the expected count for validation, or 0 if unknown.

  • compression (Compression, default None) – Compression algorithm for the uploaded data stream. None means Compression.UNCOMPRESSED.

Returns:

Stream writer with write(), finish(), get_status(), get_request_id(), and writable() methods. See StreamWriter.

Return type:

StreamWriter

Raises:

ValueError – If called on an instance-based client.

See also

create_write_stream

Create a stream before writing.

ArrowWriter

Convert Arrow batches to stream format.

close_write_stream

Close the stream after finishing write.

Notes

The returned StreamWriter accepts binary data directly. For Arrow workflows, wrap it with ArrowWriter which converts record batches to Arrow IPC format. The writer must be finished before closing the stream. Call finish() after writing all batches.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowWriter, Compression
... )
>>> import pyarrow as pa
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Complete workflow using ArrowWriter to upload Arrow record batches:

>>> # 1. Create write session
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # 2. Create write stream
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 3. Write Arrow data
>>> schema = pa.schema([
...     pa.field("id", pa.int64()),
...     pa.field("name", pa.string()),
...     pa.field("value", pa.float64()),
... ])
>>> batch = pa.record_batch([
...     pa.array([1, 2, 3]),
...     pa.array(["Alice", "Bob", "Carol"]),
...     pa.array([100.0, 200.0, 150.0]),
... ], schema=schema)
>>> stream_writer = client.write_rows_stream(session_id, stream_id=0, record_count=3)
>>> arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED)
>>> arrow_writer.write(batch)
>>> commit_msg, success = arrow_writer.finish()
>>> print(f"Upload successful: {success}")
Upload successful: True
>>> # 4. Close stream and commit session
>>> client.close_write_stream(session_id, stream_id=0)
>>> client.commit_write_session(session_id)
class odps.apis.storage_api_v2.StorageApiArrowClient(odps: ODPS, table_or_instance=None, rest_endpoint: str = None, quota_name: str = None, tags: None | str | List[str] = None)[source]

Arrow batch client for the Storage API V2.

Extends StorageApiClient with convenience methods that wrap raw stream I/O with ArrowReader and ArrowWriter, so you can work directly with PyArrow RecordBatch objects instead of raw bytes.

Parameters:
  • odps (ODPS) – ODPS entry object.

  • table_or_instance (Table or Instance, optional) – MaxCompute table or SQL instance to operate on. If an Instance is provided, only read operations are available.

  • rest_endpoint (str, optional) – Custom REST endpoint for the storage API.

  • quota_name (str, optional) – Quota name for resource management.

  • tags (str or list of str, optional) – Tags for request tracking.

See also

StorageApiClient

Base client with raw stream I/O.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiArrowClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiArrowClient(odps, table)

Read data using ArrowReader:

>>> read_resp = client.create_read_session()
>>> reader = client.read_rows_arrow(read_resp.session_id, split_index=0)
>>> while True:
...     batch = reader.read()
...     if batch is None:
...         break
...     df = batch.to_pandas()

Write data using ArrowWriter:

>>> write_resp = client.create_write_session()
>>> writer = client.write_rows_arrow(
...     write_resp.session_id, stream_id="0", record_count=100
... )
>>> writer.write(record_batch)
>>> writer.finish()
read_blobs(blob_references=None, compression=None, stream=False)

Download binary blobs by their blob references.

This method retrieves blob data uploaded via write_blob_stream or write_blob_batch by providing the blob references returned during upload. The data is returned as an iterator that yields tuples of (data_bytes, mime_type), handling protocol framing and CRC stripping automatically.

Parameters:
  • blob_references (list of str or bytes, optional) – List of blob references obtained from previous upload operations (write_blob_stream or write_blob_batch). References may be UTF-8 strings or raw bytes and will be decoded automatically if needed.

  • compression (Compression, default None) – Compression algorithm for the response data. None means Compression.UNCOMPRESSED.

  • stream (bool, optional) – If True, return a BlobStreamReader instead of BlobDataIterator. The reader provides file-like read() access per blob, a mime_type property, and a next() method to advance to the next blob. Default is False.

Returns:

When stream is False (default), returns an iterator yielding (data_bytes, mime_type) tuples per blob. When stream is True, returns a BlobStreamReader for incremental, file-like reading of each blob.

Return type:

BlobDataIterator or BlobStreamReader

See also

write_blob_batch

Upload blobs and get references.

write_blob_stream

Upload single blob and get reference.

BlobWriteItem

Helper for batch uploads.

Notes

The download stream uses a multi-layer protocol: 1. CRC32C checksums are embedded every 4096 bytes (stripped automatically) 2. Protocol framing wraps each blob (parsed automatically)

For single blob downloads, the server may omit framing and send raw decompressed data directly. The iterator automatically detects this case and returns the entire payload as one blob.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Read blobs using references obtained from previous uploads. First upload blobs, then read them back using the references:

>>> # Upload blobs (from earlier write_blob_batch example)
>>> blob_refs = ['blob_ref_001', 'blob_ref_002', 'blob_ref_003']
>>> # Read the blobs back using references
>>> blob_iterator = client.read_blobs(blob_references=blob_refs)
>>> for data, mime_type in blob_iterator:
...     print(f"Blob size: {len(data)} bytes, MIME type: {mime_type}")
...     # Save blob to file
...     filename = f"blob_{blob_refs[i]}"
...     with open(filename, 'wb') as f:
...         f.write(data)
Blob size: 1024 bytes, MIME type: image/jpeg
Blob size: 2048 bytes, MIME type: image/png
Blob size: 512 bytes, MIME type: application/json

Download a single blob by passing a single-element reference list. The iterator yields one (data, mime_type) tuple:

>>> single_ref = ['blob_ref_abc123']
>>> blob_iterator = client.read_blobs(blob_references=single_ref)
>>> data, mime_type = next(blob_iterator)
>>> print(f"Downloaded {len(data)} bytes")
Downloaded 524288 bytes
>>> # Process the blob data
>>> if mime_type == 'image/jpeg':
...     # Process as JPEG image
...     process_image(data)

Download all blobs from a batch upload by storing references during upload and using them to read later:

>>> # Earlier: upload batch
>>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0)
>>> all_refs = batch_resp.blob_references  # Save these
>>> # Later: read all blobs from the batch
>>> blob_iterator = client.read_blobs(blob_references=all_refs)
>>> downloaded_blobs = []
>>> for data, mime_type in blob_iterator:
...     downloaded_blobs.append({
...         'data': data,
...         'mime_type': mime_type,
...         'size': len(data)
...     })
>>> print(f"Downloaded {len(downloaded_blobs)} blobs")
Downloaded 10 blobs

Process blobs incrementally without loading all into memory by iterating and processing one at a time:

>>> blob_iterator = client.read_blobs(blob_references=large_blob_refs)
>>> for i, (data, mime_type) in enumerate(blob_iterator):
...     # Process each blob and discard data after processing
...     result = analyze_blob(data)
...     print(f"Blob {i}: {result}")
...     # data is freed after this iteration
Blob 0: Analysis complete
Blob 1: Analysis complete

Check MIME type to determine how to handle blob content when the type was set during upload:

>>> blob_iterator = client.read_blobs(blob_references=blob_refs)
>>> for data, mime_type in blob_iterator:
...     if mime_type == 'application/json':
...         # Parse as JSON
...         import json
...         json_obj = json.loads(data.decode('utf-8'))
...         process_json(json_obj)
...     elif mime_type == 'image/jpeg':
...         # Process as image
...         display_image(data)
...     elif mime_type is None:
...         # No type hint, use generic binary handling
...         process_binary(data)
...     else:
...         print(f"Unknown MIME type: {mime_type}")

Convert blob iterator to list for multiple iterations or indexing, though this loads all data into memory:

>>> blob_iterator = client.read_blobs(blob_references=blob_refs)
>>> all_blobs = list(blob_iterator)  # [(data1, mime1), (data2, mime2), ...]
>>> # Now can access by index
>>> first_blob_data = all_blobs[0][0]
>>> first_blob_mime = all_blobs[0][1]
>>> # Can iterate multiple times
>>> for data, mime_type in all_blobs:
...     process_blob(data)
>>> for data, mime_type in all_blobs:
...     validate_blob(data)

Handle empty blob reference list by checking iterator length or attempting to iterate:

>>> empty_refs = []
>>> blob_iterator = client.read_blobs(blob_references=empty_refs)
>>> count = 0
>>> for _ in blob_iterator:
...     count += 1
>>> print(f"Downloaded {count} blobs")
Downloaded 0 blobs

Use blob references from a table query to download specific blobs referenced in table rows:

>>> # Query table to get blob references
>>> instance = odps.execute_sql("SELECT blob_ref FROM your_table WHERE id=1")
>>> with instance.open_reader() as reader:
...     for record in reader:
...         blob_ref = record[0]  # blob_ref column
...         # Download the referenced blob
...         blob_iterator = client.read_blobs(blob_references=[blob_ref])
...         data, mime_type = next(blob_iterator)
...         print(f"Downloaded blob {blob_ref}: {len(data)} bytes")

Stream blobs incrementally using file-like read interface with stream=True:

>>> reader = client.read_blobs(blob_references=refs, stream=True)
>>> while reader is not None:
...     print(f"MIME: {reader.mime_type}")
...     chunk = reader.read(4096)
...     while chunk:
...         process(chunk)
...         chunk = reader.read(4096)
...     reader = reader.next()
abort_write_session(session_id: str, route_token=None)

Abort a write session to discard all uploaded data.

This method cancels an active write session and discards all data uploaded through its streams. Use abort when you encounter errors during the upload process or when you want to cancel the operation without making the data visible in the table. Aborting releases all resources associated with the session.

Parameters:

session_id (str) – The unique identifier of the write session to abort. This can be any active or expired session.

Returns:

Method returns None after aborting the session. All uploaded data is discarded and session resources are released.

Return type:

None

Raises:

ValueError – If session_id is None or empty, or if called on an instance-based client.

See also

create_write_session

Create a write session.

commit_write_session

Commit the session instead of aborting.

get_write_session

Check session status before aborting.

Notes

After aborting, the session cannot be committed. All data uploaded is permanently discarded. Call abort as soon as you detect errors to release resources quickly. It’s safe to abort a session multiple times if the first abort attempt fails.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Abort a session when an error occurs during data upload to prevent partial data from being committed:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> try:
...     # Create stream and write data
...     stream_resp = client.create_write_stream(...)
...     # Write data that fails due to schema mismatch
...     writer = client.write_rows_stream(...)
...     writer.write(invalid_data)
...     writer.finish()
... except Exception as e:
...     print(f"Upload failed: {e}")
...     # Abort to discard all uploaded data
...     client.abort_write_session(session_id)
...     print("Session aborted, no data committed to table")
Upload failed: Schema mismatch error
Session aborted, no data committed to table

Use abort in cleanup code to ensure resources are released even if commit never happens:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> try:
...     # Upload data...
...     # Commit if everything succeeds
...     client.commit_write_session(session_id)
... finally:
...     # If commit didn't happen (exception or early return),
...     # ensure session is aborted to release resources
...     try:
...         status = client.get_write_session(session_id)
...         if status.streams is None or len(status.streams) == 0:
...             client.abort_write_session(session_id)
...     except:
...         # Session might already be committed or aborted
...         pass

Abort a session after detecting validation errors in the data before attempting to commit:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Write data to streams
>>> writer = client.write_rows_stream(...)
>>> # Validate the data locally
>>> if data_has_errors:
...     print("Data validation failed, aborting session")
...     client.abort_write_session(session_id)
... else:
...     # Data is valid, proceed with commit
...     client.close_write_stream(...)
...     client.commit_write_session(session_id)
close_write_stream(session_id=None, stream_id=None, stream_version=0, route_token=None) CloseWriteStreamResponse

Close a write stream to finalize the data upload for that stream.

After finishing the write operation via the stream writer, call this method to formally close the stream. This signals to the server that no more data will be uploaded to this stream and marks the stream as ready for the session commit. All streams must be closed before the session can be committed.

Parameters:
  • session_id (str) – The write session identifier.

  • stream_id (str or int) – The stream identifier to close.

  • stream_version (int, default 0) – Version of the stream (should match create_write_stream).

Returns:

Response with warning_message and request_id. See CloseWriteStreamResponse.

Return type:

CloseWriteStreamResponse

Raises:

ValueError – If called on an instance-based client.

See also

write_rows_stream

Finish writing before closing the stream.

create_write_stream

Create the stream before writing.

commit_write_session

Commit session after all streams closed.

get_write_stream

Verify stream is closed.

Notes

The stream writer’s finish() method and close_write_stream serve different purposes: finish() completes the data upload, while close_write_stream formally closes the stream on the server side. Both must be called in sequence: finish the writer, then close the stream. After closing, the stream cannot accept more data.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowWriter, Compression
... )
>>> import pyarrow as pa
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Complete workflow showing the relationship between writer finish() and stream close. First finish the writer, then close the stream:

>>> # 1. Create write session and stream
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 2. Write Arrow data
>>> batch = pa.record_batch([...], schema=schema)
>>> stream_writer = client.write_rows_stream(session_id, stream_id=0, record_count=3)
>>> arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED)
>>> arrow_writer.write(batch)
>>> # 3. Finish the writer to complete data upload
>>> commit_msg, success = arrow_writer.finish()
>>> print(f"Writer finished: {success}")
Writer finished: True
>>> # 4. Close the stream to mark it ready for commit
>>> close_resp = client.close_write_stream(session_id, stream_id=0)
>>> if close_resp.warning_message:
...     print(f"Warning: {close_resp.warning_message}")
>>> # 5. Commit the session to make data visible
>>> client.commit_write_session(session_id)

Close multiple parallel streams after each finishes uploading. All streams must be closed before committing the session:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Create 3 parallel streams
>>> for stream_id in range(3):
...     # Create stream, write data, finish writer
...     client.create_write_stream(session_id, stream_id=stream_id)
...     writer = ArrowWriter(
...         client.write_rows_stream(session_id, stream_id=stream_id),
...         Compression.UNCOMPRESSED
...     )
...     writer.write(batch)
...     writer.finish()
>>> # Close all 3 streams
>>> for stream_id in range(3):
...     close_resp = client.close_write_stream(session_id, stream_id=stream_id)
...     print(f"Stream {stream_id} closed")
Stream 0 closed
Stream 1 closed
Stream 2 closed
>>> # Now commit the session
>>> client.commit_write_session(session_id)

Check for warnings when closing streams to detect potential issues that may affect the commit:

>>> close_resp = client.close_write_stream(session_id, stream_id=0)
>>> if close_resp.warning_message:
...     print(f"Stream close warning: {close_resp.warning_message}")
...     # Decide whether to proceed with commit or abort
...     if "data_incomplete" in close_resp.warning_message:
...         client.abort_write_session(session_id)
...     else:
...         # Warning is informational, proceed with commit
...         client.commit_write_session(session_id)
... else:
...     client.commit_write_session(session_id)

Use get_write_stream to verify a stream is properly closed before attempting to commit the session:

>>> # Close stream
>>> client.close_write_stream(session_id, stream_id=0)
>>> # Verify it's closed
>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0)
>>> if status.status == 'CLOSED':
...     print("Stream properly closed, safe to commit")
...     client.commit_write_session(session_id)
... else:
...     print(f"Unexpected stream status: {status.status}")
Stream properly closed, safe to commit

Handle the case where close fails due to stream errors by checking the warning message and potentially aborting:

>>> try:
...     close_resp = client.close_write_stream(session_id, stream_id=0)
...     if close_resp.warning_message and "error" in close_resp.warning_message.lower():
...         print(f"Stream had errors: {close_resp.warning_message}")
...         # Abort session instead of committing incomplete data
...         client.abort_write_session(session_id)
... except Exception as e:
...     print(f"Failed to close stream: {e}")
...     client.abort_write_session(session_id)
commit_write_session(session_id: str, stream_ids=None, stream_versions=None, route_token=None)

Commit a write session to finalize all uploaded data.

This is the final step in the write workflow. Calling commit atomically makes all data uploaded through the session’s write streams visible in the table. The commit operation ensures that either all writes succeed (transaction committed) or none of them appear in the table (transaction aborted).

Parameters:
  • session_id (str) – The unique identifier of the write session to commit. All write streams in this session must be closed before committing.

  • stream_ids (list of str, optional) – List of stream identifiers to commit. Used for transactional/delta tables to specify which streams to include in the commit. When provided, stream_versions must also be provided with matching length.

  • stream_versions (list of int, optional) – List of stream version numbers corresponding to stream_ids. Must be provided together with stream_ids and must have the same length.

Returns:

Method returns None on successful commit. The commit operation makes all uploaded data visible in the table atomically.

Return type:

None

Raises:
  • ValueError – If session_id is None or empty, or if called on an instance-based client.

  • errors.ODPSError – If any write streams are still open or if the session has expired.

See also

create_write_session

Create a write session.

close_write_stream

Close all streams before committing.

abort_write_session

Abort instead of commit to discard writes.

get_write_session

Verify all streams are closed.

Notes

Before committing, ensure all write streams are closed. Use get_write_session to verify stream states. If commit fails, call abort_write_session to clean up resources.

Commit is a synchronous operation. Once it returns successfully, the data is immediately visible in the table and cannot be rolled back.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowWriter, Compression
... )
>>> import pyarrow as pa
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Complete write workflow: create session, create stream, write Arrow data, close stream, and commit:

>>> # 1. Create write session
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # 2. Create write stream
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 3. Write data using Arrow writer
>>> schema = pa.schema([
...     pa.field("id", pa.int64()),
...     pa.field("name", pa.string()),
... ])
>>> batch = pa.record_batch([
...     pa.array([1, 2, 3]),
...     pa.array(["Alice", "Bob", "Carol"]),
... ], schema=schema)
>>> writer = ArrowWriter(
...     client.write_rows_stream(session_id, stream_id=0, record_count=3),
...     Compression.UNCOMPRESSED
... )
>>> writer.write(batch)
>>> commit_msg, success = writer.finish()
>>> # 4. Close the write stream
>>> client.close_write_stream(session_id, stream_id=0)
>>> # 5. Commit the session to finalize all writes
>>> client.commit_write_session(session_id)
>>> print("Data successfully uploaded to table")
Data successfully uploaded to table

For transactional/delta tables, specify stream_ids and stream_versions when committing:

>>> client.commit_write_session(
...     session_id,
...     stream_ids=["stream-1"],
...     stream_versions=[1],
... )

For multi-stream uploads, ensure all streams are closed before committing. Use get_write_session to verify:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Create multiple streams and write data to each
>>> for stream_id in range(3):
...     # Create stream, write data, close stream...
...     pass
>>> # Verify all streams are closed
>>> status = client.get_write_session(session_id)
>>> all_closed = all(
...     s.get('Status') == 'CLOSED'
...     for s in (status.streams or [])
... )
>>> if all_closed:
...     # Commit session to make data visible
...     client.commit_write_session(session_id)
...     print("Session committed successfully")
Session committed successfully

If commit fails due to errors, abort the session to clean up:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> try:
...     # Write data...
...     client.commit_write_session(session_id)
... except Exception as e:
...     print(f"Commit failed: {e}")
...     # Abort to discard all writes and release resources
...     client.abort_write_session(session_id)
Commit failed: [error message]
create_read_session(required_data_columns=None, required_partition_columns=None, required_partitions=None, required_bucket_ids=None, split_options=None, arrow_options=None, filter_predicate=None, filter_predicate_fallback=None, split_max_file_num=None, incremental_read=None, incremental_read_options=None) CreateReadSessionResponse

Create a read session for table or instance data retrieval.

A read session is a prerequisite for reading data from a MaxCompute table or SQL instance result. The session determines how data will be split into readable chunks and what data schema will be returned. Sessions have an expiration time and must be refreshed if they expire during long-running read operations.

Parameters:
  • required_data_columns (list of str, optional) – List of column names to read. If empty, all columns are returned.

  • required_partition_columns (list of str, optional) – Partition columns to include in the result.

  • required_partitions (list of str, optional) – Specific partition values to read (e.g., [‘pt=20230101’]).

  • required_bucket_ids (list of str, optional) – Bucket IDs to read for bucket-based tables.

  • split_options (SplitOptions, optional) – Controls how data is split into chunks. Defaults to size-based splitting with 256MB chunks.

  • arrow_options (ArrowOptions, optional) – Arrow format settings like timestamp precision.

  • filter_predicate (str, optional) – SQL-like filter condition to apply during reading.

  • filter_predicate_fallback (bool, optional) – Whether to fallback to server-side filtering if predicate pushdown fails.

  • split_max_file_num (int, optional) – Maximum number of files per split for file-based splitting.

  • incremental_read (bool, optional) – Enable incremental reading mode for capturing table changes.

  • incremental_read_options (IncrementalReadOptions, optional) – Options for incremental read mode (version range, timestamp range).

Returns:

Response with session_id, session_status, splits_count, record_count, data_schema, and expiration_time. See CreateReadSessionResponse.

Return type:

CreateReadSessionResponse

Raises:

ValueError – If the table or instance is not properly configured.

See also

get_read_session

Get current read session status.

read_rows_stream

Read data from a specific split.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, SplitOptions
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Create a basic read session that reads all columns with default splitting:

>>> response = client.create_read_session()
>>> print(f"Session ID: {response.session_id}")
Session ID: session_12345
>>> print(f"Available splits: {response.splits_count}")
Available splits: 5

Specify which columns to read and how to split the data. Use split_options to control parallelism by setting the split number instead of split size:

>>> split_opts = SplitOptions()
>>> split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
>>> split_opts.split_number = 10  # Create 10 splits
>>> response = client.create_read_session(
...     required_data_columns=["id", "name", "value"],
...     split_options=split_opts,
... )
>>> print(f"Created {response.splits_count} splits")
Created 10 splits

Read from a specific partition by providing partition values in the required_partitions parameter:

>>> response = client.create_read_session(
...     required_partitions=["pt=20230101", "region=us-west"]
... )
>>> print(f"Records in partition: {response.record_count}")
Records in partition: 1000

For incremental data capture, enable incremental_read to read changes since a specific version or timestamp:

>>> from odps.apis.storage_api_v2 import IncrementalReadOptions
>>> incr_opts = IncrementalReadOptions()
>>> incr_opts.start_version = 100
>>> response = client.create_read_session(
...     incremental_read=True,
...     incremental_read_options=incr_opts,
... )
>>> print(f"Latest version: {response.latest_version}")
Latest version: 150
create_write_session(partial_partition_spec=None, flags=None) CreateWriteSessionResponse

Create a write session for uploading data to a table.

A write session is the first step in the data upload workflow. It establishes a transactional context for writing data to a MaxCompute table, ensuring atomicity and consistency. After creating a session, you must create one or more write streams within it, upload data to those streams, close the streams, and finally commit the session.

Parameters:
  • partial_partition_spec (str, optional) – Partition specification for writing to a specific partition. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’. If empty, writes to the table’s default location.

  • flags (dict, optional) – Additional flags for session configuration. Common flags include ‘overwrite’ to replace existing partition data.

Returns:

Response with session_id, warning_message, and request_id. See CreateWriteSessionResponse.

Return type:

CreateWriteSessionResponse

Raises:

ValueError – If called on an instance-based client (writes only support tables).

See also

create_write_stream

Create a stream within the session.

commit_write_session

Commit the session to finalize writes.

abort_write_session

Abort the session to discard all writes.

Notes

Write sessions are transactional. All writes within a session are committed atomically when commit_write_session is called. If the session is aborted, all uploaded data is discarded. Sessions have a limited lifetime and must be committed before expiration.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Create a basic write session for uploading data to the table’s default location:

>>> response = client.create_write_session()
>>> session_id = response.session_id
>>> print(f"Write session ID: {session_id}")
Write session ID: write_session_12345

Write to a specific partition by specifying the partition spec. This is required for partitioned tables unless writing to the default partition:

>>> response = client.create_write_session(
...     partial_partition_spec="pt=20230101"
... )
>>> session_id = response.session_id
>>> print(f"Writing to partition pt=20230101")

Use the ‘overwrite’ flag to replace existing data in a partition instead of appending:

>>> response = client.create_write_session(
...     partial_partition_spec="pt=20230101",
...     flags={"overwrite": True}
... )
>>> session_id = response.session_id
>>> # After committing, existing partition data will be replaced

Complete write workflow: create session, create stream, write data, close stream, and commit:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Create write stream (see create_write_stream example)
>>> # Write data (see write_rows_stream example)
>>> # Close stream (see close_write_stream example)
>>> # Commit session to finalize
>>> client.commit_write_session(session_id)

If an error occurs during the write process, abort the session to discard all uploaded data and release resources:

>>> session_resp = client.create_write_session()
>>> try:
...     # Write data...
...     # If error occurs:
...     client.abort_write_session(session_resp.session_id)
... except Exception as e:
...     client.abort_write_session(session_resp.session_id)
...     raise
create_write_stream(session_id=None, stream_id=None, stream_version=0, route_token=None) CreateWriteStreamResponse

Create a write stream within an active write session.

A write stream is a data upload channel within a write session. You can create multiple streams in parallel to upload data concurrently, improving throughput for large datasets. Each stream must be closed individually after writing data, and the session must be committed after all streams are closed.

Parameters:
  • session_id (str) – The write session identifier from create_write_session.

  • stream_id (str or int) – Unique identifier for this stream within the session. Typically an integer (0, 1, 2, …) for parallel streams.

  • stream_version (int, default 0) – Version number for the stream. Increment if retrying after a failed upload to the same stream_id.

Returns:

Response with data_schema, table_id, schema_version, and route_token. See CreateWriteStreamResponse.

Return type:

CreateWriteStreamResponse

Raises:

ValueError – If called on an instance-based client.

See also

create_write_session

Create the parent write session first.

write_rows_stream

Write data to the created stream.

close_write_stream

Close the stream after writing.

get_write_stream

Get stream status.

Notes

Each stream can be written to independently, enabling parallel uploads from multiple workers. The stream_version allows retrying failed uploads without creating a new session. After a stream is closed, it cannot be reopened with the same stream_id and version.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

First create a write session, then create a single stream for sequential data upload:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> print(f"Stream created, schema: {stream_resp.data_schema}")
Stream created, schema: {'Columns': [...]}

For parallel upload from multiple workers, create multiple streams with different stream_ids. Each worker processes one stream:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Worker 1 creates stream 0
>>> stream_resp0 = client.create_write_stream(session_id, stream_id=0)
>>> # Worker 2 creates stream 1
>>> stream_resp1 = client.create_write_stream(session_id, stream_id=1)
>>> # Worker 3 creates stream 2
>>> stream_resp2 = client.create_write_stream(session_id, stream_id=2)
>>> print(f"Created 3 parallel streams")

Use stream_version to retry failed uploads. If a stream fails, increment the version to create a new stream with the same ID:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0, stream_version=0)
>>> # Attempt to write data (fails)
>>> try:
...     writer = client.write_rows_stream(...)
...     writer.write(data)
... except Exception:
...     # Retry with incremented version
...     retry_resp = client.create_write_stream(
...         session_id, stream_id=0, stream_version=1
...     )
...     writer = client.write_rows_stream(...)
...     writer.write(data)

Check the data_schema before writing to ensure your data matches the expected table schema:

>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> schema = stream_resp.data_schema
>>> if schema:
...     columns = schema.get('Columns', [])
...     for col in columns:
...         print(f"Column: {col.get('Name')}, Type: {col.get('Type')}")
Column: id, Type: bigint
Column: name, Type: string
get_read_session(session_id: str, refresh: bool = False) CreateReadSessionResponse

Get current status and metadata of an existing read session.

This method retrieves the current state of a read session, including its status, split count, schema, and expiration time. Use this to check if a session is still valid before attempting to read data, or to refresh a session that is approaching expiration.

Parameters:
  • session_id (str) – The unique identifier of the read session to retrieve. This is obtained from create_read_session response.

  • refresh (bool, default False) – Whether to refresh the session expiration time. Set to True if the session is about to expire or has expired. When True, the server extends the session lifetime, allowing continued reading operations.

Returns:

Response with session_id, session_status, splits_count, record_count, data_schema, and expiration_time. See CreateReadSessionResponse.

Return type:

CreateReadSessionResponse

Raises:

ValueError – If session_id is None or empty.

See also

create_read_session

Create a new read session.

read_rows_stream

Read data from a specific split.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import StorageApiClient
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

First create a read session, then check its status:

>>> create_response = client.create_read_session()
>>> session_id = create_response.session_id
>>> status_response = client.get_read_session(session_id)
>>> print(f"Session status: {status_response.session_status}")
Session status: NORMAL
>>> print(f"Expiration: {status_response.expiration_time}")
Expiration: 2023-01-01T12:00:00Z

When a session expires during a long-running read operation, use the refresh parameter to extend its lifetime:

>>> # Check if session expired
>>> response = client.get_read_session(session_id)
>>> if response.session_status == SessionStatus.EXPIRED:
...     # Refresh the session to continue reading
...     refreshed_response = client.get_read_session(session_id, refresh=True)
...     print(f"New expiration: {refreshed_response.expiration_time}")
New expiration: 2023-01-01T14:00:00Z

Monitor session status during a parallel read operation to ensure all splits are processed before expiration:

>>> create_response = client.create_read_session()
>>> for split_index in range(create_response.splits_count):
...     # Check session health before each split
...     status = client.get_read_session(create_response.session_id)
...     if status.session_status == SessionStatus.EXPIRED:
...         client.get_read_session(create_response.session_id, refresh=True)
...     # Read the split data...
...     reader = client.read_rows_stream(...)
get_write_session(session_id: str) GetWriteSessionResponse

Get the current status and stream information of a write session.

This method retrieves metadata about a write session, including the list of active write streams and their states. Use this to monitor the progress of a multi-stream upload operation or to verify that all streams have been properly closed before committing the session.

Parameters:

session_id (str) – The unique identifier of the write session to query. This is obtained from create_write_session response.

Returns:

Response with streams (list of stream info dicts), warning_message, and request_id. See GetWriteSessionResponse.

Return type:

GetWriteSessionResponse

Raises:

ValueError – If session_id is None or empty, or if called on an instance-based client.

See also

create_write_session

Create a write session.

commit_write_session

Commit the session after all streams closed.

create_write_stream

Create a new stream in the session.

Notes

Before calling commit_write_session, ensure all write streams are closed. Use get_write_session to verify that no streams are still in the RUNNING or OPEN state.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Create a write session and check its status before creating streams:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> status_resp = client.get_write_session(session_id)
>>> print(f"Active streams: {status_resp.streams}")
Active streams: None

After creating multiple write streams for parallel upload, use get_write_session to track which streams are active:

>>> # Create multiple streams (see create_write_stream examples)
>>> stream_resp1 = client.create_write_stream(...)
>>> stream_resp2 = client.create_write_stream(...)
>>> status = client.get_write_session(session_id)
>>> print(f"Streams: {len(status.streams) if status.streams else 0}")
Streams: 2

Before committing the session, verify all streams are closed to ensure data upload is complete:

>>> # Close all streams after writing data
>>> client.close_write_stream(...)
>>> client.close_write_stream(...)
>>> # Verify all streams are closed
>>> status = client.get_write_session(session_id)
>>> if status.streams:
...     # Check if any stream is still open
...     open_streams = [s for s in status.streams if s.get('Status') == 'OPEN']
...     if open_streams:
...         print(f"Warning: {len(open_streams)} streams still open")
...     else:
...         # All streams closed, safe to commit
...         client.commit_write_session(session_id)
... else:
...     client.commit_write_session(session_id)

Monitor the progress of a long-running parallel upload by periodically checking stream status:

>>> import time
>>> while True:
...     status = client.get_write_session(session_id)
...     if status.streams:
...         closed_count = sum(
...             1 for s in status.streams
...             if s.get('Status') == 'CLOSED'
...         )
...         print(f"Progress: {closed_count}/{len(status.streams)} streams closed")
...         if closed_count == len(status.streams):
...             break
...     time.sleep(5)
>>> # All streams closed, commit the session
>>> client.commit_write_session(session_id)
get_write_stream(session_id: str, stream_id: str, stream_version: int, route_token=None, exactly_once_mode=False) GetWriteStreamResponse

Get the current status and metadata of a write stream.

This method retrieves information about a specific write stream, including its current state, statistics, and any error information. Use this to monitor stream health during uploads or to verify stream state before closing.

Parameters:
  • session_id (str) – The unique identifier of the write session.

  • stream_id (str) – The identifier of the stream to query (typically 0, 1, 2, …).

  • stream_version (int) – The version number of the stream.

Returns:

Response with status, record_count, byte_size, error_code, and error_message. See GetWriteStreamResponse.

Return type:

GetWriteStreamResponse

Raises:

ValueError – If any parameter is None or empty, or if called on an instance-based client.

See also

create_write_stream

Create a stream before getting its status.

close_write_stream

Close the stream after verifying status.

get_write_session

Get all streams in a session.

Notes

Use get_write_stream to check if a stream has encountered errors before attempting to close it. If a stream is in ERROR state, it may need to be recreated with an incremented stream_version.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Create a write stream and check its status before writing data:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0)
>>> print(f"Stream status: {status.status}")
Stream status: OPEN

After writing data, check the stream statistics to verify upload progress before closing:

>>> # Write data to the stream
>>> writer = client.write_rows_stream(...)
>>> writer.write(batch)
>>> writer.finish()
>>> # Check stream statistics
>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0)
>>> print(f"Uploaded {status.record_count or 0} records, {status.byte_size or 0} bytes")
Uploaded 1000 records, 524288 bytes
>>> # Close the stream after verifying the upload
>>> client.close_write_stream(...)

Monitor multiple parallel streams during upload to identify which streams have encountered errors:

>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # Create 3 parallel streams and write data
>>> for i in range(3):
...     stream_resp = client.create_write_stream(session_id, stream_id=i)
>>> # Check status of all streams
>>> for i in range(3):
...     status = client.get_write_stream(session_id, stream_id=str(i), stream_version=0)
...     if status.status == 'ERROR':
...         print(f"Stream {i} failed: {status.error_message}")
...     else:
...         print(f"Stream {i}: {status.status}, {status.record_count or 0} records")
Stream 0: OPEN, 500 records
Stream 1: OPEN, 300 records
Stream 2: ERROR, Schema validation failed

If a stream shows ERROR status, use get_write_stream to get detailed error information before deciding to retry:

>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0)
>>> if status.status == 'ERROR':
...     print(f"Stream error [{status.error_code}]: {status.error_message}")
...     # Decide whether to retry with new version or abort session
...     if status.error_code == 'SCHEMA_MISMATCH':
...         # Retry with corrected data
...         client.create_write_stream(session_id, stream_id=0, stream_version=1)
...     else:
...         # Abort the entire session
...         client.abort_write_session(session_id)
preview_table(limit=None, partition=None, columns=None) StreamReader

Preview table data without creating a session.

This method provides a lightweight way to sample table data without the overhead of creating a read session. Unlike read_rows_stream, preview_table directly returns an Arrow IPC stream, making it ideal for quick data exploration, schema inspection, or testing table connectivity.

Parameters:
  • limit (int, optional) – Maximum number of rows to preview. If None, returns a small default sample (typically 100-1000 rows).

  • partition (str, optional) – Partition specification to preview specific partition data. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’.

  • columns (list of str, optional) – Specific columns to preview. If empty, all columns are returned.

Returns:

Stream reader with read(), get_status(), get_request_id(), and close() methods. See StreamReader.

Return type:

StreamReader

Raises:

ValueError – If called on an instance-based client (preview only works on tables).

See also

read_rows_stream

Read full table data with session management.

create_read_session

Create a session for large-scale reading.

Notes

Preview is optimized for quick sampling and may not return exact row counts specified in limit. For production data reading with guaranteed row counts and retry support, use create_read_session and read_rows_stream instead.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowReader
... )
>>> import pyarrow as pa
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Preview the first few rows of a table to quickly explore its data:

>>> stream_reader = client.preview_table(limit=10)
>>> arrow_reader = ArrowReader(stream_reader)
>>> batch = arrow_reader.read()
>>> if batch is not None:
...     df = batch.to_pandas()
...     print(df)
   id    name    value
0   1  Alice      100
1   2    Bob      200
2   3  Carol      150

Preview specific columns to check their data types and values without downloading all columns:

>>> reader = ArrowReader(client.preview_table(
...     limit=5, columns=["id", "name"]
... ))
>>> batch = reader.read()
>>> if batch is not None:
...     print(batch.schema)
id: int64
name: string

Preview data from a specific partition to test partition filtering before running a full query:

>>> reader = ArrowReader(client.preview_table(
...     limit=20, partition="pt=20230101"
... ))
>>> batch = reader.read()
>>> df = batch.to_pandas() if batch is not None else None
>>> print(f"Previewed {len(df) if df is not None else 0} rows")
Previewed 20 rows

Use preview to inspect table schema by requesting a small sample. This is useful for understanding column names and types before creating a read session:

>>> reader = ArrowReader(client.preview_table(limit=1))
>>> batch = reader.read()
>>> if batch is not None:
...     for field in batch.schema:
...         print(f"{field.name}: {field.type}")
id: int64
name: string
value: double
pt: string
preview_table_arrow(limit=None, partition=None, columns=None) ArrowReader[source]

Preview table data as Arrow batches.

This is the Arrow convenience wrapper for preview_table(). It reads a small sample of rows and returns them via an ArrowReader that yields pyarrow.RecordBatch objects.

Parameters:
  • limit (int, optional) – Maximum number of rows to preview. If None, returns a small default sample (typically 100-1000 rows).

  • partition (str, optional) – Partition specification to preview specific partition data. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’.

  • columns (list of str, optional) – Specific columns to preview. If empty, all columns are returned.

Returns:

Arrow batch reader with read() method yielding RecordBatch objects. See ArrowReader.

Return type:

ArrowReader

read_rows_arrow(session_id, split_index=None, row_offset=None, row_count=None, max_batch_rows=4096, skip_row_num=0, max_batch_raw_size=0, data_format=None, data_columns=None, compression=None, route_token=None) ArrowReader[source]

Read one split of the read session as Arrow batches.

This is the Arrow convenience wrapper for read_rows_stream(). It reads raw bytes from the server and wraps them in an ArrowReader that yields pyarrow.RecordBatch objects.

Parameters:
  • session_id (str) – The read session identifier from create_read_session.

  • split_index (int, optional) – Which split to read (0 to splits_count-1). If None, reads all data in the session.

  • row_offset (int, optional) – Starting row offset within the split. Defaults to 0.

  • row_count (int, optional) – Maximum number of rows to read. If None, reads all rows in the split.

  • max_batch_rows (int, default 4096) – Maximum rows per Arrow batch in the stream. Controls memory usage during reading.

  • skip_row_num (int, default 0) – Number of rows to skip before reading.

  • max_batch_raw_size (int, default 0) – Maximum raw byte size per batch. 0 means no limit.

  • data_format (DataFormat, optional) – Format of returned data (Arrow V5 is default).

  • data_columns (list of str, optional) – Specific columns to read. Must match session schema.

  • compression (Compression, default None) – Compression algorithm for the stream data. None means Compression.UNCOMPRESSED.

Returns:

Arrow batch reader with read() method yielding RecordBatch objects. See ArrowReader.

Return type:

ArrowReader

read_rows_stream(session_id=None, split_index=None, row_offset=None, row_count=None, max_batch_rows=4096, skip_row_num=0, max_batch_raw_size=0, data_format=None, data_columns=None, compression=None, route_token=None) StreamReader

Read data from a specific split in a read session.

This method reads a chunk of data from a table or instance result by specifying which split to read. The data is returned as a stream that can be processed incrementally, supporting efficient handling of large datasets. Each split can be read independently, enabling parallel processing of the same session across multiple workers.

Parameters:
  • session_id (str) – The read session identifier from create_read_session.

  • split_index (int, optional) – Which split to read (0 to splits_count-1). If None, reads all data in the session.

  • row_offset (int, optional) – Starting row offset within the split. Defaults to 0.

  • row_count (int, optional) – Maximum number of rows to read. If None, reads all rows in the split.

  • max_batch_rows (int, default 4096) – Maximum rows per Arrow batch in the stream. Controls memory usage during reading.

  • skip_row_num (int, default 0) – Number of rows to skip before reading.

  • max_batch_raw_size (int, default 0) – Maximum raw byte size per batch. 0 means no limit.

  • data_format (DataFormat, optional) – Format of returned data (Arrow V5 is default).

  • data_columns (list of str, optional) – Specific columns to read. Must match session schema.

  • compression (Compression, default None) – Compression algorithm for the stream data. None means Compression.UNCOMPRESSED.

Returns:

Stream reader with read(), get_status(), get_request_id(), and close() methods. See StreamReader.

Return type:

StreamReader

Raises:

ValueError – If the session or split parameters are invalid.

See also

create_read_session

Create a read session first.

ArrowReader

Wrap StreamReader to read Arrow batches.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowReader
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Read data from a specific split using ArrowReader for convenient batch processing:

>>> reader = client.read_rows_stream(session_id, split_index=0)
>>> arrow_reader = ArrowReader(reader)
>>> while True:
...     batch = arrow_reader.read()
...     if batch is None:
...         break
...     df = batch.to_pandas()
...     # Process dataframe
>>> arrow_reader.get_request_id()  # Get request ID after completion

For parallel processing, distribute splits across multiple workers. Each worker reads a different split index from the same session:

>>> # Worker 1 reads split 0
>>> reader1 = client.read_rows_stream(session_id, split_index=0)
>>> # Worker 2 reads split 1
>>> reader2 = client.read_rows_stream(session_id, split_index=1)

Control memory usage by limiting batch size with max_batch_rows. Smaller batches reduce memory footprint but may have lower throughput:

>>> reader = client.read_rows_stream(session_id, split_index=0, max_batch_rows=1024)
>>> arrow_reader = ArrowReader(reader)
>>> # Read batches one at a time to control memory
>>> while True:
...     batch = arrow_reader.read()
...     if batch is None:
...         break
...     df = batch.to_pandas()
...     # Process df and then discard to free memory

Read a specific range of rows within a split by using row_offset and row_count parameters:

>>> reader = client.read_rows_stream(
...     session_id,
...     split_index=0,
...     row_offset=1000,  # Skip first 1000 rows
...     row_count=500,    # Read 500 rows
... )
property route_token

The stored route token for session affinity.

This is automatically updated from response headers. Pass route_token=... to individual methods to override, or rely on this stored value when not specified.

write_blob_batch(items: List[BlobWriteItem], session_id=None, stream_id=None, stream_version=0, partition_values=None, column_index=0) WriteBlobResponse

Upload multiple blobs in a single batch request for efficiency.

This method uploads multiple binary blobs (such as images, videos, or files) in one consolidated request, reducing network overhead compared to individual uploads. Each blob is packaged with metadata (partition location, column index, MIME type) and optional checksum verification.

Parameters:
  • items (list of BlobWriteItem) – List of blob items to upload. Each item contains data (bytes), partition_values (list of str), column_index (int), distribution_key (str), mime_type (str), and checksum_type (ChecksumType). See BlobWriteItem.

  • session_id (str, optional) – The write session identifier from create_write_session.

  • stream_id (str or int, optional) – Stream identifier for this batch upload.

  • stream_version (int, default 0) – Version number for the stream.

  • partition_values (list of str, optional) – Default partition values for blobs (can be overridden per item). Format: [‘pt=20230101’, ‘region=us-west’].

  • column_index (int, default 0) – Default column index for blobs (can be overridden per item).

Returns:

Response with blob_reference, blob_references, warning_message, and size. See WriteBlobResponse.

Return type:

WriteBlobResponse

Raises:

ValueError – If called on an instance-based client.

See also

write_blob_stream

Upload single blob via streaming.

read_blobs

Read uploaded blobs using the references.

BlobWriteItem

Class to construct blob items.

Notes

Batch upload is more efficient than individual uploads when uploading many small blobs. The method uses a custom wire format with header, data, and footer sections for each blob. Checksums (CRC32 or MD5) are computed and included in the footer for server-side verification. The order of blob_references matches the input items order.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, BlobWriteItem, ChecksumType
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Upload multiple image blobs in a single batch request for efficient network usage:

>>> # 1. Create write session and stream
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 2. Prepare multiple blob items
>>> items = []
>>> for image_path in ['img1.jpg', 'img2.jpg', 'img3.jpg']:
...     with open(image_path, 'rb') as f:
...         image_data = f.read()
...     item = BlobWriteItem(
...         data=image_data,
...         partition_values=['pt=20230101'],
...         column_index=0,
...         mime_type='image/jpeg',
...         checksum_type=ChecksumType.CRC32  # Verify integrity
...     )
...     items.append(item)
>>> # 3. Upload all blobs in one batch
>>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0)
>>> print(f"Uploaded {len(batch_resp.blob_references)} blobs")
>>> print(f"Total size: {batch_resp.size} bytes")
Uploaded 3 blobs
Total size: 1572864 bytes
>>> # 4. Store references for later reading
>>> blob_refs = batch_resp.blob_references
>>> # 5. Close stream and commit session
>>> client.close_write_stream(session_id, stream_id=0)
>>> client.commit_write_session(session_id)

Upload blobs to different partitions and columns by customizing each BlobWriteItem’s metadata:

>>> items = [
...     BlobWriteItem(
...         data=blob1_data,
...         partition_values=['pt=20230101'],
...         column_index=0
...     ),
...     BlobWriteItem(
...         data=blob2_data,
...         partition_values=['pt=20230102'],
...         column_index=1
...     ),
...     BlobWriteItem(
...         data=blob3_data,
...         partition_values=['pt=20230103'],
...         column_index=0
...     ),
... ]
>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> # Each blob is stored in its specified partition/column

Use MD5 checksum for stronger integrity verification on critical data uploads. The checksum is computed and sent with the blob:

>>> critical_data = read_critical_file()
>>> item = BlobWriteItem(
...     data=critical_data,
...     column_index=0,
...     checksum_type=ChecksumType.MD5
... )
>>> response = client.write_blob_batch([item], session_id, stream_id=0)
>>> # Server verifies MD5 checksum matches computed value

Add MIME type metadata to help applications understand blob content type without examining the binary data:

>>> items = [
...     BlobWriteItem(
...         data=json_data_bytes,
...         mime_type='application/json',
...         column_index=0
...     ),
...     BlobWriteItem(
...         data=pdf_data_bytes,
...         mime_type='application/pdf',
...         column_index=1
...     ),
... ]
>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> # MIME types stored with blobs for content type hints

Use distribution_key for hash-based storage to ensure blobs are distributed across storage locations based on the key:

>>> items = [
...     BlobWriteItem(
...         data=blob_data,
...         distribution_key='user123',
...         column_index=0
...     ),
... ]
>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> # Blob stored at location determined by distribution key hash

Upload a large number of small files efficiently by batching them instead of uploading individually:

>>> # Prepare 100 small file blobs
>>> items = []
>>> for file_path in small_files:  # 100 small files
...     with open(file_path, 'rb') as f:
...         data = f.read()
...     items.append(BlobWriteItem(data=data, column_index=0))
>>> # Upload all in one batch (much faster than 100 individual uploads)
>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> print(f"Batch uploaded {len(response.blob_references)} blobs")
Batch uploaded 100 blobs

Check for warnings in the response to detect partial upload failures or other issues:

>>> response = client.write_blob_batch(items, session_id, stream_id=0)
>>> if response.warning_message:
...     print(f"Upload warning: {response.warning_message}")
...     # Some blobs may have had issues
...     # Check if all blob_references are present
...     if len(response.blob_references) < len(items):
...         print(f"Only {len(response.blob_references)} of {len(items)} uploaded")
Upload warning: 2 blobs exceeded size limit
Only 98 of 100 uploaded

Use the returned blob_references to read the blobs later by calling read_blobs with the reference list:

>>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0)
>>> blob_refs = batch_resp.blob_references
>>> # Later, read the blobs back
>>> blob_iterator = client.read_blobs(blob_references=blob_refs)
>>> for data, mime_type in blob_iterator:
...     # Process each blob data
...     print(f"Read blob: {len(data)} bytes, type: {mime_type}")
write_blob_stream(session_id, stream_id, stream_version=0, partition_values=None, column_index=0, compression=None) BlobStreamWriter

Upload a single blob via streaming upload.

This method creates a streaming writer for uploading a single binary blob (such as an image, video, or large binary file) to a specific column in a MaxCompute table. The data is optionally compressed using the specified compression algorithm and verified with MD5 checksum to ensure upload integrity.

Parameters:
  • session_id (str) – The write session identifier from create_write_session.

  • stream_id (str or int) – Stream identifier for this upload.

  • stream_version (int, default 0) – Version number for the stream.

  • partition_values (list of str, optional) – Partition values for the blob location. Format: [‘pt=20230101’, ‘region=us-west’].

  • column_index (int, default 0) – Column index in the table schema where the blob will be stored.

  • compression (Compression, default None) – Compression algorithm to use. None means Compression.UNCOMPRESSED. See Compression. Supported values: Compression.ZSTD, Compression.LZ4_FRAME, Compression.UNCOMPRESSED.

Returns:

Blob stream writer with write(), finish(), get_status(), get_request_id(), and writable() methods. See BlobStreamWriter.

Return type:

BlobStreamWriter

Raises:
  • ValueError – If called on an instance-based client.

  • errors.DependencyNotInstalledError – If zstandard library is not installed (required for ZSTD compression).

  • errors.ChecksumError – If MD5 checksum verification fails after finish().

See also

write_blob_batch

Upload multiple blobs in one request.

read_blobs

Read uploaded blobs by reference.

BlobWriteItem

Helper class for batch blob uploads.

Notes

Blob uploads default to no compression (Compression.UNCOMPRESSED). Compression algorithms like ZSTD or LZ4_FRAME can be enabled for efficient transfer. Set compression=Compression.ZSTD for zstd compression or compression=Compression.LZ4_FRAME for lz4. The writer computes MD5 checksum incrementally during write and verifies against server response when finished. If the checksum doesn’t match, a ChecksumError is raised, indicating data corruption during upload.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient
... )
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Complete workflow for uploading a single image blob to a table with a binary column:

>>> # 1. Create write session
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # 2. Create write stream
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 3. Upload blob data
>>> blob_writer = client.write_blob_stream(
...     session_id, stream_id=0,
...     partition_values=['pt=20230101'],
...     column_index=0  # First column is the blob column
... )
>>> # Read image file and upload in chunks
>>> with open('image.jpg', 'rb') as f:
...     while True:
...         chunk = f.read(8192)  # 8KB chunks
...         if not chunk:
...             break
...         blob_writer.write(chunk)
>>> # 4. Finish and verify MD5
>>> response = blob_writer.finish()
>>> print(f"Blob reference: {response.blob_reference}")
>>> print(f"Uploaded size: {response.size} bytes")
Blob reference: blob_ref_abc123
Uploaded size: 524288 bytes
>>> # 5. Close stream and commit session
>>> client.close_write_stream(session_id, stream_id=0)
>>> client.commit_write_session(session_id)

Upload blob data directly from memory without reading from a file, useful for dynamically generated binary data:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> # Generate or prepare binary data
>>> binary_data = b"generated binary content..."
>>> blob_writer.write(binary_data)
>>> # Finish upload
>>> response = blob_writer.finish()
>>> print(f"Blob uploaded: {response.blob_reference}")

Upload a blob to a specific partition by providing partition values. The blob is stored in the specified partition’s location:

>>> blob_writer = client.write_blob_stream(
...     session_id, stream_id=0,
...     partition_values=['pt=20230101', 'region=us-west'],
...     column_index=0
... )
>>> blob_writer.write(image_data)
>>> response = blob_writer.finish()
>>> # Blob is now in partition pt=20230101/region=us-west

Handle checksum errors when the upload integrity check fails, indicating data corruption during transfer:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> blob_writer.write(data)
>>> try:
...     response = blob_writer.finish()
...     print("Upload successful")
... except errors.ChecksumError as e:
...     print(f"Checksum mismatch: {e}")
...     # Data was corrupted during upload
...     # Retry the upload or abort the session
...     client.abort_write_session(session_id)
Checksum mismatch: MD5 value mismatch, expected: abc123, actual: def456

Upload large blobs incrementally by reading and writing in chunks to avoid loading the entire blob into memory:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> # Upload a large video file in 64KB chunks
>>> chunk_size = 65536
>>> with open('large_video.mp4', 'rb') as f:
...     while True:
...         chunk = f.read(chunk_size)
...         if not chunk:
...             break
...         success = blob_writer.write(chunk)
...         if not success:
...             print("Writer closed unexpectedly")
...             break
>>> response = blob_writer.finish()
>>> print(f"Large video uploaded: {response.size} bytes")
Large video uploaded: 104857600 bytes

Upload text data by converting strings to bytes. The writer automatically handles the conversion:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> text_content = "This is text data to store as a blob"
>>> blob_writer.write(text_content)  # Automatically converted to bytes
>>> response = blob_writer.finish()

Monitor writer status during long uploads to detect early failures:

>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0)
>>> for chunk in large_data_chunks:
...     success = blob_writer.write(chunk)
...     if not success or blob_writer.get_status() != Status.RUNNING:
...         print("Upload stopped unexpectedly")
...         break
>>> response = blob_writer.finish()
>>> request_id = blob_writer.get_request_id()
write_rows_arrow(session_id=None, stream_id=None, stream_version=0, record_count=0, compression=None, route_token=None, row_offset=-1, access_token=None) ArrowWriter[source]

Write Arrow batches to a write stream.

This is the Arrow convenience wrapper for write_rows_stream(). It creates an ArrowWriter that serializes pyarrow.RecordBatch objects into Arrow IPC format and writes them to the underlying stream.

Parameters:
  • session_id (str) – The write session identifier.

  • stream_id (str or int) – The stream identifier from create_write_stream.

  • stream_version (int, default 0) – Version of the stream (should match create_write_stream).

  • record_count (int, default 0) – Total number of records to be written. Set this to the expected count for validation, or 0 if unknown.

  • compression (Compression, default None) – Compression algorithm for the uploaded data stream. None means Compression.UNCOMPRESSED.

  • row_offset (int, default -1) – Row offset for Exactly-Once mode. -1 means not used.

  • access_token (str, optional) – Access token for Exactly-Once mode.

Returns:

Arrow batch writer with write() and finish() methods. See ArrowWriter.

Return type:

ArrowWriter

write_rows_stream(session_id, stream_id, stream_version=0, record_count=0, compression=None, route_token=None, row_offset=-1, access_token=None) StreamWriter

Write row data to a write stream via streaming upload.

This method creates a streaming writer for uploading row data to a write stream. The writer accepts Arrow record batches or raw binary data and uploads it incrementally, enabling efficient handling of large datasets without loading all data into memory at once.

Parameters:
  • session_id (str) – The write session identifier.

  • stream_id (str or int) – The stream identifier from create_write_stream.

  • stream_version (int, default 0) – Version of the stream (should match create_write_stream).

  • record_count (int, default 0) – Total number of records to be written. Set this to the expected count for validation, or 0 if unknown.

  • compression (Compression, default None) – Compression algorithm for the uploaded data stream. None means Compression.UNCOMPRESSED.

Returns:

Stream writer with write(), finish(), get_status(), get_request_id(), and writable() methods. See StreamWriter.

Return type:

StreamWriter

Raises:

ValueError – If called on an instance-based client.

See also

create_write_stream

Create a stream before writing.

ArrowWriter

Convert Arrow batches to stream format.

close_write_stream

Close the stream after finishing write.

Notes

The returned StreamWriter accepts binary data directly. For Arrow workflows, wrap it with ArrowWriter which converts record batches to Arrow IPC format. The writer must be finished before closing the stream. Call finish() after writing all batches.

Examples

>>> from odps import ODPS
>>> from odps.apis.storage_api_v2 import (
...     StorageApiClient, ArrowWriter, Compression
... )
>>> import pyarrow as pa
>>> odps = ODPS(
...     access_id="your_access_id",
...     secret_access_key="your_secret_access_key",
...     project="your_project",
...     endpoint="your_endpoint"
... )
>>> table = odps.get_table("your_table")
>>> client = StorageApiClient(odps, table)

Complete workflow using ArrowWriter to upload Arrow record batches:

>>> # 1. Create write session
>>> session_resp = client.create_write_session()
>>> session_id = session_resp.session_id
>>> # 2. Create write stream
>>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 3. Write Arrow data
>>> schema = pa.schema([
...     pa.field("id", pa.int64()),
...     pa.field("name", pa.string()),
...     pa.field("value", pa.float64()),
... ])
>>> batch = pa.record_batch([
...     pa.array([1, 2, 3]),
...     pa.array(["Alice", "Bob", "Carol"]),
...     pa.array([100.0, 200.0, 150.0]),
... ], schema=schema)
>>> stream_writer = client.write_rows_stream(session_id, stream_id=0, record_count=3)
>>> arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED)
>>> arrow_writer.write(batch)
>>> commit_msg, success = arrow_writer.finish()
>>> print(f"Upload successful: {success}")
Upload successful: True
>>> # 4. Close stream and commit session
>>> client.close_write_stream(session_id, stream_id=0)
>>> client.commit_write_session(session_id)

Read / Write Sessions

class odps.apis.storage_api_v2.CreateReadSessionRequest(**kwargs)[source]

Request for creating a read session.

required_data_columns

List of column names to read. If empty, all columns are returned.

Type:

list of str

required_partition_columns

Partition columns to include in the result.

Type:

list of str

required_partitions

Specific partition values to read (e.g., [‘pt=20230101’]).

Type:

list of str

required_bucket_ids

Bucket IDs to read for bucket-based tables.

Type:

list of str

split_options

Controls how data is split into chunks. Defaults to size-based splitting with 256MB chunks.

Type:

SplitOptions

arrow_options

Arrow format settings like timestamp precision.

Type:

ArrowOptions

filter_predicate

SQL-like filter condition to apply during reading.

Type:

str

filter_predicate_fallback

Whether to fallback to server-side filtering if predicate pushdown fails.

Type:

bool

split_max_file_num

Maximum number of files per split for file-based splitting.

Type:

int

incremental_read

Enable incremental reading mode for capturing table changes.

Type:

bool

incremental_read_options

Options for incremental read mode (version range, timestamp range).

Type:

IncrementalReadOptions

class odps.apis.storage_api_v2.CreateReadSessionResponse[source]

Response from creating a read session.

session_id

Unique identifier for this read session. Required for all subsequent read operations.

Type:

str

session_status

Current status (INIT, NORMAL, EXPIRED, etc).

Type:

SessionStatus

splits_count

Number of data splits available for parallel reading.

Type:

int

record_count

Total number of records across all splits.

Type:

int

data_schema

Schema information for data and partition columns.

Type:

DataSchema

expiration_time

When the session expires and needs refresh.

Type:

str

download_id

Download ID (used for instance-based reads, mapped to session_id).

Type:

str

status

Status field for instance-based reads (mapped to session_status).

Type:

SessionStatus

session_type

Type of the read session.

Type:

str

supported_data_format

List of data formats supported by the server.

Type:

list of DataFormat

split_mode

Mode used for splitting the data.

Type:

str

split_bucket_id

Bucket ID used for bucket-based splitting.

Type:

str

session_stats

Statistics about the session (estimated size, row count).

Type:

SessionStats

latest_version

Latest data version for incremental reads.

Type:

int

message

Server message, if any.

Type:

str

enable_large_string

Whether large string support is enabled.

Type:

bool

incremental_read_options

Options for incremental read from the server response.

Type:

IncrementalReadOptions

request_id

Request ID for debugging (set from response headers).

Type:

str

class odps.apis.storage_api_v2.CreateWriteSessionRequest(**kwargs)[source]

Request for creating a write session.

partial_partition_spec

Partition specification for writing to a specific partition. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’. If empty, writes to the table’s default location.

Type:

str

flags

Additional flags for session configuration. Common flags include ‘overwrite’ to replace existing partition data.

Type:

dict

class odps.apis.storage_api_v2.CreateWriteSessionResponse(**kwargs)[source]

Response from creating a write session.

session_id

Unique identifier for this write session. Required for creating write streams and committing the session.

Type:

str

warning_message

Warning message if the session has any issues.

Type:

str

route_token

Routing token for load balancing, extracted from response headers. Pass this token to subsequent operations (create_write_stream, commit_write_session, etc.) to ensure session affinity.

Type:

str

request_id

Request ID for debugging (set from response headers).

Type:

str

class odps.apis.storage_api_v2.GetWriteSessionResponse(**kwargs)[source]

Response from getting write session status.

streams

Information about all write streams created in this session. Each stream dict contains stream_id, stream_version, and stream status.

Type:

list of dict

warning_message

Warning message if any streams have issues.

Type:

str

route_token

Routing token for load balancing, extracted from response headers. Pass this token to subsequent operations to ensure session affinity.

Type:

str

request_id

Request ID for debugging (set from response headers).

Type:

str

class odps.apis.storage_api_v2.SessionStatus(value)[source]
class odps.apis.storage_api_v2.SessionStats(**kwargs)[source]

Read / Write Streams

class odps.apis.storage_api_v2.CreateWriteStreamRequest(session_id, stream_id, stream_version=0, exactly_once_mode=False)[source]

Request for creating a write stream within an active write session.

session_id

The write session identifier from create_write_session.

Type:

str

stream_id

Unique identifier for this stream within the session. Typically an integer (0, 1, 2, …) for parallel streams.

Type:

str or int

stream_version

Version number for the stream. Increment if retrying after a failed upload to the same stream_id. Default 0.

Type:

int

exactly_once_mode

Whether to enable Exactly-Once semantics for this stream. When enabled, the server returns an access token and tracks row offsets for idempotent writes. Default False.

Type:

bool

class odps.apis.storage_api_v2.CreateWriteStreamResponse(**kwargs)[source]

Response from creating a write stream.

data_schema

The table schema for this stream, including column names and types. Use this to validate data before writing.

Type:

dict

table_id

Internal table identifier.

Type:

str

schema_version

Schema version number.

Type:

str

route_token

Routing token for load balancing, extracted from response headers.

Type:

str

request_id

Request ID for debugging (set from response headers).

Type:

str

class odps.apis.storage_api_v2.GetWriteStreamResponse(**kwargs)[source]

Response from getting write stream status.

status

Current stream state (OPEN, CLOSED, ERROR).

Type:

str

record_count

Number of records written to this stream.

Type:

int

byte_size

Total bytes uploaded through this stream.

Type:

int

error_code

Error code if stream is in ERROR state.

Type:

str

error_message

Detailed error message if stream failed.

Type:

str

request_id

Request ID for debugging (set from response headers).

Type:

str

class odps.apis.storage_api_v2.ReadStreamRequest(session_id, split_index=None, row_offset=None, row_count=None, max_batch_rows=4096, skip_row_num=0, max_batch_raw_size=0, data_format=None, data_columns=None, compression=None)[source]

Request for reading data from a specific split in a read session.

session_id

The read session identifier from create_read_session.

Type:

str

split_index

Which split to read (0 to splits_count-1). If None, reads all data in the session.

Type:

int, optional

row_offset

Starting row offset within the split. Defaults to 0.

Type:

int, optional

row_count

Maximum number of rows to read. If None, reads all rows in the split.

Type:

int, optional

max_batch_rows

Maximum rows per Arrow batch in the stream. Controls memory usage during reading. Default 4096.

Type:

int

skip_row_num

Number of rows to skip before reading. Default 0.

Type:

int

max_batch_raw_size

Maximum raw byte size per batch. 0 means no limit. Default 0.

Type:

int

data_format

Format of returned data (Arrow V5 is default).

Type:

DataFormat, optional

data_columns

Specific columns to read. Must match session schema.

Type:

list of str

compression

Compression algorithm for the stream data. None means Compression.UNCOMPRESSED.

Type:

Compression, default None

class odps.apis.storage_api_v2.WriteStreamRequest(session_id, stream_id, stream_version=0, record_count=0, compression=None, row_offset=-1, access_token=None)[source]

Request for writing row data to a write stream via streaming upload.

session_id

The write session identifier.

Type:

str

stream_id

The stream identifier from create_write_stream.

Type:

str or int

stream_version

Version of the stream (should match create_write_stream). Default 0.

Type:

int

record_count

Total number of records to be written. Set this to the expected count for validation, or 0 if unknown. Default 0.

Type:

int

compression

Compression algorithm for the uploaded data stream. None means Compression.UNCOMPRESSED.

Type:

Compression, default None

row_offset

Row offset for Exactly-Once mode. Set to -1 (default) to not send the offset. In Exactly-Once mode, this tracks the starting row position for idempotent writes.

Type:

int

access_token

Access token for Exactly-Once mode. Obtained from CreateWriteStreamResponse or GetWriteStreamResponse.

Type:

str

class odps.apis.storage_api_v2.CloseWriteStreamRequest(session_id, stream_id, stream_version=0)[source]

Request for closing a write stream to finalize the data upload.

session_id

The write session identifier.

Type:

str

stream_id

The stream identifier to close.

Type:

str or int

stream_version

Version of the stream (should match create_write_stream). Default 0.

Type:

int

class odps.apis.storage_api_v2.CloseWriteStreamResponse(**kwargs)[source]

Response from closing a write stream.

warning_message

Warning message if the stream closure has any issues. Check this for potential problems even if close succeeds.

Type:

str

request_id

Request ID for debugging (set from response headers).

Type:

str

class odps.apis.storage_api_v2.StreamReader(download)[source]

Stream reader for reading Arrow IPC formatted data from the server.

read(nbytes=None)[source]

Read data from the stream. Returns bytes.

get_status()[source]

Check reading status (RUNNING or OK).

get_request_id()[source]

Get request ID after completion.

close()[source]

Close the stream.

readable()[source]

Check if the stream can still be read.

close()[source]

Flush and close the IO object.

This method has no effect if the file is already closed.

readable()[source]

Return whether object was opened for reading.

If False, read() will raise OSError.

class odps.apis.storage_api_v2.StreamWriter(upload, on_route_token=None)[source]

Stream writer for uploading binary data to the server.

Expects Arrow IPC formatted binary data. Use ArrowWriter to convert Arrow record batches to this format.

write(data)[source]

Write binary data to the stream. Returns True on success, False if writer is closed or error occurred.

finish()[source]

Finish writing and close the upload. Returns (commit_message, success_bool). In Exactly-Once mode, the response body is also parsed into a WriteStreamResponse accessible via get_write_stream_response().

get_status()[source]

Check writer status (RUNNING during write, OK after finish).

get_request_id()[source]

Get request ID after finish() for debugging.

get_route_token()[source]

Get route token from write response headers for session affinity. Call after finish(). Returns None if not available.

get_write_stream_response()[source]

Get the parsed WriteStreamResponse after finish(). Returns None if finish() has not been called or if the response did not contain exactly-once data.

writable()[source]

Check if the writer can still accept data.

get_write_stream_response()[source]

Get the parsed WriteStreamResponse after finish().

In Exactly-Once mode, the server response contains ExactlyOnceRowOffset which tracks the committed row position. This method returns that parsed response, or None if not available.

writable()[source]

Return whether object was opened for writing.

If False, write() will raise OSError.

class odps.apis.storage_api_v2.ArrowReader(stream_reader)[source]

Arrow batch reader that wraps a StreamReader.

read()[source]

Read the next Arrow record batch from the stream. Returns None when all batches have been read.

class odps.apis.storage_api_v2.ArrowWriter(stream_writer, compression)[source]

Arrow batch writer that wraps a StreamWriter.

Converts Arrow record batches to Arrow IPC format and writes them to the underlying stream.

write(record_batch)[source]

Write an Arrow RecordBatch to the stream. Returns True on success, False if writer is closed or error occurred.

finish()[source]

Finish writing and close the upload. Returns (commit_message, success_bool).

get_write_stream_response()[source]

Get the parsed WriteStreamResponse after finish().

In Exactly-Once mode, the server response contains ExactlyOnceRowOffset which tracks the committed row position. Delegates to the underlying StreamWriter.

Blob I/O

class odps.apis.storage_api_v2.BlobDataIterator(raw_stream)[source]

Iterator that parses the framed blob download protocol.

The download stream is processed through the following layers:
  1. Decompression (if the response is compressed, handled upstream via get_decompress_stream before passing to this iterator)

  2. CRC32C stripping – strips per-block checksums

  3. This iterator parses [HeaderLen][Header][DataLen][Data][FooterLen][Footer] frames

Yields (data_bytes, mime_type) tuples for each blob, where:

  • data_bytesbytes

    The raw binary data of the blob.

  • mime_typestr or None

    MIME type metadata if it was provided during upload, otherwise None.

For single blob downloads, the server may omit framing and send raw decompressed data directly. The iterator automatically detects this case and returns the entire payload as one blob.

read_data(size=-1)[source]

Read up to size bytes of the current blob’s data from the stream.

This is used by BlobStreamReader for chunked reads. The stream position must be within the current blob’s data region.

Parameters:

size (int) – Maximum bytes to read. -1 reads all remaining data of the current blob.

Skip remaining_bytes of unread data and the trailing footer.

Parameters:

remaining_bytes (int) – Number of data bytes still unread in the current frame.

class odps.apis.storage_api_v2.BlobStreamReader(iterator)[source]

File-like reader for streaming blob data from a BlobDataIterator.

Provides read(size) for incremental reads of the current blob, a mime_type property, and a next() method to advance to the next blob.

Unlike iterating over BlobDataIterator which materializes each blob entirely in memory, this reader reads data from the underlying stream in chunks, avoiding buffering the entire blob.

Calling next() before the current blob is fully exhausted raises an IOError. When all blobs have been read, next() returns None.

Parameters:

iterator (BlobDataIterator) – The underlying blob data iterator.

Examples

>>> blob_reader = client.read_blobs(blob_references=refs, stream=True)
>>> while blob_reader is not None:
...     print(f"MIME: {blob_reader.mime_type}")
...     chunk = blob_reader.read(4096)
...     while chunk:
...         process(chunk)
...         chunk = blob_reader.read(4096)
...     blob_reader = blob_reader.next()
property mime_type

MIME type of the current blob.

Type:

str or None

next()[source]

Advance to the next blob in-place and return self.

Raises IOError if the current blob has not been fully read. Returns None when there are no more blobs.

After a successful call, self is updated to reference the next blob’s data and mime_type. Do not retain references to the reader before calling next() — the same object is mutated.

Return type:

BlobStreamReader or None

read(size=-1)[source]

Read up to size bytes from the current blob.

Parameters:

size (int, optional) – Maximum number of bytes to read. -1 (default) reads all remaining bytes of the current blob.

Returns:

Data read. Empty bytes b"" when the current blob is exhausted.

Return type:

bytes

class odps.apis.storage_api_v2.BlobStreamWriter(upload, compression=Compression.UNCOMPRESSED)[source]

Stream writer for single blob upload with MD5 checksum verification.

Automatically computes MD5 checksum and verifies it against the server response. Data can be compressed during upload using the Compression enum.

write(data)[source]

Write binary data chunks to the blob stream. Accepts bytes or string (converted to bytes). Returns True on success.

finish()[source]

Finish writing and get server response with MD5 verification. Returns WriteBlobResponse. Raises ChecksumError if MD5 mismatch detected.

get_status()[source]

Check writer status (RUNNING or OK).

get_request_id()[source]

Get request ID after finish.

writable()[source]

Check if writer can accept more data.

finish()[source]

Finish writing and verify MD5 checksum against server response.

Returns:

WriteBlobResponse on success, None on failure.

writable()[source]

Return whether object was opened for writing.

If False, write() will raise OSError.

class odps.apis.storage_api_v2.BlobWriteItem(data, partition_values=None, column_index=0, distribution_key=None, mime_type=None, checksum_type=ChecksumType.NONE, size=None)[source]

A single blob item for batch upload.

Wire format when serialized:

[8-byte LE header_len][header JSON][8-byte LE data_len][data bytes][8-byte LE footer_len][footer JSON]

serialize()[source]

Serialize this item into the binary frame format.

static write_blobs(items)[source]

Serialize a list of BlobWriteItems into a single byte array.

write_frame_to(stream, chunk_size=262144)[source]

Write this item’s frame to a file-like stream.

Writes: [8-byte LE header_len][header JSON][8-byte LE data_len][data][8-byte LE footer_len][footer JSON]

For bytes data, writes directly. For file-like data, reads and writes in chunks, computing checksums incrementally.

class odps.apis.storage_api_v2.ReadBlobRequest(blob_references)[source]

Request for downloading binary blobs by their references.

blob_references

List of blob reference strings obtained from previous upload operations (write_blob_stream or write_blob_batch). These references uniquely identify the blobs to download. Bytes references are automatically decoded to UTF-8 strings.

Type:

list of str

class odps.apis.storage_api_v2.WriteBlobRequest(session_id, stream_id, stream_version=0, partition_values=None, column_index=0)[source]

Request for uploading blob data (streaming or batch).

session_id

The write session identifier from create_write_session.

Type:

str

stream_id

Stream identifier for this upload.

Type:

str or int

stream_version

Version number for the stream. Default 0.

Type:

int

partition_values

Partition values for the blob location. Format: [‘pt=20230101’, ‘region=us-west’].

Type:

list of str

column_index

Column index in the table schema where the blob will be stored. Default 0.

Type:

int

class odps.apis.storage_api_v2.WriteBlobResponse(**kwargs)[source]

Response from a blob write operation (stream or batch).

blob_reference

Single blob reference if one blob was uploaded.

Type:

str

blob_references

List of blob references matching the input items order. Use these references to read the blobs later.

Type:

list of str

warning_message

Warning message if any blob uploads had issues.

Type:

str

size

Total bytes uploaded.

Type:

int

request_id

Request ID for debugging (set from response headers).

Type:

str

class odps.apis.storage_api_v2.ChecksumType(value)[source]

Preview

class odps.apis.storage_api_v2.PreviewTableRequest(limit=None, partition=None, columns=None)[source]

Request for previewing table data without creating a session.

limit

Maximum number of rows to preview. If None, returns a small default sample (typically 100-1000 rows).

Type:

int, optional

partition

Partition specification to preview specific partition data. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’.

Type:

str, optional

columns

Specific columns to preview. If empty, all columns are returned.

Type:

list of str

Schema and Options

class odps.apis.storage_api_v2.DataSchema(**kwargs)[source]
class odps.apis.storage_api_v2.Column(**kwargs)[source]
class odps.apis.storage_api_v2.SplitOptions(**kwargs)[source]
class SplitMode(value)[source]
class odps.apis.storage_api_v2.ArrowOptions(**kwargs)[source]
class TimestampUnit(value)[source]
class odps.apis.storage_api_v2.Compression(value)[source]

Compression algorithm for Storage API V2 data streams.

UNCOMPRESSED

No compression. Default for all operations.

ZSTD

Zstandard compression. Requires the zstandard library.

LZ4_FRAME

LZ4 frame compression. Requires the lz4 library.

class odps.apis.storage_api_v2.DataFormat(**kwargs)[source]
class odps.apis.storage_api_v2.IncrementalReadOptions(**kwargs)[source]
class odps.apis.storage_api_v2.Status(value)[source]