Storage API V2

MaxCompute Storage API V2 是 MaxCompute 提供的高吞吐数据读写接口。相比于基于 Tunnel 的数据通道, Storage API V2 提供了更细粒度的会话管理、支持 Arrow 格式和 Blob 格式读写、支持增量读取以及表预览等功能, 适用于大规模并行数据读写场景。

备注

Storage API V2 需要服务端支持,请确保 MaxCompute 集群已开启相关功能。

基本概念

Storage API V2 的核心概念包括:

  • 会话(Session):读写操作的事务上下文。读会话管理数据分片,写会话保证数据原子性。

  • 分片(Split):读会话将数据按大小、并行度等方式分为多个分片,每个分片可独立读取,支持并行处理。

  • 流(Stream):写会话中的数据上传通道。一个写会话可创建多个流以支持并行写入。

  • 压缩(Compression):支持 UNCOMPRESSED(默认)、LZ4 和 ZSTD 压缩算法,减少网络传输数据量。

  • 路由令牌(Route Token):服务端返回的路由标识,用于会话亲和性,确保后续请求路由到同一节点。

  • Exactly-Once 模式:写流支持精确一次语义,通过 access_token 和 row_offset 实现幂等写入。

客户端初始化

使用 Storage API V2 需要创建 StorageApiClientStorageApiArrowClient 实例。 两者均需要传入 ODPS 入口对象和表对象(或 Instance 对象)。

from odps import ODPS
from odps.apis.storage_api_v2 import StorageApiClient, StorageApiArrowClient

# 初始化 ODPS 入口对象
odps = ODPS(
    access_id="your_access_id",
    secret_access_key="your_secret_access_key",
    project="your_project",
    endpoint="your_endpoint",
)

# 获取表对象
table = odps.get_table("your_table")

# 创建基础客户端(返回原始字节流)
client = StorageApiClient(odps, table)

# 创建 Arrow 客户端(返回 Arrow RecordBatch,推荐)
arrow_client = StorageApiArrowClient(odps, table)

StorageApiArrowClient 继承自 StorageApiClient,额外提供了 read_rows_arrowwrite_rows_arrowpreview_table_arrow 方法,能够直接读写 PyArrow RecordBatch 对象, 便于与 pandas 等数据处理框架配合使用。大多数场景下推荐使用 StorageApiArrowClient

也可以通过指定 quota_namerest_endpoint 参数来使用指定配额和自定义端点:

client = StorageApiArrowClient(
    odps, table,
    quota_name="your_quota",
    rest_endpoint="https://your-custom-endpoint",
)

还可以通过 tags 参数设置请求标签:

client = StorageApiArrowClient(
    odps, table,
    tags="tag1,tag2",
)

读取数据

完整读取流程

使用 Storage API V2 读取数据需要以下步骤:

  1. 创建读会话

  2. 按分片读取数据

  3. 关闭读会话(会话会自动过期,无需手动关闭)

创建读会话

通过 create_read_session() 方法创建读会话。 读会话确定了数据的分片方式、返回的列和分区等。

from odps.apis.storage_api_v2 import StorageApiArrowClient

arrow_client = StorageApiArrowClient(odps, table)

# 创建读会话,使用默认分片选项
read_resp = arrow_client.create_read_session()
print(f"Session ID: {read_resp.session_id}")
print(f"数据分片数: {read_resp.splits_count}")
print(f"总记录数: {read_resp.record_count}")
print(f"会话状态: {read_resp.session_status}")
print(f"过期时间: {read_resp.expiration_time}")

创建读会话时可以指定需要读取的列、分区和分片选项:

from odps.apis.storage_api_v2 import SplitOptions

# 仅读取指定列和分区
read_resp = arrow_client.create_read_session(
    required_data_columns=["id", "name", "value"],
    required_partitions=["pt=20230101"],
)

# 按并行度分片,创建 10 个分片
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
split_opts.split_number = 10
read_resp = arrow_client.create_read_session(
    split_options=split_opts,
)

# 按行偏移分片
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.ROW_OFFSET
split_opts.split_number = 1000000  # 每个分片包含 100 万行
read_resp = arrow_client.create_read_session(
    split_options=split_opts,
)

创建读会话时还可以指定分区列和 Bucket ID:

# 仅读取指定分区列
read_resp = arrow_client.create_read_session(
    required_partition_columns=["pt"],
)

