Source code for odps.models.resource

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
from collections import namedtuple

from .. import serializers, utils, types, errors, compat
from ..compat import Enum, six
from ..config import options
from .core import LazyLoad
from .cache import cache, cache_parent

if sys.version_info[0] < 3:
    _StringIOType = type(compat.StringIO())
    _StringIOType = (compat.StringIO, compat.BytesIO)

[docs] class Resource(LazyLoad): """ Resource is useful when writing UDF or MapReduce. This is an abstract class. Basically, resource can be either a file resource or a table resource. File resource can be ``file``, ``py``, ``jar``, ``archive`` in details. .. seealso:: :class:`odps.models.FileResource`, :class:`odps.models.PyResource`, :class:`odps.models.JarResource`, :class:`odps.models.ArchiveResource`, :class:`odps.models.TableResource` """ __slots__ = ( 'content_md5', 'is_temp_resource', 'volume_path', '_type_indicator' )
_type_indicator = 'type' name = serializers.XMLNodeField('Name') owner = serializers.XMLNodeField('Owner') comment = serializers.XMLNodeField('Comment') type = serializers.XMLNodeField('ResourceType', parse_callback=lambda t: Resource.Type(t.upper())) creation_time = serializers.XMLNodeField('CreationTime', parse_callback=utils.parse_rfc822) last_modified_time = serializers.XMLNodeField('LastModifiedTime', parse_callback=utils.parse_rfc822) last_updator = serializers.XMLNodeField('LastUpdator') size = serializers.XMLNodeField('ResourceSize', parse_callback=int) source_table_name = serializers.XMLNodeField('TableName') @classmethod def _get_cls(cls, typo): if typo is None: return cls if isinstance(typo, six.string_types): typo = Resource.Type(typo.upper()) clz = lambda name: globals()[name] if typo == Resource.Type.FILE: return clz('FileResource') elif typo == Resource.Type.JAR: return clz('JarResource') elif typo == Resource.Type.PY: return clz('PyResource') elif typo == Resource.Type.ARCHIVE: return clz('ArchiveResource') elif typo == Resource.Type.TABLE: return clz('TableResource') elif typo == Resource.Type.VOLUMEARCHIVE: return clz('VolumeArchiveResource') elif typo == Resource.Type.VOLUMEFILE: return clz('VolumeFileResource') else: return cls def create(self, overwrite=False, **kw): raise NotImplementedError @staticmethod def _filter_cache(_, **kwargs): return kwargs.get('type') is not None and kwargs['type'] != Resource.Type.UNKOWN @cache def __new__(cls, *args, **kwargs): typo = kwargs.get('type') if typo is not None or (cls != Resource and issubclass(cls, Resource)): return object.__new__(cls._get_cls(typo)) kwargs['type'] = Resource.Type.UNKOWN obj = Resource(**kwargs) obj.reload() return Resource(**obj.extract()) def __init__(self, **kwargs): typo = kwargs.get('type') if isinstance(typo, six.string_types): kwargs['type'] = Resource.Type(typo.upper()) super(Resource, self).__init__(**kwargs) def reload(self): params = {} schema_name = self._get_schema_name() if schema_name is not None: params['curr_schema'] = schema_name url = self.resource() resp = self._client.get(url, action='meta', params=params) self.owner = resp.headers.get('x-odps-owner') resource_type = resp.headers.get('x-odps-resource-type') self.type = Resource.Type(resource_type.upper()) self.comment = resp.headers.get('x-odps-comment') self.last_updator = resp.headers.get('x-odps-updator') size = resp.headers.get('x-odps-resource-size') self.size = None if size is None else int(size) self.creation_time = utils.parse_rfc822( resp.headers.get('x-odps-creation-time')) self.last_modified_time = utils.parse_rfc822( resp.headers.get('Last-Modified')) self.source_table_name = resp.headers.get('x-odps-copy-table-source') self.volume_path = resp.headers.get('x-odps-copy-file-source') self.content_md5 = resp.headers.get('Content-MD5') self._loaded = True def _reload_size(self): params = {} schema_name = self._get_schema_name() if schema_name is not None: params['curr_schema'] = schema_name url = self.resource() resp = self._client.get(url, action='meta', params=params) size = resp.headers.get('x-odps-resource-size') self.size = None if size is None else int(size) def update(self, **kw): raise NotImplementedError def drop(self): return self.parent.delete(self)
[docs] @cache_parent class FileResource(Resource): """ File resource represents for a file. Use ``open`` method to open this resource as a file-like object. """ __slots__ = ('_fp', 'is_part_resource', 'merge_total_bytes')
[docs] class Mode(Enum): READ = 'r' WRITE = 'w' APPEND = 'a' READWRITE = 'r+' TRUNCEREADWRITE = 'w+' APPENDREADWRITE = 'a+'
def create(self, overwrite=False, **kw): file_obj = kw.pop('file_obj', kw.pop('fileobj', None)) is_part_resource = self._getattr("is_part_resource") is_merge_resource = self._getattr('merge_total_bytes') is not None if file_obj is None: raise ValueError('parameter `file_obj` cannot be None, either string or file-like object') if isinstance(file_obj, six.text_type): file_obj = file_obj.encode('utf-8') if ( options.upload_resource_in_chunks and not is_merge_resource and not is_part_resource and self._get_file_size(file_obj) > options.resource_chunk_size ): self._upload_with_stream(file_obj, overwrite) return self if is None or len( == 0: raise errors.ODPSError('File Resource Name should not empty.') method = if not overwrite else self._client.put url = self.parent.resource() if not overwrite else self.resource() headers = { 'Content-Type': 'application/octet-stream', 'Content-Disposition': 'attachment;filename=%s' %, 'x-odps-resource-type': self.type.value.lower(), 'x-odps-resource-name':, } params = {} if self._getattr('comment') is not None: headers['x-odps-comment'] = self.comment if self._getattr('is_temp_resource'): headers['x-odps-resource-istemp'] = 'true' if self.is_temp_resource else 'false' if is_merge_resource: headers['x-odps-resource-merge-total-bytes'] = str(self.merge_total_bytes) params["rOpMerge"] = "true" if is_part_resource: params["rIsPart"] = "true" if not isinstance(file_obj, (six.string_types, six.binary_type)): content = else: content = file_obj self.size = len(content) method( url, content, headers=headers, params=params, curr_schema=self._get_schema_name() ) if overwrite: self.reload() return self def __init__(self, **kw): super(FileResource, self).__init__(**kw) self.type = Resource.Type.FILE self._fp = None @staticmethod def _get_file_size(file_obj): if isinstance(file_obj, six.binary_type): return len(file_obj) elif isinstance(file_obj, _StringIOType): pos = file_obj.tell(), os.SEEK_END) size = file_obj.tell(), os.SEEK_SET) return size elif hasattr(file_obj, "name"): fn = if fn and os.path.exists(fn): return os.path.getsize(fn) return None def _upload_with_stream(self, file_obj, overwrite=True): chunk_size = options.resource_chunk_size with"wb", stream=True, overwrite=overwrite) as res_file: while True: buf = if not buf: break res_file.write(buf) def _is_create(self): if self._loaded: return False try: self._reload_size() return False except errors.NoSuchObject: return True @property def _size(self): return self._fp.size if self._fp else self.size @property def opened(self): return self._fp is not None @property def mode(self): return self._fp.mode
[docs] def open(self, mode='r', encoding='utf-8', stream=False, overwrite=None): """ The argument ``mode`` stands for the open mode for this file resource. It can be binary mode if the 'b' is inside. For instance, 'rb' means opening the resource as read binary mode while 'r+b' means opening the resource as read+write binary mode. This is most import when the file is actually binary such as tar or jpeg file, so be aware of opening this file as a correct mode. Basically, the text mode can be 'r', 'w', 'a', 'r+', 'w+', 'a+' just like the builtin python ``open`` method. * ``r`` means read only * ``w`` means write only, the file will be truncated when opening * ``a`` means append only * ``r+`` means read+write without constraint * ``w+`` will truncate first then opening into read+write * ``a+`` can read+write, however the written content can only be appended to the end :param mode: the mode of opening file, described as above :param encoding: utf-8 as default :param stream: open in stream mode :param overwrite: if True, will overwrite existing resource. True by default. :return: file-like object :Example: >>> with'r') as fp: >>> # read one unicode character >>> fp.write('test') # wrong, cannot write under read mode >>> >>> with'wb') as fp: >>> fp.readlines() # wrong, cannot read under write mode >>> fp.write('hello world') # write bytes >>> >>> with'test_resource', 'r+') as fp: # open as read-write mode >>> >>> fp.truncate() >>> fp.flush() """ from .resourcefile import LocalResourceFile, StreamResourceFile if stream: res_file_type = StreamResourceFile else: res_file_type = LocalResourceFile self._fp = res_file_type(self, mode, encoding, overwrite=overwrite) return self
def _check_read(self): if not self.opened: raise IOError('I/O operation on non-open resource') if self.mode in (FileResource.Mode.WRITE, FileResource.Mode.APPEND): raise IOError('Resource not open for reading')
[docs] def read(self, size=-1): """ Read the file resource, read all as default. :param size: unicode or byte length depends on text mode or binary mode. :return: unicode or bytes depends on text mode or binary mode :rtype: str or unicode(Py2), bytes or str(Py3) """ self._check_read() return
[docs] def readline(self, size=-1): """ Read a single line. :param size: If the size argument is present and non-negative, it is a maximum byte count (including the trailing newline) and an incomplete line may be returned. When size is not 0, an empty string is returned only when EOF is encountered immediately :return: unicode or bytes depends on text mode or binary mode :rtype: str or unicode(Py2), bytes or str(Py3) """ self._check_read() return self._fp.readline(size)
[docs] def readlines(self, sizehint=-1): """ Read as lines. :param sizehint: If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read. :return: lines :rtype: list """ self._check_read() return self._fp.readlines(sizehint)
def _check_write(self): if not self.opened: raise IOError('I/O operation on non-open resource') if self.mode == FileResource.Mode.READ: raise IOError('Resource not open for writing')
[docs] def write(self, content): """ Write content into the file resource :param content: content to write :return: None """ self._check_write() return self._fp.write(content)
[docs] def writelines(self, seq): """ Write lines into the file resource. :param seq: lines :return: None """ self._check_write() return self._fp.writelines(seq)
[docs] def seek(self, pos, whence=compat.SEEK_SET): # io.SEEK_SET """ Seek to some place. :param pos: position to seek :param whence: if set to 2, will seek to the end :return: None """ return, whence)
def seekable(self): return self._fp.seekable()
[docs] def tell(self): """ Tell the current position :return: current position """ return self._fp.tell()
[docs] def truncate(self, size=None): """ Truncate the file resource's size. :param size: If the optional size argument is present, the file is truncated to (at most) that size. The size defaults to the current position. :return: None """ self._check_write() return self._fp.truncate(size)
[docs] def flush(self): """ Commit the change to ODPS if any change happens. Close will do this automatically. :return: None """ return self._fp.flush()
[docs] def close(self): """ Close this file resource. :return: None """ if self._fp is not None: self._fp.close() self._fp = None
def __iter__(self): self._check_read() return self._fp.__iter__() def __next__(self): self._check_read() return next(self._fp) next = __next__ def __enter__(self): return self def __exit__(self, *_): self.close() def update(self, file_obj): return self._parent.update(self, file_obj=file_obj)
[docs] @cache_parent class JarResource(FileResource): """ File resource representing for the .jar file. """ def __init__(self, **kw): super(JarResource, self).__init__(**kw) self.type = Resource.Type.JAR
[docs] @cache_parent class PyResource(FileResource): """ File resource representing for the .py file. """ def __init__(self, **kw): super(PyResource, self).__init__(**kw) self.type = Resource.Type.PY
[docs] @cache_parent class ArchiveResource(FileResource): """ File resource representing for the compressed file like .zip/.tgz/.tar.gz/.tar/jar """ def __init__(self, **kw): super(ArchiveResource, self).__init__(**kw) self.type = Resource.Type.ARCHIVE
[docs] @cache_parent class TableResource(Resource): """ Take a table as a resource. """ _TableSource = namedtuple("_TableSource", "project schema table partition") def __init__(self, **kw): project_name = kw.pop('project_name', None) schema_name = kw.pop('schema_name', None) table_name = kw.pop('table_name', None) partition_spec = kw.pop('partition', None) super(TableResource, self).__init__(**kw) self._init( table_project_name=project_name, table_schema_name=schema_name, table_name=table_name, partition=partition_spec, create=True, ) def create(self, overwrite=False, **kw): if is None or len( == 0: raise errors.ODPSError('Table Resource Name should not be empty.') method = if not overwrite else self._client.put url = self.parent.resource() if not overwrite else self.resource() headers = { 'Content-Type': 'text/plain', 'x-odps-resource-type': self.type.value.lower(), 'x-odps-resource-name':, 'x-odps-copy-table-source': self.source_table_name, } if self._getattr('comment') is not None: headers['x-odps-comment'] = self._getattr('comment') method(url, '', headers=headers, curr_schema=self._get_schema_name()) if overwrite: del self.parent[] return self.parent[] return self def _init(self, create=False, table_project_name=None, table_schema_name=None, table_name=None, **kw): table_project_name = table_project_name or kw.get("project_name") if table_name is not None and '.' in table_name: parts = table_name.split('.') if len(parts) == 2: assert table_schema_name is None table_project_name, table_name = parts else: table_project_name, table_schema_name, table_name = parts table_project_name = table_project_name.strip() if table_schema_name is not None: table_schema_name = table_schema_name.strip() table_name = table_name.strip() if table_name.startswith("`") and table_name.endswith("`"): table_name = table_name[1:-1] try: if not create: table_source = self._get_table_source() old_table_project_name = table_source.project old_schema_name = table_source.schema old_table_name = table_source.table old_partition = table_source.partition else: old_table_project_name, old_schema_name, old_table_name, old_partition = [None] * 4 except AttributeError: old_table_project_name, old_schema_name, old_table_name, old_partition = [None] * 4 table_project_name = table_project_name or old_table_project_name or table_schema_name = table_schema_name or old_schema_name or self._get_schema_name() table_name = table_name or old_table_name partition = kw.get('partition', old_partition) if table_name is not None: if table_schema_name: self.source_table_name = '.'.join((table_project_name, table_schema_name, table_name)) else: self.source_table_name = '.'.join((table_project_name, table_name)) if partition is not None: if not isinstance(partition, types.PartitionSpec): partition_spec = types.PartitionSpec(partition) else: partition_spec = partition self.source_table_name = ( '%s partition(%s)' % ( self.source_table_name.split(' partition(')[0], partition_spec ) ) def _get_table_source(self): if self.source_table_name is None: raise AttributeError('source_table_name not defined.') splits = self.source_table_name.split(' partition(') if len(splits) < 2: partition = None else: partition = splits[1].split(')', 1)[0].strip() src = splits[0] if '.' not in src: raise ValueError('Malformed source table name: %s' % src) table_parts = src.split('.') if len(table_parts) == 2: schema_name = None project_name, table_name = table_parts else: project_name, schema_name, table_name = table_parts return TableResource._TableSource( project_name, schema_name, table_name, partition ) def get_source_table(self): try: table_source = self._get_table_source() except AttributeError: return tables_parent = self.project.parent[table_source.project] if table_source.schema: tables_parent = tables_parent.schemas[table_source.schema] return tables_parent.tables[table_source.table] def get_source_table_partition(self): if self.source_table_name is None: return splits = self.source_table_name.split(' partition(') if len(splits) < 2: return partition = splits[1].split(')', 1)[0].strip() return types.PartitionSpec(partition) @property def table(self): """ Get the table object. :return: source table :rtype: :class:`odps.models.Table` .. seealso:: :class:`odps.models.Table` """ return self.get_source_table() @property def partition(self): """ Get the source table partition. :return: the source table partition """ pt = self.get_source_table_partition() if pt is None: return return self.get_source_table().get_partition(pt)
[docs] def open_reader(self, **kwargs): """ Open reader on the table resource """ return self.get_source_table().open_reader( partition=self.get_source_table_partition(), **kwargs )
[docs] def open_writer(self, **kwargs): """ Open writer on the table resource """ return self.get_source_table().open_writer( partition=self.get_source_table_partition(), **kwargs )
[docs] def update( self, table_project_name=None, table_schema_name=None, table_name=None, *args, **kw): """ Update this resource. :param table_project_name: the source table's project :param table_name: the source table's name :param partition: the source table's partition :return: self """ if len(args) > 0: kw['partition'] = args[0] self._init( table_project_name=table_project_name, table_schema_name=table_schema_name, table_name=table_name, **kw ) resources = self.parent return resources.update(self)
@cache_parent class VolumeResource(Resource): def create(self, overwrite=False, **kw): if is None or len( == 0: raise errors.ODPSError('Volume Resource Name should not be empty.') method = if not overwrite else self._client.put url = self.parent.resource() if not overwrite else self.resource() headers = { 'Content-Type': 'text/plain', 'x-odps-resource-type': self.type.value.lower(), 'x-odps-resource-name':, 'x-odps-copy-file-source': self.volume_path, } if self._getattr('comment') is not None: headers['x-odps-comment'] = self._getattr('comment') method(url, '', headers=headers, curr_schema=self._get_schema_name()) if overwrite: del self.parent[] return self.parent[] return self @cache_parent class VolumeFileResource(VolumeResource): """ Volume resource represents for a volume archive """ def __init__(self, **kw): okw = kw.copy() okw.pop('volume_file', None) super(VolumeFileResource, self).__init__(**okw) self.type = Resource.Type.VOLUMEFILE def create(self, overwrite=False, **kw): if 'volume_file' in kw: vf = kw.pop('volume_file') self.volume_path = vf.path return super(VolumeFileResource, self).create(overwrite, **kw) @cache_parent class VolumeArchiveResource(VolumeFileResource): """ Volume archive resource represents for a volume archive """ def __init__(self, **kw): super(VolumeArchiveResource, self).__init__(**kw) self.type = Resource.Type.VOLUMEARCHIVE