odps.models.partition 源代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings
from datetime import datetime

from .. import serializers, types, utils
from .core import LazyLoad, XMLRemoteModel, JSONRemoteModel
from .storage_tier import StorageTierInfo


[文档] class Partition(LazyLoad): """ A partition is a collection of rows in a table whose partition columns are equal to specific values. In order to write data into partition, users should call the ``open_writer`` method with **with statement**. At the same time, the ``open_reader`` method is used to provide the ability to read records from a partition. The behavior of these methods are the same as those in Table class except that there are no 'partition' params. """ __slots__ = 'spec', 'creation_time', 'last_meta_modified_time', \ 'last_data_modified_time', 'size', '_is_extend_info_loaded', \ 'is_archived', 'is_exstore', 'lifecycle', 'physical_size', \ 'file_num', 'reserved' class Column(XMLRemoteModel): name = serializers.XMLNodeAttributeField(attr='Name') value = serializers.XMLNodeAttributeField(attr='Value') class PartitionMeta(JSONRemoteModel): creation_time = serializers.JSONNodeField( 'createTime', parse_callback=datetime.fromtimestamp, set_to_parent=True) last_meta_modified_time = serializers.JSONNodeField( 'lastDDLTime', parse_callback=datetime.fromtimestamp, set_to_parent=True) last_data_modified_time = serializers.JSONNodeField( 'lastModifiedTime', parse_callback=datetime.fromtimestamp, set_to_parent=True) size = serializers.JSONNodeField( 'partitionSize', parse_callback=int, set_to_parent=True) class PartitionExtendedMeta(PartitionMeta): is_archived = serializers.JSONNodeField( 'IsArchived', parse_callback=bool, set_to_parent=True) is_exstore = serializers.JSONNodeField( 'IsExstore', parse_callback=bool, set_to_parent=True) lifecycle = serializers.JSONNodeField( 'LifeCycle', parse_callback=int, set_to_parent=True) physical_size = serializers.JSONNodeField( 'PhysicalSize', parse_callback=int, set_to_parent=True) file_num = serializers.JSONNodeField( 'FileNum', parse_callback=int, set_to_parent=True) reserved = serializers.JSONNodeField( 'Reserved', type='json', set_to_parent=True) columns = serializers.XMLNodesReferencesField(Column, 'Column') _schema = serializers.XMLNodeReferenceField(PartitionMeta, 'Schema') _extended_schema = serializers.XMLNodeReferenceField(PartitionExtendedMeta, 'Schema') def __init__(self, **kwargs): self._is_extend_info_loaded = False super(Partition, self).__init__(**kwargs) def __str__(self): return str(self.partition_spec) def __repr__(self): return '<Partition %s.`%s`(%s)>' % ( str(self.table.project.name), str(self.table.name), str(self.partition_spec)) def __getattribute__(self, attr): if attr in ('is_archived', 'is_exstore', 'lifecycle', 'physical_size', 'file_num', 'reserved'): if not self._is_extend_info_loaded: self.reload_extend_info() return object.__getattribute__(self, attr) val = object.__getattribute__(self, attr) if val is None and not self._loaded: if attr in getattr(Partition.PartitionMeta, '__fields'): self.reload() return object.__getattribute__(self, attr) return super(Partition, self).__getattribute__(attr) def _set_state(self, name, parent, client): self.__init__(spec=name, _parent=parent, _client=client) def _name(self): return @classmethod def get_partition_spec(cls, columns=None, spec=None): if spec is not None: return spec spec = types.PartitionSpec() for col in columns: spec[col.name] = col.value return spec @property def last_modified_time(self): warnings.warn( "Partition.last_modified_time is deprecated and will be replaced by " "Partition.last_data_modified_time.", DeprecationWarning, stacklevel=3, ) utils.add_survey_call(".".join( [type(self).__module__, type(self).__name__, "last_modified_time"] )) return self.last_data_modified_time @property def partition_spec(self): return self.get_partition_spec(self._getattr('columns'), self._getattr('spec')) @property def name(self): return str(self.partition_spec) @property def table(self): return self.parent.parent @property def project(self): return self.table.project @property def storage_tier_info(self): return StorageTierInfo.deserial(self.reserved) def reload(self): url = self.resource() params = {'partition': str(self.partition_spec)} resp = self._client.get(url, params=params, curr_schema=self._get_schema_name()) self.parse(self._client, resp, obj=self) self._loaded = True def reload_extend_info(self): url = self.resource() params = {'partition': str(self.partition_spec)} resp = self._client.get( url, action='extended', params=params, curr_schema=self._get_schema_name() ) self.parse(self._client, resp, obj=self) self._is_extend_info_loaded = True
[文档] def head(self, limit, columns=None): """ Get the head records of a partition :param limit: records' size, 10000 at most :param list columns: the columns which is subset of the table columns :return: records :rtype: list .. seealso:: :class:`odps.models.Record` """ return self.table.head(limit, partition=self.partition_spec, columns=columns)
[文档] def to_df(self): """ Create a PyODPS DataFrame from this partition. :return: DataFrame object """ from ..df import DataFrame return DataFrame(self.table).filter_parts(self)
[文档] @utils.with_wait_argument def drop(self, async_=False, if_exists=False): """ Drop this partition. :param async_: run asynchronously if True :param if_exists: :return: None """ return self.parent.delete(self, if_exists=if_exists, async_=async_)
[文档] def open_reader(self, **kw): """ Open the reader to read the entire records from this partition. :param reopen: the reader will reuse last one, reopen is true means open a new reader. :type reopen: bool :param endpoint: the tunnel service URL :param compress_option: compression algorithm, level and strategy :type compress_option: :class:`odps.tunnel.CompressOption` :param compress_algo: compression algorithm, work when ``compress_option`` is not provided, can be ``zlib``, ``snappy`` :param compress_level: used for ``zlib``, work when ``compress_option`` is not provided :param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided :return: reader, ``count`` means the full size, ``status`` means the tunnel status :Example: >>> with partition.open_reader() as reader: >>> count = reader.count # How many records of a partition >>> for record in reader[0: count]: >>> # read all data, actually better to split into reading for many times """ return self.table.open_reader(str(self), **kw)
def open_writer(self, blocks=None, **kw): return self.table.open_writer(self.partition_spec, blocks=blocks, **kw) @utils.with_wait_argument def truncate(self, async_=False): return self.table.truncate(self.partition_spec, async_=async_) @utils.with_wait_argument def set_storage_tier(self, storage_tier, async_=False): self._is_extend_info_loaded = False return self.table.set_storage_tier( storage_tier, partition_spec=self.partition_spec, async_=async_ )