odps.df.expr.expressions 源代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
import inspect
import functools
import operator
import sys
from collections import defaultdict, OrderedDict

from .core import Node, NodeMetaclass
from .errors import ExpressionError
from .utils import get_attrs, is_called_by_inspector, highest_precedence_data_type, new_id, \
    is_changed, get_proxied_expr
from .. import types
from ...compat import reduce, isvalidattr, dir2, lkeys, six, futures, Iterable
from ...config import options
from ...errors import NoSuchObject, DependencyNotInstalledError
from ...utils import TEMP_TABLE_PREFIX, to_binary, deprecated, survey
from ...models import TableSchema


class ReprWrapper(object):
    def __init__(self, func, repr):
        self._func = func
        self._repr = repr
        functools.update_wrapper(self, func)

    def __call__(self, *args, **kwargs):
        return self._func(*args, **kwargs)

    def __repr__(self):
        return self._repr(self._func)


def _wrap_method_repr(func):
    def inner(*args, **kwargs):
        obj = func(*args, **kwargs)
        if inspect.ismethod(obj):
            def _repr(x):
                instance = getattr(x, 'im_self', getattr(x, '__self__', None))
                method = 'bound method' if instance is not None else 'unbound method'
                if instance is not None:
                    return '<%(method)s %(instance)s.%(name)s>' % {
                        'method': method,
                        'instance': getattr(instance, 'node_name', instance.__class__.__name__),
                        'name': x.__name__
                    }
                else:
                    return '<function __main__.%s>' % x.__name__

            return ReprWrapper(obj, _repr)
        return obj

    return inner


def make_max_pt_function(expr=None):
    from ...models import Table
    from .. import func

    def max_pt(table_name=None):
        if isinstance(getattr(expr, 'data', None), Table):
            table_name = table_name or expr.data.name
        return func.max_pt(table_name)

    return max_pt


def repr_obj(obj):
    if hasattr(obj, '_repr'):
        try:
            return obj._repr()
        except:
            return object.__repr__(obj)
    elif isinstance(obj, (tuple, list)):
        return ','.join(repr_obj(it) for it in obj)

    return obj