# 读取指定 Bucket(适用于聚簇表)
read_resp = arrow_client.create_read_session(
    required_bucket_ids=["0", "1"],
)

使用 Arrow 格式读取数据

使用 StorageApiArrowClientread_rows_arrow 方法可以直接读取为 Arrow RecordBatch:

import pyarrow as pa

read_resp = arrow_client.create_read_session()

# 遍历所有分片读取数据
for split_index in range(read_resp.splits_count):
    reader = arrow_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        df = batch.to_pandas()
        # 处理 DataFrame

使用原始字节流读取数据

使用 StorageApiClientread_rows_stream 方法返回 StreamReader, 可以读取原始字节流:

from odps.apis.storage_api_v2 import StorageApiClient, ArrowReader

client = StorageApiClient(odps, table)
read_resp = client.create_read_session()

reader = client.read_rows_stream(read_resp.session_id, split_index=0)
arrow_reader = ArrowReader(reader)
while True:
    batch = arrow_reader.read()
    if batch is None:
        break
    df = batch.to_pandas()
    # 处理 DataFrame

读取指定范围的数据

可以通过 row_offsetrow_count 参数读取指定范围的数据:

# 从第 1000 行开始读取 500 行
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    row_offset=1000,
    row_count=500,
)

还可以通过 max_batch_rows 控制每个批次的行数,以管理内存使用:

# 每个批次最多 1024 行,减少内存占用
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    max_batch_rows=1024,
)

通过 max_batch_raw_size 可以控制每个批次的原始字节大小:

# 每个批次原始大小不超过 8MB
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    max_batch_raw_size=8 * 1024 * 1024,
)

读取时还可以通过 skip_row_num 跳过指定行数、通过 data_columns 指定返回列、 通过 data_format 指定数据格式:

# 跳过前 100 行
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    skip_row_num=100,
)

# 仅读取指定列
reader = arrow_client.read_rows_arrow(
    read_resp.session_id,
    split_index=0,
    data_columns=["id", "name"],
)

并行读取数据

Storage API V2 的分片机制天然支持并行读取。每个分片可以独立读取,适合多线程或多进程场景:

from concurrent.futures import ThreadPoolExecutor

read_resp = arrow_client.create_read_session()

def read_split(split_index):
    reader = arrow_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    batches = []
    while True:
        batch = reader.read()
        if batch is None:
            break
        batches.append(batch)
    return pa.concat_batches(batches) if batches else None

# 使用线程池并行读取所有分片
with ThreadPoolExecutor(max_workers=read_resp.splits_count) as pool:
    futures = [
        pool.submit(read_split, i)
        for i in range(read_resp.splits_count)
    ]
    results = [f.result() for f in futures]

刷新读会话

读会话有过期时间。如果在长时间读取过程中会话过期,可以使用 get_read_sessionrefresh 参数延长会话有效期:

from odps.apis.storage_api_v2 import SessionStatus

# 检查会话状态
status = arrow_client.get_read_session(read_resp.session_id)
if status.session_status == SessionStatus.EXPIRED:
    # 刷新会话
    status = arrow_client.get_read_session(read_resp.session_id, refresh=True)

增量读取

Storage API V2 支持增量读取模式,可以捕获表的数据变更:

from odps.apis.storage_api_v2 import IncrementalReadOptions

incr_opts = IncrementalReadOptions()
incr_opts.start_version = 100
read_resp = arrow_client.create_read_session(
    incremental_read=True,
    incremental_read_options=incr_opts,
)
print(f"当前最新版本: {read_resp.latest_version}")

也可以按时间戳范围进行增量读取:

incr_opts = IncrementalReadOptions()
incr_opts.start_time_stamp = "2023-01-01 00:00:00"
incr_opts.end_time_stamp = "2023-01-02 00:00:00"
read_resp = arrow_client.create_read_session(
    incremental_read=True,
    incremental_read_options=incr_opts,
)

从 SQL Instance 读取数据

Storage API V2 支持从 SQL 执行结果(Instance)中读取数据,此时客户端为只读模式:

# 执行 SQL 获取 Instance
instance = odps.execute_sql("SELECT * FROM your_table LIMIT 1000")

# 使用 Instance 创建客户端
instance_client = StorageApiArrowClient(odps, instance)
read_resp = instance_client.create_read_session()

# 读取结果
for split_index in range(read_resp.splits_count):
    reader = instance_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        df = batch.to_pandas()

