Storage API V2
- class odps.apis.storage_api_v2.StorageApiClient(odps: ODPS, table_or_instance=None, rest_endpoint: str = None, quota_name: str = None, tags: None | str | List[str] = None)[source]
Client for the MaxCompute Storage API V2.
- read_blobs(blob_references=None, compression=None, stream=False)[source]
Download binary blobs by their blob references.
This method retrieves blob data uploaded via write_blob_stream or write_blob_batch by providing the blob references returned during upload. The data is returned as an iterator that yields tuples of (data_bytes, mime_type), handling protocol framing and CRC stripping automatically.
- Parameters:
blob_references (list of str or bytes, optional) – List of blob references obtained from previous upload operations (write_blob_stream or write_blob_batch). References may be UTF-8 strings or raw bytes and will be decoded automatically if needed.
compression (Compression, default None) – Compression algorithm for the response data. None means Compression.UNCOMPRESSED.
stream (bool, optional) – If True, return a
BlobStreamReaderinstead ofBlobDataIterator. The reader provides file-likeread()access per blob, amime_typeproperty, and anext()method to advance to the next blob. Default is False.
- Returns:
When stream is False (default), returns an iterator yielding
(data_bytes, mime_type)tuples per blob. When stream is True, returns aBlobStreamReaderfor incremental, file-like reading of each blob.- Return type:
See also
write_blob_batchUpload blobs and get references.
write_blob_streamUpload single blob and get reference.
BlobWriteItemHelper for batch uploads.
Notes
The download stream uses a multi-layer protocol: 1. CRC32C checksums are embedded every 4096 bytes (stripped automatically) 2. Protocol framing wraps each blob (parsed automatically)
For single blob downloads, the server may omit framing and send raw decompressed data directly. The iterator automatically detects this case and returns the entire payload as one blob.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Read blobs using references obtained from previous uploads. First upload blobs, then read them back using the references:
>>> # Upload blobs (from earlier write_blob_batch example) >>> blob_refs = ['blob_ref_001', 'blob_ref_002', 'blob_ref_003'] >>> # Read the blobs back using references >>> blob_iterator = client.read_blobs(blob_references=blob_refs) >>> for data, mime_type in blob_iterator: ... print(f"Blob size: {len(data)} bytes, MIME type: {mime_type}") ... # Save blob to file ... filename = f"blob_{blob_refs[i]}" ... with open(filename, 'wb') as f: ... f.write(data) Blob size: 1024 bytes, MIME type: image/jpeg Blob size: 2048 bytes, MIME type: image/png Blob size: 512 bytes, MIME type: application/json
Download a single blob by passing a single-element reference list. The iterator yields one (data, mime_type) tuple:
>>> single_ref = ['blob_ref_abc123'] >>> blob_iterator = client.read_blobs(blob_references=single_ref) >>> data, mime_type = next(blob_iterator) >>> print(f"Downloaded {len(data)} bytes") Downloaded 524288 bytes >>> # Process the blob data >>> if mime_type == 'image/jpeg': ... # Process as JPEG image ... process_image(data)
Download all blobs from a batch upload by storing references during upload and using them to read later:
>>> # Earlier: upload batch >>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0) >>> all_refs = batch_resp.blob_references # Save these >>> # Later: read all blobs from the batch >>> blob_iterator = client.read_blobs(blob_references=all_refs) >>> downloaded_blobs = [] >>> for data, mime_type in blob_iterator: ... downloaded_blobs.append({ ... 'data': data, ... 'mime_type': mime_type, ... 'size': len(data) ... }) >>> print(f"Downloaded {len(downloaded_blobs)} blobs") Downloaded 10 blobs
Process blobs incrementally without loading all into memory by iterating and processing one at a time:
>>> blob_iterator = client.read_blobs(blob_references=large_blob_refs) >>> for i, (data, mime_type) in enumerate(blob_iterator): ... # Process each blob and discard data after processing ... result = analyze_blob(data) ... print(f"Blob {i}: {result}") ... # data is freed after this iteration Blob 0: Analysis complete Blob 1: Analysis complete
Check MIME type to determine how to handle blob content when the type was set during upload:
>>> blob_iterator = client.read_blobs(blob_references=blob_refs) >>> for data, mime_type in blob_iterator: ... if mime_type == 'application/json': ... # Parse as JSON ... import json ... json_obj = json.loads(data.decode('utf-8')) ... process_json(json_obj) ... elif mime_type == 'image/jpeg': ... # Process as image ... display_image(data) ... elif mime_type is None: ... # No type hint, use generic binary handling ... process_binary(data) ... else: ... print(f"Unknown MIME type: {mime_type}")
Convert blob iterator to list for multiple iterations or indexing, though this loads all data into memory:
>>> blob_iterator = client.read_blobs(blob_references=blob_refs) >>> all_blobs = list(blob_iterator) # [(data1, mime1), (data2, mime2), ...] >>> # Now can access by index >>> first_blob_data = all_blobs[0][0] >>> first_blob_mime = all_blobs[0][1] >>> # Can iterate multiple times >>> for data, mime_type in all_blobs: ... process_blob(data) >>> for data, mime_type in all_blobs: ... validate_blob(data)
Handle empty blob reference list by checking iterator length or attempting to iterate:
>>> empty_refs = [] >>> blob_iterator = client.read_blobs(blob_references=empty_refs) >>> count = 0 >>> for _ in blob_iterator: ... count += 1 >>> print(f"Downloaded {count} blobs") Downloaded 0 blobs
Use blob references from a table query to download specific blobs referenced in table rows:
>>> # Query table to get blob references >>> instance = odps.execute_sql("SELECT blob_ref FROM your_table WHERE id=1") >>> with instance.open_reader() as reader: ... for record in reader: ... blob_ref = record[0] # blob_ref column ... # Download the referenced blob ... blob_iterator = client.read_blobs(blob_references=[blob_ref]) ... data, mime_type = next(blob_iterator) ... print(f"Downloaded blob {blob_ref}: {len(data)} bytes")
Stream blobs incrementally using file-like read interface with
stream=True:>>> reader = client.read_blobs(blob_references=refs, stream=True) >>> while reader is not None: ... print(f"MIME: {reader.mime_type}") ... chunk = reader.read(4096) ... while chunk: ... process(chunk) ... chunk = reader.read(4096) ... reader = reader.next()
- abort_write_session(session_id: str, route_token=None)[source]
Abort a write session to discard all uploaded data.
This method cancels an active write session and discards all data uploaded through its streams. Use abort when you encounter errors during the upload process or when you want to cancel the operation without making the data visible in the table. Aborting releases all resources associated with the session.
- Parameters:
session_id (str) – The unique identifier of the write session to abort. This can be any active or expired session.
- Returns:
Method returns None after aborting the session. All uploaded data is discarded and session resources are released.
- Return type:
None
- Raises:
ValueError – If session_id is None or empty, or if called on an instance-based client.
See also
create_write_sessionCreate a write session.
commit_write_sessionCommit the session instead of aborting.
get_write_sessionCheck session status before aborting.
Notes
After aborting, the session cannot be committed. All data uploaded is permanently discarded. Call abort as soon as you detect errors to release resources quickly. It’s safe to abort a session multiple times if the first abort attempt fails.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Abort a session when an error occurs during data upload to prevent partial data from being committed:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> try: ... # Create stream and write data ... stream_resp = client.create_write_stream(...) ... # Write data that fails due to schema mismatch ... writer = client.write_rows_stream(...) ... writer.write(invalid_data) ... writer.finish() ... except Exception as e: ... print(f"Upload failed: {e}") ... # Abort to discard all uploaded data ... client.abort_write_session(session_id) ... print("Session aborted, no data committed to table") Upload failed: Schema mismatch error Session aborted, no data committed to table
Use abort in cleanup code to ensure resources are released even if commit never happens:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> try: ... # Upload data... ... # Commit if everything succeeds ... client.commit_write_session(session_id) ... finally: ... # If commit didn't happen (exception or early return), ... # ensure session is aborted to release resources ... try: ... status = client.get_write_session(session_id) ... if status.streams is None or len(status.streams) == 0: ... client.abort_write_session(session_id) ... except: ... # Session might already be committed or aborted ... pass
Abort a session after detecting validation errors in the data before attempting to commit:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Write data to streams >>> writer = client.write_rows_stream(...) >>> # Validate the data locally >>> if data_has_errors: ... print("Data validation failed, aborting session") ... client.abort_write_session(session_id) ... else: ... # Data is valid, proceed with commit ... client.close_write_stream(...) ... client.commit_write_session(session_id)
- close_write_stream(session_id=None, stream_id=None, stream_version=0, route_token=None) CloseWriteStreamResponse[source]
Close a write stream to finalize the data upload for that stream.
After finishing the write operation via the stream writer, call this method to formally close the stream. This signals to the server that no more data will be uploaded to this stream and marks the stream as ready for the session commit. All streams must be closed before the session can be committed.
- Parameters:
session_id (str) – The write session identifier.
stream_id (str or int) – The stream identifier to close.
stream_version (int, default 0) – Version of the stream (should match create_write_stream).
- Returns:
Response with warning_message and request_id. See
CloseWriteStreamResponse.- Return type:
- Raises:
ValueError – If called on an instance-based client.
See also
write_rows_streamFinish writing before closing the stream.
create_write_streamCreate the stream before writing.
commit_write_sessionCommit session after all streams closed.
get_write_streamVerify stream is closed.
Notes
The stream writer’s finish() method and close_write_stream serve different purposes: finish() completes the data upload, while close_write_stream formally closes the stream on the server side. Both must be called in sequence: finish the writer, then close the stream. After closing, the stream cannot accept more data.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowWriter, Compression ... ) >>> import pyarrow as pa >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Complete workflow showing the relationship between writer finish() and stream close. First finish the writer, then close the stream:
>>> # 1. Create write session and stream >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> # 2. Write Arrow data >>> batch = pa.record_batch([...], schema=schema) >>> stream_writer = client.write_rows_stream(session_id, stream_id=0, record_count=3) >>> arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED) >>> arrow_writer.write(batch) >>> # 3. Finish the writer to complete data upload >>> commit_msg, success = arrow_writer.finish() >>> print(f"Writer finished: {success}") Writer finished: True >>> # 4. Close the stream to mark it ready for commit >>> close_resp = client.close_write_stream(session_id, stream_id=0) >>> if close_resp.warning_message: ... print(f"Warning: {close_resp.warning_message}") >>> # 5. Commit the session to make data visible >>> client.commit_write_session(session_id)
Close multiple parallel streams after each finishes uploading. All streams must be closed before committing the session:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Create 3 parallel streams >>> for stream_id in range(3): ... # Create stream, write data, finish writer ... client.create_write_stream(session_id, stream_id=stream_id) ... writer = ArrowWriter( ... client.write_rows_stream(session_id, stream_id=stream_id), ... Compression.UNCOMPRESSED ... ) ... writer.write(batch) ... writer.finish() >>> # Close all 3 streams >>> for stream_id in range(3): ... close_resp = client.close_write_stream(session_id, stream_id=stream_id) ... print(f"Stream {stream_id} closed") Stream 0 closed Stream 1 closed Stream 2 closed >>> # Now commit the session >>> client.commit_write_session(session_id)
Check for warnings when closing streams to detect potential issues that may affect the commit:
>>> close_resp = client.close_write_stream(session_id, stream_id=0) >>> if close_resp.warning_message: ... print(f"Stream close warning: {close_resp.warning_message}") ... # Decide whether to proceed with commit or abort ... if "data_incomplete" in close_resp.warning_message: ... client.abort_write_session(session_id) ... else: ... # Warning is informational, proceed with commit ... client.commit_write_session(session_id) ... else: ... client.commit_write_session(session_id)
Use get_write_stream to verify a stream is properly closed before attempting to commit the session:
>>> # Close stream >>> client.close_write_stream(session_id, stream_id=0) >>> # Verify it's closed >>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0) >>> if status.status == 'CLOSED': ... print("Stream properly closed, safe to commit") ... client.commit_write_session(session_id) ... else: ... print(f"Unexpected stream status: {status.status}") Stream properly closed, safe to commit
Handle the case where close fails due to stream errors by checking the warning message and potentially aborting:
>>> try: ... close_resp = client.close_write_stream(session_id, stream_id=0) ... if close_resp.warning_message and "error" in close_resp.warning_message.lower(): ... print(f"Stream had errors: {close_resp.warning_message}") ... # Abort session instead of committing incomplete data ... client.abort_write_session(session_id) ... except Exception as e: ... print(f"Failed to close stream: {e}") ... client.abort_write_session(session_id)
- commit_write_session(session_id: str, stream_ids=None, stream_versions=None, route_token=None)[source]
Commit a write session to finalize all uploaded data.
This is the final step in the write workflow. Calling commit atomically makes all data uploaded through the session’s write streams visible in the table. The commit operation ensures that either all writes succeed (transaction committed) or none of them appear in the table (transaction aborted).
- Parameters:
session_id (str) – The unique identifier of the write session to commit. All write streams in this session must be closed before committing.
stream_ids (list of str, optional) – List of stream identifiers to commit. Used for transactional/delta tables to specify which streams to include in the commit. When provided, stream_versions must also be provided with matching length.
stream_versions (list of int, optional) – List of stream version numbers corresponding to stream_ids. Must be provided together with stream_ids and must have the same length.
- Returns:
Method returns None on successful commit. The commit operation makes all uploaded data visible in the table atomically.
- Return type:
None
- Raises:
ValueError – If session_id is None or empty, or if called on an instance-based client.
errors.ODPSError – If any write streams are still open or if the session has expired.
See also
create_write_sessionCreate a write session.
close_write_streamClose all streams before committing.
abort_write_sessionAbort instead of commit to discard writes.
get_write_sessionVerify all streams are closed.
Notes
Before committing, ensure all write streams are closed. Use get_write_session to verify stream states. If commit fails, call abort_write_session to clean up resources.
Commit is a synchronous operation. Once it returns successfully, the data is immediately visible in the table and cannot be rolled back.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowWriter, Compression ... ) >>> import pyarrow as pa >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Complete write workflow: create session, create stream, write Arrow data, close stream, and commit:
>>> # 1. Create write session >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id
>>> # 2. Create write stream >>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 3. Write data using Arrow writer >>> schema = pa.schema([ ... pa.field("id", pa.int64()), ... pa.field("name", pa.string()), ... ]) >>> batch = pa.record_batch([ ... pa.array([1, 2, 3]), ... pa.array(["Alice", "Bob", "Carol"]), ... ], schema=schema) >>> writer = ArrowWriter( ... client.write_rows_stream(session_id, stream_id=0, record_count=3), ... Compression.UNCOMPRESSED ... ) >>> writer.write(batch) >>> commit_msg, success = writer.finish()
>>> # 4. Close the write stream >>> client.close_write_stream(session_id, stream_id=0)
>>> # 5. Commit the session to finalize all writes >>> client.commit_write_session(session_id) >>> print("Data successfully uploaded to table") Data successfully uploaded to table
For transactional/delta tables, specify stream_ids and stream_versions when committing:
>>> client.commit_write_session( ... session_id, ... stream_ids=["stream-1"], ... stream_versions=[1], ... )
For multi-stream uploads, ensure all streams are closed before committing. Use get_write_session to verify:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Create multiple streams and write data to each >>> for stream_id in range(3): ... # Create stream, write data, close stream... ... pass >>> # Verify all streams are closed >>> status = client.get_write_session(session_id) >>> all_closed = all( ... s.get('Status') == 'CLOSED' ... for s in (status.streams or []) ... ) >>> if all_closed: ... # Commit session to make data visible ... client.commit_write_session(session_id) ... print("Session committed successfully") Session committed successfully
If commit fails due to errors, abort the session to clean up:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> try: ... # Write data... ... client.commit_write_session(session_id) ... except Exception as e: ... print(f"Commit failed: {e}") ... # Abort to discard all writes and release resources ... client.abort_write_session(session_id) Commit failed: [error message]
- create_read_session(required_data_columns=None, required_partition_columns=None, required_partitions=None, required_bucket_ids=None, split_options=None, arrow_options=None, filter_predicate=None, filter_predicate_fallback=None, split_max_file_num=None, incremental_read=None, incremental_read_options=None) CreateReadSessionResponse[source]
Create a read session for table or instance data retrieval.
A read session is a prerequisite for reading data from a MaxCompute table or SQL instance result. The session determines how data will be split into readable chunks and what data schema will be returned. Sessions have an expiration time and must be refreshed if they expire during long-running read operations.
- Parameters:
required_data_columns (list of str, optional) – List of column names to read. If empty, all columns are returned.
required_partition_columns (list of str, optional) – Partition columns to include in the result.
required_partitions (list of str, optional) – Specific partition values to read (e.g., [‘pt=20230101’]).
required_bucket_ids (list of str, optional) – Bucket IDs to read for bucket-based tables.
split_options (SplitOptions, optional) – Controls how data is split into chunks. Defaults to size-based splitting with 256MB chunks.
arrow_options (ArrowOptions, optional) – Arrow format settings like timestamp precision.
filter_predicate (str, optional) – SQL-like filter condition to apply during reading.
filter_predicate_fallback (bool, optional) – Whether to fallback to server-side filtering if predicate pushdown fails.
split_max_file_num (int, optional) – Maximum number of files per split for file-based splitting.
incremental_read (bool, optional) – Enable incremental reading mode for capturing table changes.
incremental_read_options (IncrementalReadOptions, optional) – Options for incremental read mode (version range, timestamp range).
- Returns:
Response with session_id, session_status, splits_count, record_count, data_schema, and expiration_time. See
CreateReadSessionResponse.- Return type:
- Raises:
ValueError – If the table or instance is not properly configured.
See also
get_read_sessionGet current read session status.
read_rows_streamRead data from a specific split.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, SplitOptions ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Create a basic read session that reads all columns with default splitting:
>>> response = client.create_read_session() >>> print(f"Session ID: {response.session_id}") Session ID: session_12345 >>> print(f"Available splits: {response.splits_count}") Available splits: 5
Specify which columns to read and how to split the data. Use
split_optionsto control parallelism by setting the split number instead of split size:>>> split_opts = SplitOptions() >>> split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM >>> split_opts.split_number = 10 # Create 10 splits >>> response = client.create_read_session( ... required_data_columns=["id", "name", "value"], ... split_options=split_opts, ... ) >>> print(f"Created {response.splits_count} splits") Created 10 splits
Read from a specific partition by providing partition values in the
required_partitionsparameter:>>> response = client.create_read_session( ... required_partitions=["pt=20230101", "region=us-west"] ... ) >>> print(f"Records in partition: {response.record_count}") Records in partition: 1000
For incremental data capture, enable
incremental_readto read changes since a specific version or timestamp:>>> from odps.apis.storage_api_v2 import IncrementalReadOptions >>> incr_opts = IncrementalReadOptions() >>> incr_opts.start_version = 100 >>> response = client.create_read_session( ... incremental_read=True, ... incremental_read_options=incr_opts, ... ) >>> print(f"Latest version: {response.latest_version}") Latest version: 150
- create_write_session(partial_partition_spec=None, flags=None) CreateWriteSessionResponse[source]
Create a write session for uploading data to a table.
A write session is the first step in the data upload workflow. It establishes a transactional context for writing data to a MaxCompute table, ensuring atomicity and consistency. After creating a session, you must create one or more write streams within it, upload data to those streams, close the streams, and finally commit the session.
- Parameters:
partial_partition_spec (str, optional) – Partition specification for writing to a specific partition. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’. If empty, writes to the table’s default location.
flags (dict, optional) – Additional flags for session configuration. Common flags include ‘overwrite’ to replace existing partition data.
- Returns:
Response with session_id, warning_message, and request_id. See
CreateWriteSessionResponse.- Return type:
- Raises:
ValueError – If called on an instance-based client (writes only support tables).
See also
create_write_streamCreate a stream within the session.
commit_write_sessionCommit the session to finalize writes.
abort_write_sessionAbort the session to discard all writes.
Notes
Write sessions are transactional. All writes within a session are committed atomically when commit_write_session is called. If the session is aborted, all uploaded data is discarded. Sessions have a limited lifetime and must be committed before expiration.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Create a basic write session for uploading data to the table’s default location:
>>> response = client.create_write_session() >>> session_id = response.session_id >>> print(f"Write session ID: {session_id}") Write session ID: write_session_12345
Write to a specific partition by specifying the partition spec. This is required for partitioned tables unless writing to the default partition:
>>> response = client.create_write_session( ... partial_partition_spec="pt=20230101" ... ) >>> session_id = response.session_id >>> print(f"Writing to partition pt=20230101")
Use the ‘overwrite’ flag to replace existing data in a partition instead of appending:
>>> response = client.create_write_session( ... partial_partition_spec="pt=20230101", ... flags={"overwrite": True} ... ) >>> session_id = response.session_id >>> # After committing, existing partition data will be replaced
Complete write workflow: create session, create stream, write data, close stream, and commit:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Create write stream (see create_write_stream example) >>> # Write data (see write_rows_stream example) >>> # Close stream (see close_write_stream example) >>> # Commit session to finalize >>> client.commit_write_session(session_id)
If an error occurs during the write process, abort the session to discard all uploaded data and release resources:
>>> session_resp = client.create_write_session() >>> try: ... # Write data... ... # If error occurs: ... client.abort_write_session(session_resp.session_id) ... except Exception as e: ... client.abort_write_session(session_resp.session_id) ... raise
- create_write_stream(session_id=None, stream_id=None, stream_version=0, route_token=None) CreateWriteStreamResponse[source]
Create a write stream within an active write session.
A write stream is a data upload channel within a write session. You can create multiple streams in parallel to upload data concurrently, improving throughput for large datasets. Each stream must be closed individually after writing data, and the session must be committed after all streams are closed.
- Parameters:
session_id (str) – The write session identifier from create_write_session.
stream_id (str or int) – Unique identifier for this stream within the session. Typically an integer (0, 1, 2, …) for parallel streams.
stream_version (int, default 0) – Version number for the stream. Increment if retrying after a failed upload to the same stream_id.
- Returns:
Response with data_schema, table_id, schema_version, and route_token. See
CreateWriteStreamResponse.- Return type:
- Raises:
ValueError – If called on an instance-based client.
See also
create_write_sessionCreate the parent write session first.
write_rows_streamWrite data to the created stream.
close_write_streamClose the stream after writing.
get_write_streamGet stream status.
Notes
Each stream can be written to independently, enabling parallel uploads from multiple workers. The stream_version allows retrying failed uploads without creating a new session. After a stream is closed, it cannot be reopened with the same stream_id and version.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
First create a write session, then create a single stream for sequential data upload:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> print(f"Stream created, schema: {stream_resp.data_schema}") Stream created, schema: {'Columns': [...]}
For parallel upload from multiple workers, create multiple streams with different stream_ids. Each worker processes one stream:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Worker 1 creates stream 0 >>> stream_resp0 = client.create_write_stream(session_id, stream_id=0) >>> # Worker 2 creates stream 1 >>> stream_resp1 = client.create_write_stream(session_id, stream_id=1) >>> # Worker 3 creates stream 2 >>> stream_resp2 = client.create_write_stream(session_id, stream_id=2) >>> print(f"Created 3 parallel streams")
Use stream_version to retry failed uploads. If a stream fails, increment the version to create a new stream with the same ID:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0, stream_version=0) >>> # Attempt to write data (fails) >>> try: ... writer = client.write_rows_stream(...) ... writer.write(data) ... except Exception: ... # Retry with incremented version ... retry_resp = client.create_write_stream( ... session_id, stream_id=0, stream_version=1 ... ) ... writer = client.write_rows_stream(...) ... writer.write(data)
Check the data_schema before writing to ensure your data matches the expected table schema:
>>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> schema = stream_resp.data_schema >>> if schema: ... columns = schema.get('Columns', []) ... for col in columns: ... print(f"Column: {col.get('Name')}, Type: {col.get('Type')}") Column: id, Type: bigint Column: name, Type: string
- get_read_session(session_id: str, refresh: bool = False) CreateReadSessionResponse[source]
Get current status and metadata of an existing read session.
This method retrieves the current state of a read session, including its status, split count, schema, and expiration time. Use this to check if a session is still valid before attempting to read data, or to refresh a session that is approaching expiration.
- Parameters:
session_id (str) – The unique identifier of the read session to retrieve. This is obtained from create_read_session response.
refresh (bool, default False) – Whether to refresh the session expiration time. Set to True if the session is about to expire or has expired. When True, the server extends the session lifetime, allowing continued reading operations.
- Returns:
Response with session_id, session_status, splits_count, record_count, data_schema, and expiration_time. See
CreateReadSessionResponse.- Return type:
- Raises:
ValueError – If session_id is None or empty.
See also
create_read_sessionCreate a new read session.
read_rows_streamRead data from a specific split.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
First create a read session, then check its status:
>>> create_response = client.create_read_session() >>> session_id = create_response.session_id >>> status_response = client.get_read_session(session_id) >>> print(f"Session status: {status_response.session_status}") Session status: NORMAL >>> print(f"Expiration: {status_response.expiration_time}") Expiration: 2023-01-01T12:00:00Z
When a session expires during a long-running read operation, use the
refreshparameter to extend its lifetime:>>> # Check if session expired >>> response = client.get_read_session(session_id) >>> if response.session_status == SessionStatus.EXPIRED: ... # Refresh the session to continue reading ... refreshed_response = client.get_read_session(session_id, refresh=True) ... print(f"New expiration: {refreshed_response.expiration_time}") New expiration: 2023-01-01T14:00:00Z
Monitor session status during a parallel read operation to ensure all splits are processed before expiration:
>>> create_response = client.create_read_session() >>> for split_index in range(create_response.splits_count): ... # Check session health before each split ... status = client.get_read_session(create_response.session_id) ... if status.session_status == SessionStatus.EXPIRED: ... client.get_read_session(create_response.session_id, refresh=True) ... # Read the split data... ... reader = client.read_rows_stream(...)
- get_write_session(session_id: str) GetWriteSessionResponse[source]
Get the current status and stream information of a write session.
This method retrieves metadata about a write session, including the list of active write streams and their states. Use this to monitor the progress of a multi-stream upload operation or to verify that all streams have been properly closed before committing the session.
- Parameters:
session_id (str) – The unique identifier of the write session to query. This is obtained from create_write_session response.
- Returns:
Response with streams (list of stream info dicts), warning_message, and request_id. See
GetWriteSessionResponse.- Return type:
- Raises:
ValueError – If session_id is None or empty, or if called on an instance-based client.
See also
create_write_sessionCreate a write session.
commit_write_sessionCommit the session after all streams closed.
create_write_streamCreate a new stream in the session.
Notes
Before calling commit_write_session, ensure all write streams are closed. Use get_write_session to verify that no streams are still in the RUNNING or OPEN state.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Create a write session and check its status before creating streams:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> status_resp = client.get_write_session(session_id) >>> print(f"Active streams: {status_resp.streams}") Active streams: None
After creating multiple write streams for parallel upload, use get_write_session to track which streams are active:
>>> # Create multiple streams (see create_write_stream examples) >>> stream_resp1 = client.create_write_stream(...) >>> stream_resp2 = client.create_write_stream(...) >>> status = client.get_write_session(session_id) >>> print(f"Streams: {len(status.streams) if status.streams else 0}") Streams: 2
Before committing the session, verify all streams are closed to ensure data upload is complete:
>>> # Close all streams after writing data >>> client.close_write_stream(...) >>> client.close_write_stream(...) >>> # Verify all streams are closed >>> status = client.get_write_session(session_id) >>> if status.streams: ... # Check if any stream is still open ... open_streams = [s for s in status.streams if s.get('Status') == 'OPEN'] ... if open_streams: ... print(f"Warning: {len(open_streams)} streams still open") ... else: ... # All streams closed, safe to commit ... client.commit_write_session(session_id) ... else: ... client.commit_write_session(session_id)
Monitor the progress of a long-running parallel upload by periodically checking stream status:
>>> import time >>> while True: ... status = client.get_write_session(session_id) ... if status.streams: ... closed_count = sum( ... 1 for s in status.streams ... if s.get('Status') == 'CLOSED' ... ) ... print(f"Progress: {closed_count}/{len(status.streams)} streams closed") ... if closed_count == len(status.streams): ... break ... time.sleep(5) >>> # All streams closed, commit the session >>> client.commit_write_session(session_id)
- get_write_stream(session_id: str, stream_id: str, stream_version: int, route_token=None, exactly_once_mode=False) GetWriteStreamResponse[source]
Get the current status and metadata of a write stream.
This method retrieves information about a specific write stream, including its current state, statistics, and any error information. Use this to monitor stream health during uploads or to verify stream state before closing.
- Parameters:
session_id (str) – The unique identifier of the write session.
stream_id (str) – The identifier of the stream to query (typically 0, 1, 2, …).
stream_version (int) – The version number of the stream.
- Returns:
Response with status, record_count, byte_size, error_code, and error_message. See
GetWriteStreamResponse.- Return type:
- Raises:
ValueError – If any parameter is None or empty, or if called on an instance-based client.
See also
create_write_streamCreate a stream before getting its status.
close_write_streamClose the stream after verifying status.
get_write_sessionGet all streams in a session.
Notes
Use get_write_stream to check if a stream has encountered errors before attempting to close it. If a stream is in ERROR state, it may need to be recreated with an incremented stream_version.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Create a write stream and check its status before writing data:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0) >>> print(f"Stream status: {status.status}") Stream status: OPEN
After writing data, check the stream statistics to verify upload progress before closing:
>>> # Write data to the stream >>> writer = client.write_rows_stream(...) >>> writer.write(batch) >>> writer.finish() >>> # Check stream statistics >>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0) >>> print(f"Uploaded {status.record_count or 0} records, {status.byte_size or 0} bytes") Uploaded 1000 records, 524288 bytes >>> # Close the stream after verifying the upload >>> client.close_write_stream(...)
Monitor multiple parallel streams during upload to identify which streams have encountered errors:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Create 3 parallel streams and write data >>> for i in range(3): ... stream_resp = client.create_write_stream(session_id, stream_id=i) >>> # Check status of all streams >>> for i in range(3): ... status = client.get_write_stream(session_id, stream_id=str(i), stream_version=0) ... if status.status == 'ERROR': ... print(f"Stream {i} failed: {status.error_message}") ... else: ... print(f"Stream {i}: {status.status}, {status.record_count or 0} records") Stream 0: OPEN, 500 records Stream 1: OPEN, 300 records Stream 2: ERROR, Schema validation failed
If a stream shows ERROR status, use get_write_stream to get detailed error information before deciding to retry:
>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0) >>> if status.status == 'ERROR': ... print(f"Stream error [{status.error_code}]: {status.error_message}") ... # Decide whether to retry with new version or abort session ... if status.error_code == 'SCHEMA_MISMATCH': ... # Retry with corrected data ... client.create_write_stream(session_id, stream_id=0, stream_version=1) ... else: ... # Abort the entire session ... client.abort_write_session(session_id)
- preview_table(limit=None, partition=None, columns=None) StreamReader[source]
Preview table data without creating a session.
This method provides a lightweight way to sample table data without the overhead of creating a read session. Unlike read_rows_stream, preview_table directly returns an Arrow IPC stream, making it ideal for quick data exploration, schema inspection, or testing table connectivity.
- Parameters:
limit (int, optional) – Maximum number of rows to preview. If None, returns a small default sample (typically 100-1000 rows).
partition (str, optional) – Partition specification to preview specific partition data. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’.
columns (list of str, optional) – Specific columns to preview. If empty, all columns are returned.
- Returns:
Stream reader with read(), get_status(), get_request_id(), and close() methods. See
StreamReader.- Return type:
- Raises:
ValueError – If called on an instance-based client (preview only works on tables).
See also
read_rows_streamRead full table data with session management.
create_read_sessionCreate a session for large-scale reading.
Notes
Preview is optimized for quick sampling and may not return exact row counts specified in limit. For production data reading with guaranteed row counts and retry support, use create_read_session and read_rows_stream instead.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowReader ... ) >>> import pyarrow as pa >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Preview the first few rows of a table to quickly explore its data:
>>> stream_reader = client.preview_table(limit=10) >>> arrow_reader = ArrowReader(stream_reader) >>> batch = arrow_reader.read() >>> if batch is not None: ... df = batch.to_pandas() ... print(df) id name value 0 1 Alice 100 1 2 Bob 200 2 3 Carol 150
Preview specific columns to check their data types and values without downloading all columns:
>>> reader = ArrowReader(client.preview_table( ... limit=5, columns=["id", "name"] ... )) >>> batch = reader.read() >>> if batch is not None: ... print(batch.schema) id: int64 name: string
Preview data from a specific partition to test partition filtering before running a full query:
>>> reader = ArrowReader(client.preview_table( ... limit=20, partition="pt=20230101" ... )) >>> batch = reader.read() >>> df = batch.to_pandas() if batch is not None else None >>> print(f"Previewed {len(df) if df is not None else 0} rows") Previewed 20 rows
Use preview to inspect table schema by requesting a small sample. This is useful for understanding column names and types before creating a read session:
>>> reader = ArrowReader(client.preview_table(limit=1)) >>> batch = reader.read() >>> if batch is not None: ... for field in batch.schema: ... print(f"{field.name}: {field.type}") id: int64 name: string value: double pt: string
- read_rows_stream(session_id=None, split_index=None, row_offset=None, row_count=None, max_batch_rows=4096, skip_row_num=0, max_batch_raw_size=0, data_format=None, data_columns=None, compression=None, route_token=None) StreamReader[source]
Read data from a specific split in a read session.
This method reads a chunk of data from a table or instance result by specifying which split to read. The data is returned as a stream that can be processed incrementally, supporting efficient handling of large datasets. Each split can be read independently, enabling parallel processing of the same session across multiple workers.
- Parameters:
session_id (str) – The read session identifier from create_read_session.
split_index (int, optional) – Which split to read (0 to splits_count-1). If None, reads all data in the session.
row_offset (int, optional) – Starting row offset within the split. Defaults to 0.
row_count (int, optional) – Maximum number of rows to read. If None, reads all rows in the split.
max_batch_rows (int, default 4096) – Maximum rows per Arrow batch in the stream. Controls memory usage during reading.
skip_row_num (int, default 0) – Number of rows to skip before reading.
max_batch_raw_size (int, default 0) – Maximum raw byte size per batch. 0 means no limit.
data_format (DataFormat, optional) – Format of returned data (Arrow V5 is default).
data_columns (list of str, optional) – Specific columns to read. Must match session schema.
compression (Compression, default None) – Compression algorithm for the stream data. None means Compression.UNCOMPRESSED.
- Returns:
Stream reader with read(), get_status(), get_request_id(), and close() methods. See
StreamReader.- Return type:
- Raises:
ValueError – If the session or split parameters are invalid.
See also
create_read_sessionCreate a read session first.
ArrowReaderWrap StreamReader to read Arrow batches.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowReader ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Read data from a specific split using ArrowReader for convenient batch processing:
>>> reader = client.read_rows_stream(session_id, split_index=0) >>> arrow_reader = ArrowReader(reader) >>> while True: ... batch = arrow_reader.read() ... if batch is None: ... break ... df = batch.to_pandas() ... # Process dataframe >>> arrow_reader.get_request_id() # Get request ID after completion
For parallel processing, distribute splits across multiple workers. Each worker reads a different split index from the same session:
>>> # Worker 1 reads split 0 >>> reader1 = client.read_rows_stream(session_id, split_index=0) >>> # Worker 2 reads split 1 >>> reader2 = client.read_rows_stream(session_id, split_index=1)
Control memory usage by limiting batch size with
max_batch_rows. Smaller batches reduce memory footprint but may have lower throughput:>>> reader = client.read_rows_stream(session_id, split_index=0, max_batch_rows=1024) >>> arrow_reader = ArrowReader(reader) >>> # Read batches one at a time to control memory >>> while True: ... batch = arrow_reader.read() ... if batch is None: ... break ... df = batch.to_pandas() ... # Process df and then discard to free memory
Read a specific range of rows within a split by using
row_offsetandrow_countparameters:>>> reader = client.read_rows_stream( ... session_id, ... split_index=0, ... row_offset=1000, # Skip first 1000 rows ... row_count=500, # Read 500 rows ... )
- property route_token
The stored route token for session affinity.
This is automatically updated from response headers. Pass
route_token=...to individual methods to override, or rely on this stored value when not specified.
- write_blob_batch(items: List[BlobWriteItem], session_id=None, stream_id=None, stream_version=0, partition_values=None, column_index=0) WriteBlobResponse[source]
Upload multiple blobs in a single batch request for efficiency.
This method uploads multiple binary blobs (such as images, videos, or files) in one consolidated request, reducing network overhead compared to individual uploads. Each blob is packaged with metadata (partition location, column index, MIME type) and optional checksum verification.
- Parameters:
items (list of BlobWriteItem) – List of blob items to upload. Each item contains data (bytes), partition_values (list of str), column_index (int), distribution_key (str), mime_type (str), and checksum_type (ChecksumType). See
BlobWriteItem.session_id (str, optional) – The write session identifier from create_write_session.
stream_id (str or int, optional) – Stream identifier for this batch upload.
stream_version (int, default 0) – Version number for the stream.
partition_values (list of str, optional) – Default partition values for blobs (can be overridden per item). Format: [‘pt=20230101’, ‘region=us-west’].
column_index (int, default 0) – Default column index for blobs (can be overridden per item).
- Returns:
Response with blob_reference, blob_references, warning_message, and size. See
WriteBlobResponse.- Return type:
- Raises:
ValueError – If called on an instance-based client.
See also
write_blob_streamUpload single blob via streaming.
read_blobsRead uploaded blobs using the references.
BlobWriteItemClass to construct blob items.
Notes
Batch upload is more efficient than individual uploads when uploading many small blobs. The method uses a custom wire format with header, data, and footer sections for each blob. Checksums (CRC32 or MD5) are computed and included in the footer for server-side verification. The order of blob_references matches the input items order.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, BlobWriteItem, ChecksumType ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Upload multiple image blobs in a single batch request for efficient network usage:
>>> # 1. Create write session and stream >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> # 2. Prepare multiple blob items >>> items = [] >>> for image_path in ['img1.jpg', 'img2.jpg', 'img3.jpg']: ... with open(image_path, 'rb') as f: ... image_data = f.read() ... item = BlobWriteItem( ... data=image_data, ... partition_values=['pt=20230101'], ... column_index=0, ... mime_type='image/jpeg', ... checksum_type=ChecksumType.CRC32 # Verify integrity ... ) ... items.append(item) >>> # 3. Upload all blobs in one batch >>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0) >>> print(f"Uploaded {len(batch_resp.blob_references)} blobs") >>> print(f"Total size: {batch_resp.size} bytes") Uploaded 3 blobs Total size: 1572864 bytes >>> # 4. Store references for later reading >>> blob_refs = batch_resp.blob_references >>> # 5. Close stream and commit session >>> client.close_write_stream(session_id, stream_id=0) >>> client.commit_write_session(session_id)
Upload blobs to different partitions and columns by customizing each BlobWriteItem’s metadata:
>>> items = [ ... BlobWriteItem( ... data=blob1_data, ... partition_values=['pt=20230101'], ... column_index=0 ... ), ... BlobWriteItem( ... data=blob2_data, ... partition_values=['pt=20230102'], ... column_index=1 ... ), ... BlobWriteItem( ... data=blob3_data, ... partition_values=['pt=20230103'], ... column_index=0 ... ), ... ] >>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> # Each blob is stored in its specified partition/column
Use MD5 checksum for stronger integrity verification on critical data uploads. The checksum is computed and sent with the blob:
>>> critical_data = read_critical_file() >>> item = BlobWriteItem( ... data=critical_data, ... column_index=0, ... checksum_type=ChecksumType.MD5 ... ) >>> response = client.write_blob_batch([item], session_id, stream_id=0) >>> # Server verifies MD5 checksum matches computed value
Add MIME type metadata to help applications understand blob content type without examining the binary data:
>>> items = [ ... BlobWriteItem( ... data=json_data_bytes, ... mime_type='application/json', ... column_index=0 ... ), ... BlobWriteItem( ... data=pdf_data_bytes, ... mime_type='application/pdf', ... column_index=1 ... ), ... ] >>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> # MIME types stored with blobs for content type hints
Use distribution_key for hash-based storage to ensure blobs are distributed across storage locations based on the key:
>>> items = [ ... BlobWriteItem( ... data=blob_data, ... distribution_key='user123', ... column_index=0 ... ), ... ] >>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> # Blob stored at location determined by distribution key hash
Upload a large number of small files efficiently by batching them instead of uploading individually:
>>> # Prepare 100 small file blobs >>> items = [] >>> for file_path in small_files: # 100 small files ... with open(file_path, 'rb') as f: ... data = f.read() ... items.append(BlobWriteItem(data=data, column_index=0)) >>> # Upload all in one batch (much faster than 100 individual uploads) >>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> print(f"Batch uploaded {len(response.blob_references)} blobs") Batch uploaded 100 blobs
Check for warnings in the response to detect partial upload failures or other issues:
>>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> if response.warning_message: ... print(f"Upload warning: {response.warning_message}") ... # Some blobs may have had issues ... # Check if all blob_references are present ... if len(response.blob_references) < len(items): ... print(f"Only {len(response.blob_references)} of {len(items)} uploaded") Upload warning: 2 blobs exceeded size limit Only 98 of 100 uploaded
Use the returned blob_references to read the blobs later by calling read_blobs with the reference list:
>>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0) >>> blob_refs = batch_resp.blob_references >>> # Later, read the blobs back >>> blob_iterator = client.read_blobs(blob_references=blob_refs) >>> for data, mime_type in blob_iterator: ... # Process each blob data ... print(f"Read blob: {len(data)} bytes, type: {mime_type}")
- write_blob_stream(session_id, stream_id, stream_version=0, partition_values=None, column_index=0, compression=None) BlobStreamWriter[source]
Upload a single blob via streaming upload.
This method creates a streaming writer for uploading a single binary blob (such as an image, video, or large binary file) to a specific column in a MaxCompute table. The data is optionally compressed using the specified compression algorithm and verified with MD5 checksum to ensure upload integrity.
- Parameters:
session_id (str) – The write session identifier from create_write_session.
stream_id (str or int) – Stream identifier for this upload.
stream_version (int, default 0) – Version number for the stream.
partition_values (list of str, optional) – Partition values for the blob location. Format: [‘pt=20230101’, ‘region=us-west’].
column_index (int, default 0) – Column index in the table schema where the blob will be stored.
compression (Compression, default None) – Compression algorithm to use. None means Compression.UNCOMPRESSED. See
Compression. Supported values: Compression.ZSTD, Compression.LZ4_FRAME, Compression.UNCOMPRESSED.
- Returns:
Blob stream writer with write(), finish(), get_status(), get_request_id(), and writable() methods. See
BlobStreamWriter.- Return type:
- Raises:
ValueError – If called on an instance-based client.
errors.DependencyNotInstalledError – If zstandard library is not installed (required for ZSTD compression).
errors.ChecksumError – If MD5 checksum verification fails after finish().
See also
write_blob_batchUpload multiple blobs in one request.
read_blobsRead uploaded blobs by reference.
BlobWriteItemHelper class for batch blob uploads.
Notes
Blob uploads default to no compression (Compression.UNCOMPRESSED). Compression algorithms like ZSTD or LZ4_FRAME can be enabled for efficient transfer. Set compression=Compression.ZSTD for zstd compression or compression=Compression.LZ4_FRAME for lz4. The writer computes MD5 checksum incrementally during write and verifies against server response when finished. If the checksum doesn’t match, a ChecksumError is raised, indicating data corruption during upload.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Complete workflow for uploading a single image blob to a table with a binary column:
>>> # 1. Create write session >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # 2. Create write stream >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> # 3. Upload blob data >>> blob_writer = client.write_blob_stream( ... session_id, stream_id=0, ... partition_values=['pt=20230101'], ... column_index=0 # First column is the blob column ... ) >>> # Read image file and upload in chunks >>> with open('image.jpg', 'rb') as f: ... while True: ... chunk = f.read(8192) # 8KB chunks ... if not chunk: ... break ... blob_writer.write(chunk) >>> # 4. Finish and verify MD5 >>> response = blob_writer.finish() >>> print(f"Blob reference: {response.blob_reference}") >>> print(f"Uploaded size: {response.size} bytes") Blob reference: blob_ref_abc123 Uploaded size: 524288 bytes >>> # 5. Close stream and commit session >>> client.close_write_stream(session_id, stream_id=0) >>> client.commit_write_session(session_id)
Upload blob data directly from memory without reading from a file, useful for dynamically generated binary data:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> # Generate or prepare binary data >>> binary_data = b"generated binary content..." >>> blob_writer.write(binary_data) >>> # Finish upload >>> response = blob_writer.finish() >>> print(f"Blob uploaded: {response.blob_reference}")
Upload a blob to a specific partition by providing partition values. The blob is stored in the specified partition’s location:
>>> blob_writer = client.write_blob_stream( ... session_id, stream_id=0, ... partition_values=['pt=20230101', 'region=us-west'], ... column_index=0 ... ) >>> blob_writer.write(image_data) >>> response = blob_writer.finish() >>> # Blob is now in partition pt=20230101/region=us-west
Handle checksum errors when the upload integrity check fails, indicating data corruption during transfer:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> blob_writer.write(data) >>> try: ... response = blob_writer.finish() ... print("Upload successful") ... except errors.ChecksumError as e: ... print(f"Checksum mismatch: {e}") ... # Data was corrupted during upload ... # Retry the upload or abort the session ... client.abort_write_session(session_id) Checksum mismatch: MD5 value mismatch, expected: abc123, actual: def456
Upload large blobs incrementally by reading and writing in chunks to avoid loading the entire blob into memory:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> # Upload a large video file in 64KB chunks >>> chunk_size = 65536 >>> with open('large_video.mp4', 'rb') as f: ... while True: ... chunk = f.read(chunk_size) ... if not chunk: ... break ... success = blob_writer.write(chunk) ... if not success: ... print("Writer closed unexpectedly") ... break >>> response = blob_writer.finish() >>> print(f"Large video uploaded: {response.size} bytes") Large video uploaded: 104857600 bytes
Upload text data by converting strings to bytes. The writer automatically handles the conversion:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> text_content = "This is text data to store as a blob" >>> blob_writer.write(text_content) # Automatically converted to bytes >>> response = blob_writer.finish()
Monitor writer status during long uploads to detect early failures:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> for chunk in large_data_chunks: ... success = blob_writer.write(chunk) ... if not success or blob_writer.get_status() != Status.RUNNING: ... print("Upload stopped unexpectedly") ... break >>> response = blob_writer.finish() >>> request_id = blob_writer.get_request_id()
- write_rows_stream(session_id, stream_id, stream_version=0, record_count=0, compression=None, route_token=None, row_offset=-1, access_token=None) StreamWriter[source]
Write row data to a write stream via streaming upload.
This method creates a streaming writer for uploading row data to a write stream. The writer accepts Arrow record batches or raw binary data and uploads it incrementally, enabling efficient handling of large datasets without loading all data into memory at once.
- Parameters:
session_id (str) – The write session identifier.
stream_id (str or int) – The stream identifier from create_write_stream.
stream_version (int, default 0) – Version of the stream (should match create_write_stream).
record_count (int, default 0) – Total number of records to be written. Set this to the expected count for validation, or 0 if unknown.
compression (Compression, default None) – Compression algorithm for the uploaded data stream. None means Compression.UNCOMPRESSED.
- Returns:
Stream writer with write(), finish(), get_status(), get_request_id(), and writable() methods. See
StreamWriter.- Return type:
- Raises:
ValueError – If called on an instance-based client.
See also
create_write_streamCreate a stream before writing.
ArrowWriterConvert Arrow batches to stream format.
close_write_streamClose the stream after finishing write.
Notes
The returned StreamWriter accepts binary data directly. For Arrow workflows, wrap it with ArrowWriter which converts record batches to Arrow IPC format. The writer must be finished before closing the stream. Call finish() after writing all batches.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowWriter, Compression ... ) >>> import pyarrow as pa >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Complete workflow using ArrowWriter to upload Arrow record batches:
>>> # 1. Create write session >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # 2. Create write stream >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> # 3. Write Arrow data >>> schema = pa.schema([ ... pa.field("id", pa.int64()), ... pa.field("name", pa.string()), ... pa.field("value", pa.float64()), ... ]) >>> batch = pa.record_batch([ ... pa.array([1, 2, 3]), ... pa.array(["Alice", "Bob", "Carol"]), ... pa.array([100.0, 200.0, 150.0]), ... ], schema=schema) >>> stream_writer = client.write_rows_stream(session_id, stream_id=0, record_count=3) >>> arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED) >>> arrow_writer.write(batch) >>> commit_msg, success = arrow_writer.finish() >>> print(f"Upload successful: {success}") Upload successful: True >>> # 4. Close stream and commit session >>> client.close_write_stream(session_id, stream_id=0) >>> client.commit_write_session(session_id)
- class odps.apis.storage_api_v2.StorageApiArrowClient(odps: ODPS, table_or_instance=None, rest_endpoint: str = None, quota_name: str = None, tags: None | str | List[str] = None)[source]
Arrow batch client for the Storage API V2.
Extends
StorageApiClientwith convenience methods that wrap raw stream I/O withArrowReaderandArrowWriter, so you can work directly with PyArrowRecordBatchobjects instead of raw bytes.- Parameters:
odps (ODPS) – ODPS entry object.
table_or_instance (Table or Instance, optional) – MaxCompute table or SQL instance to operate on. If an Instance is provided, only read operations are available.
rest_endpoint (str, optional) – Custom REST endpoint for the storage API.
quota_name (str, optional) – Quota name for resource management.
tags (str or list of str, optional) – Tags for request tracking.
See also
StorageApiClientBase client with raw stream I/O.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiArrowClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiArrowClient(odps, table)
Read data using ArrowReader:
>>> read_resp = client.create_read_session() >>> reader = client.read_rows_arrow(read_resp.session_id, split_index=0) >>> while True: ... batch = reader.read() ... if batch is None: ... break ... df = batch.to_pandas()
Write data using ArrowWriter:
>>> write_resp = client.create_write_session() >>> writer = client.write_rows_arrow( ... write_resp.session_id, stream_id="0", record_count=100 ... ) >>> writer.write(record_batch) >>> writer.finish()
- read_blobs(blob_references=None, compression=None, stream=False)
Download binary blobs by their blob references.
This method retrieves blob data uploaded via write_blob_stream or write_blob_batch by providing the blob references returned during upload. The data is returned as an iterator that yields tuples of (data_bytes, mime_type), handling protocol framing and CRC stripping automatically.
- Parameters:
blob_references (list of str or bytes, optional) – List of blob references obtained from previous upload operations (write_blob_stream or write_blob_batch). References may be UTF-8 strings or raw bytes and will be decoded automatically if needed.
compression (Compression, default None) – Compression algorithm for the response data. None means Compression.UNCOMPRESSED.
stream (bool, optional) – If True, return a
BlobStreamReaderinstead ofBlobDataIterator. The reader provides file-likeread()access per blob, amime_typeproperty, and anext()method to advance to the next blob. Default is False.
- Returns:
When stream is False (default), returns an iterator yielding
(data_bytes, mime_type)tuples per blob. When stream is True, returns aBlobStreamReaderfor incremental, file-like reading of each blob.- Return type:
See also
write_blob_batchUpload blobs and get references.
write_blob_streamUpload single blob and get reference.
BlobWriteItemHelper for batch uploads.
Notes
The download stream uses a multi-layer protocol: 1. CRC32C checksums are embedded every 4096 bytes (stripped automatically) 2. Protocol framing wraps each blob (parsed automatically)
For single blob downloads, the server may omit framing and send raw decompressed data directly. The iterator automatically detects this case and returns the entire payload as one blob.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Read blobs using references obtained from previous uploads. First upload blobs, then read them back using the references:
>>> # Upload blobs (from earlier write_blob_batch example) >>> blob_refs = ['blob_ref_001', 'blob_ref_002', 'blob_ref_003'] >>> # Read the blobs back using references >>> blob_iterator = client.read_blobs(blob_references=blob_refs) >>> for data, mime_type in blob_iterator: ... print(f"Blob size: {len(data)} bytes, MIME type: {mime_type}") ... # Save blob to file ... filename = f"blob_{blob_refs[i]}" ... with open(filename, 'wb') as f: ... f.write(data) Blob size: 1024 bytes, MIME type: image/jpeg Blob size: 2048 bytes, MIME type: image/png Blob size: 512 bytes, MIME type: application/json
Download a single blob by passing a single-element reference list. The iterator yields one (data, mime_type) tuple:
>>> single_ref = ['blob_ref_abc123'] >>> blob_iterator = client.read_blobs(blob_references=single_ref) >>> data, mime_type = next(blob_iterator) >>> print(f"Downloaded {len(data)} bytes") Downloaded 524288 bytes >>> # Process the blob data >>> if mime_type == 'image/jpeg': ... # Process as JPEG image ... process_image(data)
Download all blobs from a batch upload by storing references during upload and using them to read later:
>>> # Earlier: upload batch >>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0) >>> all_refs = batch_resp.blob_references # Save these >>> # Later: read all blobs from the batch >>> blob_iterator = client.read_blobs(blob_references=all_refs) >>> downloaded_blobs = [] >>> for data, mime_type in blob_iterator: ... downloaded_blobs.append({ ... 'data': data, ... 'mime_type': mime_type, ... 'size': len(data) ... }) >>> print(f"Downloaded {len(downloaded_blobs)} blobs") Downloaded 10 blobs
Process blobs incrementally without loading all into memory by iterating and processing one at a time:
>>> blob_iterator = client.read_blobs(blob_references=large_blob_refs) >>> for i, (data, mime_type) in enumerate(blob_iterator): ... # Process each blob and discard data after processing ... result = analyze_blob(data) ... print(f"Blob {i}: {result}") ... # data is freed after this iteration Blob 0: Analysis complete Blob 1: Analysis complete
Check MIME type to determine how to handle blob content when the type was set during upload:
>>> blob_iterator = client.read_blobs(blob_references=blob_refs) >>> for data, mime_type in blob_iterator: ... if mime_type == 'application/json': ... # Parse as JSON ... import json ... json_obj = json.loads(data.decode('utf-8')) ... process_json(json_obj) ... elif mime_type == 'image/jpeg': ... # Process as image ... display_image(data) ... elif mime_type is None: ... # No type hint, use generic binary handling ... process_binary(data) ... else: ... print(f"Unknown MIME type: {mime_type}")
Convert blob iterator to list for multiple iterations or indexing, though this loads all data into memory:
>>> blob_iterator = client.read_blobs(blob_references=blob_refs) >>> all_blobs = list(blob_iterator) # [(data1, mime1), (data2, mime2), ...] >>> # Now can access by index >>> first_blob_data = all_blobs[0][0] >>> first_blob_mime = all_blobs[0][1] >>> # Can iterate multiple times >>> for data, mime_type in all_blobs: ... process_blob(data) >>> for data, mime_type in all_blobs: ... validate_blob(data)
Handle empty blob reference list by checking iterator length or attempting to iterate:
>>> empty_refs = [] >>> blob_iterator = client.read_blobs(blob_references=empty_refs) >>> count = 0 >>> for _ in blob_iterator: ... count += 1 >>> print(f"Downloaded {count} blobs") Downloaded 0 blobs
Use blob references from a table query to download specific blobs referenced in table rows:
>>> # Query table to get blob references >>> instance = odps.execute_sql("SELECT blob_ref FROM your_table WHERE id=1") >>> with instance.open_reader() as reader: ... for record in reader: ... blob_ref = record[0] # blob_ref column ... # Download the referenced blob ... blob_iterator = client.read_blobs(blob_references=[blob_ref]) ... data, mime_type = next(blob_iterator) ... print(f"Downloaded blob {blob_ref}: {len(data)} bytes")
Stream blobs incrementally using file-like read interface with
stream=True:>>> reader = client.read_blobs(blob_references=refs, stream=True) >>> while reader is not None: ... print(f"MIME: {reader.mime_type}") ... chunk = reader.read(4096) ... while chunk: ... process(chunk) ... chunk = reader.read(4096) ... reader = reader.next()
- abort_write_session(session_id: str, route_token=None)
Abort a write session to discard all uploaded data.
This method cancels an active write session and discards all data uploaded through its streams. Use abort when you encounter errors during the upload process or when you want to cancel the operation without making the data visible in the table. Aborting releases all resources associated with the session.
- Parameters:
session_id (str) – The unique identifier of the write session to abort. This can be any active or expired session.
- Returns:
Method returns None after aborting the session. All uploaded data is discarded and session resources are released.
- Return type:
None
- Raises:
ValueError – If session_id is None or empty, or if called on an instance-based client.
See also
create_write_sessionCreate a write session.
commit_write_sessionCommit the session instead of aborting.
get_write_sessionCheck session status before aborting.
Notes
After aborting, the session cannot be committed. All data uploaded is permanently discarded. Call abort as soon as you detect errors to release resources quickly. It’s safe to abort a session multiple times if the first abort attempt fails.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Abort a session when an error occurs during data upload to prevent partial data from being committed:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> try: ... # Create stream and write data ... stream_resp = client.create_write_stream(...) ... # Write data that fails due to schema mismatch ... writer = client.write_rows_stream(...) ... writer.write(invalid_data) ... writer.finish() ... except Exception as e: ... print(f"Upload failed: {e}") ... # Abort to discard all uploaded data ... client.abort_write_session(session_id) ... print("Session aborted, no data committed to table") Upload failed: Schema mismatch error Session aborted, no data committed to table
Use abort in cleanup code to ensure resources are released even if commit never happens:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> try: ... # Upload data... ... # Commit if everything succeeds ... client.commit_write_session(session_id) ... finally: ... # If commit didn't happen (exception or early return), ... # ensure session is aborted to release resources ... try: ... status = client.get_write_session(session_id) ... if status.streams is None or len(status.streams) == 0: ... client.abort_write_session(session_id) ... except: ... # Session might already be committed or aborted ... pass
Abort a session after detecting validation errors in the data before attempting to commit:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Write data to streams >>> writer = client.write_rows_stream(...) >>> # Validate the data locally >>> if data_has_errors: ... print("Data validation failed, aborting session") ... client.abort_write_session(session_id) ... else: ... # Data is valid, proceed with commit ... client.close_write_stream(...) ... client.commit_write_session(session_id)
- close_write_stream(session_id=None, stream_id=None, stream_version=0, route_token=None) CloseWriteStreamResponse
Close a write stream to finalize the data upload for that stream.
After finishing the write operation via the stream writer, call this method to formally close the stream. This signals to the server that no more data will be uploaded to this stream and marks the stream as ready for the session commit. All streams must be closed before the session can be committed.
- Parameters:
session_id (str) – The write session identifier.
stream_id (str or int) – The stream identifier to close.
stream_version (int, default 0) – Version of the stream (should match create_write_stream).
- Returns:
Response with warning_message and request_id. See
CloseWriteStreamResponse.- Return type:
- Raises:
ValueError – If called on an instance-based client.
See also
write_rows_streamFinish writing before closing the stream.
create_write_streamCreate the stream before writing.
commit_write_sessionCommit session after all streams closed.
get_write_streamVerify stream is closed.
Notes
The stream writer’s finish() method and close_write_stream serve different purposes: finish() completes the data upload, while close_write_stream formally closes the stream on the server side. Both must be called in sequence: finish the writer, then close the stream. After closing, the stream cannot accept more data.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowWriter, Compression ... ) >>> import pyarrow as pa >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Complete workflow showing the relationship between writer finish() and stream close. First finish the writer, then close the stream:
>>> # 1. Create write session and stream >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> # 2. Write Arrow data >>> batch = pa.record_batch([...], schema=schema) >>> stream_writer = client.write_rows_stream(session_id, stream_id=0, record_count=3) >>> arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED) >>> arrow_writer.write(batch) >>> # 3. Finish the writer to complete data upload >>> commit_msg, success = arrow_writer.finish() >>> print(f"Writer finished: {success}") Writer finished: True >>> # 4. Close the stream to mark it ready for commit >>> close_resp = client.close_write_stream(session_id, stream_id=0) >>> if close_resp.warning_message: ... print(f"Warning: {close_resp.warning_message}") >>> # 5. Commit the session to make data visible >>> client.commit_write_session(session_id)
Close multiple parallel streams after each finishes uploading. All streams must be closed before committing the session:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Create 3 parallel streams >>> for stream_id in range(3): ... # Create stream, write data, finish writer ... client.create_write_stream(session_id, stream_id=stream_id) ... writer = ArrowWriter( ... client.write_rows_stream(session_id, stream_id=stream_id), ... Compression.UNCOMPRESSED ... ) ... writer.write(batch) ... writer.finish() >>> # Close all 3 streams >>> for stream_id in range(3): ... close_resp = client.close_write_stream(session_id, stream_id=stream_id) ... print(f"Stream {stream_id} closed") Stream 0 closed Stream 1 closed Stream 2 closed >>> # Now commit the session >>> client.commit_write_session(session_id)
Check for warnings when closing streams to detect potential issues that may affect the commit:
>>> close_resp = client.close_write_stream(session_id, stream_id=0) >>> if close_resp.warning_message: ... print(f"Stream close warning: {close_resp.warning_message}") ... # Decide whether to proceed with commit or abort ... if "data_incomplete" in close_resp.warning_message: ... client.abort_write_session(session_id) ... else: ... # Warning is informational, proceed with commit ... client.commit_write_session(session_id) ... else: ... client.commit_write_session(session_id)
Use get_write_stream to verify a stream is properly closed before attempting to commit the session:
>>> # Close stream >>> client.close_write_stream(session_id, stream_id=0) >>> # Verify it's closed >>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0) >>> if status.status == 'CLOSED': ... print("Stream properly closed, safe to commit") ... client.commit_write_session(session_id) ... else: ... print(f"Unexpected stream status: {status.status}") Stream properly closed, safe to commit
Handle the case where close fails due to stream errors by checking the warning message and potentially aborting:
>>> try: ... close_resp = client.close_write_stream(session_id, stream_id=0) ... if close_resp.warning_message and "error" in close_resp.warning_message.lower(): ... print(f"Stream had errors: {close_resp.warning_message}") ... # Abort session instead of committing incomplete data ... client.abort_write_session(session_id) ... except Exception as e: ... print(f"Failed to close stream: {e}") ... client.abort_write_session(session_id)
- commit_write_session(session_id: str, stream_ids=None, stream_versions=None, route_token=None)
Commit a write session to finalize all uploaded data.
This is the final step in the write workflow. Calling commit atomically makes all data uploaded through the session’s write streams visible in the table. The commit operation ensures that either all writes succeed (transaction committed) or none of them appear in the table (transaction aborted).
- Parameters:
session_id (str) – The unique identifier of the write session to commit. All write streams in this session must be closed before committing.
stream_ids (list of str, optional) – List of stream identifiers to commit. Used for transactional/delta tables to specify which streams to include in the commit. When provided, stream_versions must also be provided with matching length.
stream_versions (list of int, optional) – List of stream version numbers corresponding to stream_ids. Must be provided together with stream_ids and must have the same length.
- Returns:
Method returns None on successful commit. The commit operation makes all uploaded data visible in the table atomically.
- Return type:
None
- Raises:
ValueError – If session_id is None or empty, or if called on an instance-based client.
errors.ODPSError – If any write streams are still open or if the session has expired.
See also
create_write_sessionCreate a write session.
close_write_streamClose all streams before committing.
abort_write_sessionAbort instead of commit to discard writes.
get_write_sessionVerify all streams are closed.
Notes
Before committing, ensure all write streams are closed. Use get_write_session to verify stream states. If commit fails, call abort_write_session to clean up resources.
Commit is a synchronous operation. Once it returns successfully, the data is immediately visible in the table and cannot be rolled back.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowWriter, Compression ... ) >>> import pyarrow as pa >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Complete write workflow: create session, create stream, write Arrow data, close stream, and commit:
>>> # 1. Create write session >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id
>>> # 2. Create write stream >>> stream_resp = client.create_write_stream(session_id, stream_id=0)
>>> # 3. Write data using Arrow writer >>> schema = pa.schema([ ... pa.field("id", pa.int64()), ... pa.field("name", pa.string()), ... ]) >>> batch = pa.record_batch([ ... pa.array([1, 2, 3]), ... pa.array(["Alice", "Bob", "Carol"]), ... ], schema=schema) >>> writer = ArrowWriter( ... client.write_rows_stream(session_id, stream_id=0, record_count=3), ... Compression.UNCOMPRESSED ... ) >>> writer.write(batch) >>> commit_msg, success = writer.finish()
>>> # 4. Close the write stream >>> client.close_write_stream(session_id, stream_id=0)
>>> # 5. Commit the session to finalize all writes >>> client.commit_write_session(session_id) >>> print("Data successfully uploaded to table") Data successfully uploaded to table
For transactional/delta tables, specify stream_ids and stream_versions when committing:
>>> client.commit_write_session( ... session_id, ... stream_ids=["stream-1"], ... stream_versions=[1], ... )
For multi-stream uploads, ensure all streams are closed before committing. Use get_write_session to verify:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Create multiple streams and write data to each >>> for stream_id in range(3): ... # Create stream, write data, close stream... ... pass >>> # Verify all streams are closed >>> status = client.get_write_session(session_id) >>> all_closed = all( ... s.get('Status') == 'CLOSED' ... for s in (status.streams or []) ... ) >>> if all_closed: ... # Commit session to make data visible ... client.commit_write_session(session_id) ... print("Session committed successfully") Session committed successfully
If commit fails due to errors, abort the session to clean up:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> try: ... # Write data... ... client.commit_write_session(session_id) ... except Exception as e: ... print(f"Commit failed: {e}") ... # Abort to discard all writes and release resources ... client.abort_write_session(session_id) Commit failed: [error message]
- create_read_session(required_data_columns=None, required_partition_columns=None, required_partitions=None, required_bucket_ids=None, split_options=None, arrow_options=None, filter_predicate=None, filter_predicate_fallback=None, split_max_file_num=None, incremental_read=None, incremental_read_options=None) CreateReadSessionResponse
Create a read session for table or instance data retrieval.
A read session is a prerequisite for reading data from a MaxCompute table or SQL instance result. The session determines how data will be split into readable chunks and what data schema will be returned. Sessions have an expiration time and must be refreshed if they expire during long-running read operations.
- Parameters:
required_data_columns (list of str, optional) – List of column names to read. If empty, all columns are returned.
required_partition_columns (list of str, optional) – Partition columns to include in the result.
required_partitions (list of str, optional) – Specific partition values to read (e.g., [‘pt=20230101’]).
required_bucket_ids (list of str, optional) – Bucket IDs to read for bucket-based tables.
split_options (SplitOptions, optional) – Controls how data is split into chunks. Defaults to size-based splitting with 256MB chunks.
arrow_options (ArrowOptions, optional) – Arrow format settings like timestamp precision.
filter_predicate (str, optional) – SQL-like filter condition to apply during reading.
filter_predicate_fallback (bool, optional) – Whether to fallback to server-side filtering if predicate pushdown fails.
split_max_file_num (int, optional) – Maximum number of files per split for file-based splitting.
incremental_read (bool, optional) – Enable incremental reading mode for capturing table changes.
incremental_read_options (IncrementalReadOptions, optional) – Options for incremental read mode (version range, timestamp range).
- Returns:
Response with session_id, session_status, splits_count, record_count, data_schema, and expiration_time. See
CreateReadSessionResponse.- Return type:
- Raises:
ValueError – If the table or instance is not properly configured.
See also
get_read_sessionGet current read session status.
read_rows_streamRead data from a specific split.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, SplitOptions ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Create a basic read session that reads all columns with default splitting:
>>> response = client.create_read_session() >>> print(f"Session ID: {response.session_id}") Session ID: session_12345 >>> print(f"Available splits: {response.splits_count}") Available splits: 5
Specify which columns to read and how to split the data. Use
split_optionsto control parallelism by setting the split number instead of split size:>>> split_opts = SplitOptions() >>> split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM >>> split_opts.split_number = 10 # Create 10 splits >>> response = client.create_read_session( ... required_data_columns=["id", "name", "value"], ... split_options=split_opts, ... ) >>> print(f"Created {response.splits_count} splits") Created 10 splits
Read from a specific partition by providing partition values in the
required_partitionsparameter:>>> response = client.create_read_session( ... required_partitions=["pt=20230101", "region=us-west"] ... ) >>> print(f"Records in partition: {response.record_count}") Records in partition: 1000
For incremental data capture, enable
incremental_readto read changes since a specific version or timestamp:>>> from odps.apis.storage_api_v2 import IncrementalReadOptions >>> incr_opts = IncrementalReadOptions() >>> incr_opts.start_version = 100 >>> response = client.create_read_session( ... incremental_read=True, ... incremental_read_options=incr_opts, ... ) >>> print(f"Latest version: {response.latest_version}") Latest version: 150
- create_write_session(partial_partition_spec=None, flags=None) CreateWriteSessionResponse
Create a write session for uploading data to a table.
A write session is the first step in the data upload workflow. It establishes a transactional context for writing data to a MaxCompute table, ensuring atomicity and consistency. After creating a session, you must create one or more write streams within it, upload data to those streams, close the streams, and finally commit the session.
- Parameters:
partial_partition_spec (str, optional) – Partition specification for writing to a specific partition. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’. If empty, writes to the table’s default location.
flags (dict, optional) – Additional flags for session configuration. Common flags include ‘overwrite’ to replace existing partition data.
- Returns:
Response with session_id, warning_message, and request_id. See
CreateWriteSessionResponse.- Return type:
- Raises:
ValueError – If called on an instance-based client (writes only support tables).
See also
create_write_streamCreate a stream within the session.
commit_write_sessionCommit the session to finalize writes.
abort_write_sessionAbort the session to discard all writes.
Notes
Write sessions are transactional. All writes within a session are committed atomically when commit_write_session is called. If the session is aborted, all uploaded data is discarded. Sessions have a limited lifetime and must be committed before expiration.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Create a basic write session for uploading data to the table’s default location:
>>> response = client.create_write_session() >>> session_id = response.session_id >>> print(f"Write session ID: {session_id}") Write session ID: write_session_12345
Write to a specific partition by specifying the partition spec. This is required for partitioned tables unless writing to the default partition:
>>> response = client.create_write_session( ... partial_partition_spec="pt=20230101" ... ) >>> session_id = response.session_id >>> print(f"Writing to partition pt=20230101")
Use the ‘overwrite’ flag to replace existing data in a partition instead of appending:
>>> response = client.create_write_session( ... partial_partition_spec="pt=20230101", ... flags={"overwrite": True} ... ) >>> session_id = response.session_id >>> # After committing, existing partition data will be replaced
Complete write workflow: create session, create stream, write data, close stream, and commit:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Create write stream (see create_write_stream example) >>> # Write data (see write_rows_stream example) >>> # Close stream (see close_write_stream example) >>> # Commit session to finalize >>> client.commit_write_session(session_id)
If an error occurs during the write process, abort the session to discard all uploaded data and release resources:
>>> session_resp = client.create_write_session() >>> try: ... # Write data... ... # If error occurs: ... client.abort_write_session(session_resp.session_id) ... except Exception as e: ... client.abort_write_session(session_resp.session_id) ... raise
- create_write_stream(session_id=None, stream_id=None, stream_version=0, route_token=None) CreateWriteStreamResponse
Create a write stream within an active write session.
A write stream is a data upload channel within a write session. You can create multiple streams in parallel to upload data concurrently, improving throughput for large datasets. Each stream must be closed individually after writing data, and the session must be committed after all streams are closed.
- Parameters:
session_id (str) – The write session identifier from create_write_session.
stream_id (str or int) – Unique identifier for this stream within the session. Typically an integer (0, 1, 2, …) for parallel streams.
stream_version (int, default 0) – Version number for the stream. Increment if retrying after a failed upload to the same stream_id.
- Returns:
Response with data_schema, table_id, schema_version, and route_token. See
CreateWriteStreamResponse.- Return type:
- Raises:
ValueError – If called on an instance-based client.
See also
create_write_sessionCreate the parent write session first.
write_rows_streamWrite data to the created stream.
close_write_streamClose the stream after writing.
get_write_streamGet stream status.
Notes
Each stream can be written to independently, enabling parallel uploads from multiple workers. The stream_version allows retrying failed uploads without creating a new session. After a stream is closed, it cannot be reopened with the same stream_id and version.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
First create a write session, then create a single stream for sequential data upload:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> print(f"Stream created, schema: {stream_resp.data_schema}") Stream created, schema: {'Columns': [...]}
For parallel upload from multiple workers, create multiple streams with different stream_ids. Each worker processes one stream:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Worker 1 creates stream 0 >>> stream_resp0 = client.create_write_stream(session_id, stream_id=0) >>> # Worker 2 creates stream 1 >>> stream_resp1 = client.create_write_stream(session_id, stream_id=1) >>> # Worker 3 creates stream 2 >>> stream_resp2 = client.create_write_stream(session_id, stream_id=2) >>> print(f"Created 3 parallel streams")
Use stream_version to retry failed uploads. If a stream fails, increment the version to create a new stream with the same ID:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0, stream_version=0) >>> # Attempt to write data (fails) >>> try: ... writer = client.write_rows_stream(...) ... writer.write(data) ... except Exception: ... # Retry with incremented version ... retry_resp = client.create_write_stream( ... session_id, stream_id=0, stream_version=1 ... ) ... writer = client.write_rows_stream(...) ... writer.write(data)
Check the data_schema before writing to ensure your data matches the expected table schema:
>>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> schema = stream_resp.data_schema >>> if schema: ... columns = schema.get('Columns', []) ... for col in columns: ... print(f"Column: {col.get('Name')}, Type: {col.get('Type')}") Column: id, Type: bigint Column: name, Type: string
- get_read_session(session_id: str, refresh: bool = False) CreateReadSessionResponse
Get current status and metadata of an existing read session.
This method retrieves the current state of a read session, including its status, split count, schema, and expiration time. Use this to check if a session is still valid before attempting to read data, or to refresh a session that is approaching expiration.
- Parameters:
session_id (str) – The unique identifier of the read session to retrieve. This is obtained from create_read_session response.
refresh (bool, default False) – Whether to refresh the session expiration time. Set to True if the session is about to expire or has expired. When True, the server extends the session lifetime, allowing continued reading operations.
- Returns:
Response with session_id, session_status, splits_count, record_count, data_schema, and expiration_time. See
CreateReadSessionResponse.- Return type:
- Raises:
ValueError – If session_id is None or empty.
See also
create_read_sessionCreate a new read session.
read_rows_streamRead data from a specific split.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import StorageApiClient >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
First create a read session, then check its status:
>>> create_response = client.create_read_session() >>> session_id = create_response.session_id >>> status_response = client.get_read_session(session_id) >>> print(f"Session status: {status_response.session_status}") Session status: NORMAL >>> print(f"Expiration: {status_response.expiration_time}") Expiration: 2023-01-01T12:00:00Z
When a session expires during a long-running read operation, use the
refreshparameter to extend its lifetime:>>> # Check if session expired >>> response = client.get_read_session(session_id) >>> if response.session_status == SessionStatus.EXPIRED: ... # Refresh the session to continue reading ... refreshed_response = client.get_read_session(session_id, refresh=True) ... print(f"New expiration: {refreshed_response.expiration_time}") New expiration: 2023-01-01T14:00:00Z
Monitor session status during a parallel read operation to ensure all splits are processed before expiration:
>>> create_response = client.create_read_session() >>> for split_index in range(create_response.splits_count): ... # Check session health before each split ... status = client.get_read_session(create_response.session_id) ... if status.session_status == SessionStatus.EXPIRED: ... client.get_read_session(create_response.session_id, refresh=True) ... # Read the split data... ... reader = client.read_rows_stream(...)
- get_write_session(session_id: str) GetWriteSessionResponse
Get the current status and stream information of a write session.
This method retrieves metadata about a write session, including the list of active write streams and their states. Use this to monitor the progress of a multi-stream upload operation or to verify that all streams have been properly closed before committing the session.
- Parameters:
session_id (str) – The unique identifier of the write session to query. This is obtained from create_write_session response.
- Returns:
Response with streams (list of stream info dicts), warning_message, and request_id. See
GetWriteSessionResponse.- Return type:
- Raises:
ValueError – If session_id is None or empty, or if called on an instance-based client.
See also
create_write_sessionCreate a write session.
commit_write_sessionCommit the session after all streams closed.
create_write_streamCreate a new stream in the session.
Notes
Before calling commit_write_session, ensure all write streams are closed. Use get_write_session to verify that no streams are still in the RUNNING or OPEN state.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Create a write session and check its status before creating streams:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> status_resp = client.get_write_session(session_id) >>> print(f"Active streams: {status_resp.streams}") Active streams: None
After creating multiple write streams for parallel upload, use get_write_session to track which streams are active:
>>> # Create multiple streams (see create_write_stream examples) >>> stream_resp1 = client.create_write_stream(...) >>> stream_resp2 = client.create_write_stream(...) >>> status = client.get_write_session(session_id) >>> print(f"Streams: {len(status.streams) if status.streams else 0}") Streams: 2
Before committing the session, verify all streams are closed to ensure data upload is complete:
>>> # Close all streams after writing data >>> client.close_write_stream(...) >>> client.close_write_stream(...) >>> # Verify all streams are closed >>> status = client.get_write_session(session_id) >>> if status.streams: ... # Check if any stream is still open ... open_streams = [s for s in status.streams if s.get('Status') == 'OPEN'] ... if open_streams: ... print(f"Warning: {len(open_streams)} streams still open") ... else: ... # All streams closed, safe to commit ... client.commit_write_session(session_id) ... else: ... client.commit_write_session(session_id)
Monitor the progress of a long-running parallel upload by periodically checking stream status:
>>> import time >>> while True: ... status = client.get_write_session(session_id) ... if status.streams: ... closed_count = sum( ... 1 for s in status.streams ... if s.get('Status') == 'CLOSED' ... ) ... print(f"Progress: {closed_count}/{len(status.streams)} streams closed") ... if closed_count == len(status.streams): ... break ... time.sleep(5) >>> # All streams closed, commit the session >>> client.commit_write_session(session_id)
- get_write_stream(session_id: str, stream_id: str, stream_version: int, route_token=None, exactly_once_mode=False) GetWriteStreamResponse
Get the current status and metadata of a write stream.
This method retrieves information about a specific write stream, including its current state, statistics, and any error information. Use this to monitor stream health during uploads or to verify stream state before closing.
- Parameters:
session_id (str) – The unique identifier of the write session.
stream_id (str) – The identifier of the stream to query (typically 0, 1, 2, …).
stream_version (int) – The version number of the stream.
- Returns:
Response with status, record_count, byte_size, error_code, and error_message. See
GetWriteStreamResponse.- Return type:
- Raises:
ValueError – If any parameter is None or empty, or if called on an instance-based client.
See also
create_write_streamCreate a stream before getting its status.
close_write_streamClose the stream after verifying status.
get_write_sessionGet all streams in a session.
Notes
Use get_write_stream to check if a stream has encountered errors before attempting to close it. If a stream is in ERROR state, it may need to be recreated with an incremented stream_version.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Create a write stream and check its status before writing data:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0) >>> print(f"Stream status: {status.status}") Stream status: OPEN
After writing data, check the stream statistics to verify upload progress before closing:
>>> # Write data to the stream >>> writer = client.write_rows_stream(...) >>> writer.write(batch) >>> writer.finish() >>> # Check stream statistics >>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0) >>> print(f"Uploaded {status.record_count or 0} records, {status.byte_size or 0} bytes") Uploaded 1000 records, 524288 bytes >>> # Close the stream after verifying the upload >>> client.close_write_stream(...)
Monitor multiple parallel streams during upload to identify which streams have encountered errors:
>>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # Create 3 parallel streams and write data >>> for i in range(3): ... stream_resp = client.create_write_stream(session_id, stream_id=i) >>> # Check status of all streams >>> for i in range(3): ... status = client.get_write_stream(session_id, stream_id=str(i), stream_version=0) ... if status.status == 'ERROR': ... print(f"Stream {i} failed: {status.error_message}") ... else: ... print(f"Stream {i}: {status.status}, {status.record_count or 0} records") Stream 0: OPEN, 500 records Stream 1: OPEN, 300 records Stream 2: ERROR, Schema validation failed
If a stream shows ERROR status, use get_write_stream to get detailed error information before deciding to retry:
>>> status = client.get_write_stream(session_id, stream_id="0", stream_version=0) >>> if status.status == 'ERROR': ... print(f"Stream error [{status.error_code}]: {status.error_message}") ... # Decide whether to retry with new version or abort session ... if status.error_code == 'SCHEMA_MISMATCH': ... # Retry with corrected data ... client.create_write_stream(session_id, stream_id=0, stream_version=1) ... else: ... # Abort the entire session ... client.abort_write_session(session_id)
- preview_table(limit=None, partition=None, columns=None) StreamReader
Preview table data without creating a session.
This method provides a lightweight way to sample table data without the overhead of creating a read session. Unlike read_rows_stream, preview_table directly returns an Arrow IPC stream, making it ideal for quick data exploration, schema inspection, or testing table connectivity.
- Parameters:
limit (int, optional) – Maximum number of rows to preview. If None, returns a small default sample (typically 100-1000 rows).
partition (str, optional) – Partition specification to preview specific partition data. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’.
columns (list of str, optional) – Specific columns to preview. If empty, all columns are returned.
- Returns:
Stream reader with read(), get_status(), get_request_id(), and close() methods. See
StreamReader.- Return type:
- Raises:
ValueError – If called on an instance-based client (preview only works on tables).
See also
read_rows_streamRead full table data with session management.
create_read_sessionCreate a session for large-scale reading.
Notes
Preview is optimized for quick sampling and may not return exact row counts specified in limit. For production data reading with guaranteed row counts and retry support, use create_read_session and read_rows_stream instead.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowReader ... ) >>> import pyarrow as pa >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Preview the first few rows of a table to quickly explore its data:
>>> stream_reader = client.preview_table(limit=10) >>> arrow_reader = ArrowReader(stream_reader) >>> batch = arrow_reader.read() >>> if batch is not None: ... df = batch.to_pandas() ... print(df) id name value 0 1 Alice 100 1 2 Bob 200 2 3 Carol 150
Preview specific columns to check their data types and values without downloading all columns:
>>> reader = ArrowReader(client.preview_table( ... limit=5, columns=["id", "name"] ... )) >>> batch = reader.read() >>> if batch is not None: ... print(batch.schema) id: int64 name: string
Preview data from a specific partition to test partition filtering before running a full query:
>>> reader = ArrowReader(client.preview_table( ... limit=20, partition="pt=20230101" ... )) >>> batch = reader.read() >>> df = batch.to_pandas() if batch is not None else None >>> print(f"Previewed {len(df) if df is not None else 0} rows") Previewed 20 rows
Use preview to inspect table schema by requesting a small sample. This is useful for understanding column names and types before creating a read session:
>>> reader = ArrowReader(client.preview_table(limit=1)) >>> batch = reader.read() >>> if batch is not None: ... for field in batch.schema: ... print(f"{field.name}: {field.type}") id: int64 name: string value: double pt: string
- preview_table_arrow(limit=None, partition=None, columns=None) ArrowReader[source]
Preview table data as Arrow batches.
This is the Arrow convenience wrapper for
preview_table(). It reads a small sample of rows and returns them via anArrowReaderthat yieldspyarrow.RecordBatchobjects.- Parameters:
limit (int, optional) – Maximum number of rows to preview. If None, returns a small default sample (typically 100-1000 rows).
partition (str, optional) – Partition specification to preview specific partition data. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’.
columns (list of str, optional) – Specific columns to preview. If empty, all columns are returned.
- Returns:
Arrow batch reader with read() method yielding RecordBatch objects. See
ArrowReader.- Return type:
- read_rows_arrow(session_id, split_index=None, row_offset=None, row_count=None, max_batch_rows=4096, skip_row_num=0, max_batch_raw_size=0, data_format=None, data_columns=None, compression=None, route_token=None) ArrowReader[source]
Read one split of the read session as Arrow batches.
This is the Arrow convenience wrapper for
read_rows_stream(). It reads raw bytes from the server and wraps them in anArrowReaderthat yieldspyarrow.RecordBatchobjects.- Parameters:
session_id (str) – The read session identifier from create_read_session.
split_index (int, optional) – Which split to read (0 to splits_count-1). If None, reads all data in the session.
row_offset (int, optional) – Starting row offset within the split. Defaults to 0.
row_count (int, optional) – Maximum number of rows to read. If None, reads all rows in the split.
max_batch_rows (int, default 4096) – Maximum rows per Arrow batch in the stream. Controls memory usage during reading.
skip_row_num (int, default 0) – Number of rows to skip before reading.
max_batch_raw_size (int, default 0) – Maximum raw byte size per batch. 0 means no limit.
data_format (DataFormat, optional) – Format of returned data (Arrow V5 is default).
data_columns (list of str, optional) – Specific columns to read. Must match session schema.
compression (Compression, default None) – Compression algorithm for the stream data. None means Compression.UNCOMPRESSED.
- Returns:
Arrow batch reader with read() method yielding RecordBatch objects. See
ArrowReader.- Return type:
- read_rows_stream(session_id=None, split_index=None, row_offset=None, row_count=None, max_batch_rows=4096, skip_row_num=0, max_batch_raw_size=0, data_format=None, data_columns=None, compression=None, route_token=None) StreamReader
Read data from a specific split in a read session.
This method reads a chunk of data from a table or instance result by specifying which split to read. The data is returned as a stream that can be processed incrementally, supporting efficient handling of large datasets. Each split can be read independently, enabling parallel processing of the same session across multiple workers.
- Parameters:
session_id (str) – The read session identifier from create_read_session.
split_index (int, optional) – Which split to read (0 to splits_count-1). If None, reads all data in the session.
row_offset (int, optional) – Starting row offset within the split. Defaults to 0.
row_count (int, optional) – Maximum number of rows to read. If None, reads all rows in the split.
max_batch_rows (int, default 4096) – Maximum rows per Arrow batch in the stream. Controls memory usage during reading.
skip_row_num (int, default 0) – Number of rows to skip before reading.
max_batch_raw_size (int, default 0) – Maximum raw byte size per batch. 0 means no limit.
data_format (DataFormat, optional) – Format of returned data (Arrow V5 is default).
data_columns (list of str, optional) – Specific columns to read. Must match session schema.
compression (Compression, default None) – Compression algorithm for the stream data. None means Compression.UNCOMPRESSED.
- Returns:
Stream reader with read(), get_status(), get_request_id(), and close() methods. See
StreamReader.- Return type:
- Raises:
ValueError – If the session or split parameters are invalid.
See also
create_read_sessionCreate a read session first.
ArrowReaderWrap StreamReader to read Arrow batches.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowReader ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Read data from a specific split using ArrowReader for convenient batch processing:
>>> reader = client.read_rows_stream(session_id, split_index=0) >>> arrow_reader = ArrowReader(reader) >>> while True: ... batch = arrow_reader.read() ... if batch is None: ... break ... df = batch.to_pandas() ... # Process dataframe >>> arrow_reader.get_request_id() # Get request ID after completion
For parallel processing, distribute splits across multiple workers. Each worker reads a different split index from the same session:
>>> # Worker 1 reads split 0 >>> reader1 = client.read_rows_stream(session_id, split_index=0) >>> # Worker 2 reads split 1 >>> reader2 = client.read_rows_stream(session_id, split_index=1)
Control memory usage by limiting batch size with
max_batch_rows. Smaller batches reduce memory footprint but may have lower throughput:>>> reader = client.read_rows_stream(session_id, split_index=0, max_batch_rows=1024) >>> arrow_reader = ArrowReader(reader) >>> # Read batches one at a time to control memory >>> while True: ... batch = arrow_reader.read() ... if batch is None: ... break ... df = batch.to_pandas() ... # Process df and then discard to free memory
Read a specific range of rows within a split by using
row_offsetandrow_countparameters:>>> reader = client.read_rows_stream( ... session_id, ... split_index=0, ... row_offset=1000, # Skip first 1000 rows ... row_count=500, # Read 500 rows ... )
- property route_token
The stored route token for session affinity.
This is automatically updated from response headers. Pass
route_token=...to individual methods to override, or rely on this stored value when not specified.
- write_blob_batch(items: List[BlobWriteItem], session_id=None, stream_id=None, stream_version=0, partition_values=None, column_index=0) WriteBlobResponse
Upload multiple blobs in a single batch request for efficiency.
This method uploads multiple binary blobs (such as images, videos, or files) in one consolidated request, reducing network overhead compared to individual uploads. Each blob is packaged with metadata (partition location, column index, MIME type) and optional checksum verification.
- Parameters:
items (list of BlobWriteItem) – List of blob items to upload. Each item contains data (bytes), partition_values (list of str), column_index (int), distribution_key (str), mime_type (str), and checksum_type (ChecksumType). See
BlobWriteItem.session_id (str, optional) – The write session identifier from create_write_session.
stream_id (str or int, optional) – Stream identifier for this batch upload.
stream_version (int, default 0) – Version number for the stream.
partition_values (list of str, optional) – Default partition values for blobs (can be overridden per item). Format: [‘pt=20230101’, ‘region=us-west’].
column_index (int, default 0) – Default column index for blobs (can be overridden per item).
- Returns:
Response with blob_reference, blob_references, warning_message, and size. See
WriteBlobResponse.- Return type:
- Raises:
ValueError – If called on an instance-based client.
See also
write_blob_streamUpload single blob via streaming.
read_blobsRead uploaded blobs using the references.
BlobWriteItemClass to construct blob items.
Notes
Batch upload is more efficient than individual uploads when uploading many small blobs. The method uses a custom wire format with header, data, and footer sections for each blob. Checksums (CRC32 or MD5) are computed and included in the footer for server-side verification. The order of blob_references matches the input items order.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, BlobWriteItem, ChecksumType ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Upload multiple image blobs in a single batch request for efficient network usage:
>>> # 1. Create write session and stream >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> # 2. Prepare multiple blob items >>> items = [] >>> for image_path in ['img1.jpg', 'img2.jpg', 'img3.jpg']: ... with open(image_path, 'rb') as f: ... image_data = f.read() ... item = BlobWriteItem( ... data=image_data, ... partition_values=['pt=20230101'], ... column_index=0, ... mime_type='image/jpeg', ... checksum_type=ChecksumType.CRC32 # Verify integrity ... ) ... items.append(item) >>> # 3. Upload all blobs in one batch >>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0) >>> print(f"Uploaded {len(batch_resp.blob_references)} blobs") >>> print(f"Total size: {batch_resp.size} bytes") Uploaded 3 blobs Total size: 1572864 bytes >>> # 4. Store references for later reading >>> blob_refs = batch_resp.blob_references >>> # 5. Close stream and commit session >>> client.close_write_stream(session_id, stream_id=0) >>> client.commit_write_session(session_id)
Upload blobs to different partitions and columns by customizing each BlobWriteItem’s metadata:
>>> items = [ ... BlobWriteItem( ... data=blob1_data, ... partition_values=['pt=20230101'], ... column_index=0 ... ), ... BlobWriteItem( ... data=blob2_data, ... partition_values=['pt=20230102'], ... column_index=1 ... ), ... BlobWriteItem( ... data=blob3_data, ... partition_values=['pt=20230103'], ... column_index=0 ... ), ... ] >>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> # Each blob is stored in its specified partition/column
Use MD5 checksum for stronger integrity verification on critical data uploads. The checksum is computed and sent with the blob:
>>> critical_data = read_critical_file() >>> item = BlobWriteItem( ... data=critical_data, ... column_index=0, ... checksum_type=ChecksumType.MD5 ... ) >>> response = client.write_blob_batch([item], session_id, stream_id=0) >>> # Server verifies MD5 checksum matches computed value
Add MIME type metadata to help applications understand blob content type without examining the binary data:
>>> items = [ ... BlobWriteItem( ... data=json_data_bytes, ... mime_type='application/json', ... column_index=0 ... ), ... BlobWriteItem( ... data=pdf_data_bytes, ... mime_type='application/pdf', ... column_index=1 ... ), ... ] >>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> # MIME types stored with blobs for content type hints
Use distribution_key for hash-based storage to ensure blobs are distributed across storage locations based on the key:
>>> items = [ ... BlobWriteItem( ... data=blob_data, ... distribution_key='user123', ... column_index=0 ... ), ... ] >>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> # Blob stored at location determined by distribution key hash
Upload a large number of small files efficiently by batching them instead of uploading individually:
>>> # Prepare 100 small file blobs >>> items = [] >>> for file_path in small_files: # 100 small files ... with open(file_path, 'rb') as f: ... data = f.read() ... items.append(BlobWriteItem(data=data, column_index=0)) >>> # Upload all in one batch (much faster than 100 individual uploads) >>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> print(f"Batch uploaded {len(response.blob_references)} blobs") Batch uploaded 100 blobs
Check for warnings in the response to detect partial upload failures or other issues:
>>> response = client.write_blob_batch(items, session_id, stream_id=0) >>> if response.warning_message: ... print(f"Upload warning: {response.warning_message}") ... # Some blobs may have had issues ... # Check if all blob_references are present ... if len(response.blob_references) < len(items): ... print(f"Only {len(response.blob_references)} of {len(items)} uploaded") Upload warning: 2 blobs exceeded size limit Only 98 of 100 uploaded
Use the returned blob_references to read the blobs later by calling read_blobs with the reference list:
>>> batch_resp = client.write_blob_batch(items, session_id, stream_id=0) >>> blob_refs = batch_resp.blob_references >>> # Later, read the blobs back >>> blob_iterator = client.read_blobs(blob_references=blob_refs) >>> for data, mime_type in blob_iterator: ... # Process each blob data ... print(f"Read blob: {len(data)} bytes, type: {mime_type}")
- write_blob_stream(session_id, stream_id, stream_version=0, partition_values=None, column_index=0, compression=None) BlobStreamWriter
Upload a single blob via streaming upload.
This method creates a streaming writer for uploading a single binary blob (such as an image, video, or large binary file) to a specific column in a MaxCompute table. The data is optionally compressed using the specified compression algorithm and verified with MD5 checksum to ensure upload integrity.
- Parameters:
session_id (str) – The write session identifier from create_write_session.
stream_id (str or int) – Stream identifier for this upload.
stream_version (int, default 0) – Version number for the stream.
partition_values (list of str, optional) – Partition values for the blob location. Format: [‘pt=20230101’, ‘region=us-west’].
column_index (int, default 0) – Column index in the table schema where the blob will be stored.
compression (Compression, default None) – Compression algorithm to use. None means Compression.UNCOMPRESSED. See
Compression. Supported values: Compression.ZSTD, Compression.LZ4_FRAME, Compression.UNCOMPRESSED.
- Returns:
Blob stream writer with write(), finish(), get_status(), get_request_id(), and writable() methods. See
BlobStreamWriter.- Return type:
- Raises:
ValueError – If called on an instance-based client.
errors.DependencyNotInstalledError – If zstandard library is not installed (required for ZSTD compression).
errors.ChecksumError – If MD5 checksum verification fails after finish().
See also
write_blob_batchUpload multiple blobs in one request.
read_blobsRead uploaded blobs by reference.
BlobWriteItemHelper class for batch blob uploads.
Notes
Blob uploads default to no compression (Compression.UNCOMPRESSED). Compression algorithms like ZSTD or LZ4_FRAME can be enabled for efficient transfer. Set compression=Compression.ZSTD for zstd compression or compression=Compression.LZ4_FRAME for lz4. The writer computes MD5 checksum incrementally during write and verifies against server response when finished. If the checksum doesn’t match, a ChecksumError is raised, indicating data corruption during upload.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient ... ) >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Complete workflow for uploading a single image blob to a table with a binary column:
>>> # 1. Create write session >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # 2. Create write stream >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> # 3. Upload blob data >>> blob_writer = client.write_blob_stream( ... session_id, stream_id=0, ... partition_values=['pt=20230101'], ... column_index=0 # First column is the blob column ... ) >>> # Read image file and upload in chunks >>> with open('image.jpg', 'rb') as f: ... while True: ... chunk = f.read(8192) # 8KB chunks ... if not chunk: ... break ... blob_writer.write(chunk) >>> # 4. Finish and verify MD5 >>> response = blob_writer.finish() >>> print(f"Blob reference: {response.blob_reference}") >>> print(f"Uploaded size: {response.size} bytes") Blob reference: blob_ref_abc123 Uploaded size: 524288 bytes >>> # 5. Close stream and commit session >>> client.close_write_stream(session_id, stream_id=0) >>> client.commit_write_session(session_id)
Upload blob data directly from memory without reading from a file, useful for dynamically generated binary data:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> # Generate or prepare binary data >>> binary_data = b"generated binary content..." >>> blob_writer.write(binary_data) >>> # Finish upload >>> response = blob_writer.finish() >>> print(f"Blob uploaded: {response.blob_reference}")
Upload a blob to a specific partition by providing partition values. The blob is stored in the specified partition’s location:
>>> blob_writer = client.write_blob_stream( ... session_id, stream_id=0, ... partition_values=['pt=20230101', 'region=us-west'], ... column_index=0 ... ) >>> blob_writer.write(image_data) >>> response = blob_writer.finish() >>> # Blob is now in partition pt=20230101/region=us-west
Handle checksum errors when the upload integrity check fails, indicating data corruption during transfer:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> blob_writer.write(data) >>> try: ... response = blob_writer.finish() ... print("Upload successful") ... except errors.ChecksumError as e: ... print(f"Checksum mismatch: {e}") ... # Data was corrupted during upload ... # Retry the upload or abort the session ... client.abort_write_session(session_id) Checksum mismatch: MD5 value mismatch, expected: abc123, actual: def456
Upload large blobs incrementally by reading and writing in chunks to avoid loading the entire blob into memory:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> # Upload a large video file in 64KB chunks >>> chunk_size = 65536 >>> with open('large_video.mp4', 'rb') as f: ... while True: ... chunk = f.read(chunk_size) ... if not chunk: ... break ... success = blob_writer.write(chunk) ... if not success: ... print("Writer closed unexpectedly") ... break >>> response = blob_writer.finish() >>> print(f"Large video uploaded: {response.size} bytes") Large video uploaded: 104857600 bytes
Upload text data by converting strings to bytes. The writer automatically handles the conversion:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> text_content = "This is text data to store as a blob" >>> blob_writer.write(text_content) # Automatically converted to bytes >>> response = blob_writer.finish()
Monitor writer status during long uploads to detect early failures:
>>> blob_writer = client.write_blob_stream(session_id, stream_id=0, column_index=0) >>> for chunk in large_data_chunks: ... success = blob_writer.write(chunk) ... if not success or blob_writer.get_status() != Status.RUNNING: ... print("Upload stopped unexpectedly") ... break >>> response = blob_writer.finish() >>> request_id = blob_writer.get_request_id()
- write_rows_arrow(session_id=None, stream_id=None, stream_version=0, record_count=0, compression=None, route_token=None, row_offset=-1, access_token=None) ArrowWriter[source]
Write Arrow batches to a write stream.
This is the Arrow convenience wrapper for
write_rows_stream(). It creates anArrowWriterthat serializespyarrow.RecordBatchobjects into Arrow IPC format and writes them to the underlying stream.- Parameters:
session_id (str) – The write session identifier.
stream_id (str or int) – The stream identifier from create_write_stream.
stream_version (int, default 0) – Version of the stream (should match create_write_stream).
record_count (int, default 0) – Total number of records to be written. Set this to the expected count for validation, or 0 if unknown.
compression (Compression, default None) – Compression algorithm for the uploaded data stream. None means Compression.UNCOMPRESSED.
row_offset (int, default -1) – Row offset for Exactly-Once mode. -1 means not used.
access_token (str, optional) – Access token for Exactly-Once mode.
- Returns:
Arrow batch writer with write() and finish() methods. See
ArrowWriter.- Return type:
- write_rows_stream(session_id, stream_id, stream_version=0, record_count=0, compression=None, route_token=None, row_offset=-1, access_token=None) StreamWriter
Write row data to a write stream via streaming upload.
This method creates a streaming writer for uploading row data to a write stream. The writer accepts Arrow record batches or raw binary data and uploads it incrementally, enabling efficient handling of large datasets without loading all data into memory at once.
- Parameters:
session_id (str) – The write session identifier.
stream_id (str or int) – The stream identifier from create_write_stream.
stream_version (int, default 0) – Version of the stream (should match create_write_stream).
record_count (int, default 0) – Total number of records to be written. Set this to the expected count for validation, or 0 if unknown.
compression (Compression, default None) – Compression algorithm for the uploaded data stream. None means Compression.UNCOMPRESSED.
- Returns:
Stream writer with write(), finish(), get_status(), get_request_id(), and writable() methods. See
StreamWriter.- Return type:
- Raises:
ValueError – If called on an instance-based client.
See also
create_write_streamCreate a stream before writing.
ArrowWriterConvert Arrow batches to stream format.
close_write_streamClose the stream after finishing write.
Notes
The returned StreamWriter accepts binary data directly. For Arrow workflows, wrap it with ArrowWriter which converts record batches to Arrow IPC format. The writer must be finished before closing the stream. Call finish() after writing all batches.
Examples
>>> from odps import ODPS >>> from odps.apis.storage_api_v2 import ( ... StorageApiClient, ArrowWriter, Compression ... ) >>> import pyarrow as pa >>> odps = ODPS( ... access_id="your_access_id", ... secret_access_key="your_secret_access_key", ... project="your_project", ... endpoint="your_endpoint" ... ) >>> table = odps.get_table("your_table") >>> client = StorageApiClient(odps, table)
Complete workflow using ArrowWriter to upload Arrow record batches:
>>> # 1. Create write session >>> session_resp = client.create_write_session() >>> session_id = session_resp.session_id >>> # 2. Create write stream >>> stream_resp = client.create_write_stream(session_id, stream_id=0) >>> # 3. Write Arrow data >>> schema = pa.schema([ ... pa.field("id", pa.int64()), ... pa.field("name", pa.string()), ... pa.field("value", pa.float64()), ... ]) >>> batch = pa.record_batch([ ... pa.array([1, 2, 3]), ... pa.array(["Alice", "Bob", "Carol"]), ... pa.array([100.0, 200.0, 150.0]), ... ], schema=schema) >>> stream_writer = client.write_rows_stream(session_id, stream_id=0, record_count=3) >>> arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED) >>> arrow_writer.write(batch) >>> commit_msg, success = arrow_writer.finish() >>> print(f"Upload successful: {success}") Upload successful: True >>> # 4. Close stream and commit session >>> client.close_write_stream(session_id, stream_id=0) >>> client.commit_write_session(session_id)
Read / Write Sessions
- class odps.apis.storage_api_v2.CreateReadSessionRequest(**kwargs)[source]
Request for creating a read session.
- required_data_columns
List of column names to read. If empty, all columns are returned.
- Type:
list of str
- required_partition_columns
Partition columns to include in the result.
- Type:
list of str
- required_partitions
Specific partition values to read (e.g., [‘pt=20230101’]).
- Type:
list of str
- required_bucket_ids
Bucket IDs to read for bucket-based tables.
- Type:
list of str
- split_options
Controls how data is split into chunks. Defaults to size-based splitting with 256MB chunks.
- Type:
- arrow_options
Arrow format settings like timestamp precision.
- Type:
- filter_predicate
SQL-like filter condition to apply during reading.
- Type:
str
- filter_predicate_fallback
Whether to fallback to server-side filtering if predicate pushdown fails.
- Type:
bool
- split_max_file_num
Maximum number of files per split for file-based splitting.
- Type:
int
- incremental_read
Enable incremental reading mode for capturing table changes.
- Type:
bool
- incremental_read_options
Options for incremental read mode (version range, timestamp range).
- Type:
- class odps.apis.storage_api_v2.CreateReadSessionResponse[source]
Response from creating a read session.
- session_id
Unique identifier for this read session. Required for all subsequent read operations.
- Type:
str
- session_status
Current status (INIT, NORMAL, EXPIRED, etc).
- Type:
- splits_count
Number of data splits available for parallel reading.
- Type:
int
- record_count
Total number of records across all splits.
- Type:
int
- data_schema
Schema information for data and partition columns.
- Type:
- expiration_time
When the session expires and needs refresh.
- Type:
str
- download_id
Download ID (used for instance-based reads, mapped to session_id).
- Type:
str
- status
Status field for instance-based reads (mapped to session_status).
- Type:
- session_type
Type of the read session.
- Type:
str
- supported_data_format
List of data formats supported by the server.
- Type:
list of DataFormat
- split_mode
Mode used for splitting the data.
- Type:
str
- split_bucket_id
Bucket ID used for bucket-based splitting.
- Type:
str
- session_stats
Statistics about the session (estimated size, row count).
- Type:
- latest_version
Latest data version for incremental reads.
- Type:
int
- message
Server message, if any.
- Type:
str
- enable_large_string
Whether large string support is enabled.
- Type:
bool
- incremental_read_options
Options for incremental read from the server response.
- Type:
- request_id
Request ID for debugging (set from response headers).
- Type:
str
- class odps.apis.storage_api_v2.CreateWriteSessionRequest(**kwargs)[source]
Request for creating a write session.
- partial_partition_spec
Partition specification for writing to a specific partition. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’. If empty, writes to the table’s default location.
- Type:
str
- flags
Additional flags for session configuration. Common flags include ‘overwrite’ to replace existing partition data.
- Type:
dict
- class odps.apis.storage_api_v2.CreateWriteSessionResponse(**kwargs)[source]
Response from creating a write session.
- session_id
Unique identifier for this write session. Required for creating write streams and committing the session.
- Type:
str
- warning_message
Warning message if the session has any issues.
- Type:
str
- route_token
Routing token for load balancing, extracted from response headers. Pass this token to subsequent operations (create_write_stream, commit_write_session, etc.) to ensure session affinity.
- Type:
str
- request_id
Request ID for debugging (set from response headers).
- Type:
str
- class odps.apis.storage_api_v2.GetWriteSessionResponse(**kwargs)[source]
Response from getting write session status.
- streams
Information about all write streams created in this session. Each stream dict contains stream_id, stream_version, and stream status.
- Type:
list of dict
- warning_message
Warning message if any streams have issues.
- Type:
str
- route_token
Routing token for load balancing, extracted from response headers. Pass this token to subsequent operations to ensure session affinity.
- Type:
str
- request_id
Request ID for debugging (set from response headers).
- Type:
str
Read / Write Streams
- class odps.apis.storage_api_v2.CreateWriteStreamRequest(session_id, stream_id, stream_version=0, exactly_once_mode=False)[source]
Request for creating a write stream within an active write session.
- session_id
The write session identifier from create_write_session.
- Type:
str
- stream_id
Unique identifier for this stream within the session. Typically an integer (0, 1, 2, …) for parallel streams.
- Type:
str or int
- stream_version
Version number for the stream. Increment if retrying after a failed upload to the same stream_id. Default 0.
- Type:
int
- exactly_once_mode
Whether to enable Exactly-Once semantics for this stream. When enabled, the server returns an access token and tracks row offsets for idempotent writes. Default False.
- Type:
bool
- class odps.apis.storage_api_v2.CreateWriteStreamResponse(**kwargs)[source]
Response from creating a write stream.
- data_schema
The table schema for this stream, including column names and types. Use this to validate data before writing.
- Type:
dict
- table_id
Internal table identifier.
- Type:
str
- schema_version
Schema version number.
- Type:
str
- route_token
Routing token for load balancing, extracted from response headers.
- Type:
str
- request_id
Request ID for debugging (set from response headers).
- Type:
str
- class odps.apis.storage_api_v2.GetWriteStreamResponse(**kwargs)[source]
Response from getting write stream status.
- status
Current stream state (OPEN, CLOSED, ERROR).
- Type:
str
- record_count
Number of records written to this stream.
- Type:
int
- byte_size
Total bytes uploaded through this stream.
- Type:
int
- error_code
Error code if stream is in ERROR state.
- Type:
str
- error_message
Detailed error message if stream failed.
- Type:
str
- request_id
Request ID for debugging (set from response headers).
- Type:
str
- class odps.apis.storage_api_v2.ReadStreamRequest(session_id, split_index=None, row_offset=None, row_count=None, max_batch_rows=4096, skip_row_num=0, max_batch_raw_size=0, data_format=None, data_columns=None, compression=None)[source]
Request for reading data from a specific split in a read session.
- session_id
The read session identifier from create_read_session.
- Type:
str
- split_index
Which split to read (0 to splits_count-1). If None, reads all data in the session.
- Type:
int, optional
- row_offset
Starting row offset within the split. Defaults to 0.
- Type:
int, optional
- row_count
Maximum number of rows to read. If None, reads all rows in the split.
- Type:
int, optional
- max_batch_rows
Maximum rows per Arrow batch in the stream. Controls memory usage during reading. Default 4096.
- Type:
int
- skip_row_num
Number of rows to skip before reading. Default 0.
- Type:
int
- max_batch_raw_size
Maximum raw byte size per batch. 0 means no limit. Default 0.
- Type:
int
- data_format
Format of returned data (Arrow V5 is default).
- Type:
DataFormat, optional
- data_columns
Specific columns to read. Must match session schema.
- Type:
list of str
- compression
Compression algorithm for the stream data. None means Compression.UNCOMPRESSED.
- Type:
Compression, default None
- class odps.apis.storage_api_v2.WriteStreamRequest(session_id, stream_id, stream_version=0, record_count=0, compression=None, row_offset=-1, access_token=None)[source]
Request for writing row data to a write stream via streaming upload.
- session_id
The write session identifier.
- Type:
str
- stream_id
The stream identifier from create_write_stream.
- Type:
str or int
- stream_version
Version of the stream (should match create_write_stream). Default 0.
- Type:
int
- record_count
Total number of records to be written. Set this to the expected count for validation, or 0 if unknown. Default 0.
- Type:
int
- compression
Compression algorithm for the uploaded data stream. None means Compression.UNCOMPRESSED.
- Type:
Compression, default None
- row_offset
Row offset for Exactly-Once mode. Set to -1 (default) to not send the offset. In Exactly-Once mode, this tracks the starting row position for idempotent writes.
- Type:
int
- access_token
Access token for Exactly-Once mode. Obtained from CreateWriteStreamResponse or GetWriteStreamResponse.
- Type:
str
- class odps.apis.storage_api_v2.CloseWriteStreamRequest(session_id, stream_id, stream_version=0)[source]
Request for closing a write stream to finalize the data upload.
- session_id
The write session identifier.
- Type:
str
- stream_id
The stream identifier to close.
- Type:
str or int
- stream_version
Version of the stream (should match create_write_stream). Default 0.
- Type:
int
- class odps.apis.storage_api_v2.CloseWriteStreamResponse(**kwargs)[source]
Response from closing a write stream.
- warning_message
Warning message if the stream closure has any issues. Check this for potential problems even if close succeeds.
- Type:
str
- request_id
Request ID for debugging (set from response headers).
- Type:
str
- class odps.apis.storage_api_v2.StreamReader(download)[source]
Stream reader for reading Arrow IPC formatted data from the server.
- class odps.apis.storage_api_v2.StreamWriter(upload, on_route_token=None)[source]
Stream writer for uploading binary data to the server.
Expects Arrow IPC formatted binary data. Use
ArrowWriterto convert Arrow record batches to this format.- write(data)[source]
Write binary data to the stream. Returns True on success, False if writer is closed or error occurred.
- finish()[source]
Finish writing and close the upload. Returns (commit_message, success_bool). In Exactly-Once mode, the response body is also parsed into a WriteStreamResponse accessible via get_write_stream_response().
- get_route_token()[source]
Get route token from write response headers for session affinity. Call after finish(). Returns None if not available.
- get_write_stream_response()[source]
Get the parsed WriteStreamResponse after finish(). Returns None if finish() has not been called or if the response did not contain exactly-once data.
- class odps.apis.storage_api_v2.ArrowReader(stream_reader)[source]
Arrow batch reader that wraps a
StreamReader.
- class odps.apis.storage_api_v2.ArrowWriter(stream_writer, compression)[source]
Arrow batch writer that wraps a
StreamWriter.Converts Arrow record batches to Arrow IPC format and writes them to the underlying stream.
Blob I/O
- class odps.apis.storage_api_v2.BlobDataIterator(raw_stream)[source]
Iterator that parses the framed blob download protocol.
- The download stream is processed through the following layers:
Decompression (if the response is compressed, handled upstream via
get_decompress_streambefore passing to this iterator)CRC32C stripping – strips per-block checksums
This iterator parses [HeaderLen][Header][DataLen][Data][FooterLen][Footer] frames
Yields (data_bytes, mime_type) tuples for each blob, where:
- data_bytesbytes
The raw binary data of the blob.
- mime_typestr or None
MIME type metadata if it was provided during upload, otherwise None.
For single blob downloads, the server may omit framing and send raw decompressed data directly. The iterator automatically detects this case and returns the entire payload as one blob.
- read_data(size=-1)[source]
Read up to size bytes of the current blob’s data from the stream.
This is used by BlobStreamReader for chunked reads. The stream position must be within the current blob’s data region.
- Parameters:
size (int) – Maximum bytes to read.
-1reads all remaining data of the current blob.
Skip remaining_bytes of unread data and the trailing footer.
- Parameters:
remaining_bytes (int) – Number of data bytes still unread in the current frame.
- class odps.apis.storage_api_v2.BlobStreamReader(iterator)[source]
File-like reader for streaming blob data from a
BlobDataIterator.Provides
read(size)for incremental reads of the current blob, amime_typeproperty, and anext()method to advance to the next blob.Unlike iterating over
BlobDataIteratorwhich materializes each blob entirely in memory, this reader reads data from the underlying stream in chunks, avoiding buffering the entire blob.Calling
next()before the current blob is fully exhausted raises anIOError. When all blobs have been read,next()returnsNone.- Parameters:
iterator (BlobDataIterator) – The underlying blob data iterator.
Examples
>>> blob_reader = client.read_blobs(blob_references=refs, stream=True) >>> while blob_reader is not None: ... print(f"MIME: {blob_reader.mime_type}") ... chunk = blob_reader.read(4096) ... while chunk: ... process(chunk) ... chunk = blob_reader.read(4096) ... blob_reader = blob_reader.next()
- property mime_type
MIME type of the current blob.
- Type:
str or None
- next()[source]
Advance to the next blob in-place and return
self.Raises
IOErrorif the current blob has not been fully read. ReturnsNonewhen there are no more blobs.After a successful call,
selfis updated to reference the next blob’s data andmime_type. Do not retain references to the reader before callingnext()— the same object is mutated.- Return type:
BlobStreamReader or None
- class odps.apis.storage_api_v2.BlobStreamWriter(upload, compression=Compression.UNCOMPRESSED)[source]
Stream writer for single blob upload with MD5 checksum verification.
Automatically computes MD5 checksum and verifies it against the server response. Data can be compressed during upload using the Compression enum.
- write(data)[source]
Write binary data chunks to the blob stream. Accepts bytes or string (converted to bytes). Returns True on success.
- finish()[source]
Finish writing and get server response with MD5 verification. Returns
WriteBlobResponse. Raises ChecksumError if MD5 mismatch detected.
- class odps.apis.storage_api_v2.BlobWriteItem(data, partition_values=None, column_index=0, distribution_key=None, mime_type=None, checksum_type=ChecksumType.NONE, size=None)[source]
A single blob item for batch upload.
- Wire format when serialized:
[8-byte LE header_len][header JSON][8-byte LE data_len][data bytes][8-byte LE footer_len][footer JSON]
- write_frame_to(stream, chunk_size=262144)[source]
Write this item’s frame to a file-like stream.
Writes: [8-byte LE header_len][header JSON][8-byte LE data_len][data][8-byte LE footer_len][footer JSON]
For bytes data, writes directly. For file-like data, reads and writes in chunks, computing checksums incrementally.
- class odps.apis.storage_api_v2.ReadBlobRequest(blob_references)[source]
Request for downloading binary blobs by their references.
- blob_references
List of blob reference strings obtained from previous upload operations (write_blob_stream or write_blob_batch). These references uniquely identify the blobs to download. Bytes references are automatically decoded to UTF-8 strings.
- Type:
list of str
- class odps.apis.storage_api_v2.WriteBlobRequest(session_id, stream_id, stream_version=0, partition_values=None, column_index=0)[source]
Request for uploading blob data (streaming or batch).
- session_id
The write session identifier from create_write_session.
- Type:
str
- stream_id
Stream identifier for this upload.
- Type:
str or int
- stream_version
Version number for the stream. Default 0.
- Type:
int
- partition_values
Partition values for the blob location. Format: [‘pt=20230101’, ‘region=us-west’].
- Type:
list of str
- column_index
Column index in the table schema where the blob will be stored. Default 0.
- Type:
int
- class odps.apis.storage_api_v2.WriteBlobResponse(**kwargs)[source]
Response from a blob write operation (stream or batch).
- blob_reference
Single blob reference if one blob was uploaded.
- Type:
str
- blob_references
List of blob references matching the input items order. Use these references to read the blobs later.
- Type:
list of str
- warning_message
Warning message if any blob uploads had issues.
- Type:
str
- size
Total bytes uploaded.
- Type:
int
- request_id
Request ID for debugging (set from response headers).
- Type:
str
Preview
- class odps.apis.storage_api_v2.PreviewTableRequest(limit=None, partition=None, columns=None)[source]
Request for previewing table data without creating a session.
- limit
Maximum number of rows to preview. If None, returns a small default sample (typically 100-1000 rows).
- Type:
int, optional
- partition
Partition specification to preview specific partition data. Format: ‘pt=20230101’ or ‘pt=20230101,region=us-west’.
- Type:
str, optional
- columns
Specific columns to preview. If empty, all columns are returned.
- Type:
list of str