#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2017 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from .core import LazyLoad, JSONRemoteModel
from .record import Record
from .partitions import Partitions
from .. import types as odps_types, serializers, utils, readers
from ..compat import six, dir2
[docs]class TableSchema(odps_types.OdpsSchema, JSONRemoteModel):
"""
Schema includes the columns and partitions information of a :class:`odps.models.Table`.
There are two ways to initialize a Schema object, first is to provide columns and partitions,
the second way is to call the class method ``from_lists``. See the examples below:
:Example:
>>> columns = [Column(name='num', type='bigint', comment='the column')]
>>> partitions = [Partition(name='pt', type='string', comment='the partition')]
>>> schema = Schema(columns=columns, partitions=partitions)
>>> schema.columns
[<column num, type bigint>, <partition pt, type string>]
>>>
>>> schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])
>>> schema.columns
[<column num, type bigint>, <partition pt, type string>]
"""
class Shard(JSONRemoteModel):
hub_lifecycle = serializers.JSONNodeField('HubLifecycle')
shard_num = serializers.JSONNodeField('ShardNum')
distribute_cols = serializers.JSONNodeField('DistributeCols')
sort_cols = serializers.JSONNodeField('SortCols')
class TableColumn(odps_types.Column, JSONRemoteModel):
name = serializers.JSONNodeField('name')
type = serializers.JSONNodeField('type', parse_callback=odps_types.validate_data_type)
comment = serializers.JSONNodeField('comment')
label = serializers.JSONNodeField('label')
def __init__(self, **kwargs):
JSONRemoteModel.__init__(self, **kwargs)
if self.type is not None:
self.type = odps_types.validate_data_type(self.type)
class TablePartition(odps_types.Partition, TableColumn):
def __init__(self, **kwargs):
TableSchema.TableColumn.__init__(self, **kwargs)
def __init__(self, **kwargs):
kwargs['_columns'] = columns = kwargs.pop('columns', None)
kwargs['_partitions'] = partitions = kwargs.pop('partitions', None)
JSONRemoteModel.__init__(self, **kwargs)
odps_types.OdpsSchema.__init__(self, columns=columns, partitions=partitions)
def load(self):
self.update(self._columns, self._partitions)
self.build_snapshot()
comment = serializers.JSONNodeField('comment', set_to_parent=True)
owner = serializers.JSONNodeField('owner', set_to_parent=True)
creation_time = serializers.JSONNodeField(
'createTime', parse_callback=datetime.fromtimestamp,
set_to_parent=True)
last_modified_time = serializers.JSONNodeField(
'lastModifiedTime', parse_callback=datetime.fromtimestamp,
set_to_parent=True)
last_meta_modified_time = serializers.JSONNodeField(
'lastDDLTime', parse_callback=datetime.fromtimestamp,
set_to_parent=True)
is_virtual_view = serializers.JSONNodeField(
'isVirtualView', parse_callback=bool, set_to_parent=True)
lifecycle = serializers.JSONNodeField(
'lifecycle', parse_callback=int, set_to_parent=True)
view_text = serializers.JSONNodeField('viewText', set_to_parent=True)
size = serializers.JSONNodeField("size", parse_callback=int, set_to_parent=True)
is_archived = serializers.JSONNodeField(
'IsArchived', parse_callback=bool, set_to_parent=True)
physical_size = serializers.JSONNodeField(
'PhysicalSize', parse_callback=int, set_to_parent=True)
file_num = serializers.JSONNodeField(
'FileNum', parse_callback=int, set_to_parent=True)
record_num = serializers.JSONNodeField(
'recordNum', parse_callback=int, set_to_parent=True)
location = serializers.JSONNodeField(
'location', set_to_parent=True)
storage_handler = serializers.JSONNodeField(
'storageHandler', set_to_parent=True)
resources = serializers.JSONNodeField(
'resources', set_to_parent=True)
serde_properties = serializers.JSONNodeField(
'serDeProperties', type='json', set_to_parent=True)
reserved = serializers.JSONNodeField(
'Reserved', type='json', set_to_parent=True)
shard = serializers.JSONNodeReferenceField(
Shard, 'shardInfo', check_before=['shardExist'], set_to_parent=True)
table_label = serializers.JSONNodeField(
'tableLabel', callback=lambda t: t if t != '0' else '', set_to_parent=True)
_columns = serializers.JSONNodesReferencesField(TableColumn, 'columns')
_partitions = serializers.JSONNodesReferencesField(TablePartition, 'partitionKeys')
def __getstate__(self):
return self._columns, self._partitions
def __setstate__(self, state):
columns, partitions = state
self.__init__(columns=columns, partitions=partitions)
def __dir__(self):
return sorted(set(dir2(self)) - set(type(self)._parent_attrs))
[docs]class Table(LazyLoad):
"""
Table means the same to the RDBMS table, besides, a table can consist of partitions.
Table's properties are the same to the ones of :class:`odps.models.Project`,
which will not load from remote ODPS service until users try to get them.
In order to write data into table, users should call the ``open_writer``
method with **with statement**. At the same time, the ``open_reader`` method is used
to provide the ability to read records from a table or its partition.
:Example:
>>> table = odps.get_table('my_table')
>>> table.owner # first will load from remote
>>> table.reload() # reload to update the properties
>>>
>>> for record in table.head(5):
>>> # check the first 5 records
>>> for record in table.head(5, partition='pt=test', columns=['my_column'])
>>> # only check the `my_column` column from certain partition of this table
>>>
>>> with table.open_reader() as reader:
>>> count = reader.count # How many records of a table or its partition
>>> for record in record[0: count]:
>>> # read all data, actually better to split into reading for many times
>>>
>>> with table.open_writer() as writer:
>>> writer.write(records)
>>> with table.open_writer(partition='pt=test', blocks=[0, 1]):
>>> writer.write(0, gen_records(block=0))
>>> writer.write(1, gen_records(block=1)) # we can do this parallel
"""
_extend_args = 'is_archived', 'physical_size', 'file_num', 'location', \
'storage_handler', 'resources', 'serde_properties', \
'reserved'
__slots__ = '_is_extend_info_loaded', 'last_meta_modified_time', 'is_virtual_view', \
'lifecycle', 'view_text', 'size', 'shard', '_table_tunnel', \
'_id_thread_local', 'record_num'
__slots__ += _extend_args
name = serializers.XMLNodeField('Name')
table_id = serializers.XMLNodeField('TableId')
format = serializers.XMLNodeAttributeField(attr='format')
schema = serializers.XMLNodeReferenceField(TableSchema, 'Schema')
comment = serializers.XMLNodeField('Comment')
owner = serializers.XMLNodeField('Owner')
table_label = serializers.XMLNodeField('TableLabel')
creation_time = serializers.XMLNodeField('CreationTime',
parse_callback=utils.parse_rfc822)
last_modified_time = serializers.XMLNodeField('LastModifiedTime',
parse_callback=utils.parse_rfc822)
_download_ids = utils.thread_local_attribute('_id_thread_local', dict)
_upload_ids = utils.thread_local_attribute('_id_thread_local', dict)
def __init__(self, **kwargs):
self._is_extend_info_loaded = False
super(Table, self).__init__(**kwargs)
try:
del self._id_thread_local
except AttributeError:
pass
def reload(self):
url = self.resource()
resp = self._client.get(url)
self.parse(self._client, resp, obj=self)
self.schema.load()
self._loaded = True
def reload_extend_info(self):
params = {'extended': ''}
resp = self._client.get(self.resource(), params=params)
self.parse(self._client, resp, obj=self)
self._is_extend_info_loaded = True
if not self._loaded:
self.schema = None
def __getattribute__(self, attr):
if attr in type(self)._extend_args:
if not self._is_extend_info_loaded:
self.reload_extend_info()
return object.__getattribute__(self, attr)
val = object.__getattribute__(self, attr)
if val is None and not self._loaded:
if attr in getattr(TableSchema, '__fields'):
self.reload()
return object.__getattribute__(self, attr)
return super(Table, self).__getattribute__(attr)
def _repr(self):
buf = six.StringIO()
buf.write('odps.Table\n')
buf.write(' name: {0}.`{1}`\n'.format(self.project.name, self.name))
name_space = 2 * max(len(col.name) for col in self.schema.columns)
type_space = 2 * max(len(repr(col.type)) for col in self.schema.columns)
not_empty = lambda field: field is not None and len(field.strip()) > 0
buf.write(' schema:\n')
cols_strs = []
for col in self.schema._columns:
cols_strs.append('{0}: {1}{2}'.format(
col.name.ljust(name_space),
repr(col.type).ljust(type_space),
'# {0}'.format(utils.to_str(col.comment)) if not_empty(col.comment) else ''
))
buf.write(utils.indent('\n'.join(cols_strs), 4))
buf.write('\n')
if self.schema.partitions:
buf.write(' partitions:\n')
partition_strs = []
for partition in self.schema.partitions:
partition_strs.append('{0}: {1}{2}'.format(
partition.name.ljust(name_space),
repr(partition.type).ljust(type_space),
'# {0}'.format(utils.to_str(partition.comment)) if not_empty(partition.comment) else ''
))
buf.write(utils.indent('\n'.join(partition_strs), 4))
return buf.getvalue()
@property
def stored_as(self):
return (self.reserved or dict()).get('StoredAs')
@staticmethod
def gen_create_table_sql(table_name, table_schema, comment=None, if_not_exists=False,
lifecycle=None, shard_num=None, hub_lifecycle=None,
with_column_comments=True, project=None, **kw):
from ..utils import escape_odps_string
buf = six.StringIO()
table_name = utils.to_text(table_name)
project = utils.to_text(project)
comment = utils.to_text(comment)
stored_as = kw.get('stored_as')
external_stored_as = kw.get('external_stored_as')
storage_handler = kw.get('storage_handler')
buf.write(u'CREATE%s TABLE ' % (' EXTERNAL' if storage_handler or external_stored_as else ''))
if if_not_exists:
buf.write(u'IF NOT EXISTS ')
if project is not None:
buf.write(u'%s.`%s` ' % (project, table_name))
else:
buf.write(u'`%s` ' % table_name)
if isinstance(table_schema, six.string_types):
buf.write(u'(\n')
buf.write(table_schema)
buf.write(u'\n)\n')
if comment:
buf.write(u"COMMENT '%s'\n" % escape_odps_string(comment))
elif isinstance(table_schema, tuple):
buf.write(u'(\n')
buf.write(table_schema[0])
buf.write(u'\n)\n')
if comment:
buf.write(u"COMMENT '%s'\n" % escape_odps_string(comment))
buf.write(u'PARTITIONED BY ')
buf.write(u'(\n')
buf.write(table_schema[1])
buf.write(u'\n)\n')
else:
def write_columns(col_array):
size = len(col_array)
buf.write(u'(\n')
for idx, column in enumerate(col_array):
buf.write(u' `%s` %s' % (utils.to_text(column.name), utils.to_text(column.type)))
if with_column_comments and column.comment:
buf.write(u" COMMENT '%s'" % utils.to_text(column.comment))
if idx < size - 1:
buf.write(u',\n')
buf.write(u'\n)\n')
write_columns(table_schema.simple_columns)
if comment:
buf.write(u"COMMENT '%s'\n" % comment)
if table_schema.partitions:
buf.write(u'PARTITIONED BY ')
write_columns(table_schema.partitions)
serde_properties = kw.get('serde_properties')
location = kw.get('location')
resources = kw.get('resources')
if storage_handler or external_stored_as:
if storage_handler:
buf.write("STORED BY '%s'\n" % escape_odps_string(storage_handler))
else:
buf.write("STORED AS %s\n" % escape_odps_string(external_stored_as))
if serde_properties:
buf.write('WITH SERDEPROPERTIES (\n')
for idx, k in enumerate(serde_properties):
buf.write(" '%s' = '%s'" % (escape_odps_string(k), escape_odps_string(serde_properties[k])))
if idx + 1 < len(serde_properties):
buf.write(',')
buf.write('\n')
buf.write(')\n')
if location:
buf.write("LOCATION '%s'\n" % location)
if resources:
buf.write("USING '%s'\n" % resources)
if stored_as:
buf.write("STORED AS %s\n" % escape_odps_string(stored_as))
if lifecycle is not None and lifecycle > 0:
buf.write(u'LIFECYCLE %s\n' % lifecycle)
if shard_num is not None:
buf.write(u'INTO %s SHARDS' % shard_num)
if hub_lifecycle is not None:
buf.write(u' HUBLIFECYCLE %s\n' % hub_lifecycle)
return buf.getvalue().strip()
[docs] def get_ddl(self, with_comments=True, if_not_exists=False):
"""
Get DDL SQL statement for the given table.
:param with_comments: append comment for table and each column
:return: DDL statement
"""
shard_num = self.shard.shard_num if self.shard is not None else None
return self.gen_create_table_sql(
self.name, self.schema, self.comment if with_comments else None,
if_not_exists=if_not_exists, with_column_comments=with_comments,
lifecycle=self.lifecycle, shard_num=shard_num, project=self.project.name,
storage_handler=self.storage_handler, serde_properties=self.serde_properties,
location=self.location, resources=self.resources,
)
[docs] def head(self, limit, partition=None, columns=None):
"""
Get the head records of a table or its partition.
:param limit: records' size, 10000 at most
:param partition: partition of this table
:param columns: the columns which is subset of the table columns
:type columns: list
:return: records
:rtype: list
.. seealso:: :class:`odps.models.Record`
"""
if limit <= 0:
raise ValueError('limit number should >= 0.')
params = {'data': '', 'linenum': limit}
if partition is not None:
if not isinstance(partition, odps_types.PartitionSpec):
partition = odps_types.PartitionSpec(partition)
params['partition'] = str(partition)
if columns is not None and len(columns) > 0:
col_name = lambda col: col.name if isinstance(col, odps_types.Column) else col
params['cols'] = ','.join(col_name(col) for col in columns)
resp = self._client.get(self.resource(), params=params, stream=True)
with readers.RecordReader(self.schema, resp) as reader:
for record in reader:
yield record
def _create_table_tunnel(self, endpoint=None):
if self._table_tunnel is not None:
return self._table_tunnel
from ..tunnel import TableTunnel
self._table_tunnel = TableTunnel(client=self._client, project=self.project,
endpoint=endpoint or self.project._tunnel_endpoint)
return self._table_tunnel
[docs] def open_reader(self, partition=None, **kw):
"""
Open the reader to read the entire records from this table or its partition.
:param partition: partition of this table
:param reopen: the reader will reuse last one, reopen is true means open a new reader.
:type reopen: bool
:param endpoint: the tunnel service URL
:param compress_option: compression algorithm, level and strategy
:type compress_option: :class:`odps.tunnel.CompressOption`
:param compress_algo: compression algorithm, work when ``compress_option`` is not provided,
can be ``zlib``, ``snappy``
:param compress_level: used for ``zlib``, work when ``compress_option`` is not provided
:param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided
:param download_id: use existing download_id to download table contents
:return: reader, ``count`` means the full size, ``status`` means the tunnel status
:Example:
>>> with table.open_reader() as reader:
>>> count = reader.count # How many records of a table or its partition
>>> for record in reader[0: count]:
>>> # read all data, actually better to split into reading for many times
"""
from ..tunnel.tabletunnel import TableDownloadSession
reopen = kw.pop('reopen', False)
endpoint = kw.pop('endpoint', None)
download_id = kw.pop('download_id', None)
timeout = kw.pop('timeout', None)
schema = self.schema
table_name = self.name
rest_client = self._client
project = self.project
tunnel_endpoint = self.project._tunnel_endpoint
tunnel = self._create_table_tunnel(endpoint=endpoint)
if download_id is None:
download_ids = self._download_ids
download_id = download_ids.get(partition) if not reopen else None
download_session = tunnel.create_download_session(
table=self, partition_spec=partition, download_id=download_id, timeout=timeout, **kw)
if download_id and download_session.status != TableDownloadSession.Status.Normal:
download_session = tunnel.create_download_session(
table=self, partition_spec=partition, timeout=timeout, **kw)
download_ids[partition] = download_session.id
class RecordReader(readers.AbstractRecordReader):
def __init__(self, partition_spec=None):
self._it = iter(self)
self._schema = schema
self._partition_spec = partition_spec
@property
def download_id(self):
return download_session.id
@property
def count(self):
return download_session.count
@property
def status(self):
return download_session.status
def __iter__(self):
for record in self.read():
yield record
def __next__(self):
return next(self._it)
next = __next__
def _iter(self, start=None, end=None, step=None):
count = self._calc_count(start, end, step)
return self.read(start=start, count=count, step=step)
def read(self, start=None, count=None, step=None,
compress=False, columns=None):
start = start or 0
step = step or 1
count = count*step if count is not None else self.count-start
if count == 0:
return
with download_session.open_record_reader(
start, count, compress=compress, columns=columns) as reader:
for record in reader[::step]:
yield record
def to_pandas(self, n_process=1):
import pandas as pd
import multiprocessing
from multiprocessing import Pipe
session_id, count = download_session.id, download_session.count
if n_process == 1:
return super(RecordReader, self).to_pandas()
try:
_mp_context = multiprocessing.get_context('fork')
except ValueError:
raise ValueError('`n_process > 1` is not supported on Windows.')
def read_table_split(conn, download_id, start, count, idx, partition_spec=None):
# read part data
from odps.tunnel import TableTunnel
tunnel = TableTunnel(client=rest_client, project=project,
endpoint=tunnel_endpoint)
session = tunnel.create_download_session(table_name, download_id=download_id,
partition_spec=partition_spec)
data = session.open_record_reader(start, count).to_pandas()
conn.send((idx, data))
split_count = count // n_process + (count % n_process != 0)
start = 0
conns = []
for i in range(n_process):
parent_conn, child_conn = Pipe()
p = _mp_context.Process(target=read_table_split, args=(child_conn, session_id,
start, split_count, i, self._partition_spec))
p.start()
start += split_count
conns.append(parent_conn)
results = [c.recv() for c in conns]
splits = sorted(results, key=lambda x: x[0])
return pd.concat([d[1] for d in splits]).reset_index(drop=True)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
return RecordReader(partition)
[docs] def open_writer(self, partition=None, blocks=None, **kw):
"""
Open the writer to write records into this table or its partition.
:param partition: partition of this table
:param blocks: block ids to open
:param reopen: the reader will reuse last one, reopen is true means open a new reader.
:type reopen: bool
:param create_partition: if true, the partition will be created if not exist
:type create_partition: bool
:param endpoint: the tunnel service URL
:param upload_id: use existing upload_id to upload data
:param compress_option: compression algorithm, level and strategy
:type compress_option: :class:`odps.tunnel.CompressOption`
:param compress_algo: compression algorithm, work when ``compress_option`` is not provided,
can be ``zlib``, ``snappy``
:param compress_level: used for ``zlib``, work when ``compress_option`` is not provided
:param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided
:return: writer, status means the tunnel writer status
:Example:
>>> with table.open_writer() as writer:
>>> writer.write(records)
>>> with table.open_writer(partition='pt=test', blocks=[0, 1]):
>>> writer.write(0, gen_records(block=0))
>>> writer.write(1, gen_records(block=1)) # we can do this parallel
"""
from ..tunnel.tabletunnel import TableUploadSession
table_object = self
reopen = kw.pop('reopen', False)
commit = kw.pop('commit', True)
create_partition = kw.pop('create_partition', False)
endpoint = kw.pop('endpoint', None)
upload_id = kw.pop('upload_id', None)
if partition and not isinstance(partition, odps_types.PartitionSpec):
partition = odps_types.PartitionSpec(partition)
if create_partition and not self.exist_partition(create_partition):
self.create_partition(partition, if_not_exists=True)
tunnel = self._create_table_tunnel(endpoint=endpoint)
if upload_id is None:
upload_ids = self._upload_ids
upload_id = upload_ids.get(partition) if not reopen else None
upload_session = tunnel.create_upload_session(table=self, partition_spec=partition,
upload_id=upload_id, **kw)
if upload_id and upload_session.status.value != TableUploadSession.Status.Normal.value:
# check upload session status
upload_session = tunnel.create_upload_session(table=self, partition_spec=partition, **kw)
upload_id = None
upload_ids[partition] = upload_session.id
blocks = blocks or upload_session.blocks or [0, ]
blocks_writes = [False] * len(blocks)
blocks_writers = [None] * len(blocks)
if upload_id:
for block in upload_session.blocks:
blocks_writes[blocks.index(block)] = True
class RecordWriter(object):
def __init__(self, table):
self._table = table
self._closed = False
@property
def upload_id(self):
return upload_session.id
@property
def status(self):
return upload_session.status
def write(self, *args, **kwargs):
from types import GeneratorType
from itertools import chain
if self._closed:
raise IOError('Cannot write to a closed writer.')
block_id = kwargs.get('block_id')
if block_id is None:
if isinstance(args[0], six.integer_types):
block_id = args[0]
args = args[1:]
else:
block_id = 0
if len(args) == 1:
arg = args[0]
if isinstance(arg, Record):
records = [arg, ]
elif isinstance(arg, (list, tuple)):
if isinstance(arg[0], Record):
records = arg
elif isinstance(arg[0], (list, tuple)):
records = (table_object.new_record(vals) for vals in arg)
else:
records = [table_object.new_record(arg), ]
elif isinstance(arg, GeneratorType):
try:
# peek the first element and then put back
next_arg = six.next(arg)
chained = chain((next_arg, ), arg)
if isinstance(next_arg, Record):
records = chained
else:
records = (table_object.new_record(vals) for vals in chained)
except StopIteration:
records = ()
else:
raise ValueError('Unsupported record type.')
elif len(args) > 1:
records = args
else:
raise ValueError('Cannot write no records to table.')
compress = kwargs.get('compress', False)
idx = blocks.index(block_id)
writer = blocks_writers[idx]
if writer is None:
writer = blocks_writers[idx] = \
upload_session.open_record_writer(block_id, compress=compress)
for record in records:
writer.write(record)
blocks_writes[idx] = True
def close(self):
[writer.close() for writer in blocks_writers if writer is not None]
if commit:
written_blocks = [block for block, block_write in zip(blocks, blocks_writes) if block_write]
upload_session.commit(written_blocks)
upload_ids[partition] = None
self._closed = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# if an error occurs inside the with block, we do not commit
if exc_val is not None:
return
self.close()
return RecordWriter(self)
@property
def project(self):
return self.parent.parent
@property
def partitions(self):
return Partitions(parent=self, client=self._client)
[docs] def create_partition(self, partition_spec, if_not_exists=False, async_=False, **kw):
"""
Create a partition within the table.
:param partition_spec: specification of the partition.
:param if_not_exists:
:param async_:
:return: partition object
:rtype: odps.models.partition.Partition
"""
async_ = kw.get('async', async_)
return self.partitions.create(partition_spec, if_not_exists=if_not_exists, async_=async_)
[docs] def delete_partition(self, partition_spec, if_exists=False, async_=False, **kw):
"""
Delete a partition within the table.
:param partition_spec: specification of the partition.
:param if_exists:
:param async_:
"""
async_ = kw.get('async', async_)
return self.partitions.delete(partition_spec, if_exists=if_exists, async_=async_)
[docs] def exist_partition(self, partition_spec):
"""
Check if a partition exists within the table.
:param partition_spec: specification of the partition.
"""
return partition_spec in self.partitions
[docs] def exist_partitions(self, prefix_spec=None):
"""
Check if partitions with provided conditions exist.
:param prefix_spec: prefix of partition
:return: whether partitions exist
"""
try:
next(self.partitions.iterate_partitions(spec=prefix_spec))
except StopIteration:
return False
return True
[docs] def iterate_partitions(self, spec=None):
"""
Create an iterable object to iterate over partitions.
:param spec: specification of the partition.
"""
return self.partitions.iterate_partitions(spec=spec)
[docs] def get_partition(self, partition_spec):
"""
Get a partition with given specifications.
:param partition_spec: specification of the partition.
:return: partition object
:rtype: odps.models.partition.Partition
"""
return self.partitions[partition_spec]
[docs] def truncate(self, async_=False, **kw):
"""
truncate this table.
:param async_: run asynchronously if True
:return: None
"""
from .tasks import SQLTask
async_ = kw.get('async', async_)
task = SQLTask(name='SQLTruncateTableTask', query='truncate table %s.%s' % (self.project.name, self.name))
instance = self.project.parent[self._client.project].instances.create(task=task)
if not async_:
instance.wait_for_success()
else:
return instance
[docs] def drop(self, async_=False, if_exists=False, **kw):
"""
Drop this table.
:param async_: run asynchronously if True
:return: None
"""
async_ = kw.get('async', async_)
return self.parent.delete(self, async_=async_, if_exists=if_exists)
[docs] def new_record(self, values=None):
"""
Generate a record of the table.
:param values: the values of this records
:type values: list
:return: record
:rtype: :class:`odps.models.Record`
:Example:
>>> table = odps.create_table('test_table', schema=Schema.from_lists(['name', 'id'], ['sring', 'string']))
>>> record = table.new_record()
>>> record[0] = 'my_name'
>>> record[1] = 'my_id'
>>> record = table.new_record(['my_name', 'my_id'])
.. seealso:: :class:`odps.models.Record`
"""
return Record(schema=self.schema, values=values)
[docs] def to_df(self):
"""
Create a PyODPS DataFrame from this table.
:return: DataFrame object
"""
from ..df import DataFrame
return DataFrame(self)