#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2017 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .core import LazyLoad
from .cache import cache, cache_parent
from .. import serializers, utils, types, errors, compat
from ..compat import Enum, six
RESOURCE_SIZE_MAX = 512 * 1024 * 1024 # a single resource's size must be at most 512M
[docs]class Resource(LazyLoad):
"""
Resource is useful when writing UDF or MapReduce. This is an abstract class.
Basically, resource can be either a file resource or a table resource.
File resource can be ``file``, ``py``, ``jar``, ``archive`` in details.
.. seealso:: :class:`odps.models.FileResource`, :class:`odps.models.PyResource`,
:class:`odps.models.JarResource`, :class:`odps.models.ArchiveResource`,
:class:`odps.models.TableResource`
"""
__slots__ = 'content_md5', 'is_temp_resource', 'volume_path', '_type_indicator'
[docs] class Type(Enum):
FILE = 'FILE'
JAR = 'JAR'
PY = 'PY'
ARCHIVE = 'ARCHIVE'
TABLE = 'TABLE'
VOLUMEFILE = 'VOLUMEFILE'
VOLUMEARCHIVE = 'VOLUMEARCHIVE'
UNKOWN = 'UNKOWN'
_type_indicator = 'type'
name = serializers.XMLNodeField('Name')
owner = serializers.XMLNodeField('Owner')
comment = serializers.XMLNodeField('Comment')
type = serializers.XMLNodeField('ResourceType', parse_callback=lambda t: Resource.Type(t.upper()))
creation_time = serializers.XMLNodeField('CreationTime', parse_callback=utils.parse_rfc822)
last_modified_time = serializers.XMLNodeField('LastModifiedTime', parse_callback=utils.parse_rfc822)
last_updator = serializers.XMLNodeField('LastUpdator')
size = serializers.XMLNodeField('ResourceSize', parse_callback=int)
source_table_name = serializers.XMLNodeField('TableName')
@classmethod
def _get_cls(cls, typo):
if typo is None:
return cls
if isinstance(typo, six.string_types):
typo = Resource.Type(typo.upper())
clz = lambda name: globals()[name]
if typo == Resource.Type.FILE:
return clz('FileResource')
elif typo == Resource.Type.JAR:
return clz('JarResource')
elif typo == Resource.Type.PY:
return clz('PyResource')
elif typo == Resource.Type.ARCHIVE:
return clz('ArchiveResource')
elif typo == Resource.Type.TABLE:
return clz('TableResource')
elif typo == Resource.Type.VOLUMEARCHIVE:
return clz('VolumeArchiveResource')
elif typo == Resource.Type.VOLUMEFILE:
return clz('VolumeFileResource')
else:
return cls
def create(self, overwrite=False, **kw):
raise NotImplementedError
@staticmethod
def _filter_cache(_, **kwargs):
return kwargs.get('type') is not None and kwargs['type'] != Resource.Type.UNKOWN
@cache
def __new__(cls, *args, **kwargs):
typo = kwargs.get('type')
if typo is not None or (cls != Resource and issubclass(cls, Resource)):
return object.__new__(cls._get_cls(typo))
kwargs['type'] = Resource.Type.UNKOWN
obj = Resource(**kwargs)
obj.reload()
return Resource(**obj.extract())
def __init__(self, **kwargs):
typo = kwargs.get('type')
if isinstance(typo, six.string_types):
kwargs['type'] = Resource.Type(typo.upper())
super(Resource, self).__init__(**kwargs)
@property
def _project(self):
return self._parent._parent.name
@property
def project(self):
return self._project
def reload(self):
url = self.resource()
resp = self._client.get(url, params={'meta': ''})
self.owner = resp.headers.get('x-odps-owner')
resource_type = resp.headers.get('x-odps-resource-type')
self.type = Resource.Type(resource_type.upper())
self.comment = resp.headers.get('x-odps-comment')
self.last_updator = resp.headers.get('x-odps-updator')
size = resp.headers.get('x-odps-resource-size')
self.size = None if size is None else int(size)
self.creation_time = utils.parse_rfc822(
resp.headers.get('x-odps-creation-time'))
self.last_modified_time = utils.parse_rfc822(
resp.headers.get('Last-Modified'))
self.source_table_name = resp.headers.get('x-odps-copy-table-source')
self.volume_path = resp.headers.get('x-odps-copy-file-source')
self.content_md5 = resp.headers.get('Content-MD5')
self._loaded = True
def _reload_size(self):
url = self.resource()
resp = self._client.get(url, params={'meta': ''})
size = resp.headers.get('x-odps-resource-size')
self.size = None if size is None else int(size)
def update(self, **kw):
raise NotImplementedError
def drop(self):
return self.parent.delete(self)
[docs]@cache_parent
class FileResource(Resource):
"""
File resource represents for a file.
Use ``open`` method to open this resource as an file-like object.
"""
__slots__ = '_fp', '_mode', '_opened', '_size', '_need_commit', \
'_open_binary', '_encoding'
[docs] class Mode(Enum):
READ = 'r'
WRITE = 'w'
APPEND = 'a'
READWRITE = 'r+'
TRUNCEREADWRITE = 'w+'
APPENDREADWRITE = 'a+'
def create(self, overwrite=False, **kw):
file_obj = kw.pop('file_obj', kw.pop('fileobj', None))
if file_obj is None:
raise ValueError('parameter `file_obj` cannot be None, either string or file-like object')
if isinstance(file_obj, six.text_type):
file_obj = file_obj.encode('utf-8')
if isinstance(file_obj, six.binary_type):
file_obj = six.BytesIO(file_obj)
if self.name is None or len(self.name.strip()) == 0:
raise errors.ODPSError('File Resource Name should not empty.')
method = self._client.post if not overwrite else self._client.put
url = self.parent.resource() if not overwrite else self.resource()
headers = {'Content-Type': 'application/octet-stream',
'Content-Disposition': 'attachment;filename=%s' % self.name,
'x-odps-resource-type': self.type.value.lower(),
'x-odps-resource-name': self.name}
if self._getattr('comment') is not None:
headers['x-odps-comment'] = self.comment
if self._getattr('is_temp_resource'):
headers['x-odps-resource-istemp'] = 'true' if self.is_temp_resource else 'false'
if not isinstance(file_obj, six.string_types):
file_obj.seek(0)
content = file_obj.read()
else:
content = file_obj
method(url, content, headers=headers)
if overwrite:
self.reload()
return self
def __init__(self, **kw):
super(FileResource, self).__init__(**kw)
self.type = Resource.Type.FILE
self._fp = None
self._mode = FileResource.Mode.READ
self._open_binary = False
self._encoding = None
self._size = 0
self._opened = False
self._need_commit = False
def _is_create(self):
if self._loaded:
return False
try:
self._reload_size()
return False
except errors.NoSuchObject:
return True
[docs] def open(self, mode='r', encoding='utf-8'):
"""
The argument ``mode`` stands for the open mode for this file resource.
It can be binary mode if the 'b' is inside. For instance,
'rb' means opening the resource as read binary mode
while 'r+b' means opening the resource as read+write binary mode.
This is most import when the file is actually binary such as tar or jpeg file,
so be aware of opening this file as a correct mode.
Basically, the text mode can be 'r', 'w', 'a', 'r+', 'w+', 'a+'
just like the builtin python ``open`` method.
* ``r`` means read only
* ``w`` means write only, the file will be truncated when opening
* ``a`` means append only
* ``r+`` means read+write without constraint
* ``w+`` will truncate first then opening into read+write
* ``a+`` can read+write, however the written content can only be appended to the end
:param mode: the mode of opening file, described as above
:param encoding: utf-8 as default
:return: file-like object
:Example:
>>> with resource.open('r') as fp:
>>> fp.read(1) # read one unicode character
>>> fp.write('test') # wrong, cannot write under read mode
>>>
>>> with resource.open('wb') as fp:
>>> fp.readlines() # wrong, cannot read under write mode
>>> fp.write('hello world') # write bytes
>>>
>>> with resource.open('test_resource', 'r+') as fp: # open as read-write mode
>>> fp.seek(5)
>>> fp.truncate()
>>> fp.flush()
"""
# TODO: when reading, do not read all the data at once
if 'b' in mode:
self._open_binary = True
mode = mode.replace('b', '')
self._mode = FileResource.Mode(mode)
self._encoding = encoding
if self._mode in (FileResource.Mode.WRITE, FileResource.Mode.TRUNCEREADWRITE):
io_clz = six.BytesIO if self._open_binary else six.StringIO
self._fp = io_clz()
self._size = 0
else:
self._fp = self.parent.read_resource(
self, text_mode=not self._open_binary, encoding=self._encoding)
self._reload_size()
self._sync_size()
self._opened = True
return self
def _check_read(self):
if not self._opened:
raise IOError('I/O operation on non-open resource')
if self._mode in (FileResource.Mode.WRITE, FileResource.Mode.APPEND):
raise IOError('Resource not open for reading')
def _sync_size(self):
curr_pos = self.tell()
self.seek(0, compat.SEEK_END)
self._size = self.tell()
self.seek(curr_pos)
[docs] def read(self, size=-1):
"""
Read the file resource, read all as default.
:param size: unicode or byte length depends on text mode or binary mode.
:return: unicode or bytes depends on text mode or binary mode
:rtype: str or unicode(Py2), bytes or str(Py3)
"""
self._check_read()
return self._fp.read(size)
[docs] def readline(self, size=-1):
"""
Read a single line.
:param size: If the size argument is present and non-negative,
it is a maximum byte count (including the trailing newline)
and an incomplete line may be returned.
When size is not 0,
an empty string is returned only when EOF is encountered immediately
:return: unicode or bytes depends on text mode or binary mode
:rtype: str or unicode(Py2), bytes or str(Py3)
"""
self._check_read()
return self._fp.readline(size)
[docs] def readlines(self, sizehint=-1):
"""
Read as lines.
:param sizehint: If the optional sizehint argument is present, instead of reading up to EOF,
whole lines totalling approximately sizehint bytes
(possibly after rounding up to an internal buffer size) are read.
:return: lines
:rtype: list
"""
self._check_read()
return self._fp.readlines(sizehint)
def _check_write(self):
if not self._opened:
raise IOError('I/O operation on non-open resource')
if self._mode == FileResource.Mode.READ:
raise IOError('Resource not open for writing')
def _check_size(self):
if self._size > RESOURCE_SIZE_MAX:
raise IOError('Single resource\'s max size is %sM' %
(RESOURCE_SIZE_MAX / (1024 ** 2)))
def _convert(self, content):
if self._open_binary and isinstance(content, six.text_type):
return content.encode(self._encoding)
elif not self._open_binary and isinstance(content, six.binary_type):
return content.decode(self._encoding)
return content
[docs] def write(self, content):
"""
Write content into the file resource
:param content: content to write
:return: None
"""
content = self._convert(content)
length = len(content)
self._check_write()
if self._mode in (FileResource.Mode.APPEND, FileResource.Mode.APPENDREADWRITE):
self.seek(0, compat.SEEK_END)
if length > 0:
self._need_commit = True
res = self._fp.write(content)
self._sync_size()
self._check_size()
return res
[docs] def writelines(self, seq):
"""
Write lines into the file resource.
:param seq: lines
:return: None
"""
seq = [self._convert(s) for s in seq]
length = sum(len(s) for s in seq)
self._check_write()
if self._mode in (FileResource.Mode.APPEND, FileResource.Mode.APPENDREADWRITE):
self.seek(0, compat.SEEK_END)
if length > 0:
self._need_commit = True
res = self._fp.writelines(seq)
self._sync_size()
self._check_size()
return res
[docs] def seek(self, pos, whence=compat.SEEK_SET): # io.SEEK_SET
"""
Seek to some place.
:param pos: position to seek
:param whence: if set to 2, will seek to the end
:return: None
"""
return self._fp.seek(pos, whence)
@staticmethod
def seekable():
return True
[docs] def tell(self):
"""
Tell the current position
:return: current position
"""
return self._fp.tell()
[docs] def truncate(self, size=None):
"""
Truncate the file resource's size.
:param size: If the optional size argument is present,
the file is truncated to (at most) that size.
The size defaults to the current position.
:return: None
"""
self._check_write()
curr_pos = self.tell()
self._fp.truncate(size)
self.seek(0, compat.SEEK_END)
self._size = self.tell()
self.seek(curr_pos)
self._need_commit = True
[docs] def flush(self):
"""
Commit the change to ODPS if any change happens.
Close will do this automatically.
:return: None
"""
if self._need_commit:
is_create = self._is_create()
resources = self.parent
if is_create:
resources.create(self=self, file_obj=self._fp)
else:
resources.update(obj=self, file_obj=self._fp)
self._need_commit = False
[docs] def close(self):
"""
Close this file resource.
:return: None
"""
self.flush()
self._fp = None
self._size = 0
self._need_commit = False
self._opened = False
def __iter__(self):
self._check_read()
return self._fp.__iter__()
def __next__(self):
self._check_read()
return next(self._fp)
next = __next__
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
def update(self, file_obj):
return self._parent.update(self, file_obj=file_obj)
[docs]@cache_parent
class JarResource(FileResource):
"""
File resource representing for the .jar file.
"""
def __init__(self, **kw):
super(JarResource, self).__init__(**kw)
self.type = Resource.Type.JAR
[docs]@cache_parent
class PyResource(FileResource):
"""
File resource representing for the .py file.
"""
def __init__(self, **kw):
super(PyResource, self).__init__(**kw)
self.type = Resource.Type.PY
[docs]@cache_parent
class ArchiveResource(FileResource):
"""
File resource representing for the compressed file like .zip/.tgz/.tar.gz/.tar/jar
"""
def __init__(self, **kw):
super(ArchiveResource, self).__init__(**kw)
self.type = Resource.Type.ARCHIVE
[docs]@cache_parent
class TableResource(Resource):
"""
Take a table as a resource.
"""
def __init__(self, **kw):
project_name = kw.pop('project_name', None)
table_name = kw.pop('table_name', None)
partition_spec = kw.pop('partition', None)
super(TableResource, self).__init__(**kw)
self._init(project_name=project_name, table_name=table_name,
partition=partition_spec, create=True)
def create(self, overwrite=False, **kw):
if self.name is None or len(self.name.strip()) == 0:
raise errors.ODPSError('Table Resource Name should not be empty.')
method = self._client.post if not overwrite else self._client.put
url = self.parent.resource() if not overwrite else self.resource()
headers = {'Content-Type': 'text/plain',
'x-odps-resource-type': self.type.value.lower(),
'x-odps-resource-name': self.name,
'x-odps-copy-table-source': self.source_table_name}
if self._getattr('comment') is not None:
headers['x-odps-comment'] = self._getattr('comment')
method(url, '', headers=headers)
if overwrite:
del self.parent[self.name]
return self.parent[self.name]
return self
def _init(self, create=False, project_name=None, table_name=None, **kw):
if table_name is not None and '.' in table_name:
project_name, table_name = table_name.split('.', 1)
try:
if not create:
old_project_name, old_table_name, old_partition = self.get_project_table_partition()
else:
old_project_name, old_table_name, old_partition = None, None, None
except AttributeError:
old_project_name, old_table_name, old_partition = None, None, None
project_name = project_name or old_project_name or self._project
table_name = table_name or old_table_name
partition = kw.get('partition', old_partition)
if table_name is not None:
self.source_table_name = '%s.%s' % (project_name, table_name)
if partition is not None:
if not isinstance(partition, types.PartitionSpec):
partition_spec = types.PartitionSpec(partition)
else:
partition_spec = partition
self.source_table_name = '%s partition(%s)' \
% (self.source_table_name.split(' partition(')[0],
partition_spec)
def get_project_table_partition(self):
if self.source_table_name is None:
raise AttributeError('source_table_name not defined.')
splits = self.source_table_name.split(' partition(')
if len(splits) < 2:
partition = None
else:
partition = splits[1].split(')', 1)[0].strip()
src = splits[0]
if '.' not in src:
raise ValueError('Malformed source table name: %s' % src)
return tuple(src.split('.', 1)) + (partition, )
def get_source_table(self):
try:
project_name, table_name, _ = self.get_project_table_partition()
except AttributeError:
return
from .projects import Projects
return Projects(client=self._client)[project_name].tables[table_name]
def get_source_table_partition(self):
if self.source_table_name is None:
return
splits = self.source_table_name.split(' partition(')
if len(splits) < 2:
return
partition = splits[1].split(')', 1)[0].strip()
return types.PartitionSpec(partition)
@property
def table(self):
"""
Get the table object.
:return: source table
:rtype: :class:`odps.models.Table`
.. seealso:: :class:`odps.models.Table`
"""
return self.get_source_table()
@property
def partition(self):
"""
Get the source table partition.
:return: the source table partition
"""
pt = self.get_source_table_partition()
if pt is None:
return
return self.get_source_table().get_partition(pt)
[docs] def open_reader(self, **kwargs):
"""
Open reader on the table resource
"""
return self.get_source_table().open_reader(partition=self.get_source_table_partition(), **kwargs)
[docs] def open_writer(self, **kwargs):
"""
Open writer on the table resource
"""
return self.get_source_table().open_writer(partition=self.get_source_table_partition(), **kwargs)
[docs] def update(self, project_name=None, table_name=None, *args, **kw):
"""
Update this resource.
:param project_name: the source table's project
:param table_name: the source table's name
:param partition: the source table's partition
:return: self
"""
if len(args) > 0:
kw['partition'] = args[0]
self._init(project_name=project_name, table_name=table_name, **kw)
resources = self.parent
return resources.update(self)
@cache_parent
class VolumeResource(Resource):
def create(self, overwrite=False, **kw):
if self.name is None or len(self.name.strip()) == 0:
raise errors.ODPSError('Volume Resource Name should not be empty.')
method = self._client.post if not overwrite else self._client.put
url = self.parent.resource() if not overwrite else self.resource()
headers = {'Content-Type': 'text/plain',
'x-odps-resource-type': self.type.value.lower(),
'x-odps-resource-name': self.name,
'x-odps-copy-file-source': self.volume_path}
if self._getattr('comment') is not None:
headers['x-odps-comment'] = self._getattr('comment')
method(url, '', headers=headers)
if overwrite:
del self.parent[self.name]
return self.parent[self.name]
return self
@cache_parent
class VolumeFileResource(VolumeResource):
"""
Volume resource represents for a volume archive
"""
def __init__(self, **kw):
okw = kw.copy()
okw.pop('volume_file', None)
super(VolumeFileResource, self).__init__(**okw)
self.type = Resource.Type.VOLUMEFILE
def create(self, overwrite=False, **kw):
if 'volume_file' in kw:
vf = kw.pop('volume_file')
self.volume_path = vf.path
return super(VolumeFileResource, self).create(overwrite, **kw)
@cache_parent
class VolumeArchiveResource(VolumeFileResource):
"""
Volume archive resource represents for a volume archive
"""
def __init__(self, **kw):
super(VolumeArchiveResource, self).__init__(**kw)
self.type = Resource.Type.VOLUMEARCHIVE