class Expr(Node):
    __slots__ = '_deps', '_ban_optimize', '_engine', '_need_cache', '_mem_cache', '__execution', '_id'

    def _init(self, *args, **kwargs):
        """
        _deps is used for common dependencies.
        When a expr depend on other exprs, and the expr is not calculated from the others,
        the _deps are specified to identify the dependencies.
        """
        self._init_attr('_deps', None)

        self._init_attr('_ban_optimize', False)
        self._init_attr('_engine', None)
        self._init_attr('_Expr__execution', None)

        self._init_attr('_need_cache', False)
        self._init_attr('_mem_cache', False)

        if '_id' not in kwargs:
            kwargs['_id'] = new_id()

        super(Expr, self)._init(*args, **kwargs)

    def __repr__(self):
        if not options.interactive or is_called_by_inspector():
            return self._repr()
        else:
            if isinstance(self.__execution, Exception):
                self.__execution = None
            if self.__execution is None:
                try:
                    self.__execution = self.execute()
                except Exception as e:
                    self.__execution = e
                    raise
            return self.__execution.__repr__()

    def _repr_html_(self):
        if not options.interactive:
            return '<code>' + repr(self) + '</code>'
        else:
            if self.__execution is None:
                self.__execution = self.execute()
            else:
                if isinstance(self.__execution, Exception):
                    try:
                        return
                    finally:
                        self.__execution = None
            if hasattr(self.__execution, '_repr_html_'):
                return self.__execution._repr_html_()
            return repr(self.__execution)

    def _handle_delay_call(self, method, *args, **kwargs):
        delay = kwargs.pop('delay', None)
        if delay is not None:
            future = delay.register_item(method, *args, **kwargs)
            return future
        else:
            from ..engines import get_default_engine
            engine = get_default_engine(self)

            wrapper = kwargs.pop('wrapper', None)
            result = getattr(engine, method)(*args, **kwargs)
            if wrapper is None:
                return result
            async_ = kwargs.get('async_', kwargs.get('async', False))
            if async_:
                user_future = futures.Future()

                def _relay(f):
                    try:
                        user_future.set_result(wrapper(f.result()))
                    except:
                        if hasattr(f, 'exception_info'):
                            user_future.set_exception_info(*f.exception_info())
                        else:
                            user_future.set_exception(f.exception())

                result.add_done_callback(_relay)
                return user_future
            else:
                return wrapper(result)

    def visualize(self):
        from ..engines import get_default_engine

        engine = get_default_engine(self)
        return engine.visualize(self)

    def execute(self, **kwargs):
        """
        :param hints: settings for SQL, e.g. `odps.sql.mapper.split.size`
        :type hints: dict
        :param priority: instance priority, 9 as default
        :type priority: int
        :param running_cluster: cluster to run this instance
        :return: execution result
        :rtype: :class:`odps.df.backends.frame.ResultFrame`
        """
        _wrapper = kwargs.pop('wrapper', None)

        def wrapper(result):
            self.__execution = result
            if _wrapper is not None:
                return _wrapper(result)
            else:
                return result

        return self._handle_delay_call('execute', self, wrapper=wrapper, **kwargs)

    def compile(self):
        """
        Compile this expression into an ODPS SQL

        :return: compiled DAG
        :rtype: str
        """

        from ..engines import get_default_engine

        engine = get_default_engine(self)
        return engine.compile(self)

    def persist(self, name, partitions=None, partition=None, lifecycle=None, project=None, **kwargs):
        """
        Persist the execution into a new table. If `partitions` not specified,
        will create a new table without partitions if the table does not exist,
        and insert the SQL result into it.
        If `partitions` are specified, they will be the partition fields of the new table.
        If `partition` is specified, the data will be inserted into the exact partition of the table.

        :param name: table name
        :param partitions: list of string, the partition fields
        :type partitions: list
        :param partition: persist to a specified partition
        :type partition: string or PartitionSpec
        :param lifecycle: table lifecycle. If absent, `options.lifecycle` will be used.
        :type lifecycle: int
        :param project: project name, if not provided, will be the default project
        :param hints: settings for SQL, e.g. `odps.sql.mapper.split.size`
        :type hints: dict
        :param priority: instance priority, 9 as default
        :type priority: int
        :param running_cluster: cluster to run this instance
        :param overwrite: overwrite the table, True as default
        :type overwrite: bool
        :param drop_table: drop table if exists, False as default
        :type drop_table: bool
        :param create_table: create table first if not exits, True as default
        :type create_table: bool
        :param drop_partition: drop partition if exists, False as default
        :type drop_partition: bool
        :param create_partition: create partition if not exists, None as default
        :type create_partition: bool
        :param cast: cast all columns' types as the existed table, False as default
        :type cast: bool
        :return: :class:`odps.df.DataFrame`

        :Example:

        >>> df = df['name', 'id', 'ds']
        >>> df.persist('odps_new_table')
        >>> df.persist('odps_new_table', partition='pt=test')
        >>> df.persist('odps_new_table', partitions=['ds'])
        """
        if lifecycle is None and options.lifecycle is not None:
            lifecycle = \
                options.lifecycle if not name.startswith(TEMP_TABLE_PREFIX) \
                    else options.temp_lifecycle

        return self._handle_delay_call('persist', self, name, partitions=partitions, partition=partition,
                                       lifecycle=lifecycle, project=project, **kwargs)

    def cache(self, mem=False):
        self._need_cache = True
        self._mem_cache = mem
        self._ban_optimize = True
        return self

    def uncache(self):
        self._need_cache = False
        self._mem_cache = False

        from ..backends.context import context
        context.uncache(self)

    def verify(self):
        """
        Verify if this expression can be compiled into ODPS SQL.

        :return: True if compilation succeed else False
        :rtype: bool
        """

        try:
            self.compile()

            return True
        except:
            return False

    def _repr(self):
        from .formatter import ExprFormatter

        formatter = ExprFormatter(self)
        return formatter()

    def ast(self):
        """
        Return the AST string.

        :return: AST tree
        :rtype: str
        """

        return self._repr()

    @_wrap_method_repr
    def __getattribute__(self, attr):
        try:
            return super(Expr, self).__getattribute__(attr)
        except AttributeError:
            if not attr.startswith('_'):
                new_attr = '_%s' % attr
                if new_attr in object.__getattribute__(self, '_args_indexes'):
                    try:
                        return object.__getattribute__(self, new_attr)
                    except AttributeError:
                        return

            raise

    def _defunc(self, field):
        return field(self) if inspect.isfunction(field) else field

    @property
    def optimize_banned(self):
        return self._ban_optimize

    @optimize_banned.setter
    def optimize_banned(self, val):
        self._ban_optimize = val

    @property
    def args(self):
        if not self._deps:
            return super(Expr, self).args
        return super(Expr, self).args + self.deps

    @property
    def deps(self):
        if self._deps is None:
            return
        return tuple(dep if not isinstance(dep, tuple) else dep[0]
                     for dep in self._deps)

    def add_deps(self, *deps):
        dependencies = []
        if len(deps) == 1 and isinstance(deps[0], Iterable):
            dependencies.append([d for d in dependencies
                                 if isinstance(d, Expr)])
        else:
            dependencies.extend(deps)

        if getattr(self, '_deps', None) is None:
            self._deps = dependencies
        else:
            self._deps.extend(dependencies)

    def substitute(self, old_arg, new_arg, dag=None):
        super(Expr, self).substitute(old_arg, new_arg, dag=dag)

        new_deps = []
        if self._deps is not None:
            for dep in self._deps:
                if isinstance(dep, tuple):
                    node = dep[0]
                    if node is old_arg:
                        new_deps.append((new_arg, ) + dep[1:])
                    else:
                        new_deps.append(dep)
                else:
                    if dep is old_arg:
                        new_deps.append(new_arg)
                    else:
                        new_deps.append(dep)

        self._deps = new_deps

    def rebuild(self):
        # used in the dynamic setting, do nothing by default
        # `rebuild` will copy itself, and apply all changes to the new one
        return self.copy()

    def __hash__(self):
        return self._node_id * hash(Expr)

    def __eq__(self, other):
        try:
            return self._eq(other)
        except AttributeError:
            # Due to current complexity of parent's eq,
            # by now, every expression is unequal
            return self is other

    def __ne__(self, other):
        try:
            return self._ne(other)
        except AttributeError:
            return not super(Expr, self).__eq__(other)

    def __lt__(self, other):
        return self._lt(other)

    def __le__(self, other):
        return self._le(other)

    def __gt__(self, other):
        return self._gt(other)

    def __ge__(self, other):
        return self._ge(other)

    def __add__(self, other):
        return self._add(other)

    def __radd__(self, other):
        return self._radd(other)

    def __mul__(self, other):
        return self._mul(other)

    def __rmul__(self, other):
        return self._rmul(other)

    def __div__(self, other):
        return self._div(other)

    def __rdiv__(self, other):
        return self._rdiv(other)

    __truediv__ = __div__
    __rtruediv__ = __rdiv__

    def __floordiv__(self, other):
        return self._floordiv(other)

    def __rfloordiv__(self, other):
        return self._rfloordiv(other)

    def __mod__(self, other):
        return self._mod(other)

    def __rmod__(self, other):
        return self._rmod(other)

    def __sub__(self, other):
        return self._sub(other)

    def __rsub__(self, other):
        return self._rsub(other)

    def __pow__(self, power):
        return self._pow(power)

    def __rpow__(self, power):
        return self._rpow(power)

    def __or__(self, other):
        return self._or(other)

    def __ror__(self, other):
        return self._ror(other)

    def __and__(self, other):
        return self._and(other)

    def __rand__(self, other):
        return self._rand(other)

    def __neg__(self):
        return self._neg()

    def __invert__(self):
        return self._invert()

    def __abs__(self):
        return self._abs()