备注

基于 Instance 的客户端不支持写操作。调用写操作方法会抛出 ValueError

预览表数据

预览功能提供了一种轻量级的数据浏览方式,无需创建会话即可快速查看表数据:

from odps.apis.storage_api_v2 import ArrowReader

# 使用 Arrow 客户端预览
reader = arrow_client.preview_table_arrow(limit=10)
batch = reader.read()
if batch is not None:
    df = batch.to_pandas()
    print(df)

# 使用基础客户端预览
stream_reader = client.preview_table(limit=10)
arrow_reader = ArrowReader(stream_reader)
batch = arrow_reader.read()
if batch is not None:
    df = batch.to_pandas()
    print(df)

可以指定分区和列进行预览:

# 预览指定分区
reader = arrow_client.preview_table_arrow(limit=10, partition="pt=20230101")

# 预览指定列
reader = arrow_client.preview_table_arrow(limit=10, columns=["id", "name"])

备注

预览功能仅适用于表,不支持 Instance 客户端。预览返回的行数可能不精确, 生产环境建议使用 create_read_session + read_rows_arrow 进行精确读取。

写入数据

完整写入流程

使用 Storage API V2 写入数据需要以下步骤:

  1. 创建写会话

  2. 创建写流

  3. 写入数据

  4. 关闭写流

  5. 提交写会话

import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient

arrow_client = StorageApiArrowClient(odps, table)

# 1. 创建写会话
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

# 2. 创建写流
stream_resp = arrow_client.create_write_stream(session_id, stream_id=0)

# 3. 写入数据
schema = pa.schema([
    pa.field("id", pa.int64()),
    pa.field("name", pa.string()),
    pa.field("value", pa.float64()),
])
batch = pa.record_batch([
    pa.array([1, 2, 3]),
    pa.array(["Alice", "Bob", "Carol"]),
    pa.array([100.0, 200.0, 150.0]),
], schema=schema)

writer = arrow_client.write_rows_arrow(
    session_id, stream_id=0, record_count=3,
)
writer.write(batch)
commit_msg, success = writer.finish()

# 4. 关闭写流
arrow_client.close_write_stream(session_id, stream_id=0)

# 5. 提交写会话
arrow_client.commit_write_session(session_id)

写入分区表

写入分区表时需要在创建写会话时指定分区:

# 写入指定分区
write_resp = arrow_client.create_write_session(
    partial_partition_spec="pt=20230101"
)

如果需要覆盖已有分区数据,可以使用 flags 参数:

# 覆盖写入分区
write_resp = arrow_client.create_write_session(
    partial_partition_spec="pt=20230101",
    flags={"overwrite": True},
)

并行写入数据

一个写会话可以创建多个写流,支持并行写入:

from concurrent.futures import ThreadPoolExecutor

write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

def write_stream(stream_id, data_batch):
    # 创建写流
    arrow_client.create_write_stream(session_id, stream_id=stream_id)
    # 写入数据
    writer = arrow_client.write_rows_arrow(
        session_id, stream_id=stream_id, record_count=len(data_batch),
    )
    writer.write(data_batch)
    writer.finish()
    # 关闭写流
    arrow_client.close_write_stream(session_id, stream_id=stream_id)

# 多线程并行写入
with ThreadPoolExecutor(max_workers=3) as pool:
    futures = [
        pool.submit(write_stream, i, batch_data[i])
        for i in range(3)
    ]
    [f.result() for f in futures]

# 提交写会话
arrow_client.commit_write_session(session_id)

使用原始字节流写入数据

使用 StorageApiClientwrite_rows_stream 方法可以获取 StreamWriter, 然后使用 ArrowWriter 包装进行写入:

from odps.apis.storage_api_v2 import StorageApiClient, ArrowWriter, Compression

client = StorageApiClient(odps, table)

write_resp = client.create_write_session()
session_id = write_resp.session_id
client.create_write_stream(session_id, stream_id=0)

stream_writer = client.write_rows_stream(
    session_id, stream_id=0, record_count=3,
)
arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED)
arrow_writer.write(batch)
commit_msg, success = arrow_writer.finish()

client.close_write_stream(session_id, stream_id=0)
client.commit_write_session(session_id)

Exactly-Once 写入模式

Storage API V2 支持 Exactly-Once 语义,确保写入操作的幂等性。启用后,服务端会返回 access_token 和 row_offset,用于重试时保证数据不重复:

