odps.df.expr.expressions 源代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
import functools
import operator
import sys
from collections import defaultdict, OrderedDict
from collections.abc import Iterable
from concurrent.futures import Future
from functools import reduce

from .core import Node, NodeMetaclass
from .errors import ExpressionError
from .utils import get_attrs, is_called_by_inspector, highest_precedence_data_type, new_id, \
    is_changed, get_proxied_expr
from .. import types
from ...config import options
from ...errors import NoSuchObject, DependencyNotInstalledError
from ...utils import TEMP_TABLE_PREFIX, to_binary, to_lower_str, deprecated, survey, isvalidattr
from ...models import TableSchema


class ReprWrapper:
    def __init__(self, func, repr):
        self._func = func
        self._repr = repr
        functools.update_wrapper(self, func)

    def __call__(self, *args, **kwargs):
        return self._func(*args, **kwargs)

    def __repr__(self):
        return self._repr(self._func)


def _wrap_method_repr(func):
    def inner(*args, **kwargs):
        obj = func(*args, **kwargs)
        if inspect.ismethod(obj):
            def _repr(x):
                instance = getattr(x, 'im_self', getattr(x, '__self__', None))
                method = 'bound method' if instance is not None else 'unbound method'
                if instance is not None:
                    return '<%(method)s %(instance)s.%(name)s>' % {
                        'method': method,
                        'instance': getattr(instance, 'node_name', instance.__class__.__name__),
                        'name': x.__name__
                    }
                else:
                    return '<function __main__.%s>' % x.__name__

            return ReprWrapper(obj, _repr)
        return obj

    return inner


def make_max_pt_function(expr=None):
    from ...models import Table
    from .. import func

    def max_pt(table_name=None):
        if isinstance(getattr(expr, 'data', None), Table):
            table_name = table_name or expr.data.name
        return func.max_pt(table_name)

    return max_pt


def repr_obj(obj):
    if hasattr(obj, '_repr'):
        try:
            return obj._repr()
        except:
            return object.__repr__(obj)
    elif isinstance(obj, (tuple, list)):
        return ','.join(repr_obj(it) for it in obj)

    return obj


