Storage API V2
MaxCompute Storage API V2 是 MaxCompute 提供的高吞吐数据读写接口。相比于基于 Tunnel 的数据通道, Storage API V2 提供了更细粒度的会话管理、支持 Arrow 格式和 Blob 格式读写、支持增量读取以及表预览等功能, 适用于大规模并行数据读写场景。
备注
Storage API V2 需要服务端支持,请确保 MaxCompute 集群已开启相关功能。
基本概念
Storage API V2 的核心概念包括:
会话(Session):读写操作的事务上下文。读会话管理数据分片,写会话保证数据原子性。
分片(Split):读会话将数据按大小、并行度等方式分为多个分片,每个分片可独立读取,支持并行处理。
流(Stream):写会话中的数据上传通道。一个写会话可创建多个流以支持并行写入。
压缩(Compression):支持 UNCOMPRESSED(默认)、LZ4 和 ZSTD 压缩算法,减少网络传输数据量。
路由令牌(Route Token):服务端返回的路由标识,用于会话亲和性,确保后续请求路由到同一节点。
Exactly-Once 模式:写流支持精确一次语义,通过 access_token 和 row_offset 实现幂等写入。
客户端初始化
使用 Storage API V2 需要创建 StorageApiClient 或 StorageApiArrowClient 实例。
两者均需要传入 ODPS 入口对象和表对象(或 Instance 对象)。
from odps import ODPS
from odps.apis.storage_api_v2 import StorageApiClient, StorageApiArrowClient
# 初始化 ODPS 入口对象
odps = ODPS(
access_id="your_access_id",
secret_access_key="your_secret_access_key",
project="your_project",
endpoint="your_endpoint",
)
# 获取表对象
table = odps.get_table("your_table")
# 创建基础客户端(返回原始字节流)
client = StorageApiClient(odps, table)
# 创建 Arrow 客户端(返回 Arrow RecordBatch,推荐)
arrow_client = StorageApiArrowClient(odps, table)
StorageApiArrowClient 继承自 StorageApiClient,额外提供了 read_rows_arrow、
write_rows_arrow 和 preview_table_arrow 方法,能够直接读写 PyArrow RecordBatch 对象,
便于与 pandas 等数据处理框架配合使用。大多数场景下推荐使用 StorageApiArrowClient。
也可以通过指定 quota_name 和 rest_endpoint 参数来使用指定配额和自定义端点:
client = StorageApiArrowClient(
odps, table,
quota_name="your_quota",
rest_endpoint="https://your-custom-endpoint",
)
还可以通过 tags 参数设置请求标签:
client = StorageApiArrowClient(
odps, table,
tags="tag1,tag2",
)
读取数据
完整读取流程
使用 Storage API V2 读取数据需要以下步骤:
创建读会话
按分片读取数据
关闭读会话(会话会自动过期,无需手动关闭)
创建读会话
通过 create_read_session() 方法创建读会话。
读会话确定了数据的分片方式、返回的列和分区等。
from odps.apis.storage_api_v2 import StorageApiArrowClient
arrow_client = StorageApiArrowClient(odps, table)
# 创建读会话,使用默认分片选项
read_resp = arrow_client.create_read_session()
print(f"Session ID: {read_resp.session_id}")
print(f"数据分片数: {read_resp.splits_count}")
print(f"总记录数: {read_resp.record_count}")
print(f"会话状态: {read_resp.session_status}")
print(f"过期时间: {read_resp.expiration_time}")
创建读会话时可以指定需要读取的列、分区和分片选项:
from odps.apis.storage_api_v2 import SplitOptions
# 仅读取指定列和分区
read_resp = arrow_client.create_read_session(
required_data_columns=["id", "name", "value"],
required_partitions=["pt=20230101"],
)
# 按并行度分片,创建 10 个分片
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
split_opts.split_number = 10
read_resp = arrow_client.create_read_session(
split_options=split_opts,
)
# 按行偏移分片
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.ROW_OFFSET
split_opts.split_number = 1000000 # 每个分片包含 100 万行
read_resp = arrow_client.create_read_session(
split_options=split_opts,
)
创建读会话时还可以指定分区列和 Bucket ID:
# 仅读取指定分区列
read_resp = arrow_client.create_read_session(
required_partition_columns=["pt"],
)
# 读取指定 Bucket(适用于聚簇表)
read_resp = arrow_client.create_read_session(
required_bucket_ids=["0", "1"],
)
使用 Arrow 格式读取数据
使用 StorageApiArrowClient 的 read_rows_arrow 方法可以直接读取为 Arrow RecordBatch:
import pyarrow as pa
read_resp = arrow_client.create_read_session()
# 遍历所有分片读取数据
for split_index in range(read_resp.splits_count):
reader = arrow_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
df = batch.to_pandas()
# 处理 DataFrame
使用原始字节流读取数据
使用 StorageApiClient 的 read_rows_stream 方法返回 StreamReader,
可以读取原始字节流:
from odps.apis.storage_api_v2 import StorageApiClient, ArrowReader
client = StorageApiClient(odps, table)
read_resp = client.create_read_session()
reader = client.read_rows_stream(read_resp.session_id, split_index=0)
arrow_reader = ArrowReader(reader)
while True:
batch = arrow_reader.read()
if batch is None:
break
df = batch.to_pandas()
# 处理 DataFrame
读取指定范围的数据
可以通过 row_offset 和 row_count 参数读取指定范围的数据:
# 从第 1000 行开始读取 500 行
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
row_offset=1000,
row_count=500,
)
还可以通过 max_batch_rows 控制每个批次的行数,以管理内存使用:
# 每个批次最多 1024 行,减少内存占用
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
max_batch_rows=1024,
)
通过 max_batch_raw_size 可以控制每个批次的原始字节大小:
# 每个批次原始大小不超过 8MB
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
max_batch_raw_size=8 * 1024 * 1024,
)
读取时还可以通过 skip_row_num 跳过指定行数、通过 data_columns 指定返回列、
通过 data_format 指定数据格式:
# 跳过前 100 行
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
skip_row_num=100,
)
# 仅读取指定列
reader = arrow_client.read_rows_arrow(
read_resp.session_id,
split_index=0,
data_columns=["id", "name"],
)
并行读取数据
Storage API V2 的分片机制天然支持并行读取。每个分片可以独立读取,适合多线程或多进程场景:
from concurrent.futures import ThreadPoolExecutor
read_resp = arrow_client.create_read_session()
def read_split(split_index):
reader = arrow_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
batches = []
while True:
batch = reader.read()
if batch is None:
break
batches.append(batch)
return pa.concat_batches(batches) if batches else None
# 使用线程池并行读取所有分片
with ThreadPoolExecutor(max_workers=read_resp.splits_count) as pool:
futures = [
pool.submit(read_split, i)
for i in range(read_resp.splits_count)
]
results = [f.result() for f in futures]
刷新读会话
读会话有过期时间。如果在长时间读取过程中会话过期,可以使用 get_read_session 的
refresh 参数延长会话有效期:
from odps.apis.storage_api_v2 import SessionStatus
# 检查会话状态
status = arrow_client.get_read_session(read_resp.session_id)
if status.session_status == SessionStatus.EXPIRED:
# 刷新会话
status = arrow_client.get_read_session(read_resp.session_id, refresh=True)
增量读取
Storage API V2 支持增量读取模式,可以捕获表的数据变更:
from odps.apis.storage_api_v2 import IncrementalReadOptions
incr_opts = IncrementalReadOptions()
incr_opts.start_version = 100
read_resp = arrow_client.create_read_session(
incremental_read=True,
incremental_read_options=incr_opts,
)
print(f"当前最新版本: {read_resp.latest_version}")
也可以按时间戳范围进行增量读取:
incr_opts = IncrementalReadOptions()
incr_opts.start_time_stamp = "2023-01-01 00:00:00"
incr_opts.end_time_stamp = "2023-01-02 00:00:00"
read_resp = arrow_client.create_read_session(
incremental_read=True,
incremental_read_options=incr_opts,
)
从 SQL Instance 读取数据
Storage API V2 支持从 SQL 执行结果(Instance)中读取数据,此时客户端为只读模式:
# 执行 SQL 获取 Instance
instance = odps.execute_sql("SELECT * FROM your_table LIMIT 1000")
# 使用 Instance 创建客户端
instance_client = StorageApiArrowClient(odps, instance)
read_resp = instance_client.create_read_session()
# 读取结果
for split_index in range(read_resp.splits_count):
reader = instance_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
df = batch.to_pandas()
备注
基于 Instance 的客户端不支持写操作。调用写操作方法会抛出 ValueError。
预览表数据
预览功能提供了一种轻量级的数据浏览方式,无需创建会话即可快速查看表数据:
from odps.apis.storage_api_v2 import ArrowReader
# 使用 Arrow 客户端预览
reader = arrow_client.preview_table_arrow(limit=10)
batch = reader.read()
if batch is not None:
df = batch.to_pandas()
print(df)
# 使用基础客户端预览
stream_reader = client.preview_table(limit=10)
arrow_reader = ArrowReader(stream_reader)
batch = arrow_reader.read()
if batch is not None:
df = batch.to_pandas()
print(df)
可以指定分区和列进行预览:
# 预览指定分区
reader = arrow_client.preview_table_arrow(limit=10, partition="pt=20230101")
# 预览指定列
reader = arrow_client.preview_table_arrow(limit=10, columns=["id", "name"])
备注
预览功能仅适用于表,不支持 Instance 客户端。预览返回的行数可能不精确,
生产环境建议使用 create_read_session + read_rows_arrow 进行精确读取。
写入数据
完整写入流程
使用 Storage API V2 写入数据需要以下步骤:
创建写会话
创建写流
写入数据
关闭写流
提交写会话
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient
arrow_client = StorageApiArrowClient(odps, table)
# 1. 创建写会话
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
# 2. 创建写流
stream_resp = arrow_client.create_write_stream(session_id, stream_id=0)
# 3. 写入数据
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("value", pa.float64()),
])
batch = pa.record_batch([
pa.array([1, 2, 3]),
pa.array(["Alice", "Bob", "Carol"]),
pa.array([100.0, 200.0, 150.0]),
], schema=schema)
writer = arrow_client.write_rows_arrow(
session_id, stream_id=0, record_count=3,
)
writer.write(batch)
commit_msg, success = writer.finish()
# 4. 关闭写流
arrow_client.close_write_stream(session_id, stream_id=0)
# 5. 提交写会话
arrow_client.commit_write_session(session_id)
写入分区表
写入分区表时需要在创建写会话时指定分区:
# 写入指定分区
write_resp = arrow_client.create_write_session(
partial_partition_spec="pt=20230101"
)
如果需要覆盖已有分区数据,可以使用 flags 参数:
# 覆盖写入分区
write_resp = arrow_client.create_write_session(
partial_partition_spec="pt=20230101",
flags={"overwrite": True},
)
并行写入数据
一个写会话可以创建多个写流,支持并行写入:
from concurrent.futures import ThreadPoolExecutor
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
def write_stream(stream_id, data_batch):
# 创建写流
arrow_client.create_write_stream(session_id, stream_id=stream_id)
# 写入数据
writer = arrow_client.write_rows_arrow(
session_id, stream_id=stream_id, record_count=len(data_batch),
)
writer.write(data_batch)
writer.finish()
# 关闭写流
arrow_client.close_write_stream(session_id, stream_id=stream_id)
# 多线程并行写入
with ThreadPoolExecutor(max_workers=3) as pool:
futures = [
pool.submit(write_stream, i, batch_data[i])
for i in range(3)
]
[f.result() for f in futures]
# 提交写会话
arrow_client.commit_write_session(session_id)
使用原始字节流写入数据
使用 StorageApiClient 的 write_rows_stream 方法可以获取 StreamWriter,
然后使用 ArrowWriter 包装进行写入:
from odps.apis.storage_api_v2 import StorageApiClient, ArrowWriter, Compression
client = StorageApiClient(odps, table)
write_resp = client.create_write_session()
session_id = write_resp.session_id
client.create_write_stream(session_id, stream_id=0)
stream_writer = client.write_rows_stream(
session_id, stream_id=0, record_count=3,
)
arrow_writer = ArrowWriter(stream_writer, Compression.UNCOMPRESSED)
arrow_writer.write(batch)
commit_msg, success = arrow_writer.finish()
client.close_write_stream(session_id, stream_id=0)
client.commit_write_session(session_id)
Exactly-Once 写入模式
Storage API V2 支持 Exactly-Once 语义,确保写入操作的幂等性。启用后,服务端会返回 access_token 和 row_offset,用于重试时保证数据不重复:
# 创建写会话
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
# 创建写流,启用 Exactly-Once 模式
stream_resp = arrow_client.create_write_stream(session_id, stream_id=0)
access_token = stream_resp.access_token
# 写入数据时传入 access_token
writer = arrow_client.write_rows_arrow(
session_id, stream_id=0, record_count=3,
access_token=access_token,
)
writer.write(batch)
commit_msg, success = writer.finish()
# 获取 Exactly-Once 模式的写入偏移量
write_stream_resp = writer.get_write_stream_response()
if write_stream_resp is not None:
print(f"写入偏移量: {write_stream_resp.exactly_once_row_offset}")
# 查询写流状态也可获取 access_token 和 row_offset
stream_status = arrow_client.get_write_stream(
session_id, stream_id=0, stream_version=0,
)
print(f"最新 schema 版本: {stream_status.latest_schema_version}")
print(f"写入偏移量: {stream_status.row_offset}")
备注
Exactly-Once 模式下,写流重试时需要使用上次成功写入返回的 row_offset 和
access_token 来保证幂等性。
中止写会话
如果写入过程中发生错误,可以中止写会话以丢弃所有已上传的数据:
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
try:
# 写入数据...
pass
except Exception as e:
# 发生错误时中止会话
arrow_client.abort_write_session(session_id)
raise
查询写会话状态
可以使用 get_write_session 方法查看写会话中各流的状态:
status = arrow_client.get_write_session(session_id)
if status.streams:
for stream_info in status.streams:
print(f"Stream: {stream_info}")
也可以使用 get_write_stream 方法查询单个写流的详细状态:
stream_status = arrow_client.get_write_stream(
session_id, stream_id=0, stream_version=0,
)
print(f"流状态: {stream_status.status}")
print(f"已写入记录数: {stream_status.record_count}")
print(f"已写入字节数: {stream_status.byte_size}")
对于事务表(Delta 表),提交写会话时可以指定需要提交的流 ID 和版本:
arrow_client.commit_write_session(
session_id,
stream_ids=["stream-1", "stream-2"],
stream_versions=[1, 1],
)
路由令牌与会话亲和性
服务端在创建会话和写流时会返回路由令牌(route token),客户端会自动存储并在后续请求中 携带该令牌,确保请求路由到同一节点,提高性能:
# 路由令牌会自动从响应中提取并存储
write_resp = arrow_client.create_write_session()
print(f"当前路由令牌: {arrow_client.route_token}")
# 也可以手动指定 route_token 参数覆盖自动值
arrow_client.commit_write_session(
session_id, route_token="your-route-token",
)
Blob 数据读写
Storage API V2 支持 Blob 类型数据的读写。Blob 读写适用于包含 BLOB 列的表 (表格式需为 V2 且开启事务性)。
写入 Blob 数据
使用 write_blob_stream 方法以流式方式上传单个 Blob:
# 写入 Blob 数据(默认不压缩,启用 MD5 校验)
blob_writer = arrow_client.write_blob_stream(
session_id, stream_id=0, partition_values=["pt=20230101"],
)
blob_writer.write(b"your blob data here")
response = blob_writer.finish()
print(f"Blob reference: {response.blob_reference}")
写入时可以指定压缩算法:
from odps.apis.storage_api_v2 import Compression
# 使用 ZSTD 压缩写入 Blob
blob_writer = arrow_client.write_blob_stream(
session_id, stream_id=0, partition_values=["pt=20230101"],
compression=Compression.ZSTD,
)
blob_writer.write(b"your blob data here")
response = blob_writer.finish()
使用 write_blob_batch 方法批量上传多个 Blob:
from odps.apis.storage_api_v2 import BlobWriteItem, ChecksumType
# 创建 BlobWriteItem 列表
items = [
BlobWriteItem(
data=b"blob data 1",
partition_values=["pt=20230101"],
column_index=0,
mime_type="image/png",
checksum_type=ChecksumType.MD5,
distribution_key="key1",
),
BlobWriteItem(
data=b"blob data 2",
partition_values=["pt=20230101"],
column_index=0,
),
]
# 批量写入
response = arrow_client.write_blob_batch(
items, session_id=session_id, stream_id=0,
)
print(f"Blob references: {response.blob_references}")
BlobWriteItem 支持 distribution_key 参数用于指定分布键,
checksum_type 参数支持以下校验类型:
ChecksumType.NONE-- 不校验(默认)ChecksumType.CRC32-- CRC32 校验ChecksumType.MD5-- MD5 校验
读取 Blob 数据
使用 read_blobs 方法通过 Blob 引用下载数据。返回 BlobDataIterator 迭代器,
每次迭代返回 (data_bytes, mime_type) 元组:
# 批量读取 Blob
iterator = arrow_client.read_blobs(blob_references=["ref1", "ref2"])
for data, mime_type in iterator:
print(f"Blob size: {len(data)}, MIME type: {mime_type}")
也可以使用 stream=True 参数获取 BlobStreamReader 进行流式读取:
# 流式读取 Blob
stream_reader = arrow_client.read_blobs(
blob_references=["ref1", "ref2"], stream=True,
)
while stream_reader is not None:
chunk = stream_reader.read(4096) # 每次读取 4KB
if not chunk:
# 当前 Blob 读取完毕,切换到下一个
stream_reader = stream_reader.next()
else:
# 处理数据块
process_chunk(chunk)
print(f"Current blob MIME type: {stream_reader.mime_type}")
读取时可以指定压缩算法:
# 读取使用 ZSTD 压缩的 Blob
iterator = arrow_client.read_blobs(
blob_references=["ref1", "ref2"],
compression=Compression.ZSTD,
)
完整的 Blob 读写示例
from odps.apis.storage_api_v2 import (
StorageApiArrowClient, BlobWriteItem, ChecksumType,
)
arrow_client = StorageApiArrowClient(odps, blob_table)
# 创建写会话和写流
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
arrow_client.create_write_stream(session_id, stream_id=0)
# 写入 Blob 数据
items = [
BlobWriteItem(data=b"hello world", partition_values=["pt=test"]),
BlobWriteItem(data=b"foo bar", partition_values=["pt=test"]),
]
write_resp = arrow_client.write_blob_batch(
items, session_id=session_id, stream_id=0,
)
blob_refs = write_resp.blob_references
# 关闭写流并提交会话
arrow_client.close_write_stream(session_id, stream_id=0)
arrow_client.commit_write_session(session_id)
# 读取 Blob 数据
iterator = arrow_client.read_blobs(blob_references=blob_refs)
for data, mime_type in iterator:
print(f"Data: {data!r}, MIME type: {mime_type}")
压缩选项
Storage API V2 内置支持以下压缩算法:
Compression.UNCOMPRESSED-- 不压缩(默认)Compression.ZSTD-- ZSTD 压缩(需要安装zstandard库)Compression.LZ4_FRAME-- LZ4 帧格式压缩(需要安装lz4库)
默认情况下,读写操作不使用压缩(Compression.UNCOMPRESSED)。可以在读取和写入时指定压缩算法:
from odps.apis.storage_api_v2 import Compression
# 写入时使用 ZSTD 压缩
writer = arrow_client.write_rows_arrow(
session_id, stream_id=0, record_count=100,
compression=Compression.ZSTD,
)
# 读取时使用 LZ4 帧格式解压
reader = arrow_client.read_rows_arrow(
session_id, split_index=0,
compression=Compression.LZ4_FRAME,
)
数据过滤
Storage API V2 支持在创建读会话时通过 filter_predicate 参数指定过滤条件,减少网络传输数据量:
# 使用过滤谓词
read_resp = arrow_client.create_read_session(
filter_predicate="id > 100 AND name = 'test'",
)
如果谓词下推失败,可以设置 filter_predicate_fallback=True 回退到服务端过滤:
read_resp = arrow_client.create_read_session(
filter_predicate="id > 100",
filter_predicate_fallback=True,
)
创建读会话时还可以通过 split_max_file_num 参数限制分片中的最大文件数:
read_resp = arrow_client.create_read_session(
split_max_file_num=1000,
)
Arrow 格式选项
创建读会话时可以通过 arrow_options 参数控制 Arrow 格式的精度:
from odps.apis.storage_api_v2 import ArrowOptions
# 设置时间戳精度为微秒
arrow_opts = ArrowOptions()
arrow_opts.timestamp_unit = ArrowOptions.TimestampUnit.MICRO
arrow_opts.date_time_unit = ArrowOptions.TimestampUnit.MILLI
read_resp = arrow_client.create_read_session(
arrow_options=arrow_opts,
)
ArrowOptions 支持的精度选项:
TimestampUnit.SECOND-- 秒TimestampUnit.MILLI-- 毫秒TimestampUnit.MICRO-- 微秒TimestampUnit.NANO-- 纳秒(默认)
SplitOptions 分片选项
创建读会话时可以通过 split_options 参数控制数据的分片方式:
from odps.apis.storage_api_v2 import SplitOptions
# 按大小分片(默认),每个分片 256MB
split_opts = SplitOptions()
# 按并行度分片,创建指定数量的分片
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.PARALLELISM
split_opts.split_number = 10
# 按行偏移分片
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.ROW_OFFSET
split_opts.split_number = 100000 # 每个分片 10 万行
# 按 Bucket ID 分片(适用于聚簇表)
split_opts = SplitOptions()
split_opts.split_mode = SplitOptions.SplitMode.BUCKET
# 是否跨分区分片(默认为 True)
split_opts = SplitOptions()
split_opts.cross_partition = False
SplitOptions.SplitMode 支持的模式:
SIZE-- 按数据大小分片(默认),split_number指定每个分片的大小(字节)PARALLELISM-- 按并行度分片,split_number指定分片数量ROW_OFFSET-- 按行偏移分片,split_number指定每个分片的行数BUCKET-- 按 Bucket ID 分片
常见场景
将整张表读取为 pandas DataFrame
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient
arrow_client = StorageApiArrowClient(odps, table)
read_resp = arrow_client.create_read_session()
batches = []
for split_index in range(read_resp.splits_count):
reader = arrow_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
batches.append(batch)
df = pa.concat_batches(batches).to_pandas() if batches else None
从分区表读取特定分区
read_resp = arrow_client.create_read_session(
required_partitions=["pt=20230101"],
)
for split_index in range(read_resp.splits_count):
reader = arrow_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
# 处理 batch
写入 pandas DataFrame 到表
import pandas as pd
import pyarrow as pa
from odps.apis.storage_api_v2 import StorageApiArrowClient
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Carol"],
"value": [100.0, 200.0, 150.0],
})
batch = pa.RecordBatch.from_pandas(df)
arrow_client = StorageApiArrowClient(odps, table)
# 创建写会话
write_resp = arrow_client.create_write_session()
session_id = write_resp.session_id
# 创建写流
arrow_client.create_write_stream(session_id, stream_id=0)
# 写入数据
writer = arrow_client.write_rows_arrow(
session_id, stream_id=0, record_count=len(df),
)
writer.write(batch)
writer.finish()
# 关闭写流并提交会话
arrow_client.close_write_stream(session_id, stream_id=0)
arrow_client.commit_write_session(session_id)
读取 SQL 查询结果
instance = odps.execute_sql("SELECT * FROM your_table WHERE id > 100")
instance_client = StorageApiArrowClient(odps, instance)
read_resp = instance_client.create_read_session()
for split_index in range(read_resp.splits_count):
reader = instance_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
while True:
batch = reader.read()
if batch is None:
break
df = batch.to_pandas()
# 处理查询结果
多进程并行读取
from multiprocessing import Pool
read_resp = arrow_client.create_read_session()
def read_split(split_index):
# 每个进程创建独立的客户端
from odps import ODPS
from odps.apis.storage_api_v2 import StorageApiArrowClient
local_odps = ODPS(...) # 初始化 ODPS
local_client = StorageApiArrowClient(local_odps, table)
reader = local_client.read_rows_arrow(
read_resp.session_id, split_index=split_index
)
batches = []
while True:
batch = reader.read()
if batch is None:
break
batches.append(batch.to_pandas())
return batches
if __name__ == "__main__":
with Pool(processes=read_resp.splits_count) as pool:
results = pool.map(read_split, range(read_resp.splits_count))
会话状态说明
读会话状态(SessionStatus)
状态 |
说明 |
|---|---|
INIT |
会话初始化中 |
NORMAL |
会话正常,可以读取数据 |
CRITICAL |
会话出现严重错误 |
EXPIRED |
会话已过期,需要刷新或重新创建 |
COMMITTING |
写会话正在提交中 |
COMMITTED |
写会话已提交完成 |
流状态(Status)
状态 |
说明 |
|---|---|
INIT |
流初始化中 |
OK |
流操作完成 |
WAIT |
等待数据中 |
RUNNING |
流正在运行中 |
参考
完整的 API 参考文档请参见 Storage API V2。