# 创建写会话
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

# 创建写流,启用 Exactly-Once 模式
stream_resp = arrow_client.create_write_stream(session_id, stream_id=0)
access_token = stream_resp.access_token

# 写入数据时传入 access_token
writer = arrow_client.write_rows_arrow(
    session_id, stream_id=0, record_count=3,
    access_token=access_token,
)
writer.write(batch)
commit_msg, success = writer.finish()

# 获取 Exactly-Once 模式的写入偏移量
write_stream_resp = writer.get_write_stream_response()
if write_stream_resp is not None:
    print(f"写入偏移量: {write_stream_resp.exactly_once_row_offset}")

# 查询写流状态也可获取 access_token 和 row_offset
stream_status = arrow_client.get_write_stream(
    session_id, stream_id=0, stream_version=0,
)
print(f"最新 schema 版本: {stream_status.latest_schema_version}")
print(f"写入偏移量: {stream_status.row_offset}")

备注

Exactly-Once 模式下,写流重试时需要使用上次成功写入返回的 row_offsetaccess_token 来保证幂等性。

中止写会话

如果写入过程中发生错误,可以中止写会话以丢弃所有已上传的数据:

write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

try:
    # 写入数据...
    pass
except Exception as e:
    # 发生错误时中止会话
    arrow_client.abort_write_session(session_id)
    raise

查询写会话状态

可以使用 get_write_session 方法查看写会话中各流的状态:

status = arrow_client.get_write_session(session_id)
if status.streams:
    for stream_info in status.streams:
        print(f"Stream: {stream_info}")

也可以使用 get_write_stream 方法查询单个写流的详细状态:

stream_status = arrow_client.get_write_stream(
    session_id, stream_id=0, stream_version=0,
)
print(f"流状态: {stream_status.status}")
print(f"已写入记录数: {stream_status.record_count}")
print(f"已写入字节数: {stream_status.byte_size}")

对于事务表(Delta 表),提交写会话时可以指定需要提交的流 ID 和版本:

arrow_client.commit_write_session(
    session_id,
    stream_ids=["stream-1", "stream-2"],
    stream_versions=[1, 1],
)

路由令牌与会话亲和性

服务端在创建会话和写流时会返回路由令牌(route token),客户端会自动存储并在后续请求中 携带该令牌,确保请求路由到同一节点,提高性能:

# 路由令牌会自动从响应中提取并存储
write_resp = arrow_client.create_write_session()
print(f"当前路由令牌: {arrow_client.route_token}")

# 也可以手动指定 route_token 参数覆盖自动值
arrow_client.commit_write_session(
    session_id, route_token="your-route-token",
)

Blob 数据读写

Storage API V2 支持 Blob 类型数据的读写。Blob 读写适用于包含 BLOB 列的表 (表格式需为 V2 且开启事务性)。

写入 Blob 数据

使用 write_blob_stream 方法以流式方式上传单个 Blob:

# 写入 Blob 数据(默认不压缩,启用 MD5 校验)
blob_writer = arrow_client.write_blob_stream(
    session_id, stream_id=0, partition_values=["pt=20230101"],
)
blob_writer.write(b"your blob data here")
response = blob_writer.finish()
print(f"Blob reference: {response.blob_reference}")

写入时可以指定压缩算法:

from odps.apis.storage_api_v2 import Compression

# 使用 ZSTD 压缩写入 Blob
blob_writer = arrow_client.write_blob_stream(
    session_id, stream_id=0, partition_values=["pt=20230101"],
    compression=Compression.ZSTD,
)
blob_writer.write(b"your blob data here")
response = blob_writer.finish()

使用 write_blob_batch 方法批量上传多个 Blob:

from odps.apis.storage_api_v2 import BlobWriteItem, ChecksumType

# 创建 BlobWriteItem 列表
items = [
    BlobWriteItem(
        data=b"blob data 1",
        partition_values=["pt=20230101"],
        column_index=0,
        mime_type="image/png",
        checksum_type=ChecksumType.MD5,
        distribution_key="key1",
    ),
    BlobWriteItem(
        data=b"blob data 2",
        partition_values=["pt=20230101"],
        column_index=0,
    ),
]

# 批量写入
response = arrow_client.write_blob_batch(
    items, session_id=session_id, stream_id=0,
)
print(f"Blob references: {response.blob_references}")