class Expr(Node):
    __slots__ = '_deps', '_ban_optimize', '_engine', '_need_cache', '_mem_cache', '__execution', '_id'

    def _init(self, *args, **kwargs):
        """
        _deps is used for common dependencies.
        When a expr depend on other exprs, and the expr is not calculated from the others,
        the _deps are specified to identify the dependencies.
        """
        self._init_attr('_deps', None)

        self._init_attr('_ban_optimize', False)
        self._init_attr('_engine', None)
        self._init_attr('_Expr__execution', None)

        self._init_attr('_need_cache', False)
        self._init_attr('_mem_cache', False)

        if '_id' not in kwargs:
            kwargs['_id'] = new_id()

        super(Expr, self)._init(*args, **kwargs)

    def __repr__(self):
        if not options.interactive or is_called_by_inspector():
            return self._repr()
        else:
            if isinstance(self.__execution, Exception):
                self.__execution = None
            if self.__execution is None:
                try:
                    self.__execution = self.execute()
                except Exception as e:
                    self.__execution = e
                    raise
            return self.__execution.__repr__()

    def _repr_html_(self):
        if not options.interactive:
            return '<code>' + repr(self) + '</code>'
        else:
            if self.__execution is None:
                self.__execution = self.execute()
            else:
                if isinstance(self.__execution, Exception):
                    try:
                        return
                    finally:
                        self.__execution = None
            if hasattr(self.__execution, '_repr_html_'):
                return self.__execution._repr_html_()
            return repr(self.__execution)

    def _handle_delay_call(self, method, *args, **kwargs):
        delay = kwargs.pop('delay', None)
        if delay is not None:
            future = delay.register_item(method, *args, **kwargs)
            return future
        else:
            from ..engines import get_default_engine
            engine = get_default_engine(self)

            wrapper = kwargs.pop('wrapper', None)
            result = getattr(engine, method)(*args, **kwargs)
            if wrapper is None:
                return result
            async_ = kwargs.get('async_', kwargs.get('async', False))
            if async_:
                user_future = Future()

                def _relay(f):
                    try:
                        user_future.set_result(wrapper(f.result()))
                    except:
                        user_future.set_exception(f.exception())

                result.add_done_callback(_relay)
                return user_future
            else:
                return wrapper(result)

    def visualize(self):
        from ..engines import get_default_engine

        engine = get_default_engine(self)
        return engine.visualize(self)

    def execute(self, **kwargs):
        """
        :param hints: settings for SQL, e.g. `odps.sql.mapper.split.size`
        :type hints: dict
        :param priority: instance priority, 9 as default
        :type priority: int
        :param running_cluster: cluster to run this instance
        :return: execution result
        :rtype: :class:`odps.df.backends.frame.ResultFrame`
        """
        _wrapper = kwargs.pop('wrapper', None)

        def wrapper(result):
            self.__execution = result
            if _wrapper is not None:
                return _wrapper(result)
            else:
                return result

        return self._handle_delay_call('execute', self, wrapper=wrapper, **kwargs)

    def compile(self):
        """
        Compile this expression into an ODPS SQL

        :return: compiled DAG
        :rtype: str
        """

        from ..engines import get_default_engine

        engine = get_default_engine(self)
        return engine.compile(self)

    def persist(self, name, partitions=None, partition=None, lifecycle=None, project=None, **kwargs):
        """
        Persist the execution into a new table. If `partitions` not specified,
        will create a new table without partitions if the table does not exist,
        and insert the SQL result into it.
        If `partitions` are specified, they will be the partition fields of the new table.
        If `partition` is specified, the data will be inserted into the exact partition of the table.

        :param name: table name
        :param partitions: list of string, the partition fields
        :type partitions: list
        :param partition: persist to a specified partition
        :type partition: string or PartitionSpec
        :param lifecycle: table lifecycle. If absent, `options.lifecycle` will be used.
        :type lifecycle: int
        :param project: project name, if not provided, will be the default project
        :param hints: settings for SQL, e.g. `odps.sql.mapper.split.size`
        :type hints: dict
        :param priority: instance priority, 9 as default
        :type priority: int
        :param running_cluster: cluster to run this instance
        :param overwrite: overwrite the table, True as default
        :type overwrite: bool
        :param drop_table: drop table if exists, False as default
        :type drop_table: bool
        :param create_table: create table first if not exits, True as default
        :type create_table: bool
        :param drop_partition: drop partition if exists, False as default
        :type drop_partition: bool
        :param create_partition: create partition if not exists, None as default
        :type create_partition: bool
        :param cast: cast all columns' types as the existed table, False as default
        :type cast: bool
        :return: :class:`odps.df.DataFrame`

        :Example:

        >>> df = df['name', 'id', 'ds']
        >>> df.persist('odps_new_table')
        >>> df.persist('odps_new_table', partition='pt=test')
        >>> df.persist('odps_new_table', partitions=['ds'])
        """
        if lifecycle is None and options.lifecycle is not None:
            lifecycle = \
                options.lifecycle if not name.startswith(TEMP_TABLE_PREFIX) \
                    else options.temp_lifecycle

        return self._handle_delay_call('persist', self, name, partitions=partitions, partition=partition,
                                       lifecycle=lifecycle, project=project, **kwargs)

    def cache(self, mem=False):
        self._need_cache = True
        self._mem_cache = mem
        self._ban_optimize = True
        return self

    def uncache(self):
        self._need_cache = False
        self._mem_cache = False

        from ..backends.context import context
        context.uncache(self)

    def verify(self):
        """
        Verify if this expression can be compiled into ODPS SQL.

        :return: True if compilation succeed else False
        :rtype: bool
        """

        try:
            self.compile()

            return True
        except:
            return False

    def _repr(self):
        from .formatter import ExprFormatter

        formatter = ExprFormatter(self)
        return formatter()

    def ast(self):
        """
        Return the AST string.

        :return: AST tree
        :rtype: str
        """

        return self._repr()

    @_wrap_method_repr
    def __getattribute__(self, attr):
        try:
            return super(Expr, self).__getattribute__(attr)
        except AttributeError:
            if not attr.startswith('_'):
                new_attr = '_%s' % attr
                if new_attr in object.__getattribute__(self, '_args_indexes'):
                    try:
                        return object.__getattribute__(self, new_attr)
                    except AttributeError:
                        return

            raise

    def _defunc(self, field):
        return field(self) if inspect.isfunction(field) else field

    @property
    def optimize_banned(self):
        return self._ban_optimize

    @optimize_banned.setter
    def optimize_banned(self, val):
        self._ban_optimize = val

    @property
    def args(self):
        if not self._deps:
            return super(Expr, self).args
        return super(Expr, self).args + self.deps

    @property
    def deps(self):
        if self._deps is None:
            return
        return tuple(dep if not isinstance(dep, tuple) else dep[0]
                     for dep in self._deps)

    def add_deps(self, *deps):
        dependencies = []
        if len(deps) == 1 and isinstance(deps[0], Iterable):
            dependencies.append([d for d in dependencies
                                 if isinstance(d, Expr)])
        else:
            dependencies.extend(deps)

        if getattr(self, '_deps', None) is None:
            self._deps = dependencies
        else:
            self._deps.extend(dependencies)

    def substitute(self, old_arg, new_arg, dag=None):
        super(Expr, self).substitute(old_arg, new_arg, dag=dag)

        new_deps = []
        if self._deps is not None:
            for dep in self._deps:
                if isinstance(dep, tuple):
                    node = dep[0]
                    if node is old_arg:
                        new_deps.append((new_arg, ) + dep[1:])
                    else:
                        new_deps.append(dep)
                else:
                    if dep is old_arg:
                        new_deps.append(new_arg)
                    else:
                        new_deps.append(dep)

        self._deps = new_deps

    def rebuild(self):
        # used in the dynamic setting, do nothing by default
        # `rebuild` will copy itself, and apply all changes to the new one
        return self.copy()

    def __hash__(self):
        return self._node_id * hash(Expr)

    def __eq__(self, other):
        try:
            return self._eq(other)
        except AttributeError:
            # Due to current complexity of parent's eq,
            # by now, every expression is unequal
            return self is other
        except TypeError:
            if self._eq is None:
                return self is other
            raise

    def __ne__(self, other):
        try:
            return self._ne(other)
        except AttributeError:
            return not super(Expr, self).__eq__(other)
        except TypeError:
            if self._ne is None:
                return not super(Expr, self).__eq__(other)
            raise

    def __lt__(self, other):
        return self._lt(other)

    def __le__(self, other):
        return self._le(other)

    def __gt__(self, other):
        return self._gt(other)

    def __ge__(self, other):
        return self._ge(other)

    def __add__(self, other):
        return self._add(other)

    def __radd__(self, other):
        return self._radd(other)

    def __mul__(self, other):
        return self._mul(other)

    def __rmul__(self, other):
        return self._rmul(other)

    def __div__(self, other):
        return self._div(other)

    def __rdiv__(self, other):
        return self._rdiv(other)

    __truediv__ = __div__
    __rtruediv__ = __rdiv__

    def __floordiv__(self, other):
        return self._floordiv(other)

    def __rfloordiv__(self, other):
        return self._rfloordiv(other)

    def __mod__(self, other):
        return self._mod(other)

    def __rmod__(self, other):
        return self._rmod(other)

    def __sub__(self, other):
        return self._sub(other)

    def __rsub__(self, other):
        return self._rsub(other)

    def __pow__(self, power):
        return self._pow(power)

    def __rpow__(self, power):
        return self._rpow(power)

    def __or__(self, other):
        return self._or(other)

    def __ror__(self, other):
        return self._ror(other)

    def __and__(self, other):
        return self._and(other)

    def __rand__(self, other):
        return self._rand(other)

    def __neg__(self):
        return self._neg()

    def __invert__(self):
        return self._invert()

    def __abs__(self):
        return self._abs()


