Definitions
- class odps.ODPS(access_id=None, secret_access_key=None, project=None, endpoint=None, schema=None, app_account=None, logview_host=None, tunnel_endpoint=None, region_name=None, **kw)[源代码]
Main entrance to ODPS.
Convenient operations on ODPS objects are provided. Please refer to ODPS docs for more details.
Generally, basic operations such as
list
,get
,exist
,create
,delete
are provided for each ODPS object. Take theTable
as an example.To create an ODPS instance, access_id and access_key is required, and should ensure correctness, or
SignatureNotMatch
error will throw. If tunnel_endpoint is not set, the tunnel API will route service URL automatically.- 参数:
access_id – Aliyun Access ID
secret_access_key – Aliyun Access Key
project – default project name
endpoint – Rest service URL
tunnel_endpoint – Tunnel service URL
logview_host – Logview host URL
app_account – Application account, instance of odps.accounts.AppAccount used for dual authentication
- Example:
>>> odps = ODPS('**your access id**', '**your access key**', 'default_project') >>> >>> for table in odps.list_tables(): >>> # handle each table >>> >>> table = odps.get_table('dual') >>> >>> odps.exist_table('dual') is True >>> >>> odps.create_table('test_table', schema) >>> >>> odps.delete_table('test_table')
- as_account(access_id=None, secret_access_key=None, account=None, app_account=None)[源代码]
Creates a new ODPS entry object with a new account information
- 参数:
access_id – Aliyun Access ID of the new account
secret_access_key – Aliyun Access Key of the new account
account – new account object, if access_id and secret_access_key not supplied
app_account – Application account, instance of odps.accounts.AppAccount used for dual authentication
- 返回:
- attach_session(session_name, taskname=None, hints=None)[源代码]
Attach to an existing session.
- 参数:
session_name – The session name.
taskname – The created sqlrt task name. If not provided, the default value is used. Mostly doesn’t matter, default works.
- 返回:
A SessionInstance you may execute select tasks within.
- copy_offline_model(name, new_name, project=None, new_project=None, async_=False)[源代码]
Copy current model into a new location.
- 参数:
new_name – name of the new model
new_project – new project name. if absent, original project name will be used
async – if True, return the copy instance. otherwise return the newly-copied model
- create_external_volume(name, project=None, schema=None, location=None, rolearn=None, **kwargs)[源代码]
Create a file system volume based on external storage (for instance, OSS) in a project.
- 参数:
name (str) – volume name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
volume
- 返回类型:
odps.models.FSVolume
参见
odps.models.FSVolume
- create_fs_volume(name, project=None, schema=None, **kwargs)[源代码]
Create a new-fashioned file system volume in a project.
- 参数:
name (str) – volume name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
volume
- 返回类型:
odps.models.FSVolume
参见
odps.models.FSVolume
- create_function(name, project=None, schema=None, **kwargs)[源代码]
Create a function by given name.
- 参数:
name – function name
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
class_type (str) – main class
resources (list) – the resources that function needs to use
- 返回:
the created function
- 返回类型:
- Example:
>>> res = odps.get_resource('test_func.py') >>> func = odps.create_function('test_func', class_type='test_func.Test', resources=[res, ])
- create_mars_cluster(worker_num=1, worker_cpu=8, worker_mem=32, cache_mem=None, min_worker_num=None, disk_num=1, disk_size=100, supervisor_num=1, supervisor_cpu=None, supervisor_mem=None, with_notebook=False, notebook_cpu=None, notebook_mem=None, with_graphscope=False, coordinator_cpu=None, coordinator_mem=None, timeout=None, extra_modules=None, resources=None, instance_id=None, name='default', if_exists='reuse', project=None, **kw)
Create a Mars cluster and a Mars session as default session, then all tasks will be submitted to cluster.
- 参数:
worker_num – mars cluster worker’s number
worker_cpu – number of cpu cores on each mars worker
worker_mem – memory size on each mars worker
cache_mem – cache memory size on each mars worker
disk_num – number of mounted disk
min_worker_num – return if cluster worker’s number reach to min_worker
resources – resources name
extra_modules – user defined module path
supervisor_num – the number of supervisors, default is 0
with_notebook – whether launch jupyter notebook, default is False
instance_id – existing mars cluster’s instance id
name – cluster name, ‘default’ will be default name
if_exists – ‘reuse’, ‘raise’ or ‘ignore’, if ‘reuse’, will reuse the first created cluster with the same name, if not created, create a new one; if ‘raise’, will fail if cluster with same name created already; if ‘ignore’, will always create a new cluster
project – project name
- 返回:
class: MarsClient
- create_parted_volume(name, project=None, schema=None, **kwargs)[源代码]
Create an old-fashioned partitioned volume in a project.
- 参数:
name (str) – volume name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
volume
- 返回类型:
odps.models.PartedVolume
参见
odps.models.PartedVolume
- create_resource(name, type=None, project=None, schema=None, **kwargs)[源代码]
Create a resource by given name and given type.
Currently, the resource type can be
file
,jar
,py
,archive
,table
.The
file
,jar
,py
,archive
can be classified into file resource. To init the file resource, you have to provide another parameter which is a file-like object.For the table resource, the table name, project name, and partition should be provided which the partition is optional.
- 参数:
name – resource name
type – resource type, now support
file
,jar
,py
,archive
,table
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
kwargs – optional arguments, I will illustrate this in the example below.
- 返回:
resource depends on the type, if
file
will beodps.models.FileResource
and so on- 返回类型:
odps.models.Resource
’s subclasses- Example:
>>> from odps.models.resource import * >>> >>> res = odps.create_resource('test_file_resource', 'file', fileobj=open('/to/path/file')) >>> assert isinstance(res, FileResource) >>> True >>> >>> res = odps.create_resource('test_py_resource.py', 'py', fileobj=StringIO('import this')) >>> assert isinstance(res, PyResource) >>> True >>> >>> res = odps.create_resource('test_table_resource', 'table', table_name='test_table', partition='pt=test') >>> assert isinstance(res, TableResource) >>> True >>>
- create_role(name, project=None)[源代码]
Create a role in a project
- 参数:
name – name of the role to create
project – project name, if not provided, will be the default project
- 返回:
role object created
- create_schema(name, project=None, async_=False)[源代码]
Create a schema with given name
- 参数:
name – schema name
project – project name, if not provided, will be the default project
async – if True, will run asynchronously
- 返回:
if async_ is True, return instance, otherwise return Schema object.
- create_session(session_worker_count, session_worker_memory, session_name=None, worker_spare_span=None, taskname=None, hints=None)[源代码]
Create session.
- 参数:
session_worker_count – How much workers assigned to the session.
session_worker_memory – How much memory each worker consumes.
session_name – The session name. Not specifying to use its ID as name.
worker_spare_span – format “00-24”, allocated workers will be reduced during this time. Not specifying to disable this.
taskname – The created sqlrt task name. If not provided, the default value is used. Mostly doesn’t matter, default works.
hints – Extra hints provided to the session. Parameters of this method will override certain hints.
- 返回:
A SessionInstance you may execute select tasks within.
- create_table(name, table_schema=None, project=None, schema=None, comment=None, if_not_exists=False, lifecycle=None, shard_num=None, hub_lifecycle=None, hints=None, transactional=False, primary_key=None, storage_tier=None, async_=False, **kw)[源代码]
Create a table by given schema and other optional parameters.
- 参数:
name – table name
table_schema – table schema. Can be an instance of
odps.models.TableSchema
or a string like ‘col1 string, col2 bigint’project – project name, if not provided, will be the default project
comment – table comment
schema (str) – schema name, if not provided, will be the default schema
if_not_exists (bool) – will not create if this table already exists, default False
lifecycle (int) – table’s lifecycle. If absent, options.lifecycle will be used.
shard_num (int) – table’s shard num
hub_lifecycle (int) – hub lifecycle
hints (dict) – hints for the task
transactional (bool) – make table transactional
primary_key (list) – primary key of the table, only for transactional tables
storage_tier (str) – storage tier of the table
async (bool) – if True, will run asynchronously
- 返回:
the created Table if not async else odps instance
- 返回类型:
- create_user(name, project=None)[源代码]
Add a user into the project
- 参数:
name – user name
project – project name, if not provided, will be the default project
- 返回:
user created
- create_volume_directory(volume, path=None, project=None, schema=None)[源代码]
Create a directory under a file system volume.
- 参数:
volume (str) – name of the volume.
path (str) – path of the directory to be created.
project (str) – project name, if not provided, will be the default project.
schema (str) – schema name, if not provided, will be the default schema
- 返回:
directory object.
- default_session()[源代码]
Attach to the default session of your project.
- 返回:
A SessionInstance you may execute select tasks within.
- delete_function(name, project=None, schema=None)[源代码]
Delete a function by given name.
- 参数:
name – function name
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
None
- delete_offline_model(name, project=None, if_exists=False)[源代码]
Delete the offline model by given name.
- 参数:
name – offline model’s name
if_exists – will not raise errors when the offline model does not exist, default False
project – project name, if not provided, will be the default project
- 返回:
None
- delete_resource(name, project=None, schema=None)[源代码]
Delete resource by given name.
- 参数:
name – resource name
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
None
- delete_role(name, project=None)[源代码]
Delete a role in a project
- 参数:
name – name of the role to delete
project – project name, if not provided, will be the default project
- delete_schema(name, project=None, async_=False)[源代码]
Delete the schema with given name
- 参数:
name – schema name
project – project name, if not provided, will be the default project
async (bool) – if True, will run asynchronously
- delete_table(name, project=None, if_exists=False, schema=None, hints=None, async_=False)[源代码]
Delete the table with given name
- 参数:
name – table name
project – project name, if not provided, will be the default project
if_exists (bool) – will not raise errors when the table does not exist, default False
schema (str) – schema name, if not provided, will be the default schema
hints (dict) – hints for the task
async (bool) – if True, will run asynchronously
- 返回:
None if not async else odps instance
- delete_user(name, project=None)[源代码]
Delete a user from the project
- 参数:
name – user name
project – project name, if not provided, will be the default project
- delete_volume(name, project=None, schema=None)[源代码]
Delete volume by given name.
- 参数:
name – volume name
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
None
- delete_volume_file(volume, path=None, recursive=False, project=None, schema=None)[源代码]
Delete a file / directory object under a file system volume.
- 参数:
volume (str) – name of the volume.
path (str) – path of the directory to be created.
recursive (bool) – if True, recursively delete files
project (str) – project name, if not provided, will be the default project.
schema (str) – schema name, if not provided, will be the default schema
- 返回:
directory object.
- delete_volume_partition(volume, partition=None, project=None, schema=None)[源代码]
Delete partition in a volume by given name
- 参数:
volume (str) – volume name
partition (str) – partition name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- delete_xflow(name, project=None)[源代码]
Delete xflow by given name.
- 参数:
name – xflow name
project – project name, if not provided, will be the default project
- 返回:
None
- execute_archive_table(table, partition=None, project=None, schema=None, hints=None, priority=None)[源代码]
Execute a task to archive tables and wait for termination.
- 参数:
table – name of the table to archive
partition – partition to archive
project – project name, if not provided, will be the default project
hints – settings for table archive task.
priority – instance priority, 9 as default
- 返回:
instance
- 返回类型:
- execute_merge_files(table, partition=None, project=None, schema=None, hints=None, priority=None, running_cluster=None, compact_type=None)[源代码]
Execute a task to merge multiple files in tables and wait for termination.
- 参数:
table – name of the table to optimize
partition – partition to optimize
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
hints – settings for merge task.
priority – instance priority, 9 as default
running_cluster – cluster to run this instance
compact_type – compact option for transactional table, can be major or minor.
- 返回:
instance
- 返回类型:
- execute_security_query(query, project=None, schema=None, token=None, hints=None, output_json=True)[源代码]
Execute a security query to grant / revoke / query privileges and returns the result string or json value.
- 参数:
query (str) – query text
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
output_json (bool) – parse json for the output
- 返回:
result string / json object
- execute_sql(sql, project=None, priority=None, running_cluster=None, hints=None, **kwargs)[源代码]
Run a given SQL statement and block until the SQL executed successfully.
- 参数:
sql (str) – SQL statement
project – project name, if not provided, will be the default project
priority (int) – instance priority, 9 as default
running_cluster – cluster to run this instance
hints (dict) – settings for SQL, e.g. odps.mapred.map.split.size
- 返回:
instance
- 返回类型:
- Example:
>>> instance = odps.execute_sql('select * from dual') >>> with instance.open_reader() as reader: >>> for record in reader: # iterate to handle result with schema >>> # handle each record >>> >>> instance = odps.execute_sql('desc dual') >>> with instance.open_reader() as reader: >>> print(reader.raw) # without schema, just get the raw result
- execute_sql_cost(sql, project=None, hints=None, **kwargs)[源代码]
- 参数:
sql (str) – SQL statement
project – project name, if not provided, will be the default project
hints (dict) – settings for SQL, e.g. odps.mapred.map.split.size
- 返回:
cost info in dict format
- 返回类型:
cost: dict
- Example:
>>> sql_cost = odps.execute_sql_cost('select * from dual') >>> sql_cost.udf_num 0 >>> sql_cost.complexity 1.0 >>> sql_cost.input_size 100
- execute_sql_interactive(sql, hints=None, fallback=True, wait_fallback=True, **kwargs)[源代码]
Run SQL query in interactive mode (a.k.a MaxCompute QueryAcceleration). If query is not supported or fails, and fallback is True, will fallback to offline mode automatically
- 参数:
sql – the sql query.
hints – settings for sql query.
fallback – fallback query to non-interactive mode, True by default. Both boolean type and policy names separated by commas are acceptable.
wait_fallback (bool) – wait fallback instance to finish, True by default.
- 返回:
instance.
- execute_xflow(xflow_name, xflow_project=None, parameters=None, project=None, hints=None, priority=None)[源代码]
Run xflow by given name, xflow project, paremeters, block until xflow executed successfully.
- 参数:
xflow_name (str) – XFlow name
xflow_project (str) – the project XFlow deploys
parameters (dict) – parameters
project – project name, if not provided, will be the default project
hints (dict) – execution hints
priority (int) – instance priority, 9 as default
- 返回:
instance
- 返回类型:
- exist_function(name, project=None, schema=None)[源代码]
If the function with given name exists or not.
- 参数:
name (str) – function name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
True if the function exists or False
- 返回类型:
bool
- exist_instance(id_, project=None)[源代码]
If the instance with given id exists or not.
- 参数:
id – instance id
project – project name, if not provided, will be the default project
- 返回:
True if exists or False
- 返回类型:
bool
- exist_offline_model(name, project=None)[源代码]
If the offline model with given name exists or not.
- 参数:
name – offline model’s name
project – project name, if not provided, will be the default project
- 返回:
True if offline model exists else False
- 返回类型:
bool
- exist_project(name)[源代码]
If project name which provided exists or not.
- 参数:
name – project name
- 返回:
True if exists or False
- 返回类型:
bool
- exist_resource(name, project=None, schema=None)[源代码]
If the resource with given name exists or not.
- 参数:
name – resource name
schema (str) – schema name, if not provided, will be the default schema
project – project name, if not provided, will be the default project
- 返回:
True if exists or False
- 返回类型:
bool
- exist_role(name, project=None)[源代码]
Check if a role exists in a project
- 参数:
name – name of the role
project – project name, if not provided, will be the default project
- exist_schema(name, project=None)[源代码]
If schema name which provided exists or not.
- 参数:
name – schema name
project – project name, if not provided, will be the default project
- 返回:
True if exists or False
- 返回类型:
bool
- exist_table(name, project=None, schema=None)[源代码]
If the table with given name exists or not.
- 参数:
name – table name
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
True if table exists or False
- 返回类型:
bool
- exist_user(name, project=None)[源代码]
Check if a user exists in the project
- 参数:
name – user name
project – project name, if not provided, will be the default project
- exist_volume(name, schema=None, project=None)[源代码]
If the volume with given name exists or not.
- 参数:
name (str) – volume name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
True if exists or False
- 返回类型:
bool
- exist_volume_partition(volume, partition=None, project=None, schema=None)[源代码]
If the volume with given name exists in a partition or not.
- 参数:
volume (str) – volume name
partition (str) – partition name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- exist_xflow(name, project=None)[源代码]
If the xflow with given name exists or not.
- 参数:
name – xflow name
project – project name, if not provided, will be the default project
- 返回:
True if exists or False
- 返回类型:
bool
- get_function(name, project=None, schema=None)[源代码]
Get the function by given name
- 参数:
name – function name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
the right function
- Raise:
odps.errors.NoSuchObject
if not exists
- get_instance(id_, project=None)[源代码]
Get instance by given instance id.
- 参数:
id – instance id
project – project name, if not provided, will be the default project
- 返回:
the right instance
- 返回类型:
- Raise:
odps.errors.NoSuchObject
if not exists
- get_logview_address(instance_id, hours=None, project=None)[源代码]
Get logview address by given instance id and hours.
- 参数:
instance_id – instance id
hours
project – project name, if not provided, will be the default project
- 返回:
logview address
- 返回类型:
str
- get_offline_model(name, project=None)[源代码]
Get offline model by given name
- 参数:
name – offline model name
project – project name, if not provided, will be the default project
- 返回:
offline model
- 返回类型:
- Raise:
odps.errors.NoSuchObject
if not exists
- get_project(name=None, default_schema=None)[源代码]
Get project by given name.
- 参数:
name – project name, if not provided, will be the default project
default_schema – default schema name, if not provided, will be the schema specified in ODPS object
- 返回:
the right project
- 返回类型:
- Raise:
odps.errors.NoSuchObject
if not exists
- get_project_policy(project=None)[源代码]
Get policy of a project
- 参数:
project – project name, if not provided, will be the default project
- 返回:
JSON object
- get_resource(name, project=None, schema=None)[源代码]
Get a resource by given name
- 参数:
name – resource name
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
the right resource
- 返回类型:
- Raise:
odps.errors.NoSuchObject
if not exists
- get_role_policy(name, project=None)[源代码]
Get policy object of a role
- 参数:
name – name of the role
project – project name, if not provided, will be the default project
- 返回:
JSON object
- get_schema(name=None, project=None)[源代码]
Get the schema by given name.
- 参数:
name – schema name, if not provided, will be the default schema
project – project name, if not provided, will be the default project
- 返回:
the Schema object
- get_security_option(option_name, project=None)[源代码]
Get one security option of a project
- 参数:
option_name – name of the security option. Please refer to ODPS options for more details.
project – project name, if not provided, will be the default project
- 返回:
option value
- get_security_options(project=None)[源代码]
Get all security options of a project
- 参数:
project – project name, if not provided, will be the default project
- 返回:
SecurityConfiguration object
- get_table(name, project=None, schema=None)[源代码]
Get table by given name.
- 参数:
name – table name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
the right table
- 返回类型:
- Raise:
odps.errors.NoSuchObject
if not exists
- get_volume(name, project=None, schema=None)[源代码]
Get volume by given name.
- 参数:
name (str) – volume name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
volume object. Return type depends on the type of the volume.
- 返回类型:
odps.models.Volume
- get_volume_file(volume, path=None, project=None, schema=None)[源代码]
Get a file under a partition of a parted volume, or a file / directory object under a file system volume.
- 参数:
volume (str) – name of the volume.
path (str) – path of the directory to be created.
project (str) – project name, if not provided, will be the default project.
schema (str) – schema name, if not provided, will be the default schema
- 返回:
directory object.
- get_volume_partition(volume, partition=None, project=None, schema=None)[源代码]
Get partition in a parted volume by given name.
- 参数:
volume (str) – volume name
partition (str) – partition name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
partitions
- 返回类型:
odps.models.VolumePartition
- get_xflow(name, project=None)[源代码]
Get xflow by given name
- 参数:
name – xflow name
project – project name, if not provided, will be the default project
- 返回:
xflow
- 返回类型:
odps.models.XFlow
- Raise:
odps.errors.NoSuchObject
if not exists
参见
odps.models.XFlow
- get_xflow_results(instance, project=None)[源代码]
The result given the results of xflow
- 参数:
instance (
odps.models.Instance
) – instance of xflowproject – project name, if not provided, will be the default project
- 返回:
xflow result
- 返回类型:
dict
- get_xflow_sub_instances(instance, project=None)[源代码]
The result iterates the sub instance of xflow
- 参数:
instance (
odps.models.Instance
) – instance of xflowproject – project name, if not provided, will be the default project
- 返回:
sub instances dictionary
- iter_xflow_sub_instances(instance, interval=1, project=None, check=False)[源代码]
The result iterates the sub instance of xflow and will wait till instance finish
- 参数:
instance (
odps.models.Instance
) – instance of xflowinterval – time interval to check
project – project name, if not provided, will be the default project
check (bool) – check if the instance is successful
- 返回:
sub instances dictionary
- list_functions(project=None, prefix=None, owner=None, schema=None)[源代码]
List all functions of a project.
- 参数:
project (str) – project name, if not provided, will be the default project
prefix (str) – the listed functions start with this prefix
owner (str) – Aliyun account, the owner which listed tables belong to
schema (str) – schema name, if not provided, will be the default schema
- 返回:
functions
- 返回类型:
generator
- list_instance_queueing_infos(project=None, status=None, only_owner=None, quota_index=None)[源代码]
List instance queueing information.
- 参数:
project – project name, if not provided, will be the default project
status – including ‘Running’, ‘Suspended’, ‘Terminated’
only_owner (bool) – True will filter the instances created by current user
quota_index (str)
- 返回:
instance queueing infos
- 返回类型:
list
- list_instances(project=None, start_time=None, end_time=None, status=None, only_owner=None, quota_index=None, **kw)[源代码]
List instances of a project by given optional conditions including start time, end time, status and if only the owner.
- 参数:
project – project name, if not provided, will be the default project
start_time (datetime, int or float) – the start time of filtered instances
end_time (datetime, int or float) – the end time of filtered instances
status – including ‘Running’, ‘Suspended’, ‘Terminated’
only_owner (bool) – True will filter the instances created by current user
quota_index (str)
- 返回:
instances
- 返回类型:
list
- list_mars_instances(project=None, days=3, return_task_name=False)
List all running mars instances in your project.
- 参数:
project – default project name
days – the days range of filtered instances
return_task_name – If return task name
- 返回:
Instances.
- list_offline_models(project=None, prefix=None, owner=None)[源代码]
List offline models of project by optional filter conditions including prefix and owner.
- 参数:
project – project name, if not provided, will be the default project
prefix – prefix of offline model’s name
owner – Aliyun account
- 返回:
offline models
- 返回类型:
list
- list_projects(owner=None, user=None, group=None, prefix=None, max_items=None, region_id=None, tenant_id=None)[源代码]
List projects.
- 参数:
owner – Aliyun account, the owner which listed projects belong to
user – name of the user who has access to listed projects
group – name of the group listed projects belong to
prefix – prefix of names of listed projects
max_items – the maximal size of result set
- 返回:
projects in this endpoint.
- 返回类型:
generator
- list_resources(project=None, prefix=None, owner=None, schema=None)[源代码]
List all resources of a project.
- 参数:
project – project name, if not provided, will be the default project
prefix (str) – the listed resources start with this prefix
owner (str) – Aliyun account, the owner which listed tables belong to
schema (str) – schema name, if not provided, will be the default schema
- 返回:
resources
- 返回类型:
generator
- list_role_users(name, project=None)[源代码]
List users who have the specified role.
- 参数:
name – name of the role
project – project name, if not provided, will be the default project
- 返回:
collection of User objects
- list_roles(project=None)[源代码]
List all roles in a project
- 参数:
project – project name, if not provided, will be the default project
- 返回:
collection of role objects
- list_schemas(project=None, prefix=None, owner=None)[源代码]
List all schemas of a project.
- 参数:
project – project name, if not provided, will be the default project
prefix (str) – the listed schemas start with this prefix
owner (str) – Aliyun account, the owner which listed tables belong to
- 返回:
schemas
- list_tables(project=None, prefix=None, owner=None, schema=None, type=None, extended=False)[源代码]
List all tables of a project. If prefix is provided, the listed tables will all start with this prefix. If owner is provided, the listed tables will belong to such owner.
- 参数:
project (str) – project name, if not provided, will be the default project
prefix (str) – the listed tables start with this prefix
owner (str) – Aliyun account, the owner which listed tables belong to
schema (str) – schema name, if not provided, will be the default schema
type (str) – type of the table
extended (bool) – if True, load extended information for table
- 返回:
tables in this project, filtered by the optional prefix and owner.
- 返回类型:
generator
- list_tables_model(prefix='', project=None)
List all TablesModel in the given project.
- 参数:
prefix – model prefix
project (str) – project name, if you want to look up in another project
- 返回类型:
list[str]
- list_user_roles(name, project=None)[源代码]
List roles of the specified user
- 参数:
name – user name
project – project name, if not provided, will be the default project
- 返回:
collection of Role object
- list_users(project=None)[源代码]
List users in the project
- 参数:
project – project name, if not provided, will be the default project
- 返回:
collection of User objects
- list_volume_files(volume, partition=None, project=None, schema=None)[源代码]
List files in a volume. In partitioned volumes, the function returns files under specified partition. In file system volumes, the function returns files under specified path.
- 参数:
volume (str) – volume name
partition (str) – partition name for partitioned volumes, and path for file system volumes.
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
files
- 返回类型:
list
- Example:
>>> # List files under a partition in a partitioned volume. Two calls are equivalent. >>> odps.list_volume_files('parted_volume', 'partition_name') >>> odps.list_volume_files('/parted_volume/partition_name') >>> # List files under a path in a file system volume. Two calls are equivalent. >>> odps.list_volume_files('fs_volume', 'dir1/dir2') >>> odps.list_volume_files('/fs_volume/dir1/dir2')
- list_volume_partitions(volume, project=None, schema=None)[源代码]
List partitions of a volume.
- 参数:
volume (str) – volume name
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
- 返回:
partitions
- 返回类型:
list
- list_volumes(project=None, schema=None, owner=None)[源代码]
List volumes of a project.
- 参数:
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
owner (str) – Aliyun account
- 返回:
volumes
- 返回类型:
list
- list_xflows(project=None, owner=None)[源代码]
List xflows of a project which can be filtered by the xflow owner.
- 参数:
project (str) – project name, if not provided, will be the default project
owner (str) – Aliyun account
- 返回:
xflows
- 返回类型:
list
- move_volume_file(old_path, new_path, replication=None, project=None, schema=None)[源代码]
Move a file / directory object under a file system volume to another location in the same volume.
- 参数:
old_path (str) – old path of the volume file.
new_path (str) – target path of the moved file.
replication (int) – file replication.
project (str) – project name, if not provided, will be the default project.
schema (str) – schema name, if not provided, will be the default schema
- 返回:
directory object.
- open_resource(name, project=None, mode='r+', encoding='utf-8', schema=None, type='file', stream=False, comment=None)[源代码]
Open a file resource as file-like object. This is an elegant and pythonic way to handle file resource.
The argument
mode
stands for the open mode for this file resource. It can be binary mode if the ‘b’ is inside. For instance, ‘rb’ means opening the resource as read binary mode while ‘r+b’ means opening the resource as read+write binary mode. This is most import when the file is actually binary such as tar or jpeg file, so be aware of opening this file as a correct mode.Basically, the text mode can be ‘r’, ‘w’, ‘a’, ‘r+’, ‘w+’, ‘a+’ just like the builtin python
open
method.r
means read onlyw
means write only, the file will be truncated when openinga
means append onlyr+
means read+write without constraintw+
will truncate first then opening into read+writea+
can read+write, however the written content can only be appended to the end
- 参数:
name (
odps.models.FileResource
or str) – file resource or file resource nameproject – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
mode (str) – the mode of opening file, described as above
encoding (str) – utf-8 as default
type (str) – resource type, can be “file”, “archive”, “jar” or “py”
stream (bool) – if True, use stream to upload, False by default
comment (str) – comment of the resource
- 返回:
file-like object
- Example:
>>> with odps.open_resource('test_resource', mode='r') as fp: >>> fp.read(1) # read one unicode character >>> fp.write('test') # wrong, cannot write under read mode >>> >>> with odps.open_resource('test_resource', mode='wb') as fp: >>> fp.readlines() # wrong, cannot read under write mode >>> fp.write('hello world') # write bytes >>> >>> with odps.open_resource('test_resource') as fp: # default as read-write mode >>> fp.seek(5) >>> fp.truncate() >>> fp.flush()
- open_volume_reader(volume, partition=None, file_name=None, project=None, schema=None, start=None, length=None, **kwargs)[源代码]
Open a volume file for read. A file-like object will be returned which can be used to read contents from volume files.
- 参数:
volume (str) – name of the volume
partition (str) – name of the partition
file_name (str) – name of the file
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
start – start position
length – length limit
compress_option (CompressOption) – the compression algorithm, level and strategy
- Example:
>>> with odps.open_volume_reader('parted_volume', 'partition', 'file') as reader: >>> [print(line) for line in reader]
- open_volume_writer(volume, partition=None, project=None, schema=None, **kwargs)[源代码]
Write data into a volume. This function behaves differently under different types of volumes.
Under partitioned volumes, all files under a partition should be uploaded in one submission. The method returns a writer object with whose open method you can open a file inside the volume and write to it, or you can use write method to write to specific files.
Under file system volumes, the method returns a file-like object.
- 参数:
volume (str) – name of the volume
partition (str) – partition name for partitioned volumes, and path for file system volumes.
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
compress_option (
odps.tunnel.CompressOption
) – the compression algorithm, level and strategy
- Example:
>>> # Writing to partitioned volumes >>> with odps.open_volume_writer('parted_volume', 'partition') as writer: >>> # both write methods are acceptable >>> writer.open('file1').write('some content') >>> writer.write('file2', 'some content') >>> # Writing to file system volumes >>> with odps.open_volume_writer('/fs_volume/dir1/file_name') as writer: >>> writer.write('some content')
- persist_mars_dataframe(df, table_name, overwrite=False, partition=None, write_batch_size=None, unknown_as_string=None, as_type=None, drop_table=False, create_table=True, drop_partition=False, create_partition=None, lifecycle=None, tunnel_quota_name=None, runtime_endpoint=None, **kw)
Write Mars DataFrame to table.
- 参数:
df – Mars DataFrame.
table_name – table to write.
overwrite – if overwrite the data. False as default.
partition – partition spec.
write_batch_size – batch size of records to write. 1024 as default.
unknown_as_string – set the columns to string type if it’s type is Object.
as_type – specify column dtypes. {‘a’: ‘string’} will set column a as string type.
drop_table – drop table if exists, False as default
create_table – create table first if not exits, True as default
drop_partition – drop partition if exists, False as default
create_partition – create partition if not exists, None as default
lifecycle – table lifecycle. If absent, options.lifecycle will be used.
tunnel_quota_name – name of tunnel quota
- 返回:
None
- read_table(name, limit=None, start=0, step=None, project=None, schema=None, partition=None, **kw)[源代码]
Read table’s records.
- 参数:
name (
odps.models.table.Table
or str) – table or table namelimit – the records’ size, if None will read all records from the table
start – the record where read starts with
step – default as 1
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
partition – the partition of this table to read
columns (list) – the columns’ names which are the parts of table’s columns
compress (bool) – if True, the data will be compressed during downloading
compress_option (
odps.tunnel.CompressOption
) – the compression algorithm, level and strategyendpoint – tunnel service URL
reopen – reading the table will reuse the session which opened last time, if set to True will open a new download session, default as False
- 返回:
records
- 返回类型:
generator
- Example:
>>> for record in odps.read_table('test_table', 100): >>> # deal with such 100 records >>> for record in odps.read_table('test_table', partition='pt=test', start=100, limit=100): >>> # read the `pt=test` partition, skip 100 records and read 100 records
- run_archive_table(table, partition=None, project=None, schema=None, hints=None, priority=None)[源代码]
Start running a task to archive tables.
- 参数:
table – name of the table to archive
partition – partition to archive
project – project name, if not provided, will be the default project
hints – settings for table archive task.
priority – instance priority, 9 as default
- 返回:
instance
- 返回类型:
- run_merge_files(table, partition=None, project=None, schema=None, hints=None, priority=None, running_cluster=None, compact_type=None)[源代码]
Start running a task to merge multiple files in tables.
- 参数:
table – name of the table to optimize
partition – partition to optimize
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
hints – settings for merge task.
priority – instance priority, 9 as default
running_cluster – cluster to run this instance
compact_type – compact option for transactional table, can be major or minor.
- 返回:
instance
- 返回类型:
- run_security_query(query, project=None, schema=None, token=None, hints=None, output_json=True)[源代码]
Run a security query to grant / revoke / query privileges. If the query is install package or uninstall package, return a waitable AuthQueryInstance object, otherwise returns the result string or json value.
- 参数:
query (str) – query text
project (str) – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
output_json (bool) – parse json for the output
- 返回:
result string / json object
- run_sql(sql, project=None, priority=None, running_cluster=None, hints=None, aliases=None, default_schema=None, **kwargs)[源代码]
Run a given SQL statement asynchronously
- 参数:
sql (str) – SQL statement
project – project name, if not provided, will be the default project
priority (int) – instance priority, 9 as default
running_cluster – cluster to run this instance
hints (dict) – settings for SQL, e.g. odps.mapred.map.split.size
aliases (dict)
- 返回:
instance
- 返回类型:
- run_sql_interactive(sql, hints=None, **kwargs)[源代码]
Run SQL query in interactive mode (a.k.a MaxCompute QueryAcceleration). Won’t fallback to offline mode automatically if query not supported or fails :param sql: the sql query. :param hints: settings for sql query. :return: instance.
- run_xflow(xflow_name, xflow_project=None, parameters=None, project=None, hints=None, priority=None)[源代码]
Run xflow by given name, xflow project, paremeters asynchronously.
- 参数:
xflow_name (str) – XFlow name
xflow_project (str) – the project XFlow deploys
parameters (dict) – parameters
project – project name, if not provided, will be the default project
hints (dict) – execution hints
priority (int) – instance priority, 9 as default
- 返回:
instance
- 返回类型:
- property schema
Get or set default schema name of the ODPS object
- set_project_policy(policy, project=None)[源代码]
Set policy of a project
- 参数:
policy – name of policy.
project – project name, if not provided, will be the default project
- 返回:
JSON object
- set_role_policy(name, policy, project=None)[源代码]
Get policy object of project
- 参数:
name – name of the role
policy – policy string or JSON object
project – project name, if not provided, will be the default project
- set_security_option(option_name, value, project=None)[源代码]
Set a security option of a project
- 参数:
option_name – name of the security option. Please refer to ODPS options for more details.
value – value of security option to be set.
project – project name, if not provided, will be the default project.
- stop_instance(id_, project=None)[源代码]
Stop the running instance by given instance id.
- 参数:
id – instance id
project – project name, if not provided, will be the default project
- 返回:
None
- stop_job(id_, project=None)
Stop the running instance by given instance id.
- 参数:
id – instance id
project – project name, if not provided, will be the default project
- 返回:
None
- to_mars_dataframe(table_name, shape=None, partition=None, chunk_bytes=None, sparse=False, columns=None, add_offset=None, calc_nrows=True, index_type='chunk_incremental', use_arrow_dtype=False, string_as_binary=None, chunk_size=None, memory_scale=None, runtime_endpoint=None, append_partitions=False, with_split_meta_on_tile=False, tunnel_quota_name=None, extra_params=None, **kw)
Read table to Mars DataFrame.
- 参数:
table_name – table name
shape – table shape. A tuple like (1000, 3) which means table count is 1000 and schema length is 3.
partition – partition spec.
chunk_bytes – Bytes to read for each chunk. Default value is ‘16M’.
chunk_size – Desired chunk size on rows.
sparse – if read as sparse DataFrame.
columns – selected columns.
add_offset – if standardize the DataFrame’s index to RangeIndex. False as default.
index_type – type of retrieved index
calc_nrows – if calculate nrows if shape is not specified.
use_arrow_dtype – read to arrow dtype. Reduce memory in some saces.
string_as_binary – read string columns as binary type.
memory_scale – Scale that real memory occupation divided with raw file size.
append_partitions – append partition name when reading partitioned tables.
tunnel_quota_name – name of tunnel quota
- 返回:
Mars DataFrame.
- property tunnel_endpoint
Get or set tunnel endpoint of the ODPS object
- write_table(name, *block_records, **kw)[源代码]
Write records into given table.
- 参数:
name (
models.table.Table
or str) – table or table nameblock_records – if given records only, the block id will be 0 as default.
project – project name, if not provided, will be the default project
schema (str) – schema name, if not provided, will be the default schema
partition – the partition of this table to write
overwrite (bool) – if True, will overwrite existing data
compress (bool) – if True, the data will be compressed during uploading
compress_option (
odps.tunnel.CompressOption
) – the compression algorithm, level and strategyendpoint – tunnel service URL
reopen – writing the table will reuse the session which opened last time, if set to True will open a new upload session, default as False
- 返回:
None
- Example:
>>> odps.write_table('test_table', records) # write to block 0 as default >>> >>> odps.write_table('test_table', 0, records) # write to block 0 explicitly >>> >>> odps.write_table('test_table', 0, records1, 1, records2) # write to multi-blocks >>> >>> odps.write_table('test_table', records, partition='pt=test') # write to certain partition
- class odps.models.Project(**kwargs)[源代码]
Project is the counterpart of database in a RDBMS.
By get an object of Project, users can get the properties like
name
,owner
,comment
,creation_time
,last_modified_time
, and so on.These properties will not load from remote ODPS service, unless users try to get them explicitly. If users want to check the newest status, try use
reload
method.- Example:
>>> project = odps.get_project('my_project') >>> project.last_modified_time # this property will be fetched from the remote ODPS service >>> project.last_modified_time # Once loaded, the property will not bring remote call >>> project.owner # so do the other properties, they are fetched together >>> project.reload() # force to update each properties >>> project.last_modified_time # already updated
- class AuthQueryStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[源代码]
- class odps.models.Table(**kwargs)[源代码]
Table means the same to the RDBMS table, besides, a table can consist of partitions.
Table’s properties are the same to the ones of
odps.models.Project
, which will not load from remote ODPS service until users try to get them.In order to write data into table, users should call the
open_writer
method with with statement. At the same time, theopen_reader
method is used to provide the ability to read records from a table or its partition.- Example:
>>> table = odps.get_table('my_table') >>> table.owner # first will load from remote >>> table.reload() # reload to update the properties >>> >>> for record in table.head(5): >>> # check the first 5 records >>> for record in table.head(5, partition='pt=test', columns=['my_column']) >>> # only check the `my_column` column from certain partition of this table >>> >>> with table.open_reader() as reader: >>> count = reader.count # How many records of a table or its partition >>> for record in reader[0: count]: >>> # read all data, actually better to split into reading for many times >>> >>> with table.open_writer() as writer: >>> writer.write(records) >>> with table.open_writer(partition='pt=test', blocks=[0, 1]): >>> writer.write(0, gen_records(block=0)) >>> writer.write(1, gen_records(block=1)) # we can do this parallel
- class Type(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[源代码]
- create_partition(partition_spec, if_not_exists=False, async_=False, hints=None)[源代码]
Create a partition within the table.
- 参数:
partition_spec – specification of the partition.
if_not_exists
hints
async
- 返回:
partition object
- 返回类型:
- delete_partition(partition_spec, if_exists=False, async_=False, hints=None)[源代码]
Delete a partition within the table.
- 参数:
partition_spec – specification of the partition.
if_exists
hints
async
- drop(async_=False, if_exists=False, hints=None)[源代码]
Drop this table.
- 参数:
async – run asynchronously if True
if_exists
hints
- 返回:
None
- exist_partition(partition_spec)[源代码]
Check if a partition exists within the table.
- 参数:
partition_spec – specification of the partition.
- exist_partitions(prefix_spec=None)[源代码]
Check if partitions with provided conditions exist.
- 参数:
prefix_spec – prefix of partition
- 返回:
whether partitions exist
- get_ddl(with_comments=True, if_not_exists=False, force_table_ddl=False)[源代码]
Get DDL SQL statement for the given table.
- 参数:
with_comments – append comment for table and each column
if_not_exists – generate if not exists code for generated DDL
force_table_ddl – force generate table DDL if object is a view
- 返回:
DDL statement
- get_max_partition(spec=None, skip_empty=True, reverse=False)[源代码]
Get partition with maximal values within certain spec.
- 参数:
spec – parent partitions. if specified, will return partition with maximal value within specified parent partition
skip_empty – if True, will skip partitions without data
reverse – if True, will return minimal value
- 返回:
Partition
- get_partition(partition_spec)[源代码]
Get a partition with given specifications.
- 参数:
partition_spec – specification of the partition.
- 返回:
partition object
- 返回类型:
- head(limit, partition=None, columns=None, use_legacy=True, timeout=None)[源代码]
Get the head records of a table or its partition.
- 参数:
limit (int) – records’ size, 10000 at most
partition – partition of this table
columns (list) – the columns which is subset of the table columns
- 返回:
records
- 返回类型:
list
- iterate_partitions(spec=None, reverse=False)[源代码]
Create an iterable object to iterate over partitions.
- 参数:
spec – specification of the partition.
reverse – output partitions in reversed order
- new_record(values=None)[源代码]
Generate a record of the table.
- 参数:
values (list) – the values of this records
- 返回:
record
- 返回类型:
- Example:
>>> table = odps.create_table('test_table', schema=TableSchema.from_lists(['name', 'id'], ['sring', 'string'])) >>> record = table.new_record() >>> record[0] = 'my_name' >>> record[1] = 'my_id' >>> record = table.new_record(['my_name', 'my_id'])
- open_reader(partition=None, reopen=False, endpoint=None, download_id=None, timeout=None, arrow=False, columns=None, quota_name=None, async_mode=True, **kw)[源代码]
Open the reader to read the entire records from this table or its partition.
- 参数:
partition – partition of this table
reopen (bool) – the reader will reuse last one, reopen is true means open a new reader.
endpoint – the tunnel service URL
download_id – use existing download_id to download table contents
arrow – use arrow tunnel to read data
quota_name – name of tunnel quota
async_mode – enable async mode to create tunnels, can set True if session creation takes a long time.
compress_option (
odps.tunnel.CompressOption
) – compression algorithm, level and strategycompress_algo – compression algorithm, work when
compress_option
is not provided, can bezlib
,snappy
compress_level – used for
zlib
, work whencompress_option
is not providedcompress_strategy – used for
zlib
, work whencompress_option
is not provided
- 返回:
reader,
count
means the full size,status
means the tunnel status- Example:
>>> with table.open_reader() as reader: >>> count = reader.count # How many records of a table or its partition >>> for record in reader[0: count]: >>> # read all data, actually better to split into reading for many times
- open_writer(partition=None, blocks=None, reopen=False, create_partition=False, commit=True, endpoint=None, upload_id=None, arrow=False, quota_name=None, mp_context=None, **kw)[源代码]
Open the writer to write records into this table or its partition.
- 参数:
partition – partition of this table
blocks – block ids to open
reopen (bool) – the reader will reuse last one, reopen is true means open a new reader.
create_partition (bool) – if true, the partition will be created if not exist
endpoint – the tunnel service URL
upload_id – use existing upload_id to upload data
arrow – use arrow tunnel to write data
quota_name – name of tunnel quota
overwrite (bool) – if True, will overwrite existing data
compress_option (
odps.tunnel.CompressOption
) – compression algorithm, level and strategycompress_algo – compression algorithm, work when
compress_option
is not provided, can bezlib
,snappy
compress_level – used for
zlib
, work whencompress_option
is not providedcompress_strategy – used for
zlib
, work whencompress_option
is not provided
- 返回:
writer, status means the tunnel writer status
- Example:
>>> with table.open_writer() as writer: >>> writer.write(records) >>> with table.open_writer(partition='pt=test', blocks=[0, 1]): >>> writer.write(0, gen_records(block=0)) >>> writer.write(1, gen_records(block=1)) # we can do this parallel
- class odps.models.TableSchema(**kwargs)[源代码]
Schema includes the columns and partitions information of a
odps.models.Table
.There are two ways to initialize a Schema object, first is to provide columns and partitions, the second way is to call the class method
from_lists
. See the examples below:- Example:
>>> columns = [Column(name='num', type='bigint', comment='the column')] >>> partitions = [Partition(name='pt', type='string', comment='the partition')] >>> schema = TableSchema(columns=columns, partitions=partitions) >>> schema.columns [<column num, type bigint>, <partition pt, type string>] >>> >>> schema = TableSchema.from_lists(['num'], ['bigint'], ['pt'], ['string']) >>> schema.columns [<column num, type bigint>, <partition pt, type string>]
- class odps.models.table.TableSchema(**kwargs)[源代码]
Schema includes the columns and partitions information of a
odps.models.Table
.There are two ways to initialize a Schema object, first is to provide columns and partitions, the second way is to call the class method
from_lists
. See the examples below:- Example:
>>> columns = [Column(name='num', type='bigint', comment='the column')] >>> partitions = [Partition(name='pt', type='string', comment='the partition')] >>> schema = TableSchema(columns=columns, partitions=partitions) >>> schema.columns [<column num, type bigint>, <partition pt, type string>] >>> >>> schema = TableSchema.from_lists(['num'], ['bigint'], ['pt'], ['string']) >>> schema.columns [<column num, type bigint>, <partition pt, type string>]
- class odps.models.partition.Partition(**kwargs)[源代码]
A partition is a collection of rows in a table whose partition columns are equal to specific values.
In order to write data into partition, users should call the
open_writer
method with with statement. At the same time, theopen_reader
method is used to provide the ability to read records from a partition. The behavior of these methods are the same as those in Table class except that there are no ‘partition’ params.- drop(async_=False, if_exists=False)[源代码]
Drop this partition.
- 参数:
async – run asynchronously if True
if_exists
- 返回:
None
- head(limit, columns=None)[源代码]
Get the head records of a partition
- 参数:
limit – records’ size, 10000 at most
columns (list) – the columns which is subset of the table columns
- 返回:
records
- 返回类型:
list
- open_reader(**kw)[源代码]
Open the reader to read the entire records from this partition.
- 参数:
reopen (bool) – the reader will reuse last one, reopen is true means open a new reader.
endpoint – the tunnel service URL
compress_option (
odps.tunnel.CompressOption
) – compression algorithm, level and strategycompress_algo – compression algorithm, work when
compress_option
is not provided, can bezlib
,snappy
compress_level – used for
zlib
, work whencompress_option
is not providedcompress_strategy – used for
zlib
, work whencompress_option
is not provided
- 返回:
reader,
count
means the full size,status
means the tunnel status- Example:
>>> with partition.open_reader() as reader: >>> count = reader.count # How many records of a partition >>> for record in reader[0: count]: >>> # read all data, actually better to split into reading for many times
- class odps.models.Record(columns=None, schema=None, values=None, max_field_size=None)[源代码]
A record generally means the data of a single line in a table.
- Example:
>>> schema = TableSchema.from_lists(['name', 'id'], ['string', 'string']) >>> record = Record(schema=schema, values=['test', 'test2']) >>> record[0] = 'test' >>> record[0] >>> 'test' >>> record['name'] >>> 'test' >>> record[0:2] >>> ('test', 'test2') >>> record[0, 1] >>> ('test', 'test2') >>> record['name', 'id'] >>> for field in record: >>> print(field) ('name', u'test') ('id', u'test2') >>> len(record) 2 >>> 'name' in record True
- class odps.models.Instance(**kwargs)[源代码]
Instance means that a ODPS task will sometimes run as an instance.
status
can reflect the current situation of a instance.is_terminated
method indicates if the instance has finished.is_successful
method indicates if the instance runs successfully.wait_for_success
method will block the main process until the instance has finished.For a SQL instance, we can use open_reader to read the results.
- Example:
>>> instance = odps.execute_sql('select * from dual') # this sql return the structured data >>> with instance.open_reader() as reader: >>> # handle the record >>> >>> instance = odps.execute_sql('desc dual') # this sql do not return structured data >>> with instance.open_reader() as reader: >>> print(reader.raw) # just return the raw result
- exception DownloadSessionCreationError(msg, request_id=None, code=None, host_id=None, instance_id=None, endpoint=None, tag=None)[源代码]
- class Status(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[源代码]
- class Task(**kwargs)[源代码]
Task stands for each task inside an instance.
It has a name, a task type, the start to end time, and a running status.
- class TaskProgress(**kwargs)[源代码]
TaskProgress reprents for the progress of a task.
A single TaskProgress may consist of several stages.
- Example:
>>> progress = instance.get_task_progress('task_name') >>> progress.get_stage_progress_formatted_string() 2015-11-19 16:39:07 M1_Stg1_job0:0/0/1[0%] R2_1_Stg1_job0:0/0/1[0%]
- get_logview_address(hours=None)[源代码]
Get logview address of the instance object by hours.
- 参数:
hours
- 返回:
logview address
- 返回类型:
str
- get_sql_task_cost()[源代码]
Get cost information of the sql cost task, including input data size, number of UDF, Complexity of the sql task.
NOTE that DO NOT use this function directly as it cannot be applied to instances returned from SQL. Use
o.execute_sql_cost
instead.- 返回:
cost info in dict format
- get_task_cost(task_name)[源代码]
Get task cost
- 参数:
task_name – name of the task
- 返回:
task cost
- 返回类型:
Instance.TaskCost
- Example:
>>> cost = instance.get_task_cost(instance.get_task_names()[0]) >>> cost.cpu_cost 200 >>> cost.memory_cost 4096 >>> cost.input_size 0
- get_task_detail(task_name)[源代码]
Get task’s detail
- 参数:
task_name – task name
- 返回:
the task’s detail
- 返回类型:
list or dict according to the JSON
- get_task_detail2(task_name)[源代码]
Get task’s detail v2
- 参数:
task_name – task name
- 返回:
the task’s detail
- 返回类型:
list or dict according to the JSON
- get_task_info(task_name, key)[源代码]
Get task related information.
- 参数:
task_name – name of the task
key – key of the information item
- 返回:
a string of the task information
- get_task_progress(task_name)[源代码]
Get task’s current progress
- 参数:
task_name – task_name
- 返回:
the task’s progress
- 返回类型:
- get_task_quota(task_name)[源代码]
Get queueing info of the task. Note that time between two calls should larger than 30 seconds, otherwise empty dict is returned.
- 参数:
task_name – name of the task
- 返回:
quota info in dict format
- get_task_result(task_name, timeout=None)[源代码]
Get a single task result.
- 参数:
task_name – task name
- 返回:
task result
- 返回类型:
str
- get_task_results(timeout=None)[源代码]
Get all the task results.
- 返回:
a dict which key is task name, and value is the task result as string
- 返回类型:
dict
- get_task_statuses()[源代码]
Get all tasks’ statuses
- 返回:
a dict which key is the task name and value is the
odps.models.Instance.Task
object- 返回类型:
dict
- get_task_summary(task_name)[源代码]
Get a task’s summary, mostly used for MapReduce.
- 参数:
task_name – task name
- 返回:
summary as a dict parsed from JSON
- 返回类型:
dict
- get_task_workers(task_name=None, json_obj=None)[源代码]
Get workers from task :param task_name: task name :param json_obj: json object parsed from get_task_detail2 :return: list of workers
- get_worker_log(log_id, log_type, size=0)[源代码]
Get logs from worker.
- 参数:
log_id – id of log, can be retrieved from details.
log_type – type of logs. Possible log types contains coreinfo, hs_err_log, jstack, pstack, stderr, stdout, waterfall_summary
size – length of the log to retrieve
- 返回:
log content
- is_running(retry=False)[源代码]
If this instance is still running.
- 返回:
True if still running else False
- 返回类型:
bool
- is_successful(retry=False)[源代码]
If the instance runs successfully.
- 返回:
True if successful else False
- 返回类型:
bool
- is_terminated(retry=False)[源代码]
If this instance has finished or not.
- 返回:
True if finished else False
- 返回类型:
bool
- open_reader(*args, **kwargs)[源代码]
Open the reader to read records from the result of the instance. If tunnel is True, instance tunnel will be used. Otherwise conventional routine will be used. If instance tunnel is not available and tunnel is not specified, the method will fall back to the conventional routine. Note that the number of records returned is limited unless options.limited_instance_tunnel is set to True or limit=True is configured under instance tunnel mode. Otherwise the number of records returned is always limited.
- 参数:
tunnel – if true, use instance tunnel to read from the instance. if false, use conventional routine. if absent, options.tunnel.use_instance_tunnel will be used and automatic fallback is enabled.
limit (bool) – if True, enable the limitation
reopen (bool) – the reader will reuse last one, reopen is true means open a new reader.
endpoint – the tunnel service URL
compress_option (
odps.tunnel.CompressOption
) – compression algorithm, level and strategycompress_algo – compression algorithm, work when
compress_option
is not provided, can bezlib
,snappy
compress_level – used for
zlib
, work whencompress_option
is not providedcompress_strategy – used for
zlib
, work whencompress_option
is not provided
- 返回:
reader,
count
means the full size,status
means the tunnel status- Example:
>>> with instance.open_reader() as reader: >>> count = reader.count # How many records of a table or its partition >>> for record in reader[0: count]: >>> # read all data, actually better to split into reading for many times
- put_task_info(task_name, key, value, check_location=False)[源代码]
Put information into a task.
- 参数:
task_name – name of the task
key – key of the information item
value – value of the information item
- wait_for_completion(interval=1, timeout=None, max_interval=None)[源代码]
Wait for the instance to complete, and neglect the consequence.
- 参数:
interval – time interval to check
max_interval – if specified, next check interval will be multiplied by 2 till max_interval is reached.
timeout – time
- 返回:
None
- wait_for_success(interval=1, timeout=None, max_interval=None)[源代码]
Wait for instance to complete, and check if the instance is successful.
- 参数:
interval – time interval to check
max_interval – if specified, next check interval will be multiplied by 2 till max_interval is reached.
timeout – time
- 返回:
None
- Raise:
odps.errors.ODPSError
if the instance failed
- class odps.models.Resource(**kwargs)[源代码]
Resource is useful when writing UDF or MapReduce. This is an abstract class.
Basically, resource can be either a file resource or a table resource. File resource can be
file
,py
,jar
,archive
in details.
- class odps.models.FileResource(**kwargs)[源代码]
File resource represents for a file.
Use
open
method to open this resource as a file-like object.- class Mode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[源代码]
- flush()[源代码]
Commit the change to ODPS if any change happens. Close will do this automatically.
- 返回:
None
- open(mode='r', encoding='utf-8', stream=False, overwrite=None)[源代码]
The argument
mode
stands for the open mode for this file resource. It can be binary mode if the ‘b’ is inside. For instance, ‘rb’ means opening the resource as read binary mode while ‘r+b’ means opening the resource as read+write binary mode. This is most import when the file is actually binary such as tar or jpeg file, so be aware of opening this file as a correct mode.Basically, the text mode can be ‘r’, ‘w’, ‘a’, ‘r+’, ‘w+’, ‘a+’ just like the builtin python
open
method.r
means read onlyw
means write only, the file will be truncated when openinga
means append onlyr+
means read+write without constraintw+
will truncate first then opening into read+writea+
can read+write, however the written content can only be appended to the end
- 参数:
mode – the mode of opening file, described as above
encoding – utf-8 as default
stream – open in stream mode
overwrite – if True, will overwrite existing resource. True by default.
- 返回:
file-like object
- Example:
>>> with resource.open('r') as fp: >>> fp.read(1) # read one unicode character >>> fp.write('test') # wrong, cannot write under read mode >>> >>> with resource.open('wb') as fp: >>> fp.readlines() # wrong, cannot read under write mode >>> fp.write('hello world') # write bytes >>> >>> with resource.open('test_resource', 'r+') as fp: # open as read-write mode >>> fp.seek(5) >>> fp.truncate() >>> fp.flush()
- read(size=-1)[源代码]
Read the file resource, read all as default.
- 参数:
size – unicode or byte length depends on text mode or binary mode.
- 返回:
unicode or bytes depends on text mode or binary mode
- 返回类型:
str or unicode(Py2), bytes or str(Py3)
- readline(size=-1)[源代码]
Read a single line.
- 参数:
size – If the size argument is present and non-negative, it is a maximum byte count (including the trailing newline) and an incomplete line may be returned. When size is not 0, an empty string is returned only when EOF is encountered immediately
- 返回:
unicode or bytes depends on text mode or binary mode
- 返回类型:
str or unicode(Py2), bytes or str(Py3)
- readlines(sizehint=-1)[源代码]
Read as lines.
- 参数:
sizehint – If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read.
- 返回:
lines
- 返回类型:
list
- seek(pos, whence=0)[源代码]
Seek to some place.
- 参数:
pos – position to seek
whence – if set to 2, will seek to the end
- 返回:
None
- class odps.models.ArchiveResource(**kwargs)[源代码]
File resource representing for the compressed file like .zip/.tgz/.tar.gz/.tar/jar
- class odps.models.TableResource(**kwargs)[源代码]
Take a table as a resource.
- property partition
Get the source table partition.
- 返回:
the source table partition
- property table
Get the table object.
- 返回:
source table
- 返回类型:
- class odps.models.Function(**kwargs)[源代码]
Function can be used in UDF when user writes a SQL.
- property resources
Return all the resources which this function refer to.
- 返回:
resources
- 返回类型:
list
- class odps.models.Worker(**kwargs)[源代码]
Worker information class for worker information and log retrieval.
- class odps.models.ml.OfflineModel(**kwargs)[源代码]
Representing an ODPS offline model.