BlobWriteItem 支持 distribution_key 参数用于指定分布键, checksum_type 参数支持以下校验类型:

  • ChecksumType.NONE -- 不校验(默认)

  • ChecksumType.CRC32 -- CRC32 校验

  • ChecksumType.MD5 -- MD5 校验

读取 Blob 数据

使用 read_blobs 方法通过 Blob 引用下载数据。返回 BlobDataIterator 迭代器, 每次迭代返回 (data_bytes, mime_type) 元组:

# 批量读取 Blob
iterator = arrow_client.read_blobs(blob_references=["ref1", "ref2"])
for data, mime_type in iterator:
    print(f"Blob size: {len(data)}, MIME type: {mime_type}")

也可以使用 stream=True 参数获取 BlobStreamReader 进行流式读取:

# 流式读取 Blob
stream_reader = arrow_client.read_blobs(
    blob_references=["ref1", "ref2"], stream=True,
)
while stream_reader is not None:
    chunk = stream_reader.read(4096)  # 每次读取 4KB
    if not chunk:
        # 当前 Blob 读取完毕,切换到下一个
        stream_reader = stream_reader.next()
    else:
        # 处理数据块
        process_chunk(chunk)
        print(f"Current blob MIME type: {stream_reader.mime_type}")

读取时可以指定压缩算法:

# 读取使用 ZSTD 压缩的 Blob
iterator = arrow_client.read_blobs(
    blob_references=["ref1", "ref2"],
    compression=Compression.ZSTD,
)

完整的 Blob 读写示例

from odps.apis.storage_api_v2 import (
    StorageApiArrowClient, BlobWriteItem, ChecksumType,
)

arrow_client = StorageApiArrowClient(odps, blob_table)

# 创建写会话和写流
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
arrow_client.create_write_stream(session_id, stream_id=0)

# 写入 Blob 数据
items = [
    BlobWriteItem(data=b"hello world", partition_values=["pt=test"]),
    BlobWriteItem(data=b"foo bar", partition_values=["pt=test"]),
]
write_resp = arrow_client.write_blob_batch(
    items, session_id=session_id, stream_id=0,
)
blob_refs = write_resp.blob_references

# 关闭写流并提交会话
arrow_client.close_write_stream(session_id, stream_id=0)
arrow_client.commit_write_session(session_id)

# 读取 Blob 数据
iterator = arrow_client.read_blobs(blob_references=blob_refs)
for data, mime_type in iterator:
    print(f"Data: {data!r}, MIME type: {mime_type}")

压缩选项

Storage API V2 内置支持以下压缩算法:

  • Compression.UNCOMPRESSED -- 不压缩(默认)

  • Compression.ZSTD -- ZSTD 压缩(需要安装 zstandard 库)

  • Compression.LZ4_FRAME -- LZ4 帧格式压缩(需要安装 lz4 库)

默认情况下,读写操作不使用压缩(Compression.UNCOMPRESSED)。可以在读取和写入时指定压缩算法:

from odps.apis.storage_api_v2 import Compression

# 写入时使用 ZSTD 压缩
writer = arrow_client.write_rows_arrow(
    session_id, stream_id=0, record_count=100,
    compression=Compression.ZSTD,
)

# 读取时使用 LZ4 帧格式解压
reader = arrow_client.read_rows_arrow(
    session_id, split_index=0,
    compression=Compression.LZ4_FRAME,
)

数据过滤

Storage API V2 支持在创建读会话时通过 filter_predicate 参数指定过滤条件,减少网络传输数据量:

# 使用过滤谓词
read_resp = arrow_client.create_read_session(
    filter_predicate="id > 100 AND name = 'test'",
)

如果谓词下推失败,可以设置 filter_predicate_fallback=True 回退到服务端过滤:

read_resp = arrow_client.create_read_session(
    filter_predicate="id > 100",
    filter_predicate_fallback=True,
)

创建读会话时还可以通过 split_max_file_num 参数限制分片中的最大文件数:

read_resp = arrow_client.create_read_session(
    split_max_file_num=1000,
)

Arrow 格式选项

创建读会话时可以通过 arrow_options 参数控制 Arrow 格式的精度:

from odps.apis.storage_api_v2 import ArrowOptions

# 设置时间戳精度为微秒
arrow_opts = ArrowOptions()
arrow_opts.timestamp_unit = ArrowOptions.TimestampUnit.MICRO
arrow_opts.date_time_unit = ArrowOptions.TimestampUnit.MILLI