[文档] class CollectionExpr(Expr): """ Collection represents for the two-dimensions data. :Example: >>> # projection >>> df = DataFrame(o.get_table('my_table')) # DataFrame is actually a CollectionExpr >>> df['name', 'id'] # projection some columns >>> df[[df.name, df.id]] # projection >>> df[df] # means nothing, but get all the columns >>> df[df, df.name.lower().rename('name2')] # projection a new columns `name2` besides all the original columns >>> df.select(df, name2=df.name.lower()) # projection by `select` >>> df.exclude('name') # projection all columns but `name` >>> df[df.exclude('name'), df.name.lower()] # `name` will not conflict any more >>> >>> # filter >>> df[(df.id < 3) & (df.name != 'test')] >>> df.filter(df.id < 3, df.name != 'test') >>> >>> # slice >>> df[: 10] >>> df.limit(10) >>> >>> # Sequence >>> df.name # an instance of :class:`odps.df.expr.expressions.SequenceExpr` >>> >>> # schema or dtypes >>> df.dtypes odps.Schema { name string id int64 } >>> df.schema odps.Schema { name string id int64 } """ __slots__ = ( '_schema', '_source_data', '_proxy', '_ml_fields_cache', '_ml_uplink', '_ml_operations', ) node_name = 'Collection' def _init(self, *args, **kwargs): self._init_attr('_source_data', None) self._init_attr('_proxy', None) self._init_attr('_ml_fields_cache', None) self._init_attr('_ml_uplink', []) self._init_attr('_ml_operations', []) super(CollectionExpr, self)._init(*args, **kwargs) if hasattr(self, '_schema') and any(it is None for it in self._schema.names): raise TypeError('Schema cannot has field whose name is None') def __dir__(self): dir_set = set(dir2(self)) | set([c.name for c in self.schema if isvalidattr(c.name)]) return sorted(dir_set) def __getitem__(self, item): item = self._defunc(item) if isinstance(item, tuple): item = list(item) if isinstance(item, CollectionExpr): item = [item, ] if isinstance(item, six.string_types): return self._get_field(item) elif isinstance(item, slice): if item.start is None and item.stop is None and item.step is None: return self return self._slice(item) elif isinstance(item, list): return self._project(item) else: field = self._get_field(item) if isinstance(field, BooleanSequenceExpr): return self.filter(item) if isinstance(item, SequenceExpr): raise ExpressionError('No boolean sequence found for filtering, ' 'a tuple or list is required for projection') raise ExpressionError('Not supported projection: collection[%s]' % repr_obj(item)) def _set_field(self, value, value_dag=None): expr = self.copy() if self._proxy is None else self._proxy._input if value_dag is None: value_dag = value.to_dag(copy=False, validate=False) if value_dag.contains_node(self): value_dag.substitute(self, expr) value = value_dag.root if self._proxy is not None: self._proxy._setitem(value, value_dag=value_dag) return fields = [f if f != value.name else value for f in self._schema.names] if value.name not in self._schema: fields.append(value) # make the isinstance to check the proxy type self.__class__ = type(self.__class__.__name__, (self.__class__,), {}) self._proxy = expr.select(fields) def __setitem__(self, key, value): if not isinstance(value, Expr) and value is not None: value = Scalar(value) if not isinstance(key, tuple): if value is None: raise ValueError('Cannot determine type for column %s with None value.' % key) column_name = key value = value.rename(column_name) else: conds = key[:-1] conds = [self._defunc(c) for c in conds] if not all(isinstance(c.dtype, types.Boolean) for c in conds): raise ValueError('Conditions should be boolean expressions or boolean columns') if len(conds) == 1: cond = conds[0] else: cond = reduce(operator.and_, conds) column_name = key[-1] if column_name not in self._schema: if value is None: raise ValueError('Cannot determine type for column %s with None value.' % key) default_col = Scalar(_value_type=value.dtype) else: default_col = self[column_name] if value is None: value = Scalar(_value_type=default_col.dtype) value = cond.ifelse(value, default_col).rename(column_name) self._set_field(value) def __delitem__(self, key): if key not in self.schema: raise KeyError('Field({0}) does not exist'.format(key)) if self._proxy is not None: return self._proxy._delitem(key) fields = [n for n in self._schema.names if n != key] # make the instance to check the proxy type self.__class__ = type(self.__class__.__name__, (self.__class__,), {}) self._proxy = self.copy().select(fields) def __delattr__(self, item): if item in self._schema: return self.__delitem__(item) super(CollectionExpr, self).__delattr__(item) def __setattr__(self, key, value): try: if key != '_proxy' and object.__getattribute__(self, '_proxy'): return setattr(self._proxy, key, value) except AttributeError: pass Expr.__setattr__(self, key, value) def __getattribute__(self, attr): try: proxy = object.__getattribute__(self, '_proxy') if attr == '_proxy': return proxy if proxy: # delegate everything to proxy object return getattr(proxy, attr) except AttributeError: pass try: if attr in object.__getattribute__(self, '_schema')._name_indexes: cls_attr = getattr(type(self), attr, None) if cls_attr is None or inspect.ismethod(cls_attr) or inspect.isfunction(cls_attr): return self[attr] except AttributeError: pass return super(CollectionExpr, self).__getattribute__(attr)
[文档] def query(self, expr): """ Query the data with a boolean expression. :param expr: the query string, you can use '@' character refer to environment variables. :return: new collection :rtype: :class:`odps.df.expr.expressions.CollectionExpr` """ from .query import CollectionVisitor if not isinstance(expr, six.string_types): raise ValueError('expr must be a string') frame = sys._getframe(2).f_locals try: env = frame.copy() finally: del frame env['max_pt'] = ReprWrapper(make_max_pt_function(self), repr) visitor = CollectionVisitor(self, env) predicate = visitor.eval(expr) return self.filter(predicate)
[文档] def filter(self, *predicates): """ Filter the data by predicates :param predicates: the conditions to filter :return: new collection :rtype: :class:`odps.df.expr.expressions.CollectionExpr` """ predicates = self._get_fields(predicates) predicate = reduce(operator.and_, predicates) return FilterCollectionExpr(self, predicate, _schema=self._schema)
@deprecated('The function `filter_partition` is deprecated, please use `filter_parts` instead' "and change the predicate parameter into `pt1=1,pt2=2/pt1=2,pt2=1` form.") @survey def filter_partition(self, predicate='', exclude=True): if isinstance(predicate, six.string_types): part_reprs = '/'.join(','.join(p.split('/')) for p in predicate.split(',')) else: part_reprs = predicate return self.filter_parts(part_reprs, exclude)
[文档] def filter_parts(self, predicate='', exclude=True): """ Filter the data by partition string. A partition string looks like `pt1=1,pt2=2/pt1=2,pt2=1`, where comma (,) denotes 'and', while (/) denotes 'or'. :param str|Partition predicate: predicate string of partition filter :param bool exclude: True if you want to exclude partition fields, otherwise False. True for default. :return: new collection :rtype: :class:`odps.df.expr.expressions.CollectionExpr` """ source = self._source_data if source is None: raise ExpressionError('Can only filter on data sources.') def _parse_partition_predicate(p): if '=' not in p: raise ExpressionError('Illegal partition predicate.') field_name, field_value = [s.strip() for s in p.split('=', 1)] if not hasattr(source, 'table_schema'): raise ExpressionError('filter_partition can only be applied on ODPS DataFrames') if field_name not in source.table_schema: raise ExpressionError('Column `%s` not exists in input collection' % field_name) if field_name not in source.table_schema._partition_schema: raise ExpressionError('`%s` is not a partition column' % field_name) part_col = self[field_name] if field_value.startswith('\'') or field_value.startswith('\"'): encoding = 'string-escape' if six.PY2 else 'unicode-escape' field_value = to_binary(field_value.strip('"\'')).decode(encoding) if isinstance(part_col.data_type, types.Integer): field_value = int(field_value) elif isinstance(part_col.data_type, types.Float): field_value = float(field_value) return part_col == field_value from ...models.partition import Partition from ...types import PartitionSpec if isinstance(predicate, Partition): predicate = predicate.partition_spec if isinstance(predicate, PartitionSpec): predicate = ','.join("%s='%s'" % (k, v) for k, v in six.iteritems(predicate.kv)) if isinstance(predicate, list): predicate = '/'.join(str(s) for s in predicate) elif not isinstance(predicate, six.string_types): raise ExpressionError('Only accept string predicates.') if not predicate: predicate_obj = None else: part_formatter = lambda p: reduce(operator.and_, map(_parse_partition_predicate, p.split(','))) predicate_obj = reduce(operator.or_, map(part_formatter, predicate.split('/'))) if not source.table_schema.partitions: raise ExpressionError('No partition columns in the collection.') if exclude: columns = [ c for c in self.schema if c.name not in source.table_schema._partition_schema ] new_schema = types.TableSchema.from_lists([c.name for c in columns], [c.type for c in columns]) return FilterPartitionCollectionExpr(self, predicate_obj, _schema=new_schema, _predicate_string=predicate) else: return self.filter(predicate_obj)
def _validate_field(self, field): if not isinstance(field, SequenceExpr): return True if not field.is_ancestor(self): return False for path in field.all_path(self): if any(isinstance(n, CollectionExpr) for n in path[1: -1]): return False from .reduction import GroupedSequenceReduction if any(isinstance(n, GroupedSequenceReduction) for n in path): return False return True @staticmethod def _backtrack_field(field, collection): from .window import RankOp for col in field.traverse(top_down=True, unique=True, stop_cond=lambda x: isinstance(x, (Column, RankOp))): if isinstance(col, Column): changed = is_changed(collection, col) if changed or changed is None: return elif col.source_name not in collection.schema: return else: col.substitute(col._input, collection) elif isinstance(col, RankOp) and col._input is not collection: col.substitute(col._input, collection) return field def _get_field(self, field): from .reduction import GroupedSequenceReduction field = self._defunc(field) if isinstance(field, six.string_types): if field not in self._schema: raise ValueError('Field(%s) does not exist, please check schema' % field) cls = Column if callable(getattr(type(self), field, None)): cls = CallableColumn return cls(self, _name=field, _data_type=self._schema[field].type) if not self._validate_field(field): new_field = None if not isinstance(field, GroupedSequenceReduction): # the reduction is not allowed new_field = self._backtrack_field(field, self) if new_field is None: raise ExpressionError('Cannot support projection on %s' % repr_obj(field)) field = new_field return field def _get_fields(self, fields, ret_raw_fields=False): selects = [] raw_selects = [] for field in fields: field = self._defunc(field) if isinstance(field, CollectionExpr): if any(c is self for c in field.children()): selects.extend(self._get_fields(field._project_fields)) else: selects.extend(self._get_fields(field._fetch_fields())) raw_selects.append(field) else: select = self._get_field(field) selects.append(select) raw_selects.append(select) if ret_raw_fields: return selects, raw_selects return selects @classmethod def _backtrack_lateral_view(cls, lv, collection): if isinstance(lv.input, ProjectCollectionExpr): src_inputs = (lv.input, lv.input.input) else: src_inputs = (lv.input,) lv_copy = lv.copy(_id=None, _lateral_view=True) cur = collection while cur not in src_inputs and isinstance(cur, (ProjectCollectionExpr, FilterCollectionExpr)): cur = cur.input if cur not in src_inputs: raise ExpressionError("Input of 'apply' in lateral views can only be " "simple column selections") if cur is lv.input: apply_src = lv_copy else: apply_src = lv_copy.input if collection is apply_src.input: return lv_copy for f in apply_src._fields: cls._backtrack_field(f, collection) lv_copy.substitute(apply_src.input, collection) return lv_copy def _filter_lateral_views(self, fields): from .collections import RowAppliedCollectionExpr from .merge import JoinFieldMergedCollectionExpr, JoinCollectionExpr, UnionCollectionExpr walk_through_collections = (JoinCollectionExpr, UnionCollectionExpr, JoinFieldMergedCollectionExpr) lateral_views = [] for field in fields: if not isinstance(field, RowAppliedCollectionExpr): continue else: is_lv = True stop_cond = lambda n: n is not self and isinstance(n, CollectionExpr) \ and not isinstance(n, walk_through_collections) for coll in self.traverse(unique=True, stop_cond=stop_cond): if coll is field: is_lv = False break if is_lv: lateral_views.append(field) return lateral_views def _project(self, fields): field_lvs = self._filter_lateral_views(fields) lv_id_set = set(id(f) for f in field_lvs) selects = [] lateral_views = [] for idx, field in enumerate(fields): if id(field) not in lv_id_set: s = self._get_fields([field]) selects.extend(s) else: lv = self._backtrack_lateral_view(field, self) lateral_views.append(lv) selects.extend(lv.columns) names = [f.name for f in selects] typos = [f.dtype for f in selects] if not lateral_views and all(isinstance(it, Scalar) for it in selects): return self._summary(selects) if len(names) != len(set(names)): counts = defaultdict(lambda: 0) for n in names: counts[n] += 1 raise ExpressionError('Duplicate column names: %s' % ', '.join(n for n in counts if counts[n] > 1)) if lateral_views: return LateralViewCollectionExpr(self, _fields=selects, _lateral_views=lateral_views, _schema=types.TableSchema.from_lists(names, typos)) else: return ProjectCollectionExpr(self, _fields=selects, _schema=types.TableSchema.from_lists(names, typos))
[文档] def select(self, *fields, **kw): """ Projection columns. Remember to avoid column names' conflict. :param fields: columns to project :param kw: columns and their names to project :return: new collection :rtype: :class:`odps.df.expr.expression.CollectionExpr` """ if len(fields) == 1 and isinstance(fields[0], list): fields = fields[0] else: fields = list(fields) if kw: def handle(it): it = self._defunc(it) if not isinstance(it, Expr): it = Scalar(it) return it fields.extend([handle(f).rename(new_name) for new_name, f in six.iteritems(kw)]) return self._project(fields)
[文档] def exclude(self, *fields): """ Projection columns which not included in the fields :param fields: field names :return: new collection :rtype: :class:`odps.df.expr.expression.CollectionExpr` """ if len(fields) == 1 and isinstance(fields[0], list): exclude_fields = fields[0] else: exclude_fields = list(fields) exclude_fields = [self._defunc(it) for it in exclude_fields] exclude_fields = [field.name if not isinstance(field, six.string_types) else field for field in exclude_fields] fields = [name for name in self._schema.names if name not in exclude_fields] return self._project(fields)
def _summary(self, fields): names = [field if isinstance(field, six.string_types) else field.name for field in fields] typos = [self._schema.get_type(field) if isinstance(field, six.string_types) else field.dtype for field in fields] if None in names: raise ExpressionError('Column does not have a name, ' 'please specify one by `rename`') return Summary(_input=self, _fields=fields, _schema=types.TableSchema.from_lists(names, typos)) def _slice(self, slices): return SliceCollectionExpr(self, _indexes=slices, _schema=self._schema) @property def schema(self): return self._schema @property def odps_schema(self): from ..backends.odpssql.types import df_schema_to_odps_schema return df_schema_to_odps_schema(self.schema) @property def columns(self): """ :return: columns :rtype: list which each element is a Column """ return [self[n] for n in self._schema.names] def _fetch_fields(self): return [self._get_field(name) for name in self._schema.names] @property def _project_fields(self): return self._fetch_fields() def _data_source(self): if hasattr(self, '_source_data') and self._source_data is not None: yield self._source_data def __getattr__(self, attr): try: obj = super(CollectionExpr, self).__getattribute__(attr) return obj except AttributeError as e: if attr in object.__getattribute__(self, '_schema')._name_indexes: return self[attr] raise e def output_type(self): return 'collection'
[文档] def limit(self, n): """ limit n records :param n: n records :return: """ return self[:n]
[文档] def head(self, n=None, **kwargs): """ Return the first n rows. Execute at once. :param n: :return: result frame :rtype: :class:`odps.df.backends.frame.ResultFrame` """ if n is None: n = options.display.max_rows return self._handle_delay_call('execute', self, head=n, **kwargs)
[文档] def tail(self, n=None, **kwargs): """ Return the last n rows. Execute at once. :param n: :return: result frame :rtype: :class:`odps.df.backends.frame.ResultFrame` """ if n is None: n = options.display.max_rows return self._handle_delay_call('execute', self, tail=n, **kwargs)
[文档] def to_pandas(self, wrap=False, **kwargs): """ Convert to pandas DataFrame. Execute at once. :param wrap: if True, wrap the pandas DataFrame into a PyODPS DataFrame :return: pandas DataFrame """ try: import pandas as pd except ImportError: raise DependencyNotInstalledError( 'to_pandas requires `pandas` library') def wrapper(result): res = result.values if wrap: from .. import DataFrame return DataFrame(res, schema=self.schema) return res return self.execute(wrapper=wrapper, **kwargs)
@property def dtypes(self): return self.schema
[文档] def view(self): """ Clone a same collection. useful for self-join. :return: """ proxied = get_proxied_expr(self) kv = dict((attr, getattr(proxied, attr)) for attr in get_attrs(proxied)) return type(proxied)(**kv)
def describe(self): from .. import output numeric_methods = OrderedDict([ ('count', lambda f: f.count().rename(col.name + '_count')), ('mean', lambda f: f.mean()), ('std', lambda f: f.std()), ('min', lambda f: f.min()), ('quantile_25', lambda f: f.quantile(0.25).rename(col.name + '_quantile_25')), ('quantile_50', lambda f: f.quantile(0.5).rename(col.name + '_quantile_50')), ('quantile_75', lambda f: f.quantile(0.75).rename(col.name + '_quantile_75')), ('max', lambda f: f.max()), ]) string_methods = OrderedDict([ ('count', lambda f: f.count().rename(col.name + '_count')), ('unique', lambda f: f.nunique().rename(col.name + '_unique')), ]) aggs = [] output_names = [] output_types = [] def produce_stat_fields(col, methods): output_names.append(col.name) fields = [] tps = [] for func in six.itervalues(methods): field = func(self[col.name]) fields.append(field) tps.append(field.dtype) t = highest_precedence_data_type(*tps) output_types.append(t) aggs.extend([f.astype(t) if t == types.decimal else f for f in fields]) if any(types.is_number(col.type) for col in self.schema.columns): methods = numeric_methods for col in self.schema.columns: if types.is_number(col.type): produce_stat_fields(col, numeric_methods) else: methods = string_methods for col in self.schema.columns: produce_stat_fields(col, string_methods) summary = self[aggs] methods_names = lkeys(methods) @output(['type'] + output_names, ['string'] + output_types) def to_methods(row): for method in methods_names: values = [None] * len(output_names) for i, field_name in enumerate(output_names): values[i] = getattr(row, '%s_%s' % (field_name, method.split('(', 1)[0])) yield [method, ] + values return summary.apply(to_methods, axis=1) def accept(self, visitor): if self._source_data is not None: visitor.visit_source_collection(self) else: raise NotImplementedError def get_cached(self, data): try: if hasattr(data, 'reload'): data.reload() except NoSuchObject: from ..backends.context import context context.uncache(self) return None coll = CollectionExpr(_source_data=data, _schema=self._schema) for attr in CollectionExpr.__slots__: if not attr.startswith('_ml_'): continue if hasattr(self, attr): setattr(coll, attr, getattr(self, attr)) return coll
_cached_typed_expr = dict() class TypedExpr(Expr): __slots__ = '_name', '_source_name' @classmethod def _get_type(cls, *args, **kwargs): # return the data type which extracted from args and kwargs raise NotImplementedError @classmethod def _typed_classes(cls, *args, **kwargs): # return allowed data types raise NotImplementedError @classmethod def _base_class(cls, *args, **kwargs): # base class, SequenceExpr or Scalar raise NotImplementedError @classmethod def _new_cls(cls, *args, **kwargs): data_type = cls._get_type(*args, **kwargs) if data_type: base_class = cls._base_class(*args, **kwargs) typed_classes = cls._typed_classes(*args, **kwargs) data_type = types.validate_data_type(data_type) name = data_type.CLASS_NAME + base_class.__name__ # get the typed class, e.g. Int64SequenceExpr, StringScalar typed_cls = globals().get(name) if typed_cls is None: raise TypeError("Name {} doesn't exist".format(name)) if issubclass(cls, typed_cls): return cls elif cls == base_class: return typed_cls elif cls in typed_classes: return typed_cls keys = (cls, typed_cls) if keys in _cached_typed_expr: return _cached_typed_expr[keys] mros = inspect.getmro(cls) has_data_type = len([sub for sub in mros if sub in typed_classes]) > 0 if has_data_type: mros = mros[1:] subs = [sub for sub in mros if sub not in typed_classes] subs.insert(1, typed_cls) bases = [] for sub in subs[::-1]: for i in range(len(bases)): if bases[i] is None: continue if issubclass(sub, bases[i]): bases[i] = None bases.append(sub) bases = tuple(base for base in bases if base is not None) dic = dict() if hasattr(cls, '_args'): dic['_args'] = cls._args dic['__slots__'] = cls.__slots__ + getattr(cls, '_slots', ()) dic['_add_args_slots'] = True try: accept_cls = next(c for c in bases if c.__name__ == cls.__name__) if hasattr(accept_cls, 'accept'): dic['accept'] = accept_cls.accept except StopIteration: pass clz = type(cls.__name__, bases, dic) _cached_typed_expr[keys] = clz return clz else: return cls def __new__(cls, *args, **kwargs): clz = cls._new_cls(*args, **kwargs) return super(TypedExpr, clz).__new__(clz) def __getattribute__(self, item): try: return super(TypedExpr, self).__getattribute__(item) except AttributeError: pass # need to show type of expression when certain method missing dtype = None try: dtype = object.__getattribute__(self, "_data_type") except AttributeError: # pragma: no cover pass try: dtype = object.__getattribute__(self, "_value_type") except AttributeError: # pragma: no cover pass if dtype is not None: raise AttributeError( "'%s' object has no attribute '%s'. Type of the expression " "is %s which may not support this method." % (type(self).__name__, item, dtype) ) else: raise AttributeError( "'%s' object has no attribute '%s'" % (type(self).__name__, item) ) @classmethod def _new(cls, *args, **kwargs): return cls._new_cls(*args, **kwargs)(*args, **kwargs) def is_renamed(self): return self._name is not None and self._source_name is not None and \ self._name != self._source_name def rename(self, new_name): if new_name == self._name: return self attr_dict = dict((attr, getattr(self, attr, None)) for attr in get_attrs(self)) attr_dict['_source_name'] = self._source_name attr_dict['_name'] = new_name return type(self)(**attr_dict) @property def name(self): return self._name @property def source_name(self): return self._source_name def astype(self, data_type): raise NotImplementedError def cast(self, t): return self.astype(t) def eval(self, str_expr, rewrite=False): from .query import SequenceVisitor if not isinstance(str_expr, six.string_types): raise ValueError('expr must be a string') frame = sys._getframe(2).f_locals try: env = frame.copy() finally: del frame env['max_pt'] = ReprWrapper(make_max_pt_function(self), repr) visitor = SequenceVisitor(self, env) return visitor.eval(str_expr, rewrite=rewrite)
[文档] class SequenceExpr(TypedExpr): """ Sequence represents for 1-dimension data. """ __slots__ = ( '_data_type', '_source_data_type', '_ml_fields_cache', '_ml_uplink', '_ml_operations', ) @classmethod def _get_type(cls, *args, **kwargs): return types.validate_data_type(kwargs.get('_data_type')) @classmethod def _typed_classes(cls, *args, **kwargs): return _typed_sequence_exprs @classmethod def _base_class(cls, *args, **kwargs): return SequenceExpr def _init(self, *args, **kwargs): self._init_attr('_name', None) self._init_attr('_ml_fields_cache', None) self._init_attr('_ml_uplink', []) self._init_attr('_ml_operations', []) super(SequenceExpr, self)._init(*args, **kwargs) if '_data_type' in kwargs: self._data_type = types.validate_data_type(kwargs.get('_data_type')) if '_source_name' not in kwargs: self._source_name = self._name if '_source_data_type' in kwargs: self._source_data_type = types.validate_data_type(kwargs.get('_source_data_type')) else: self._source_data_type = self._data_type def cache(self, mem=False): raise ExpressionError('Cache operation does not support for sequence.')
[文档] def head(self, n=None, **kwargs): """ Return first n rows. Execute at once. :param n: :return: result frame :rtype: :class:`odps.df.expr.expressions.CollectionExpr` """ if n is None: n = options.display.max_rows return self._handle_delay_call('execute', self, head=n, **kwargs)
[文档] def tail(self, n=None, **kwargs): """ Return the last n rows. Execute at once. :param n: :return: """ if n is None: n = options.display.max_rows return self._handle_delay_call('execute', self, tail=n, **kwargs)
[文档] def to_pandas(self, wrap=False, **kwargs): """ Convert to pandas Series. Execute at once. :param wrap: if True, wrap the pandas DataFrame into a PyODPS DataFrame :return: pandas Series """ try: import pandas as pd except ImportError: raise DependencyNotInstalledError( 'to_pandas requires for `pandas` library') def wrapper(result): df = result.values if wrap: from .. import DataFrame df = DataFrame(df) return df[self.name] return self.execute(wrapper=wrapper, **kwargs)
@property def data_type(self): return self._data_type @property def source_data_type(self): return self._source_data_type @property def dtype(self): """ Return the data type. Available types: int8, int16, int32, int64, float32, float64, boolean, string, decimal, datetime :return: the data type """ return self._data_type
[文档] def astype(self, data_type): """ Cast to a new data type. :param data_type: the new data type :return: casted sequence :Example: >>> df.id.astype('float') """ data_type = types.validate_data_type(data_type) if data_type == self._data_type: return self attr_dict = dict() attr_dict['_data_type'] = data_type attr_dict['_source_data_type'] = self._source_data_type attr_dict['_input'] = self new_sequence = AsTypedSequenceExpr(**attr_dict) return new_sequence
def output_type(self): return 'sequence(%s)' % repr(self._data_type) def accept(self, visitor): visitor.visit_sequence(self)
class BooleanSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(BooleanSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.boolean class Int8SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Int8SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.int8 class Int16SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Int16SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.int16 class Int32SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Int32SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.int32
[文档] class Int64SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Int64SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.int64
class Float32SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Float32SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.float32 class Float64SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Float64SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.float64 class DecimalSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(DecimalSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.decimal
[文档] class StringSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(StringSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.string
class DatetimeSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(DatetimeSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.datetime class BinarySequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(BinarySequenceExpr, self)._init(*args, **kwargs) self._data_type = types.binary class DateSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(DateSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.date class TimestampSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(TimestampSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.timestamp class JsonSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(JsonSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.json class ListSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(ListSequenceExpr, self)._init(*args, **kwargs) class DictSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(DictSequenceExpr, self)._init(*args, **kwargs) class StructSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(StructSequenceExpr, self)._init(*args, **kwargs) class UnknownSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(UnknownSequenceExpr, self)._init(*args, **kwargs) if not isinstance(self._data_type, types.Unknown): self._data_type = types.Unknown() _typed_sequence_exprs = [globals()[t.__class__.__name__ + SequenceExpr.__name__] for t in types._data_types.values()] number_sequences = [globals().get(repr(t).capitalize() + SequenceExpr.__name__) for t in types.number_types()] int_number_sequences = [globals().get(repr(t).capitalize() + SequenceExpr.__name__) for t in types.number_types() if repr(t).startswith('int')] class AsTypedSequenceExpr(SequenceExpr): _args = '_input', node_name = "TypedSequence" @property def input(self): return self._input def accept(self, visitor): return visitor.visit_cast(self) @property def name(self): return self._name or self._input.name @property def source_name(self): return self._source_name or self._input.source_name @property def dtype(self): return self._data_type or self._input.data_type @property def source_type(self): return self._source_data_type or self._input._source_data_type def rebuild(self): attr_dict = self._attr_dict() tp = self._copy_type() if isinstance(attr_dict['_source_data_type'], types.Unknown): attr_dict['_source_data_type'] = self._input.dtype return tp(**attr_dict) class Column(SequenceExpr): _args = '_input', @property def input(self): return self._input def rebuild(self): attr_dict = self._attr_dict() tp = self._copy_type() new_col = self.input[self.source_name] for attr in ('_source_data_type', '_data_type'): if isinstance(attr_dict[attr], types.Unknown): attr_dict[attr] = new_col.dtype return tp(**attr_dict) def accept(self, visitor): return visitor.visit_column(self) class CallableColumn(Column): def __call__(self, *args, **kwargs): return getattr(type(self._input), self.source_name)(self._input, *args, **kwargs)
[文档] class Scalar(TypedExpr): """ Represent for the scalar type. :param _value: value of the scalar :param _value_type: value type of the scalar :Example: >>> df[df, Scalar(4).rename('append_const')] """ __slots__ = '_value', '_value_type', '_source_value_type' @classmethod def _get_type(cls, *args, **kwargs): value = args[0] if len(args) > 0 else None value_type = args[1] if len(args) > 1 else None val = kwargs.get('_value') if val is None: val = value value_type = kwargs.get('_value_type', None) or value_type if val is None and value_type is None: raise ValueError('Either value or value_type should be provided') if val is not None and not isinstance(val, NodeMetaclass): return types.validate_value_type(val, value_type) else: return types.validate_data_type(value_type) @classmethod def _typed_classes(cls, *args, **kwargs): return _typed_scalar_exprs @classmethod def _base_class(cls, *args, **kwargs): return Scalar @classmethod def _transform(cls, *args, **kwargs): value = args[0] if len(args) > 0 else None value_type = args[1] if len(args) > 1 else None if ('_value' not in kwargs or kwargs['_value'] is None) and \ value is not None: kwargs['_value'] = value if ('_value_type' not in kwargs or kwargs['_value_type'] is None) and \ value_type is not None: kwargs['_value_type'] = types.validate_data_type(value_type) if kwargs.get('_value') is not None: kwargs['_value_type'] = types.validate_value_type(kwargs.get('_value'), kwargs.get('_value_type')) if '_source_name' not in kwargs: kwargs['_source_name'] = kwargs.get('_name') if '_source_value_type' in kwargs: kwargs['_source_value_type'] = types.validate_data_type(kwargs['_source_value_type']) else: kwargs['_source_value_type'] = kwargs['_value_type'] return kwargs def __new__(cls, *args, **kwargs): kwargs = cls._transform(*args, **kwargs) return super(Scalar, cls).__new__(cls, **kwargs) def _init(self, *args, **kwargs): self._init_attr('_name', None) self._init_attr('_value', None) kwargs = self._transform(*args, **kwargs) super(Scalar, self)._init(**kwargs) def equals(self, other): return super(Scalar, self).equals(other) @property def value(self): return self._value @property def value_type(self): return self._value_type @property def dtype(self): return self._value_type def output_type(self): return 'Scalar[%s]' % repr(self._value_type) def astype(self, value_type): value_type = types.validate_data_type(value_type) if value_type == self._value_type: return self attr_dict = dict() attr_dict['_input'] = self attr_dict['_value_type'] = value_type attr_dict['_source_value_type'] = self._source_value_type new_scalar = AsTypedScalar(**attr_dict) return new_scalar def to_sequence(self): if self._value is None: attr_values = dict((attr, getattr(self, attr)) for attr in get_attrs(self)) attr_values['_data_type'] = attr_values.pop('_value_type') if '_source_value_type' in attr_values: attr_values['_source_data_type'] = attr_values.pop('_source_value_type') del attr_values['_value'] cls = next(c for c in inspect.getmro(type(self))[1:] if c.__name__ == type(self).__name__ and not issubclass(c, Scalar)) seq = cls._new(**attr_values) return seq raise ExpressionError('Cannot convert valued scalar to sequence') def accept(self, visitor): visitor.visit_scalar(self) def get_cached(self, data): return Scalar(_value=data, _value_type=self.dtype)
class AsTypedScalar(Scalar): _args = '_input', node_name = "TypedScalar" def accept(self, visitor): return visitor.visit_cast(self) @property def name(self): return self._name or self._input.name @property def source_name(self): return self._source_name or self._input.source_name @property def value(self): return self._value @property def value_type(self): return self._value_type @property def dtype(self): return self._value_type def rebuild(self): attr_dict = self._attr_dict() tp = self._copy_type() if isinstance(attr_dict['_source_value_type'], types.Unknown): attr_dict['_source_value_type'] = self._input.dtype return tp(**attr_dict) @property def source_type(self): return self._source_value_type class BooleanScalar(Scalar): def _init(self, *args, **kwargs): super(BooleanScalar, self)._init(*args, **kwargs) self._value_type = types.boolean class Int8Scalar(Scalar): def _init(self, *args, **kwargs): super(Int8Scalar, self)._init(*args, **kwargs) self._value_type = types.int8 class Int16Scalar(Scalar): def _init(self, *args, **kwargs): super(Int16Scalar, self)._init(*args, **kwargs) self._value_type = types.int16 class Int32Scalar(Scalar): def _init(self, *args, **kwargs): super(Int32Scalar, self)._init(*args, **kwargs) self._value_type = types.int32 class Int64Scalar(Scalar): def _init(self, *args, **kwargs): super(Int64Scalar, self)._init(*args, **kwargs) self._value_type = types.int64 class Float32Scalar(Scalar): def _init(self, *args, **kwargs): super(Float32Scalar, self)._init(*args, **kwargs) self._value_type = types.float32 class Float64Scalar(Scalar): def _init(self, *args, **kwargs): super(Float64Scalar, self)._init(*args, **kwargs) self._value_type = types.float64 class DecimalScalar(Scalar): def _init(self, *args, **kwargs): super(DecimalScalar, self)._init(*args, **kwargs) self._value_type = types.decimal class StringScalar(Scalar): def _init(self, *args, **kwargs): super(StringScalar, self)._init(*args, **kwargs) self._value_type = types.string class DateScalar(Scalar): def _init(self, *args, **kwargs): super(DateScalar, self)._init(*args, **kwargs) self._value_type = types.date class DatetimeScalar(Scalar): def _init(self, *args, **kwargs): super(DatetimeScalar, self)._init(*args, **kwargs) self._value_type = types.datetime class TimestampScalar(Scalar): def _init(self, *args, **kwargs): super(TimestampScalar, self)._init(*args, **kwargs) self._value_type = types.timestamp class BinaryScalar(Scalar): def _init(self, *args, **kwargs): super(BinaryScalar, self)._init(*args, **kwargs) self._value_type = types.binary class JsonScalar(Scalar): def _init(self, *args, **kwargs): super(JsonScalar, self)._init(*args, **kwargs) self._value_type = types.json class ListScalar(Scalar): pass class DictScalar(Scalar): pass class StructScalar(Scalar): pass class UnknownScalar(Scalar): def _init(self, *args, **kwargs): super(UnknownScalar, self)._init(*args, **kwargs) if not isinstance(self._value_type, types.Unknown): self._value_type = types.Unknown() _typed_scalar_exprs = [globals()[t.__class__.__name__ + Scalar.__name__] for t in types._data_types.values()] number_scalars = [globals().get(repr(t).capitalize() + Scalar.__name__) for t in types.number_types()] int_number_scalars = [globals().get(repr(t).capitalize() + Scalar.__name__) for t in types.number_types() if repr(t).startswith('int')] class BuiltinFunction(Scalar): __slots__ = '_func_name', '_func_args', '_func_kwargs' def __init__(self, name=None, rtype=None, args=(), **kwargs): rtype = rtype or kwargs.pop('_value_type', types.string) rtype = types.validate_data_type(rtype) func_name = name or kwargs.pop('_func_name', None) func_args = args or kwargs.pop('_func_args', ()) super(BuiltinFunction, self).__init__(_func_name=func_name, _func_args=func_args, _value_type=rtype, **kwargs) def accept(self, visitor): visitor.visit_builtin_function(self)
[文档] class RandomScalar(BuiltinFunction): """ Represent for the random scalar type. :param seed: random seed, None by default :Example: >>> df[df, RandomScalar().rename('append_random')] """ def __new__(cls, seed=None, **kw): args = (seed, ) if seed is not None else kw.get('_func_args', ()) kw.update(dict(_func_name='rand', _value_type='float', _func_args=args)) return BuiltinFunction.__new__(cls, **kw) def __init__(self, seed=None, **kw): args = (seed, ) if seed is not None else kw.get('_func_args', ()) kw.update(dict(_func_name='rand', _value_type='float', _func_args=args)) super(RandomScalar, self).__init__(**kw)
class FilterCollectionExpr(CollectionExpr): _args = '_input', '_predicate' node_name = 'Filter' def _init(self, *args, **kwargs): super(FilterCollectionExpr, self)._init(*args, **kwargs) if self._schema is None: self._schema = self._input.schema def iter_args(self): for it in zip(['collection', 'predicate'], self.args): yield it @property def input(self): return self._input def rebuild(self): rebuilt = super(FilterCollectionExpr, self).rebuild() rebuilt._schema = rebuilt.input.schema return rebuilt def accept(self, visitor): visitor.visit_filter_collection(self) class ProjectCollectionExpr(CollectionExpr): __slots__ = '_raw_fields', _args = '_input', '_fields' _extra_args = '_raw_fields', node_name = 'Projection' def _init(self, *args, **kwargs): fields = kwargs.get('_fields') if fields is None and len(args) >= 2: fields = args[1] for field in fields: if field.name is None: raise ExpressionError('Column does not have a name, ' 'please specify one by `rename`: %s' % repr_obj(field._repr())) self._init_attr('_raw_fields', None) super(ProjectCollectionExpr, self)._init(*args, **kwargs) def _set_field(self, value, value_dag=None): from ..backends.context import context from .window import Window if context.is_cached(self): super(ProjectCollectionExpr, self)._set_field(value, value_dag=value_dag) return if value_dag is None: value_dag = value.to_dag(copy=False, validate=False) for n in value.traverse(top_down=True, unique=True, stop_cond=lambda x: isinstance(x, Column)): if isinstance(n, Column) and (n.input is self or n.input._proxy is self): source_name = n.source_name idx = self._schema._name_indexes[source_name] field = self._fields[idx] if field.name != n.name: field = field.rename(n.name) value_dag.substitute(n, field) elif isinstance(n, Window) and n.input is self: # Window object like rank will point to collection directly instead of through column n._input = self.input value = value_dag.root fields = [field if field.name != value.name else value for field in self._fields] if value.name not in self._schema: fields.append(value) self._schema = TableSchema.from_lists( [f.name for f in fields], [f.dtype for f in fields] ) self._fields = fields def _delitem(self, key): from ..backends.context import context if context.is_cached(self): return super(ProjectCollectionExpr, self).__delitem__(key) fields = [f for f in self._fields if f.name != key] self._schema = TableSchema.from_lists( [f.name for f in fields], [f.dtype for f in fields] ) self._fields = fields def __delitem__(self, key): if key not in self._schema: raise KeyError('Field({0}) does not exist'.format(key)) self._delitem(key) @property def _project_fields(self): return self._fields def iter_args(self): for it in zip(['collection', 'selections'], self.args): yield it @property def input(self): return self._input @property def fields(self): return self._fields def rebuild(self): if self._raw_fields: return self._input.select(*self._raw_fields) rebuilt = super(ProjectCollectionExpr, self).rebuild() rebuilt._schema = TableSchema.from_lists( [f.name for f in rebuilt._fields], [f.dtype for f in rebuilt._fields] ) return rebuilt def accept(self, visitor): visitor.visit_project_collection(self) class LateralViewCollectionExpr(ProjectCollectionExpr): _args = '_input', '_fields', '_lateral_views' node_name = 'LateralView' def _init(self, *args, **kwargs): self._init_attr('_lateral_views', None) super(LateralViewCollectionExpr, self)._init(*args, **kwargs) self._redirect_lateral_views() def _redirect_lateral_views(self): # dealing with cases like df[df['a', 'b'].apply(..., axis=1), df.c] for lv in self.lateral_views: if lv.input is self.input: continue for f, f_o in zip(lv._fields, lv.input._fields): lv.substitute(f, f_o) lv.substitute(lv.input, self.input) def iter_args(self): for it in zip(['collection', 'selections', 'lateral_views'], self.args): yield it @property def lateral_views(self): return self._lateral_views def accept(self, visitor): visitor.visit_lateral_view(self) class FilterPartitionCollectionExpr(CollectionExpr): __slots__ = '_predicate_string', _args = '_input', '_predicate', '_fields' node_name = 'FilterPartition' def _init(self, *args, **kwargs): super(FilterPartitionCollectionExpr, self)._init(*args, **kwargs) self._fields = [self._input[col.name] for col in self._schema.columns] self._predicate_string = kwargs.get('_predicate_string') @property def _project_fields(self): return self._fields def iter_args(self): for it in zip(['collection', 'predicate', 'selections'], self.args): yield it @property def input(self): return self._input @property def fields(self): return self._fields @property def predicate_string(self): return self._predicate_string def accept(self, visitor): visitor.visit_filter_partition_collection(self) @property def data_table(self): return self.input.data @property def data(self): from ...types import PartitionSpec from .arithmetic import Or if isinstance(self._predicate, Or): raise AttributeError('Cannot get data when predicate contains multiple partition specs.') spec = PartitionSpec(self.predicate_string) return self.data_table.partitions[spec] class SliceCollectionExpr(CollectionExpr): _args = '_input', '_indexes' node_name = 'Slice' def _init(self, *args, **kwargs): super(SliceCollectionExpr, self)._init(*args, **kwargs) if isinstance(self._indexes, slice): scalar = lambda v: Scalar(_value=v) if v is not None else None self._indexes = scalar(self._indexes.start), \ scalar(self._indexes.stop), scalar(self._indexes.step) @property def start(self): res = self._indexes[0] return res.value if res is not None else None @property def stop(self): res = self._indexes[1] return res.value if res is not None else None @property def step(self): res = self._indexes[2] return res.value if res is not None else None @property def input(self): return self._input def iter_args(self): args = [self._input] + list(self._indexes) for it in zip(['collection', 'start', 'stop', 'step'], args): yield it def rebuild(self): rebuilt = super(SliceCollectionExpr, self).rebuild() rebuilt._schema = self.input.schema return rebuilt def accept(self, visitor): visitor.visit_slice_collection(self) class Summary(CollectionExpr): __slots__ = '_schema', _args = '_input', '_fields' def _init(self, *args, **kwargs): super(Summary, self)._init(*args, **kwargs) if hasattr(self, '_schema') and any(it is None for it in self._schema.names): raise TypeError('Schema cannot has field which name is None') @property def input(self): return self._input @property def fields(self): return self._fields @property def schema(self): return self._schema def iter_args(self): for it in zip(['collection', 'fields'], self.args): yield it def accept(self, visitor): visitor.visit_project_collection(self) from . import element from . import arithmetic from . import reduction from . import groupby from . import collections from . import window from . import math from . import strings from . import datetimes from . import merge from . import composites from ..tools import * # hack for count def _count(expr, *args, **kwargs): if len(args) + len(kwargs) > 0: from .strings import _count return _count(expr, *args, **kwargs) else: from .reduction import count return count(expr) StringSequenceExpr.count = _count