[文档] class CollectionExpr(Expr): """ Collection represents for the two-dimensions data. :Example: >>> # projection >>> df = DataFrame(o.get_table('my_table')) # DataFrame is actually a CollectionExpr >>> df['name', 'id'] # projection some columns >>> df[[df.name, df.id]] # projection >>> df[df] # means nothing, but get all the columns >>> df[df, df.name.lower().rename('name2')] # projection a new columns `name2` besides all the original columns >>> df.select(df, name2=df.name.lower()) # projection by `select` >>> df.exclude('name') # projection all columns but `name` >>> df[df.exclude('name'), df.name.lower()] # `name` will not conflict any more >>> >>> # filter >>> df[(df.id < 3) & (df.name != 'test')] >>> df.filter(df.id < 3, df.name != 'test') >>> >>> # slice >>> df[: 10] >>> df.limit(10) >>> >>> # Sequence >>> df.name # an instance of :class:`odps.df.expr.expressions.SequenceExpr` >>> >>> # schema or dtypes >>> df.dtypes odps.Schema { name string id int64 } >>> df.schema odps.Schema { name string id int64 } """ __slots__ = ( '_schema', '_source_data', '_proxy', '_ml_fields_cache', '_ml_uplink', '_ml_operations', ) node_name = 'Collection' def _init(self, *args, **kwargs): self._init_attr('_source_data', None) self._init_attr('_proxy', None) self._init_attr('_ml_fields_cache', None) self._init_attr('_ml_uplink', []) self._init_attr('_ml_operations', []) super(CollectionExpr, self)._init(*args, **kwargs) if hasattr(self, '_schema') and any(it is None for it in self._schema.names): raise TypeError('Schema cannot has field whose name is None') def __dir__(self): cur_dir = object.__dir__(self) dir_set = set(cur_dir) | set([c.name for c in self.schema if isvalidattr(c.name)]) return sorted(dir_set) def __getitem__(self, item): item = self._defunc(item) if isinstance(item, tuple): item = list(item) if isinstance(item, CollectionExpr): item = [item, ] if isinstance(item, str): return self._get_field(item) elif isinstance(item, slice): if item.start is None and item.stop is None and item.step is None: return self return self._slice(item) elif isinstance(item, list): return self._project(item) else: field = self._get_field(item) if isinstance(field, BooleanSequenceExpr): return self.filter(item) if isinstance(item, SequenceExpr): raise ExpressionError('No boolean sequence found for filtering, ' 'a tuple or list is required for projection') raise ExpressionError('Not supported projection: collection[%s]' % repr_obj(item)) def _set_field(self, value, value_dag=None): expr = self.copy() if self._proxy is None else self._proxy._input if value_dag is None: value_dag = value.to_dag(copy=False, validate=False) if value_dag.contains_node(self): value_dag.substitute(self, expr) value = value_dag.root if self._proxy is not None: self._proxy._setitem(value, value_dag=value_dag) return fields = [f if f != value.name else value for f in self._schema.names] if value.name not in self._schema: fields.append(value) # make the isinstance to check the proxy type self.__class__ = type(self.__class__.__name__, (self.__class__,), {}) self._proxy = expr.select(fields) def __setitem__(self, key, value): if not isinstance(value, Expr) and value is not None: value = Scalar(value) if not isinstance(key, tuple): if value is None: raise ValueError('Cannot determine type for column %s with None value.' % key) column_name = key value = value.rename(column_name) else: conds = key[:-1] conds = [self._defunc(c) for c in conds] if not all(isinstance(c.dtype, types.Boolean) for c in conds): raise ValueError('Conditions should be boolean expressions or boolean columns') if len(conds) == 1: cond = conds[0] else: cond = reduce(operator.and_, conds) column_name = key[-1] if column_name not in self._schema: if value is None: raise ValueError('Cannot determine type for column %s with None value.' % key) default_col = Scalar(_value_type=value.dtype) else: default_col = self[column_name] if value is None: value = Scalar(_value_type=default_col.dtype) value = cond.ifelse(value, default_col).rename(column_name) self._set_field(value) def __delitem__(self, key): if key not in self.schema: raise KeyError('Field({0}) does not exist'.format(key)) if self._proxy is not None: return self._proxy._delitem(key) fields = [n for n in self._schema.names if n != key] # make the instance to check the proxy type self.__class__ = type(self.__class__.__name__, (self.__class__,), {}) self._proxy = self.copy().select(fields) def __delattr__(self, item): if item in self._schema: return self.__delitem__(item) super(CollectionExpr, self).__delattr__(item) def __setattr__(self, key, value): try: if key != '_proxy' and object.__getattribute__(self, '_proxy'): return setattr(self._proxy, key, value) except AttributeError: pass Expr.__setattr__(self, key, value) def __getattribute__(self, attr): try: proxy = object.__getattribute__(self, '_proxy') if attr == '_proxy': return proxy if proxy: # delegate everything to proxy object return getattr(proxy, attr) except AttributeError: pass try: if to_lower_str(attr) in object.__getattribute__(self, '_schema')._name_indexes: cls_attr = getattr(type(self), attr, None) if cls_attr is None or inspect.ismethod(cls_attr) or inspect.isfunction(cls_attr): return self[attr] except AttributeError: pass return super(CollectionExpr, self).__getattribute__(attr)
[文档] def query(self, expr): """ Query the data with a boolean expression. :param expr: the query string, you can use '@' character refer to environment variables. :return: new collection :rtype: :class:`odps.df.expr.expressions.CollectionExpr` """ from .query import CollectionVisitor if not isinstance(expr, str): raise ValueError('expr must be a string') frame = sys._getframe(2).f_locals try: env = frame.copy() finally: del frame env['max_pt'] = ReprWrapper(make_max_pt_function(self), repr) visitor = CollectionVisitor(self, env) predicate = visitor.eval(expr) return self.filter(predicate)
[文档] def filter(self, *predicates): """ Filter the data by predicates :param predicates: the conditions to filter :return: new collection :rtype: :class:`odps.df.expr.expressions.CollectionExpr` """ predicates = self._get_fields(predicates) predicate = reduce(operator.and_, predicates) return FilterCollectionExpr(self, predicate, _schema=self._schema)
@deprecated('The function `filter_partition` is deprecated, please use `filter_parts` instead' "and change the predicate parameter into `pt1=1,pt2=2/pt1=2,pt2=1` form.") @survey def filter_partition(self, predicate='', exclude=True): if isinstance(predicate, str): part_reprs = '/'.join(','.join(p.split('/')) for p in predicate.split(',')) else: part_reprs = predicate return self.filter_parts(part_reprs, exclude)
[文档] def filter_parts(self, predicate='', exclude=True): """ Filter the data by partition string. A partition string looks like `pt1=1,pt2=2/pt1=2,pt2=1`, where comma (,) denotes 'and', while (/) denotes 'or'. :param str|Partition predicate: predicate string of partition filter :param bool exclude: True if you want to exclude partition fields, otherwise False. True for default. :return: new collection :rtype: :class:`odps.df.expr.expressions.CollectionExpr` """ source = self._source_data if source is None: raise ExpressionError('Can only filter on data sources.') def _parse_partition_predicate(p): if '=' not in p: raise ExpressionError('Illegal partition predicate.') field_name, field_value = [s.strip() for s in p.split('=', 1)] if not hasattr(source, 'table_schema'): raise ExpressionError('filter_partition can only be applied on ODPS DataFrames') if field_name not in source.table_schema: raise ExpressionError('Column `%s` not exists in input collection' % field_name) if field_name not in source.table_schema._partition_schema: raise ExpressionError('`%s` is not a partition column' % field_name) part_col = self[field_name] if field_value.startswith('\'') or field_value.startswith('\"'): encoding = 'unicode-escape' field_value = to_binary(field_value.strip('"\'')).decode(encoding) if isinstance(part_col.data_type, types.Integer): field_value = int(field_value) elif isinstance(part_col.data_type, types.Float): field_value = float(field_value) return part_col == field_value from ...models.partition import Partition from ...types import PartitionSpec if isinstance(predicate, Partition): predicate = predicate.partition_spec if isinstance(predicate, PartitionSpec): predicate = ','.join("%s='%s'" % (k, v) for k, v in predicate.kv.items()) if isinstance(predicate, list): predicate = '/'.join(str(s) for s in predicate) elif not isinstance(predicate, str): raise ExpressionError('Only accept string predicates.') if not predicate: predicate_obj = None else: part_formatter = lambda p: reduce(operator.and_, map(_parse_partition_predicate, p.split(','))) predicate_obj = reduce(operator.or_, map(part_formatter, predicate.split('/'))) if not source.table_schema.partitions: raise ExpressionError('No partition columns in the collection.') if exclude: columns = [ c for c in self.schema if c.name not in source.table_schema._partition_schema ] new_schema = types.TableSchema.from_lists([c.name for c in columns], [c.type for c in columns]) return FilterPartitionCollectionExpr(self, predicate_obj, _schema=new_schema, _predicate_string=predicate) else: return self.filter(predicate_obj)
def _validate_field(self, field): if not isinstance(field, SequenceExpr): return True if not field.is_ancestor(self): return False for path in field.all_path(self): if any(isinstance(n, CollectionExpr) for n in path[1: -1]): return False from .reduction import GroupedSequenceReduction if any(isinstance(n, GroupedSequenceReduction) for n in path): return False return True @staticmethod def _backtrack_field(field, collection): from .window import RankOp for col in field.traverse(top_down=True, unique=True, stop_cond=lambda x: isinstance(x, (Column, RankOp))): if isinstance(col, Column): changed = is_changed(collection, col) if changed or changed is None: return elif col.source_name not in collection.schema: return else: col.substitute(col._input, collection) elif isinstance(col, RankOp) and col._input is not collection: col.substitute(col._input, collection) return field def _get_field(self, field): from .reduction import GroupedSequenceReduction field = self._defunc(field) if isinstance(field, str): if field not in self._schema: raise ValueError('Field(%s) does not exist, please check schema' % field) cls = Column if callable(getattr(type(self), field, None)): cls = CallableColumn return cls(self, _name=field, _data_type=self._schema[field].type) if not self._validate_field(field): new_field = None if not isinstance(field, GroupedSequenceReduction): # the reduction is not allowed new_field = self._backtrack_field(field, self) if new_field is None: raise ExpressionError('Cannot support projection on %s' % repr_obj(field)) field = new_field return field def _get_fields(self, fields, ret_raw_fields=False): selects = [] raw_selects = [] for field in fields: field = self._defunc(field) if isinstance(field, CollectionExpr): if any(c is self for c in field.children()): selects.extend(self._get_fields(field._project_fields)) else: selects.extend(self._get_fields(field._fetch_fields())) raw_selects.append(field) else: select = self._get_field(field) selects.append(select) raw_selects.append(select) if ret_raw_fields: return selects, raw_selects return selects @classmethod def _backtrack_lateral_view(cls, lv, collection): if isinstance(lv.input, ProjectCollectionExpr): src_inputs = (lv.input, lv.input.input) else: src_inputs = (lv.input,) lv_copy = lv.copy(_id=None, _lateral_view=True) cur = collection while cur not in src_inputs and isinstance(cur, (ProjectCollectionExpr, FilterCollectionExpr)): cur = cur.input if cur not in src_inputs: raise ExpressionError("Input of 'apply' in lateral views can only be " "simple column selections") if cur is lv.input: apply_src = lv_copy else: apply_src = lv_copy.input if collection is apply_src.input: return lv_copy for f in apply_src._fields: cls._backtrack_field(f, collection) lv_copy.substitute(apply_src.input, collection) return lv_copy def _filter_lateral_views(self, fields): from .collections import RowAppliedCollectionExpr from .merge import JoinFieldMergedCollectionExpr, JoinCollectionExpr, UnionCollectionExpr walk_through_collections = (JoinCollectionExpr, UnionCollectionExpr, JoinFieldMergedCollectionExpr) lateral_views = [] for field in fields: if not isinstance(field, RowAppliedCollectionExpr): continue else: is_lv = True stop_cond = lambda n: n is not self and isinstance(n, CollectionExpr) \ and not isinstance(n, walk_through_collections) for coll in self.traverse(unique=True, stop_cond=stop_cond): if coll is field: is_lv = False break if is_lv: lateral_views.append(field) return lateral_views def _project(self, fields): field_lvs = self._filter_lateral_views(fields) lv_id_set = set(id(f) for f in field_lvs) selects = [] lateral_views = [] for idx, field in enumerate(fields): if id(field) not in lv_id_set: s = self._get_fields([field]) selects.extend(s) else: lv = self._backtrack_lateral_view(field, self) lateral_views.append(lv) selects.extend(lv.columns) names = [f.name for f in selects] typos = [f.dtype for f in selects] if not lateral_views and all(isinstance(it, Scalar) for it in selects): return self._summary(selects) if len(names) != len(set(names)): counts = defaultdict(lambda: 0) for n in names: counts[n] += 1 raise ExpressionError('Duplicate column names: %s' % ', '.join(n for n in counts if counts[n] > 1)) if lateral_views: return LateralViewCollectionExpr(self, _fields=selects, _lateral_views=lateral_views, _schema=types.TableSchema.from_lists(names, typos)) else: return ProjectCollectionExpr(self, _fields=selects, _schema=types.TableSchema.from_lists(names, typos))
[文档] def select(self, *fields, **kw): """ Projection columns. Remember to avoid column names' conflict. :param fields: columns to project :param kw: columns and their names to project :return: new collection :rtype: :class:`odps.df.expr.expression.CollectionExpr` """ if len(fields) == 1 and isinstance(fields[0], list): fields = fields[0] else: fields = list(fields) if kw: def handle(it): it = self._defunc(it) if not isinstance(it, Expr): it = Scalar(it) return it fields.extend([handle(f).rename(new_name) for new_name, f in kw.items()]) return self._project(fields)
[文档] def exclude(self, *fields): """ Projection columns which not included in the fields :param fields: field names :return: new collection :rtype: :class:`odps.df.expr.expression.CollectionExpr` """ if len(fields) == 1 and isinstance(fields[0], list): exclude_fields = fields[0] else: exclude_fields = list(fields) exclude_fields = [self._defunc(it) for it in exclude_fields] exclude_fields = [field.name if not isinstance(field, str) else field for field in exclude_fields] fields = [name for name in self._schema.names if name not in exclude_fields] return self._project(fields)
def _summary(self, fields): names = [field if isinstance(field, str) else field.name for field in fields] typos = [self._schema.get_type(field) if isinstance(field, str) else field.dtype for field in fields] if None in names: raise ExpressionError('Column does not have a name, ' 'please specify one by `rename`') return Summary(_input=self, _fields=fields, _schema=types.TableSchema.from_lists(names, typos)) def _slice(self, slices): return SliceCollectionExpr(self, _indexes=slices, _schema=self._schema) @property def schema(self): return self._schema @property def odps_schema(self): from ..backends.odpssql.types import df_schema_to_odps_schema return df_schema_to_odps_schema(self.schema) @property def columns(self): """ :return: columns :rtype: list which each element is a Column """ return [self[n] for n in self._schema.names] def _fetch_fields(self): return [self._get_field(name) for name in self._schema.names] @property def _project_fields(self): return self._fetch_fields() def _data_source(self): if hasattr(self, '_source_data') and self._source_data is not None: yield self._source_data def __getattr__(self, attr): try: obj = super(CollectionExpr, self).__getattribute__(attr) return obj except AttributeError as e: if to_lower_str(attr) in object.__getattribute__(self, '_schema')._name_indexes: return self[attr] raise e def output_type(self): return 'collection'
[文档] def limit(self, n): """ limit n records :param n: n records :return: """ return self[:n]
[文档] def head(self, n=None, **kwargs): """ Return the first n rows. Execute at once. :param n: :return: result frame :rtype: :class:`odps.df.backends.frame.ResultFrame` """ if n is None: n = options.display.max_rows return self._handle_delay_call('execute', self, head=n, **kwargs)
[文档] def tail(self, n=None, **kwargs): """ Return the last n rows. Execute at once. :param n: :return: result frame :rtype: :class:`odps.df.backends.frame.ResultFrame` """ if n is None: n = options.display.max_rows return self._handle_delay_call('execute', self, tail=n, **kwargs)
[文档] def to_pandas(self, wrap=False, **kwargs): """ Convert to pandas DataFrame. Execute at once. :param wrap: if True, wrap the pandas DataFrame into a PyODPS DataFrame :return: pandas DataFrame """ try: import pandas as pd except (ImportError, ValueError): raise DependencyNotInstalledError('to_pandas requires `pandas` library') def wrapper(result): res = result.values if wrap: from .. import DataFrame return DataFrame(res, schema=self.schema) return res return self.execute(wrapper=wrapper, **kwargs)
@property def dtypes(self): return self.schema
[文档] def view(self): """ Clone a same collection. useful for self-join. :return: """ proxied = get_proxied_expr(self) kv = dict((attr, getattr(proxied, attr)) for attr in get_attrs(proxied)) return type(proxied)(**kv)
def describe(self): from .. import output numeric_methods = OrderedDict([ ('count', lambda f: f.count().rename(col.name + '_count')), ('mean', lambda f: f.mean()), ('std', lambda f: f.std()), ('min', lambda f: f.min()), ('quantile_25', lambda f: f.quantile(0.25).rename(col.name + '_quantile_25')), ('quantile_50', lambda f: f.quantile(0.5).rename(col.name + '_quantile_50')), ('quantile_75', lambda f: f.quantile(0.75).rename(col.name + '_quantile_75')), ('max', lambda f: f.max()), ]) string_methods = OrderedDict([ ('count', lambda f: f.count().rename(col.name + '_count')), ('unique', lambda f: f.nunique().rename(col.name + '_unique')), ]) aggs = [] output_names = [] output_types = [] def produce_stat_fields(col, methods): output_names.append(col.name) fields = [] tps = [] for func in methods.values(): field = func(self[col.name]) fields.append(field) tps.append(field.dtype) t = highest_precedence_data_type(*tps) output_types.append(t) aggs.extend([f.astype(t) if t == types.decimal else f for f in fields]) if any(types.is_number(col.type) for col in self.schema.columns): methods = numeric_methods for col in self.schema.columns: if types.is_number(col.type): produce_stat_fields(col, numeric_methods) else: methods = string_methods for col in self.schema.columns: produce_stat_fields(col, string_methods) summary = self[aggs] methods_names = list(methods.keys()) @output(['type'] + output_names, ['string'] + output_types) def to_methods(row): for method in methods_names: values = [None] * len(output_names) for i, field_name in enumerate(output_names): values[i] = getattr(row, '%s_%s' % (field_name, method.split('(', 1)[0])) yield [method, ] + values return summary.apply(to_methods, axis=1) def accept(self, visitor): if self._source_data is not None: visitor.visit_source_collection(self) else: raise NotImplementedError def get_cached(self, data): try: if hasattr(data, 'reload'): data.reload() except NoSuchObject: from ..backends.context import context context.uncache(self) return None coll = CollectionExpr(_source_data=data, _schema=self._schema) for attr in CollectionExpr.__slots__: if not attr.startswith('_ml_'): continue if hasattr(self, attr): setattr(coll, attr, getattr(self, attr)) return coll
_cached_typed_expr = dict() class TypedExpr(Expr): __slots__ = '_name', '_source_name' @classmethod def _get_type(cls, *args, **kwargs): # return the data type which extracted from args and kwargs raise NotImplementedError @classmethod def _typed_classes(cls, *args, **kwargs): # return allowed data types raise NotImplementedError @classmethod def _base_class(cls, *args, **kwargs): # base class, SequenceExpr or Scalar raise NotImplementedError @classmethod def _new_cls(cls, *args, **kwargs): data_type = cls._get_type(*args, **kwargs) if data_type: base_class = cls._base_class(*args, **kwargs) typed_classes = cls._typed_classes(*args, **kwargs) data_type = types.validate_data_type(data_type) name = data_type.CLASS_NAME + base_class.__name__ # get the typed class, e.g. Int64SequenceExpr, StringScalar typed_cls = globals().get(name) if typed_cls is None: raise TypeError("Name {} doesn't exist".format(name)) if issubclass(cls, typed_cls): return cls elif cls == base_class: return typed_cls elif cls in typed_classes: return typed_cls keys = (cls, typed_cls) if keys in _cached_typed_expr: return _cached_typed_expr[keys] mros = inspect.getmro(cls) has_data_type = len([sub for sub in mros if sub in typed_classes]) > 0 if has_data_type: mros = mros[1:] subs = [sub for sub in mros if sub not in typed_classes] subs.insert(1, typed_cls) bases = [] for sub in subs[::-1]: for i in range(len(bases)): if bases[i] is None: continue if issubclass(sub, bases[i]): bases[i] = None bases.append(sub) bases = tuple(base for base in bases if base is not None) dic = dict() if hasattr(cls, '_args'): dic['_args'] = cls._args dic['__slots__'] = cls.__slots__ + getattr(cls, '_slots', ()) dic['_add_args_slots'] = True try: accept_cls = next(c for c in bases if c.__name__ == cls.__name__) if hasattr(accept_cls, 'accept'): dic['accept'] = accept_cls.accept except StopIteration: pass clz = type(cls.__name__, bases, dic) _cached_typed_expr[keys] = clz return clz else: return cls def __new__(cls, *args, **kwargs): clz = cls._new_cls(*args, **kwargs) return super(TypedExpr, clz).__new__(clz) def __getattribute__(self, item): try: return super(TypedExpr, self).__getattribute__(item) except AttributeError: pass # need to show type of expression when certain method missing dtype = None try: dtype = object.__getattribute__(self, "_data_type") except AttributeError: # pragma: no cover pass try: dtype = object.__getattribute__(self, "_value_type") except AttributeError: # pragma: no cover pass if dtype is not None: raise AttributeError( "'%s' object has no attribute '%s'. Type of the expression " "is %s which may not support this method." % (type(self).__name__, item, dtype) ) else: raise AttributeError( "'%s' object has no attribute '%s'" % (type(self).__name__, item) ) @classmethod def _new(cls, *args, **kwargs): return cls._new_cls(*args, **kwargs)(*args, **kwargs) def is_renamed(self): return self._name is not None and self._source_name is not None and \ self._name != self._source_name def rename(self, new_name): if new_name == self._name: return self attr_dict = dict((attr, getattr(self, attr, None)) for attr in get_attrs(self)) attr_dict['_source_name'] = self._source_name attr_dict['_name'] = new_name return type(self)(**attr_dict) @property def name(self): return self._name @property def source_name(self): return self._source_name def astype(self, data_type): raise NotImplementedError def cast(self, t): return self.astype(t) def eval(self, str_expr, rewrite=False): from .query import SequenceVisitor if not isinstance(str_expr, str): raise ValueError('expr must be a string') frame = sys._getframe(2).f_locals try: env = frame.copy() finally: del frame env['max_pt'] = ReprWrapper(make_max_pt_function(self), repr) visitor = SequenceVisitor(self, env) return visitor.eval(str_expr, rewrite=rewrite)
[文档] class SequenceExpr(TypedExpr): """ Sequence represents for 1-dimension data. """ __slots__ = ( '_data_type', '_source_data_type', '_ml_fields_cache', '_ml_uplink', '_ml_operations', ) @classmethod def _get_type(cls, *args, **kwargs): return types.validate_data_type(kwargs.get('_data_type')) @classmethod def _typed_classes(cls, *args, **kwargs): return _typed_sequence_exprs @classmethod def _base_class(cls, *args, **kwargs): return SequenceExpr def _init(self, *args, **kwargs): self._init_attr('_name', None) self._init_attr('_ml_fields_cache', None) self._init_attr('_ml_uplink', []) self._init_attr('_ml_operations', []) super(SequenceExpr, self)._init(*args, **kwargs) if '_data_type' in kwargs: self._data_type = types.validate_data_type(kwargs.get('_data_type')) if '_source_name' not in kwargs: self._source_name = self._name if '_source_data_type' in kwargs: self._source_data_type = types.validate_data_type(kwargs.get('_source_data_type')) else: self._source_data_type = self._data_type def cache(self, mem=False): raise ExpressionError('Cache operation does not support for sequence.')
[文档] def head(self, n=None, **kwargs): """ Return first n rows. Execute at once. :param n: :return: result frame :rtype: :class:`odps.df.expr.expressions.CollectionExpr` """ if n is None: n = options.display.max_rows return self._handle_delay_call('execute', self, head=n, **kwargs)
[文档] def tail(self, n=None, **kwargs): """ Return the last n rows. Execute at once. :param n: :return: """ if n is None: n = options.display.max_rows return self._handle_delay_call('execute', self, tail=n, **kwargs)
[文档] def to_pandas(self, wrap=False, **kwargs): """ Convert to pandas Series. Execute at once. :param wrap: if True, wrap the pandas DataFrame into a PyODPS DataFrame :return: pandas Series """ try: import pandas as pd except (ImportError, ValueError): raise DependencyNotInstalledError('to_pandas requires for `pandas` library') def wrapper(result): df = result.values if wrap: from .. import DataFrame df = DataFrame(df) return df[self.name] return self.execute(wrapper=wrapper, **kwargs)
@property def data_type(self): return self._data_type @property def source_data_type(self): return self._source_data_type @property def dtype(self): """ Return the data type. Available types: int8, int16, int32, int64, float32, float64, boolean, string, decimal, datetime :return: the data type """ return self._data_type
[文档] def astype(self, data_type): """ Cast to a new data type. :param data_type: the new data type :return: casted sequence :Example: >>> df.id.astype('float') """ data_type = types.validate_data_type(data_type) if data_type == self._data_type: return self attr_dict = dict() attr_dict['_data_type'] = data_type attr_dict['_source_data_type'] = self._source_data_type attr_dict['_input'] = self new_sequence = AsTypedSequenceExpr(**attr_dict) return new_sequence
def output_type(self): return 'sequence(%s)' % repr(self._data_type) def accept(self, visitor): visitor.visit_sequence(self)
class BooleanSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(BooleanSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.boolean class Int8SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Int8SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.int8 class Int16SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Int16SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.int16 class Int32SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Int32SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.int32
[文档] class Int64SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Int64SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.int64
class Float32SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Float32SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.float32 class Float64SequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(Float64SequenceExpr, self)._init(*args, **kwargs) self._data_type = types.float64 class DecimalSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(DecimalSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.decimal
[文档] class StringSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(StringSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.string
class DatetimeSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(DatetimeSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.datetime class BinarySequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(BinarySequenceExpr, self)._init(*args, **kwargs) self._data_type = types.binary class DateSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(DateSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.date class TimestampSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(TimestampSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.timestamp class JsonSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(JsonSequenceExpr, self)._init(*args, **kwargs) self._data_type = types.json class ListSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(ListSequenceExpr, self)._init(*args, **kwargs) class DictSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(DictSequenceExpr, self)._init(*args, **kwargs) class StructSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(StructSequenceExpr, self)._init(*args, **kwargs) class UnknownSequenceExpr(SequenceExpr): def _init(self, *args, **kwargs): super(UnknownSequenceExpr, self)._init(*args, **kwargs) if not isinstance(self._data_type, types.Unknown): self._data_type = types.Unknown() _typed_sequence_exprs = [globals()[t.__class__.__name__ + SequenceExpr.__name__] for t in types._data_types.values()] number_sequences = [globals().get(repr(t).capitalize() + SequenceExpr.__name__) for t in types.number_types()] int_number_sequences = [globals().get(repr(t).capitalize() + SequenceExpr.__name__) for t in types.number_types() if repr(t).startswith('int')] class AsTypedSequenceExpr(SequenceExpr): _args = '_input', node_name = "TypedSequence" @property def input(self): return self._input def accept(self, visitor): return visitor.visit_cast(self) @property def name(self): return self._name or self._input.name @property def source_name(self): return self._source_name or self._input.source_name @property def dtype(self): return self._data_type or self._input.data_type @property def source_type(self): return self._source_data_type or self._input._source_data_type def rebuild(self): attr_dict = self._attr_dict() tp = self._copy_type() if isinstance(attr_dict['_source_data_type'], types.Unknown): attr_dict['_source_data_type'] = self._input.dtype return tp(**attr_dict) class Column(SequenceExpr): _args = '_input', @property def input(self): return self._input def rebuild(self): attr_dict = self._attr_dict() tp = self._copy_type() new_col = self.input[self.source_name] for attr in ('_source_data_type', '_data_type'): if isinstance(attr_dict[attr], types.Unknown): attr_dict[attr] = new_col.dtype return tp(**attr_dict) def accept(self, visitor): return visitor.visit_column(self) class CallableColumn(Column): def __call__(self, *args, **kwargs): return getattr(type(self._input), self.source_name)(self._input, *args, **kwargs)
[文档] class Scalar(TypedExpr): """ Represent for the scalar type. :param _value: value of the scalar :param _value_type: value type of the scalar :Example: >>> df[df, Scalar(4).rename('append_const')] """ __slots__ = '_value', '_value_type', '_source_value_type' @classmethod def _get_type(cls, *args, **kwargs): value = args[0] if len(args) > 0 else None value_type = args[1] if len(args) > 1 else None val = kwargs.get('_value') if val is None: val = value value_type = kwargs.get('_value_type', None) or value_type if val is None and value_type is None: raise ValueError('Either value or value_type should be provided') if val is not None and not isinstance(val, NodeMetaclass): return types.validate_value_type(val, value_type) else: return types.validate_data_type(value_type) @classmethod def _typed_classes(cls, *args, **kwargs): return _typed_scalar_exprs @classmethod def _base_class(cls, *args, **kwargs): return Scalar @classmethod def _transform(cls, *args, **kwargs): value = args[0] if len(args) > 0 else None value_type = args[1] if len(args) > 1 else None if ('_value' not in kwargs or kwargs['_value'] is None) and \ value is not None: kwargs['_value'] = value if ('_value_type' not in kwargs or kwargs['_value_type'] is None) and \ value_type is not None: kwargs['_value_type'] = types.validate_data_type(value_type) if kwargs.get('_value') is not None: kwargs['_value_type'] = types.validate_value_type(kwargs.get('_value'), kwargs.get('_value_type')) if '_source_name' not in kwargs: kwargs['_source_name'] = kwargs.get('_name') if '_source_value_type' in kwargs: kwargs['_source_value_type'] = types.validate_data_type(kwargs['_source_value_type']) else: kwargs['_source_value_type'] = kwargs['_value_type'] return kwargs def __new__(cls, *args, **kwargs): kwargs = cls._transform(*args, **kwargs) return super(Scalar, cls).__new__(cls, **kwargs) def _init(self, *args, **kwargs): self._init_attr('_name', None) self._init_attr('_value', None) kwargs = self._transform(*args, **kwargs) super(Scalar, self)._init(**kwargs) def equals(self, other): return super(Scalar, self).equals(other) @property def value(self): return self._value @property def value_type(self): return self._value_type @property def dtype(self): return self._value_type def output_type(self): return 'Scalar[%s]' % repr(self._value_type) def astype(self, value_type): value_type = types.validate_data_type(value_type) if value_type == self._value_type: return self attr_dict = dict() attr_dict['_input'] = self attr_dict['_value_type'] = value_type attr_dict['_source_value_type'] = self._source_value_type new_scalar = AsTypedScalar(**attr_dict) return new_scalar def to_sequence(self): if self._value is None: attr_values = dict((attr, getattr(self, attr)) for attr in get_attrs(self)) attr_values['_data_type'] = attr_values.pop('_value_type') if '_source_value_type' in attr_values: attr_values['_source_data_type'] = attr_values.pop('_source_value_type') del attr_values['_value'] cls = next(c for c in inspect.getmro(type(self))[1:] if c.__name__ == type(self).__name__ and not issubclass(c, Scalar)) seq = cls._new(**attr_values) return seq raise ExpressionError('Cannot convert valued scalar to sequence') def accept(self, visitor): visitor.visit_scalar(self) def get_cached(self, data): return Scalar(_value=data, _value_type=self.dtype)
class AsTypedScalar(Scalar): _args = '_input', node_name = "TypedScalar" def accept(self, visitor): return visitor.visit_cast(self) @property def name(self): return self._name or self._input.name @property def source_name(self): return self._source_name or self._input.source_name @property def value(self): return self._value @property def value_type(self): return self._value_type @property def dtype(self): return self._value_type def rebuild(self): attr_dict = self._attr_dict() tp = self._copy_type() if isinstance(attr_dict['_source_value_type'], types.Unknown): attr_dict['_source_value_type'] = self._input.dtype return tp(**attr_dict) @property def source_type(self): return self._source_value_type class BooleanScalar(Scalar): def _init(self, *args, **kwargs): super(BooleanScalar, self)._init(*args, **kwargs) self._value_type = types.boolean class Int8Scalar(Scalar): def _init(self, *args, **kwargs): super(Int8Scalar, self)._init(*args, **kwargs) self._value_type = types.int8 class Int16Scalar(Scalar): def _init(self, *args, **kwargs): super(Int16Scalar, self)._init(*args, **kwargs) self._value_type = types.int16 class Int32Scalar(Scalar): def _init(self, *args, **kwargs): super(Int32Scalar, self)._init(*args, **kwargs) self._value_type = types.int32 class Int64Scalar(Scalar): def _init(self, *args, **kwargs): super(Int64Scalar, self)._init(*args, **kwargs) self._value_type = types.int64 class Float32Scalar(Scalar): def _init(self, *args, **kwargs): super(Float32Scalar, self)._init(*args, **kwargs) self._value_type = types.float32 class Float64Scalar(Scalar): def _init(self, *args, **kwargs): super(Float64Scalar, self)._init(*args, **kwargs) self._value_type = types.float64 class DecimalScalar(Scalar): def _init(self, *args, **kwargs): super(DecimalScalar, self)._init(*args, **kwargs) self._value_type = types.decimal class StringScalar(Scalar): def _init(self, *args, **kwargs): super(StringScalar, self)._init(*args, **kwargs) self._value_type = types.string class DateScalar(Scalar): def _init(self, *args, **kwargs): super(DateScalar, self)._init(*args, **kwargs) self._value_type = types.date class DatetimeScalar(Scalar): def _init(self, *args, **kwargs): super(DatetimeScalar, self)._init(*args, **kwargs) self._value_type = types.datetime class TimestampScalar(Scalar): def _init(self, *args, **kwargs): super(TimestampScalar, self)._init(*args, **kwargs) self._value_type = types.timestamp class BinaryScalar(Scalar): def _init(self, *args, **kwargs): super(BinaryScalar, self)._init(*args, **kwargs) self._value_type = types.binary class JsonScalar(Scalar): def _init(self, *args, **kwargs): super(JsonScalar, self)._init(*args, **kwargs) self._value_type = types.json class ListScalar(Scalar): pass class DictScalar(Scalar): pass class StructScalar(Scalar): pass class UnknownScalar(Scalar): def _init(self, *args, **kwargs): super(UnknownScalar, self)._init(*args, **kwargs) if not isinstance(self._value_type, types.Unknown): self._value_type = types.Unknown() _typed_scalar_exprs = [globals()[t.__class__.__name__ + Scalar.__name__] for t in types._data_types.values()] number_scalars = [globals().get(repr(t).capitalize() + Scalar.__name__) for t in types.number_types()] int_number_scalars = [globals().get(repr(t).capitalize() + Scalar.__name__) for t in types.number_types() if repr(t).startswith('int')] class BuiltinFunction(Scalar): __slots__ = '_func_name', '_func_args', '_func_kwargs' def __init__(self, name=None, rtype=None, args=(), **kwargs): rtype = rtype or kwargs.pop('_value_type', types.string) rtype = types.validate_data_type(rtype) func_name = name or kwargs.pop('_func_name', None) func_args = args or kwargs.pop('_func_args', ()) super(BuiltinFunction, self).__init__(_func_name=func_name, _func_args=func_args, _value_type=rtype, **kwargs) def accept(self, visitor): visitor.visit_builtin_function(self)
[文档] class RandomScalar(BuiltinFunction): """ Represent for the random scalar type. :param seed: random seed, None by default :Example: >>> df[df, RandomScalar().rename('append_random')] """ def __new__(cls, seed=None, **kw): args = (seed, ) if seed is not None else kw.get('_func_args', ()) kw.update(dict(_func_name='rand', _value_type='float', _func_args=args)) return BuiltinFunction.__new__(cls, **kw) def __init__(self, seed=None, **kw): args = (seed, ) if seed is not None else kw.get('_func_args', ()) kw.update(dict(_func_name='rand', _value_type='float', _func_args=args)) super(RandomScalar, self).__init__(**kw)
class FilterCollectionExpr(CollectionExpr): _args = '_input', '_predicate' node_name = 'Filter' def _init(self, *args, **kwargs): super(FilterCollectionExpr, self)._init(*args, **kwargs) if self._schema is None: self._schema = self._input.schema def iter_args(self): for it in zip(['collection', 'predicate'], self.args): yield it @property def input(self): return self._input def rebuild(self): rebuilt = super(FilterCollectionExpr, self).rebuild() rebuilt._schema = rebuilt.input.schema return rebuilt def accept(self, visitor): visitor.visit_filter_collection(self) class ProjectCollectionExpr(CollectionExpr): __slots__ = '_raw_fields', _args = '_input', '_fields' _extra_args = '_raw_fields', node_name = 'Projection' def _init(self, *args, **kwargs): fields = kwargs.get('_fields') if fields is None and len(args) >= 2: fields = args[1] for field in fields: if field.name is None: raise ExpressionError('Column does not have a name, ' 'please specify one by `rename`: %s' % repr_obj(field._repr())) self._init_attr('_raw_fields', None) super(ProjectCollectionExpr, self)._init(*args, **kwargs) def _set_field(self, value, value_dag=None): from ..backends.context import context from .window import Window if context.is_cached(self): super(ProjectCollectionExpr, self)._set_field(value, value_dag=value_dag) return if value_dag is None: value_dag = value.to_dag(copy=False, validate=False) for n in value.traverse(top_down=True, unique=True, stop_cond=lambda x: isinstance(x, Column)): if isinstance(n, Column) and (n.input is self or n.input._proxy is self): source_name = n.source_name idx = self._schema._name_indexes[to_lower_str(source_name)] field = self._fields[idx] if field.name != n.name: field = field.rename(n.name) value_dag.substitute(n, field) elif isinstance(n, Window) and n.input is self: # Window object like rank will point to collection directly instead of through column n._input = self.input value = value_dag.root fields = [field if field.name != value.name else value for field in self._fields] if value.name not in self._schema: fields.append(value) self._schema = TableSchema.from_lists( [f.name for f in fields], [f.dtype for f in fields] ) self._fields = fields def _delitem(self, key): from ..backends.context import context if context.is_cached(self): return super(ProjectCollectionExpr, self).__delitem__(key) fields = [f for f in self._fields if f.name != key] self._schema = TableSchema.from_lists( [f.name for f in fields], [f.dtype for f in fields] ) self._fields = fields def __delitem__(self, key): if key not in self._schema: raise KeyError('Field({0}) does not exist'.format(key)) self._delitem(key) @property def _project_fields(self): return self._fields def iter_args(self): for it in zip(['collection', 'selections'], self.args): yield it @property def input(self): return self._input @property def fields(self): return self._fields def rebuild(self): if self._raw_fields: return self._input.select(*self._raw_fields) rebuilt = super(ProjectCollectionExpr, self).rebuild() rebuilt._schema = TableSchema.from_lists( [f.name for f in rebuilt._fields], [f.dtype for f in rebuilt._fields] ) return rebuilt def accept(self, visitor): visitor.visit_project_collection(self) class LateralViewCollectionExpr(ProjectCollectionExpr): _args = '_input', '_fields', '_lateral_views' node_name = 'LateralView' def _init(self, *args, **kwargs): self._init_attr('_lateral_views', None) super(LateralViewCollectionExpr, self)._init(*args, **kwargs) self._redirect_lateral_views() def _redirect_lateral_views(self): # dealing with cases like df[df['a', 'b'].apply(..., axis=1), df.c] for lv in self.lateral_views: if lv.input is self.input: continue for f, f_o in zip(lv._fields, lv.input._fields): lv.substitute(f, f_o) lv.substitute(lv.input, self.input) def iter_args(self): for it in zip(['collection', 'selections', 'lateral_views'], self.args): yield it @property def lateral_views(self): return self._lateral_views def accept(self, visitor): visitor.visit_lateral_view(self) class FilterPartitionCollectionExpr(CollectionExpr): __slots__ = '_predicate_string', _args = '_input', '_predicate', '_fields' node_name = 'FilterPartition' def _init(self, *args, **kwargs): super(FilterPartitionCollectionExpr, self)._init(*args, **kwargs) self._fields = [self._input[col.name] for col in self._schema.columns] self._predicate_string = kwargs.get('_predicate_string') @property def _project_fields(self): return self._fields def iter_args(self): for it in zip(['collection', 'predicate', 'selections'], self.args): yield it @property def input(self): return self._input @property def fields(self): return self._fields @property def predicate_string(self): return self._predicate_string def accept(self, visitor): visitor.visit_filter_partition_collection(self) @property def data_table(self): return self.input.data @property def data(self): from ...types import PartitionSpec from .arithmetic import Or if isinstance(self._predicate, Or): raise AttributeError('Cannot get data when predicate contains multiple partition specs.') spec = PartitionSpec(self.predicate_string) return self.data_table.partitions[spec] class SliceCollectionExpr(CollectionExpr): _args = '_input', '_indexes' node_name = 'Slice' def _init(self, *args, **kwargs): super(SliceCollectionExpr, self)._init(*args, **kwargs) if isinstance(self._indexes, slice): scalar = lambda v: Scalar(_value=v) if v is not None else None self._indexes = scalar(self._indexes.start), \ scalar(self._indexes.stop), scalar(self._indexes.step) @property def start(self): res = self._indexes[0] return res.value if res is not None else None @property def stop(self): res = self._indexes[1] return res.value if res is not None else None @property def step(self): res = self._indexes[2] return res.value if res is not None else None @property def input(self): return self._input def iter_args(self): args = [self._input] + list(self._indexes) for it in zip(['collection', 'start', 'stop', 'step'], args): yield it def rebuild(self): rebuilt = super(SliceCollectionExpr, self).rebuild() rebuilt._schema = self.input.schema return rebuilt def accept(self, visitor): visitor.visit_slice_collection(self) class Summary(CollectionExpr): __slots__ = '_schema', _args = '_input', '_fields' def _init(self, *args, **kwargs): super(Summary, self)._init(*args, **kwargs) if hasattr(self, '_schema') and any(it is None for it in self._schema.names): raise TypeError('Schema cannot has field which name is None') @property def input(self): return self._input @property def fields(self): return self._fields @property def schema(self): return self._schema def iter_args(self): for it in zip(['collection', 'fields'], self.args): yield it def accept(self, visitor): visitor.visit_project_collection(self) from . import element from . import arithmetic from . import reduction from . import groupby from . import collections from . import window from . import math from . import strings from . import datetimes from . import merge from . import composites from ..tools import * # hack for count def _count(expr, *args, **kwargs): if len(args) + len(kwargs) > 0: from .strings import _count return _count(expr, *args, **kwargs) else: from .reduction import count return count(expr) StringSequenceExpr.count = _count