read_resp = arrow_client.create_read_session(
    arrow_options=arrow_opts,
)

ArrowOptions 支持的精度选项:

  • TimestampUnit.SECOND -- 秒

  • TimestampUnit.MILLI -- 毫秒

  • TimestampUnit.MICRO -- 微秒

  • TimestampUnit.NANO -- 纳秒(默认)

SplitOptions 分片选项

创建读会话时可以通过 split_options 参数控制数据的分片方式:

from odps.apis.storage_api_v2 import SplitOptions

# 按大小分片(默认),每个分片 256MB
split_opts = SplitOptions()

# 按并行度分片,创建指定数量的分片
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
split_opts.split_number = 10

# 按行偏移分片
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.ROW_OFFSET
split_opts.split_number = 100000  # 每个分片 10 万行

# 按 Bucket ID 分片(适用于聚簇表)
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.BUCKET

# 是否跨分区分片(默认为 True)
split_opts = SplitOptions()
split_opts.cross_partition = False

SplitOptions.SplitMode 支持的模式:

  • SIZE -- 按数据大小分片(默认),split_number 指定每个分片的大小(字节)

  • PARALLELISM -- 按并行度分片,split_number 指定分片数量

  • ROW_OFFSET -- 按行偏移分片,split_number 指定每个分片的行数

  • BUCKET -- 按 Bucket ID 分片

常见场景

将整张表读取为 pandas DataFrame

import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient

arrow_client = StorageApiArrowClient(odps, table)
read_resp = arrow_client.create_read_session()

batches = []
for split_index in range(read_resp.splits_count):
    reader = arrow_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        batches.append(batch)

df = pa.concat_batches(batches).to_pandas() if batches else None

从分区表读取特定分区

read_resp = arrow_client.create_read_session(
    required_partitions=["pt=20230101"],
)

for split_index in range(read_resp.splits_count):
    reader = arrow_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        # 处理 batch

写入 pandas DataFrame 到表

import pandas as pd
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient

df = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Carol"],
    "value": [100.0, 200.0, 150.0],
})
batch = pa.RecordBatch.from_pandas(df)

arrow_client = StorageApiArrowClient(odps, table)

# 创建写会话
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id

# 创建写流
arrow_client.create_write_stream(session_id, stream_id=0)

# 写入数据
writer = arrow_client.write_rows_arrow(
    session_id, stream_id=0, record_count=len(df),
)
writer.write(batch)
writer.finish()

# 关闭写流并提交会话
arrow_client.close_write_stream(session_id, stream_id=0)
arrow_client.commit_write_session(session_id)

读取 SQL 查询结果

instance = odps.execute_sql("SELECT * FROM your_table WHERE id > 100")
instance_client = StorageApiArrowClient(odps, instance)

read_resp = instance_client.create_read_session()
for split_index in range(read_resp.splits_count):
    reader = instance_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    while True:
        batch = reader.read()
        if batch is None:
            break
        df = batch.to_pandas()
        # 处理查询结果

多进程并行读取

from multiprocessing import Pool

read_resp = arrow_client.create_read_session()

def read_split(split_index):
    # 每个进程创建独立的客户端
    from odps import ODPS
    from odps.apis.storage_api_v2 import StorageApiArrowClient
    local_odps = ODPS(...)  # 初始化 ODPS
    local_client = StorageApiArrowClient(local_odps, table)
    reader = local_client.read_rows_arrow(
        read_resp.session_id, split_index=split_index
    )
    batches = []
    while True:
        batch = reader.read()
        if batch is None:
            break
        batches.append(batch.to_pandas())
    return batches

if __name__ == "__main__":
    with Pool(processes=read_resp.splits_count) as pool:
        results = pool.map(read_split, range(read_resp.splits_count))

会话状态说明

读会话状态(SessionStatus)

状态

说明

INIT

会话初始化中

NORMAL

会话正常,可以读取数据

CRITICAL

会话出现严重错误

EXPIRED

会话已过期,需要刷新或重新创建

COMMITTING

写会话正在提交中

COMMITTED

写会话已提交完成

流状态(Status)

状态

说明

INIT

流初始化中

OK

流操作完成

WAIT

等待数据中

RUNNING

流正在运行中

参考

完整的 API 参考文档请参见 Storage API V2