#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import functools
import operator
import sys
from collections import defaultdict, OrderedDict
from collections.abc import Iterable
from concurrent.futures import Future
from functools import reduce
from .core import Node, NodeMetaclass
from .errors import ExpressionError
from .utils import get_attrs, is_called_by_inspector, highest_precedence_data_type, new_id, \
is_changed, get_proxied_expr
from .. import types
from ...config import options
from ...errors import NoSuchObject, DependencyNotInstalledError
from ...utils import TEMP_TABLE_PREFIX, to_binary, to_lower_str, deprecated, survey, isvalidattr
from ...models import TableSchema
class ReprWrapper:
def __init__(self, func, repr):
self._func = func
self._repr = repr
functools.update_wrapper(self, func)
def __call__(self, *args, **kwargs):
return self._func(*args, **kwargs)
def __repr__(self):
return self._repr(self._func)
def _wrap_method_repr(func):
def inner(*args, **kwargs):
obj = func(*args, **kwargs)
if inspect.ismethod(obj):
def _repr(x):
instance = getattr(x, 'im_self', getattr(x, '__self__', None))
method = 'bound method' if instance is not None else 'unbound method'
if instance is not None:
return '<%(method)s %(instance)s.%(name)s>' % {
'method': method,
'instance': getattr(instance, 'node_name', instance.__class__.__name__),
'name': x.__name__
}
else:
return '<function __main__.%s>' % x.__name__
return ReprWrapper(obj, _repr)
return obj
return inner
def make_max_pt_function(expr=None):
from ...models import Table
from .. import func
def max_pt(table_name=None):
if isinstance(getattr(expr, 'data', None), Table):
table_name = table_name or expr.data.name
return func.max_pt(table_name)
return max_pt
def repr_obj(obj):
if hasattr(obj, '_repr'):
try:
return obj._repr()
except:
return object.__repr__(obj)
elif isinstance(obj, (tuple, list)):
return ','.join(repr_obj(it) for it in obj)
return obj
class Expr(Node):
__slots__ = '_deps', '_ban_optimize', '_engine', '_need_cache', '_mem_cache', '__execution', '_id'
def _init(self, *args, **kwargs):
"""
_deps is used for common dependencies.
When a expr depend on other exprs, and the expr is not calculated from the others,
the _deps are specified to identify the dependencies.
"""
self._init_attr('_deps', None)
self._init_attr('_ban_optimize', False)
self._init_attr('_engine', None)
self._init_attr('_Expr__execution', None)
self._init_attr('_need_cache', False)
self._init_attr('_mem_cache', False)
if '_id' not in kwargs:
kwargs['_id'] = new_id()
super(Expr, self)._init(*args, **kwargs)
def __repr__(self):
if not options.interactive or is_called_by_inspector():
return self._repr()
else:
if isinstance(self.__execution, Exception):
self.__execution = None
if self.__execution is None:
try:
self.__execution = self.execute()
except Exception as e:
self.__execution = e
raise
return self.__execution.__repr__()
def _repr_html_(self):
if not options.interactive:
return '<code>' + repr(self) + '</code>'
else:
if self.__execution is None:
self.__execution = self.execute()
else:
if isinstance(self.__execution, Exception):
try:
return
finally:
self.__execution = None
if hasattr(self.__execution, '_repr_html_'):
return self.__execution._repr_html_()
return repr(self.__execution)
def _handle_delay_call(self, method, *args, **kwargs):
delay = kwargs.pop('delay', None)
if delay is not None:
future = delay.register_item(method, *args, **kwargs)
return future
else:
from ..engines import get_default_engine
engine = get_default_engine(self)
wrapper = kwargs.pop('wrapper', None)
result = getattr(engine, method)(*args, **kwargs)
if wrapper is None:
return result
async_ = kwargs.get('async_', kwargs.get('async', False))
if async_:
user_future = Future()
def _relay(f):
try:
user_future.set_result(wrapper(f.result()))
except:
user_future.set_exception(f.exception())
result.add_done_callback(_relay)
return user_future
else:
return wrapper(result)
def visualize(self):
from ..engines import get_default_engine
engine = get_default_engine(self)
return engine.visualize(self)
def execute(self, **kwargs):
"""
:param hints: settings for SQL, e.g. `odps.sql.mapper.split.size`
:type hints: dict
:param priority: instance priority, 9 as default
:type priority: int
:param running_cluster: cluster to run this instance
:return: execution result
:rtype: :class:`odps.df.backends.frame.ResultFrame`
"""
_wrapper = kwargs.pop('wrapper', None)
def wrapper(result):
self.__execution = result
if _wrapper is not None:
return _wrapper(result)
else:
return result
return self._handle_delay_call('execute', self, wrapper=wrapper, **kwargs)
def compile(self):
"""
Compile this expression into an ODPS SQL
:return: compiled DAG
:rtype: str
"""
from ..engines import get_default_engine
engine = get_default_engine(self)
return engine.compile(self)
def persist(self, name, partitions=None, partition=None, lifecycle=None, project=None, **kwargs):
"""
Persist the execution into a new table. If `partitions` not specified,
will create a new table without partitions if the table does not exist,
and insert the SQL result into it.
If `partitions` are specified, they will be the partition fields of the new table.
If `partition` is specified, the data will be inserted into the exact partition of the table.
:param name: table name
:param partitions: list of string, the partition fields
:type partitions: list
:param partition: persist to a specified partition
:type partition: string or PartitionSpec
:param lifecycle: table lifecycle. If absent, `options.lifecycle` will be used.
:type lifecycle: int
:param project: project name, if not provided, will be the default project
:param hints: settings for SQL, e.g. `odps.sql.mapper.split.size`
:type hints: dict
:param priority: instance priority, 9 as default
:type priority: int
:param running_cluster: cluster to run this instance
:param overwrite: overwrite the table, True as default
:type overwrite: bool
:param drop_table: drop table if exists, False as default
:type drop_table: bool
:param create_table: create table first if not exits, True as default
:type create_table: bool
:param drop_partition: drop partition if exists, False as default
:type drop_partition: bool
:param create_partition: create partition if not exists, None as default
:type create_partition: bool
:param cast: cast all columns' types as the existed table, False as default
:type cast: bool
:return: :class:`odps.df.DataFrame`
:Example:
>>> df = df['name', 'id', 'ds']
>>> df.persist('odps_new_table')
>>> df.persist('odps_new_table', partition='pt=test')
>>> df.persist('odps_new_table', partitions=['ds'])
"""
if lifecycle is None and options.lifecycle is not None:
lifecycle = \
options.lifecycle if not name.startswith(TEMP_TABLE_PREFIX) \
else options.temp_lifecycle
return self._handle_delay_call('persist', self, name, partitions=partitions, partition=partition,
lifecycle=lifecycle, project=project, **kwargs)
def cache(self, mem=False):
self._need_cache = True
self._mem_cache = mem
self._ban_optimize = True
return self
def uncache(self):
self._need_cache = False
self._mem_cache = False
from ..backends.context import context
context.uncache(self)
def verify(self):
"""
Verify if this expression can be compiled into ODPS SQL.
:return: True if compilation succeed else False
:rtype: bool
"""
try:
self.compile()
return True
except:
return False
def _repr(self):
from .formatter import ExprFormatter
formatter = ExprFormatter(self)
return formatter()
def ast(self):
"""
Return the AST string.
:return: AST tree
:rtype: str
"""
return self._repr()
@_wrap_method_repr
def __getattribute__(self, attr):
try:
return super(Expr, self).__getattribute__(attr)
except AttributeError:
if not attr.startswith('_'):
new_attr = '_%s' % attr
if new_attr in object.__getattribute__(self, '_args_indexes'):
try:
return object.__getattribute__(self, new_attr)
except AttributeError:
return
raise
def _defunc(self, field):
return field(self) if inspect.isfunction(field) else field
@property
def optimize_banned(self):
return self._ban_optimize
@optimize_banned.setter
def optimize_banned(self, val):
self._ban_optimize = val
@property
def args(self):
if not self._deps:
return super(Expr, self).args
return super(Expr, self).args + self.deps
@property
def deps(self):
if self._deps is None:
return
return tuple(dep if not isinstance(dep, tuple) else dep[0]
for dep in self._deps)
def add_deps(self, *deps):
dependencies = []
if len(deps) == 1 and isinstance(deps[0], Iterable):
dependencies.append([d for d in dependencies
if isinstance(d, Expr)])
else:
dependencies.extend(deps)
if getattr(self, '_deps', None) is None:
self._deps = dependencies
else:
self._deps.extend(dependencies)
def substitute(self, old_arg, new_arg, dag=None):
super(Expr, self).substitute(old_arg, new_arg, dag=dag)
new_deps = []
if self._deps is not None:
for dep in self._deps:
if isinstance(dep, tuple):
node = dep[0]
if node is old_arg:
new_deps.append((new_arg, ) + dep[1:])
else:
new_deps.append(dep)
else:
if dep is old_arg:
new_deps.append(new_arg)
else:
new_deps.append(dep)
self._deps = new_deps
def rebuild(self):
# used in the dynamic setting, do nothing by default
# `rebuild` will copy itself, and apply all changes to the new one
return self.copy()
def __hash__(self):
return self._node_id * hash(Expr)
def __eq__(self, other):
try:
return self._eq(other)
except AttributeError:
# Due to current complexity of parent's eq,
# by now, every expression is unequal
return self is other
except TypeError:
if self._eq is None:
return self is other
raise
def __ne__(self, other):
try:
return self._ne(other)
except AttributeError:
return not super(Expr, self).__eq__(other)
except TypeError:
if self._ne is None:
return not super(Expr, self).__eq__(other)
raise
def __lt__(self, other):
return self._lt(other)
def __le__(self, other):
return self._le(other)
def __gt__(self, other):
return self._gt(other)
def __ge__(self, other):
return self._ge(other)
def __add__(self, other):
return self._add(other)
def __radd__(self, other):
return self._radd(other)
def __mul__(self, other):
return self._mul(other)
def __rmul__(self, other):
return self._rmul(other)
def __div__(self, other):
return self._div(other)
def __rdiv__(self, other):
return self._rdiv(other)
__truediv__ = __div__
__rtruediv__ = __rdiv__
def __floordiv__(self, other):
return self._floordiv(other)
def __rfloordiv__(self, other):
return self._rfloordiv(other)
def __mod__(self, other):
return self._mod(other)
def __rmod__(self, other):
return self._rmod(other)
def __sub__(self, other):
return self._sub(other)
def __rsub__(self, other):
return self._rsub(other)
def __pow__(self, power):
return self._pow(power)
def __rpow__(self, power):
return self._rpow(power)
def __or__(self, other):
return self._or(other)
def __ror__(self, other):
return self._ror(other)
def __and__(self, other):
return self._and(other)
def __rand__(self, other):
return self._rand(other)
def __neg__(self):
return self._neg()
def __invert__(self):
return self._invert()
def __abs__(self):
return self._abs()
[文档]
class CollectionExpr(Expr):
"""
Collection represents for the two-dimensions data.
:Example:
>>> # projection
>>> df = DataFrame(o.get_table('my_table')) # DataFrame is actually a CollectionExpr
>>> df['name', 'id'] # projection some columns
>>> df[[df.name, df.id]] # projection
>>> df[df] # means nothing, but get all the columns
>>> df[df, df.name.lower().rename('name2')] # projection a new columns `name2` besides all the original columns
>>> df.select(df, name2=df.name.lower()) # projection by `select`
>>> df.exclude('name') # projection all columns but `name`
>>> df[df.exclude('name'), df.name.lower()] # `name` will not conflict any more
>>>
>>> # filter
>>> df[(df.id < 3) & (df.name != 'test')]
>>> df.filter(df.id < 3, df.name != 'test')
>>>
>>> # slice
>>> df[: 10]
>>> df.limit(10)
>>>
>>> # Sequence
>>> df.name # an instance of :class:`odps.df.expr.expressions.SequenceExpr`
>>>
>>> # schema or dtypes
>>> df.dtypes
odps.Schema {
name string
id int64
}
>>> df.schema
odps.Schema {
name string
id int64
}
"""
__slots__ = (
'_schema', '_source_data', '_proxy',
'_ml_fields_cache', '_ml_uplink', '_ml_operations',
)
node_name = 'Collection'
def _init(self, *args, **kwargs):
self._init_attr('_source_data', None)
self._init_attr('_proxy', None)
self._init_attr('_ml_fields_cache', None)
self._init_attr('_ml_uplink', [])
self._init_attr('_ml_operations', [])
super(CollectionExpr, self)._init(*args, **kwargs)
if hasattr(self, '_schema') and any(it is None for it in self._schema.names):
raise TypeError('Schema cannot has field whose name is None')
def __dir__(self):
cur_dir = object.__dir__(self)
dir_set = set(cur_dir) | set([c.name for c in self.schema if isvalidattr(c.name)])
return sorted(dir_set)
def __getitem__(self, item):
item = self._defunc(item)
if isinstance(item, tuple):
item = list(item)
if isinstance(item, CollectionExpr):
item = [item, ]
if isinstance(item, str):
return self._get_field(item)
elif isinstance(item, slice):
if item.start is None and item.stop is None and item.step is None:
return self
return self._slice(item)
elif isinstance(item, list):
return self._project(item)
else:
field = self._get_field(item)
if isinstance(field, BooleanSequenceExpr):
return self.filter(item)
if isinstance(item, SequenceExpr):
raise ExpressionError('No boolean sequence found for filtering, '
'a tuple or list is required for projection')
raise ExpressionError('Not supported projection: collection[%s]' % repr_obj(item))
def _set_field(self, value, value_dag=None):
expr = self.copy() if self._proxy is None else self._proxy._input
if value_dag is None:
value_dag = value.to_dag(copy=False, validate=False)
if value_dag.contains_node(self):
value_dag.substitute(self, expr)
value = value_dag.root
if self._proxy is not None:
self._proxy._setitem(value, value_dag=value_dag)
return
fields = [f if f != value.name else value for f in self._schema.names]
if value.name not in self._schema:
fields.append(value)
# make the isinstance to check the proxy type
self.__class__ = type(self.__class__.__name__, (self.__class__,), {})
self._proxy = expr.select(fields)
def __setitem__(self, key, value):
if not isinstance(value, Expr) and value is not None:
value = Scalar(value)
if not isinstance(key, tuple):
if value is None:
raise ValueError('Cannot determine type for column %s with None value.' % key)
column_name = key
value = value.rename(column_name)
else:
conds = key[:-1]
conds = [self._defunc(c) for c in conds]
if not all(isinstance(c.dtype, types.Boolean) for c in conds):
raise ValueError('Conditions should be boolean expressions or boolean columns')
if len(conds) == 1:
cond = conds[0]
else:
cond = reduce(operator.and_, conds)
column_name = key[-1]
if column_name not in self._schema:
if value is None:
raise ValueError('Cannot determine type for column %s with None value.' % key)
default_col = Scalar(_value_type=value.dtype)
else:
default_col = self[column_name]
if value is None:
value = Scalar(_value_type=default_col.dtype)
value = cond.ifelse(value, default_col).rename(column_name)
self._set_field(value)
def __delitem__(self, key):
if key not in self.schema:
raise KeyError('Field({0}) does not exist'.format(key))
if self._proxy is not None:
return self._proxy._delitem(key)
fields = [n for n in self._schema.names if n != key]
# make the instance to check the proxy type
self.__class__ = type(self.__class__.__name__, (self.__class__,), {})
self._proxy = self.copy().select(fields)
def __delattr__(self, item):
if item in self._schema:
return self.__delitem__(item)
super(CollectionExpr, self).__delattr__(item)
def __setattr__(self, key, value):
try:
if key != '_proxy' and object.__getattribute__(self, '_proxy'):
return setattr(self._proxy, key, value)
except AttributeError:
pass
Expr.__setattr__(self, key, value)
def __getattribute__(self, attr):
try:
proxy = object.__getattribute__(self, '_proxy')
if attr == '_proxy':
return proxy
if proxy:
# delegate everything to proxy object
return getattr(proxy, attr)
except AttributeError:
pass
try:
if to_lower_str(attr) in object.__getattribute__(self, '_schema')._name_indexes:
cls_attr = getattr(type(self), attr, None)
if cls_attr is None or inspect.ismethod(cls_attr) or inspect.isfunction(cls_attr):
return self[attr]
except AttributeError:
pass
return super(CollectionExpr, self).__getattribute__(attr)
[文档]
def query(self, expr):
"""
Query the data with a boolean expression.
:param expr: the query string, you can use '@' character refer to environment variables.
:return: new collection
:rtype: :class:`odps.df.expr.expressions.CollectionExpr`
"""
from .query import CollectionVisitor
if not isinstance(expr, str):
raise ValueError('expr must be a string')
frame = sys._getframe(2).f_locals
try:
env = frame.copy()
finally:
del frame
env['max_pt'] = ReprWrapper(make_max_pt_function(self), repr)
visitor = CollectionVisitor(self, env)
predicate = visitor.eval(expr)
return self.filter(predicate)
[文档]
def filter(self, *predicates):
"""
Filter the data by predicates
:param predicates: the conditions to filter
:return: new collection
:rtype: :class:`odps.df.expr.expressions.CollectionExpr`
"""
predicates = self._get_fields(predicates)
predicate = reduce(operator.and_, predicates)
return FilterCollectionExpr(self, predicate, _schema=self._schema)
@deprecated('The function `filter_partition` is deprecated, please use `filter_parts` instead'
"and change the predicate parameter into `pt1=1,pt2=2/pt1=2,pt2=1` form.")
@survey
def filter_partition(self, predicate='', exclude=True):
if isinstance(predicate, str):
part_reprs = '/'.join(','.join(p.split('/')) for p in predicate.split(','))
else:
part_reprs = predicate
return self.filter_parts(part_reprs, exclude)
[文档]
def filter_parts(self, predicate='', exclude=True):
"""
Filter the data by partition string. A partition string looks like `pt1=1,pt2=2/pt1=2,pt2=1`, where
comma (,) denotes 'and', while (/) denotes 'or'.
:param str|Partition predicate: predicate string of partition filter
:param bool exclude: True if you want to exclude partition fields, otherwise False. True for default.
:return: new collection
:rtype: :class:`odps.df.expr.expressions.CollectionExpr`
"""
source = self._source_data
if source is None:
raise ExpressionError('Can only filter on data sources.')
def _parse_partition_predicate(p):
if '=' not in p:
raise ExpressionError('Illegal partition predicate.')
field_name, field_value = [s.strip() for s in p.split('=', 1)]
if not hasattr(source, 'table_schema'):
raise ExpressionError('filter_partition can only be applied on ODPS DataFrames')
if field_name not in source.table_schema:
raise ExpressionError('Column `%s` not exists in input collection' % field_name)
if field_name not in source.table_schema._partition_schema:
raise ExpressionError('`%s` is not a partition column' % field_name)
part_col = self[field_name]
if field_value.startswith('\'') or field_value.startswith('\"'):
encoding = 'unicode-escape'
field_value = to_binary(field_value.strip('"\'')).decode(encoding)
if isinstance(part_col.data_type, types.Integer):
field_value = int(field_value)
elif isinstance(part_col.data_type, types.Float):
field_value = float(field_value)
return part_col == field_value
from ...models.partition import Partition
from ...types import PartitionSpec
if isinstance(predicate, Partition):
predicate = predicate.partition_spec
if isinstance(predicate, PartitionSpec):
predicate = ','.join("%s='%s'" % (k, v) for k, v in predicate.kv.items())
if isinstance(predicate, list):
predicate = '/'.join(str(s) for s in predicate)
elif not isinstance(predicate, str):
raise ExpressionError('Only accept string predicates.')
if not predicate:
predicate_obj = None
else:
part_formatter = lambda p: reduce(operator.and_, map(_parse_partition_predicate, p.split(',')))
predicate_obj = reduce(operator.or_, map(part_formatter, predicate.split('/')))
if not source.table_schema.partitions:
raise ExpressionError('No partition columns in the collection.')
if exclude:
columns = [
c for c in self.schema if c.name not in source.table_schema._partition_schema
]
new_schema = types.TableSchema.from_lists([c.name for c in columns], [c.type for c in columns])
return FilterPartitionCollectionExpr(self, predicate_obj, _schema=new_schema, _predicate_string=predicate)
else:
return self.filter(predicate_obj)
def _validate_field(self, field):
if not isinstance(field, SequenceExpr):
return True
if not field.is_ancestor(self):
return False
for path in field.all_path(self):
if any(isinstance(n, CollectionExpr) for n in path[1: -1]):
return False
from .reduction import GroupedSequenceReduction
if any(isinstance(n, GroupedSequenceReduction) for n in path):
return False
return True
@staticmethod
def _backtrack_field(field, collection):
from .window import RankOp
for col in field.traverse(top_down=True, unique=True,
stop_cond=lambda x: isinstance(x, (Column, RankOp))):
if isinstance(col, Column):
changed = is_changed(collection, col)
if changed or changed is None:
return
elif col.source_name not in collection.schema:
return
else:
col.substitute(col._input, collection)
elif isinstance(col, RankOp) and col._input is not collection:
col.substitute(col._input, collection)
return field
def _get_field(self, field):
from .reduction import GroupedSequenceReduction
field = self._defunc(field)
if isinstance(field, str):
if field not in self._schema:
raise ValueError('Field(%s) does not exist, please check schema' % field)
cls = Column
if callable(getattr(type(self), field, None)):
cls = CallableColumn
return cls(self, _name=field, _data_type=self._schema[field].type)
if not self._validate_field(field):
new_field = None
if not isinstance(field, GroupedSequenceReduction):
# the reduction is not allowed
new_field = self._backtrack_field(field, self)
if new_field is None:
raise ExpressionError('Cannot support projection on %s' % repr_obj(field))
field = new_field
return field
def _get_fields(self, fields, ret_raw_fields=False):
selects = []
raw_selects = []
for field in fields:
field = self._defunc(field)
if isinstance(field, CollectionExpr):
if any(c is self for c in field.children()):
selects.extend(self._get_fields(field._project_fields))
else:
selects.extend(self._get_fields(field._fetch_fields()))
raw_selects.append(field)
else:
select = self._get_field(field)
selects.append(select)
raw_selects.append(select)
if ret_raw_fields:
return selects, raw_selects
return selects
@classmethod
def _backtrack_lateral_view(cls, lv, collection):
if isinstance(lv.input, ProjectCollectionExpr):
src_inputs = (lv.input, lv.input.input)
else:
src_inputs = (lv.input,)
lv_copy = lv.copy(_id=None, _lateral_view=True)
cur = collection
while cur not in src_inputs and isinstance(cur, (ProjectCollectionExpr, FilterCollectionExpr)):
cur = cur.input
if cur not in src_inputs:
raise ExpressionError("Input of 'apply' in lateral views can only be "
"simple column selections")
if cur is lv.input:
apply_src = lv_copy
else:
apply_src = lv_copy.input
if collection is apply_src.input:
return lv_copy
for f in apply_src._fields:
cls._backtrack_field(f, collection)
lv_copy.substitute(apply_src.input, collection)
return lv_copy
def _filter_lateral_views(self, fields):
from .collections import RowAppliedCollectionExpr
from .merge import JoinFieldMergedCollectionExpr, JoinCollectionExpr, UnionCollectionExpr
walk_through_collections = (JoinCollectionExpr, UnionCollectionExpr,
JoinFieldMergedCollectionExpr)
lateral_views = []
for field in fields:
if not isinstance(field, RowAppliedCollectionExpr):
continue
else:
is_lv = True
stop_cond = lambda n: n is not self and isinstance(n, CollectionExpr) \
and not isinstance(n, walk_through_collections)
for coll in self.traverse(unique=True, stop_cond=stop_cond):
if coll is field:
is_lv = False
break
if is_lv:
lateral_views.append(field)
return lateral_views
def _project(self, fields):
field_lvs = self._filter_lateral_views(fields)
lv_id_set = set(id(f) for f in field_lvs)
selects = []
lateral_views = []
for idx, field in enumerate(fields):
if id(field) not in lv_id_set:
s = self._get_fields([field])
selects.extend(s)
else:
lv = self._backtrack_lateral_view(field, self)
lateral_views.append(lv)
selects.extend(lv.columns)
names = [f.name for f in selects]
typos = [f.dtype for f in selects]
if not lateral_views and all(isinstance(it, Scalar) for it in selects):
return self._summary(selects)
if len(names) != len(set(names)):
counts = defaultdict(lambda: 0)
for n in names:
counts[n] += 1
raise ExpressionError('Duplicate column names: %s' %
', '.join(n for n in counts if counts[n] > 1))
if lateral_views:
return LateralViewCollectionExpr(self, _fields=selects, _lateral_views=lateral_views,
_schema=types.TableSchema.from_lists(names, typos))
else:
return ProjectCollectionExpr(self, _fields=selects,
_schema=types.TableSchema.from_lists(names, typos))
[文档]
def select(self, *fields, **kw):
"""
Projection columns. Remember to avoid column names' conflict.
:param fields: columns to project
:param kw: columns and their names to project
:return: new collection
:rtype: :class:`odps.df.expr.expression.CollectionExpr`
"""
if len(fields) == 1 and isinstance(fields[0], list):
fields = fields[0]
else:
fields = list(fields)
if kw:
def handle(it):
it = self._defunc(it)
if not isinstance(it, Expr):
it = Scalar(it)
return it
fields.extend([handle(f).rename(new_name)
for new_name, f in kw.items()])
return self._project(fields)
[文档]
def exclude(self, *fields):
"""
Projection columns which not included in the fields
:param fields: field names
:return: new collection
:rtype: :class:`odps.df.expr.expression.CollectionExpr`
"""
if len(fields) == 1 and isinstance(fields[0], list):
exclude_fields = fields[0]
else:
exclude_fields = list(fields)
exclude_fields = [self._defunc(it) for it in exclude_fields]
exclude_fields = [field.name if not isinstance(field, str) else field
for field in exclude_fields]
fields = [name for name in self._schema.names
if name not in exclude_fields]
return self._project(fields)
def _summary(self, fields):
names = [field if isinstance(field, str) else field.name
for field in fields]
typos = [self._schema.get_type(field) if isinstance(field, str)
else field.dtype for field in fields]
if None in names:
raise ExpressionError('Column does not have a name, '
'please specify one by `rename`')
return Summary(_input=self, _fields=fields,
_schema=types.TableSchema.from_lists(names, typos))
def _slice(self, slices):
return SliceCollectionExpr(self, _indexes=slices, _schema=self._schema)
@property
def schema(self):
return self._schema
@property
def odps_schema(self):
from ..backends.odpssql.types import df_schema_to_odps_schema
return df_schema_to_odps_schema(self.schema)
@property
def columns(self):
"""
:return: columns
:rtype: list which each element is a Column
"""
return [self[n] for n in self._schema.names]
def _fetch_fields(self):
return [self._get_field(name) for name in self._schema.names]
@property
def _project_fields(self):
return self._fetch_fields()
def _data_source(self):
if hasattr(self, '_source_data') and self._source_data is not None:
yield self._source_data
def __getattr__(self, attr):
try:
obj = super(CollectionExpr, self).__getattribute__(attr)
return obj
except AttributeError as e:
if to_lower_str(attr) in object.__getattribute__(self, '_schema')._name_indexes:
return self[attr]
raise e
def output_type(self):
return 'collection'
[文档]
def limit(self, n):
"""
limit n records
:param n: n records
:return:
"""
return self[:n]
[文档]
def head(self, n=None, **kwargs):
"""
Return the first n rows. Execute at once.
:param n:
:return: result frame
:rtype: :class:`odps.df.backends.frame.ResultFrame`
"""
if n is None:
n = options.display.max_rows
return self._handle_delay_call('execute', self, head=n, **kwargs)
[文档]
def tail(self, n=None, **kwargs):
"""
Return the last n rows. Execute at once.
:param n:
:return: result frame
:rtype: :class:`odps.df.backends.frame.ResultFrame`
"""
if n is None:
n = options.display.max_rows
return self._handle_delay_call('execute', self, tail=n, **kwargs)
[文档]
def to_pandas(self, wrap=False, **kwargs):
"""
Convert to pandas DataFrame. Execute at once.
:param wrap: if True, wrap the pandas DataFrame into a PyODPS DataFrame
:return: pandas DataFrame
"""
try:
import pandas as pd
except (ImportError, ValueError):
raise DependencyNotInstalledError('to_pandas requires `pandas` library')
def wrapper(result):
res = result.values
if wrap:
from .. import DataFrame
return DataFrame(res, schema=self.schema)
return res
return self.execute(wrapper=wrapper, **kwargs)
@property
def dtypes(self):
return self.schema
[文档]
def view(self):
"""
Clone a same collection. useful for self-join.
:return:
"""
proxied = get_proxied_expr(self)
kv = dict((attr, getattr(proxied, attr)) for attr in get_attrs(proxied))
return type(proxied)(**kv)
def describe(self):
from .. import output
numeric_methods = OrderedDict([
('count', lambda f: f.count().rename(col.name + '_count')),
('mean', lambda f: f.mean()),
('std', lambda f: f.std()),
('min', lambda f: f.min()),
('quantile_25', lambda f: f.quantile(0.25).rename(col.name + '_quantile_25')),
('quantile_50', lambda f: f.quantile(0.5).rename(col.name + '_quantile_50')),
('quantile_75', lambda f: f.quantile(0.75).rename(col.name + '_quantile_75')),
('max', lambda f: f.max()),
])
string_methods = OrderedDict([
('count', lambda f: f.count().rename(col.name + '_count')),
('unique', lambda f: f.nunique().rename(col.name + '_unique')),
])
aggs = []
output_names = []
output_types = []
def produce_stat_fields(col, methods):
output_names.append(col.name)
fields = []
tps = []
for func in methods.values():
field = func(self[col.name])
fields.append(field)
tps.append(field.dtype)
t = highest_precedence_data_type(*tps)
output_types.append(t)
aggs.extend([f.astype(t) if t == types.decimal else f for f in fields])
if any(types.is_number(col.type) for col in self.schema.columns):
methods = numeric_methods
for col in self.schema.columns:
if types.is_number(col.type):
produce_stat_fields(col, numeric_methods)
else:
methods = string_methods
for col in self.schema.columns:
produce_stat_fields(col, string_methods)
summary = self[aggs]
methods_names = list(methods.keys())
@output(['type'] + output_names, ['string'] + output_types)
def to_methods(row):
for method in methods_names:
values = [None] * len(output_names)
for i, field_name in enumerate(output_names):
values[i] = getattr(row, '%s_%s' % (field_name, method.split('(', 1)[0]))
yield [method, ] + values
return summary.apply(to_methods, axis=1)
def accept(self, visitor):
if self._source_data is not None:
visitor.visit_source_collection(self)
else:
raise NotImplementedError
def get_cached(self, data):
try:
if hasattr(data, 'reload'):
data.reload()
except NoSuchObject:
from ..backends.context import context
context.uncache(self)
return None
coll = CollectionExpr(_source_data=data, _schema=self._schema)
for attr in CollectionExpr.__slots__:
if not attr.startswith('_ml_'):
continue
if hasattr(self, attr):
setattr(coll, attr, getattr(self, attr))
return coll
_cached_typed_expr = dict()
class TypedExpr(Expr):
__slots__ = '_name', '_source_name'
@classmethod
def _get_type(cls, *args, **kwargs):
# return the data type which extracted from args and kwargs
raise NotImplementedError
@classmethod
def _typed_classes(cls, *args, **kwargs):
# return allowed data types
raise NotImplementedError
@classmethod
def _base_class(cls, *args, **kwargs):
# base class, SequenceExpr or Scalar
raise NotImplementedError
@classmethod
def _new_cls(cls, *args, **kwargs):
data_type = cls._get_type(*args, **kwargs)
if data_type:
base_class = cls._base_class(*args, **kwargs)
typed_classes = cls._typed_classes(*args, **kwargs)
data_type = types.validate_data_type(data_type)
name = data_type.CLASS_NAME + base_class.__name__
# get the typed class, e.g. Int64SequenceExpr, StringScalar
typed_cls = globals().get(name)
if typed_cls is None:
raise TypeError("Name {} doesn't exist".format(name))
if issubclass(cls, typed_cls):
return cls
elif cls == base_class:
return typed_cls
elif cls in typed_classes:
return typed_cls
keys = (cls, typed_cls)
if keys in _cached_typed_expr:
return _cached_typed_expr[keys]
mros = inspect.getmro(cls)
has_data_type = len([sub for sub in mros if sub in typed_classes]) > 0
if has_data_type:
mros = mros[1:]
subs = [sub for sub in mros if sub not in typed_classes]
subs.insert(1, typed_cls)
bases = []
for sub in subs[::-1]:
for i in range(len(bases)):
if bases[i] is None:
continue
if issubclass(sub, bases[i]):
bases[i] = None
bases.append(sub)
bases = tuple(base for base in bases if base is not None)
dic = dict()
if hasattr(cls, '_args'):
dic['_args'] = cls._args
dic['__slots__'] = cls.__slots__ + getattr(cls, '_slots', ())
dic['_add_args_slots'] = True
try:
accept_cls = next(c for c in bases if c.__name__ == cls.__name__)
if hasattr(accept_cls, 'accept'):
dic['accept'] = accept_cls.accept
except StopIteration:
pass
clz = type(cls.__name__, bases, dic)
_cached_typed_expr[keys] = clz
return clz
else:
return cls
def __new__(cls, *args, **kwargs):
clz = cls._new_cls(*args, **kwargs)
return super(TypedExpr, clz).__new__(clz)
def __getattribute__(self, item):
try:
return super(TypedExpr, self).__getattribute__(item)
except AttributeError:
pass
# need to show type of expression when certain method missing
dtype = None
try:
dtype = object.__getattribute__(self, "_data_type")
except AttributeError: # pragma: no cover
pass
try:
dtype = object.__getattribute__(self, "_value_type")
except AttributeError: # pragma: no cover
pass
if dtype is not None:
raise AttributeError(
"'%s' object has no attribute '%s'. Type of the expression "
"is %s which may not support this method." % (type(self).__name__, item, dtype)
)
else:
raise AttributeError(
"'%s' object has no attribute '%s'" % (type(self).__name__, item)
)
@classmethod
def _new(cls, *args, **kwargs):
return cls._new_cls(*args, **kwargs)(*args, **kwargs)
def is_renamed(self):
return self._name is not None and self._source_name is not None and \
self._name != self._source_name
def rename(self, new_name):
if new_name == self._name:
return self
attr_dict = dict((attr, getattr(self, attr, None)) for attr in get_attrs(self))
attr_dict['_source_name'] = self._source_name
attr_dict['_name'] = new_name
return type(self)(**attr_dict)
@property
def name(self):
return self._name
@property
def source_name(self):
return self._source_name
def astype(self, data_type):
raise NotImplementedError
def cast(self, t):
return self.astype(t)
def eval(self, str_expr, rewrite=False):
from .query import SequenceVisitor
if not isinstance(str_expr, str):
raise ValueError('expr must be a string')
frame = sys._getframe(2).f_locals
try:
env = frame.copy()
finally:
del frame
env['max_pt'] = ReprWrapper(make_max_pt_function(self), repr)
visitor = SequenceVisitor(self, env)
return visitor.eval(str_expr, rewrite=rewrite)
[文档]
class SequenceExpr(TypedExpr):
"""
Sequence represents for 1-dimension data.
"""
__slots__ = (
'_data_type', '_source_data_type',
'_ml_fields_cache', '_ml_uplink', '_ml_operations',
)
@classmethod
def _get_type(cls, *args, **kwargs):
return types.validate_data_type(kwargs.get('_data_type'))
@classmethod
def _typed_classes(cls, *args, **kwargs):
return _typed_sequence_exprs
@classmethod
def _base_class(cls, *args, **kwargs):
return SequenceExpr
def _init(self, *args, **kwargs):
self._init_attr('_name', None)
self._init_attr('_ml_fields_cache', None)
self._init_attr('_ml_uplink', [])
self._init_attr('_ml_operations', [])
super(SequenceExpr, self)._init(*args, **kwargs)
if '_data_type' in kwargs:
self._data_type = types.validate_data_type(kwargs.get('_data_type'))
if '_source_name' not in kwargs:
self._source_name = self._name
if '_source_data_type' in kwargs:
self._source_data_type = types.validate_data_type(kwargs.get('_source_data_type'))
else:
self._source_data_type = self._data_type
def cache(self, mem=False):
raise ExpressionError('Cache operation does not support for sequence.')
[文档]
def head(self, n=None, **kwargs):
"""
Return first n rows. Execute at once.
:param n:
:return: result frame
:rtype: :class:`odps.df.expr.expressions.CollectionExpr`
"""
if n is None:
n = options.display.max_rows
return self._handle_delay_call('execute', self, head=n, **kwargs)
[文档]
def tail(self, n=None, **kwargs):
"""
Return the last n rows. Execute at once.
:param n:
:return:
"""
if n is None:
n = options.display.max_rows
return self._handle_delay_call('execute', self, tail=n, **kwargs)
[文档]
def to_pandas(self, wrap=False, **kwargs):
"""
Convert to pandas Series. Execute at once.
:param wrap: if True, wrap the pandas DataFrame into a PyODPS DataFrame
:return: pandas Series
"""
try:
import pandas as pd
except (ImportError, ValueError):
raise DependencyNotInstalledError('to_pandas requires for `pandas` library')
def wrapper(result):
df = result.values
if wrap:
from .. import DataFrame
df = DataFrame(df)
return df[self.name]
return self.execute(wrapper=wrapper, **kwargs)
@property
def data_type(self):
return self._data_type
@property
def source_data_type(self):
return self._source_data_type
@property
def dtype(self):
"""
Return the data type. Available types:
int8, int16, int32, int64, float32, float64, boolean, string, decimal, datetime
:return: the data type
"""
return self._data_type
[文档]
def astype(self, data_type):
"""
Cast to a new data type.
:param data_type: the new data type
:return: casted sequence
:Example:
>>> df.id.astype('float')
"""
data_type = types.validate_data_type(data_type)
if data_type == self._data_type:
return self
attr_dict = dict()
attr_dict['_data_type'] = data_type
attr_dict['_source_data_type'] = self._source_data_type
attr_dict['_input'] = self
new_sequence = AsTypedSequenceExpr(**attr_dict)
return new_sequence
def output_type(self):
return 'sequence(%s)' % repr(self._data_type)
def accept(self, visitor):
visitor.visit_sequence(self)
class BooleanSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(BooleanSequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.boolean
class Int8SequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(Int8SequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.int8
class Int16SequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(Int16SequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.int16
class Int32SequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(Int32SequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.int32
[文档]
class Int64SequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(Int64SequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.int64
class Float32SequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(Float32SequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.float32
class Float64SequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(Float64SequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.float64
class DecimalSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(DecimalSequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.decimal
[文档]
class StringSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(StringSequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.string
class DatetimeSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(DatetimeSequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.datetime
class BinarySequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(BinarySequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.binary
class DateSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(DateSequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.date
class TimestampSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(TimestampSequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.timestamp
class JsonSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(JsonSequenceExpr, self)._init(*args, **kwargs)
self._data_type = types.json
class ListSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(ListSequenceExpr, self)._init(*args, **kwargs)
class DictSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(DictSequenceExpr, self)._init(*args, **kwargs)
class StructSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(StructSequenceExpr, self)._init(*args, **kwargs)
class UnknownSequenceExpr(SequenceExpr):
def _init(self, *args, **kwargs):
super(UnknownSequenceExpr, self)._init(*args, **kwargs)
if not isinstance(self._data_type, types.Unknown):
self._data_type = types.Unknown()
_typed_sequence_exprs = [globals()[t.__class__.__name__ + SequenceExpr.__name__]
for t in types._data_types.values()]
number_sequences = [globals().get(repr(t).capitalize() + SequenceExpr.__name__)
for t in types.number_types()]
int_number_sequences = [globals().get(repr(t).capitalize() + SequenceExpr.__name__)
for t in types.number_types() if repr(t).startswith('int')]
class AsTypedSequenceExpr(SequenceExpr):
_args = '_input',
node_name = "TypedSequence"
@property
def input(self):
return self._input
def accept(self, visitor):
return visitor.visit_cast(self)
@property
def name(self):
return self._name or self._input.name
@property
def source_name(self):
return self._source_name or self._input.source_name
@property
def dtype(self):
return self._data_type or self._input.data_type
@property
def source_type(self):
return self._source_data_type or self._input._source_data_type
def rebuild(self):
attr_dict = self._attr_dict()
tp = self._copy_type()
if isinstance(attr_dict['_source_data_type'], types.Unknown):
attr_dict['_source_data_type'] = self._input.dtype
return tp(**attr_dict)
class Column(SequenceExpr):
_args = '_input',
@property
def input(self):
return self._input
def rebuild(self):
attr_dict = self._attr_dict()
tp = self._copy_type()
new_col = self.input[self.source_name]
for attr in ('_source_data_type', '_data_type'):
if isinstance(attr_dict[attr], types.Unknown):
attr_dict[attr] = new_col.dtype
return tp(**attr_dict)
def accept(self, visitor):
return visitor.visit_column(self)
class CallableColumn(Column):
def __call__(self, *args, **kwargs):
return getattr(type(self._input), self.source_name)(self._input, *args, **kwargs)
[文档]
class Scalar(TypedExpr):
"""
Represent for the scalar type.
:param _value: value of the scalar
:param _value_type: value type of the scalar
:Example:
>>> df[df, Scalar(4).rename('append_const')]
"""
__slots__ = '_value', '_value_type', '_source_value_type'
@classmethod
def _get_type(cls, *args, **kwargs):
value = args[0] if len(args) > 0 else None
value_type = args[1] if len(args) > 1 else None
val = kwargs.get('_value')
if val is None:
val = value
value_type = kwargs.get('_value_type', None) or value_type
if val is None and value_type is None:
raise ValueError('Either value or value_type should be provided')
if val is not None and not isinstance(val, NodeMetaclass):
return types.validate_value_type(val, value_type)
else:
return types.validate_data_type(value_type)
@classmethod
def _typed_classes(cls, *args, **kwargs):
return _typed_scalar_exprs
@classmethod
def _base_class(cls, *args, **kwargs):
return Scalar
@classmethod
def _transform(cls, *args, **kwargs):
value = args[0] if len(args) > 0 else None
value_type = args[1] if len(args) > 1 else None
if ('_value' not in kwargs or kwargs['_value'] is None) and \
value is not None:
kwargs['_value'] = value
if ('_value_type' not in kwargs or kwargs['_value_type'] is None) and \
value_type is not None:
kwargs['_value_type'] = types.validate_data_type(value_type)
if kwargs.get('_value') is not None:
kwargs['_value_type'] = types.validate_value_type(kwargs.get('_value'),
kwargs.get('_value_type'))
if '_source_name' not in kwargs:
kwargs['_source_name'] = kwargs.get('_name')
if '_source_value_type' in kwargs:
kwargs['_source_value_type'] = types.validate_data_type(kwargs['_source_value_type'])
else:
kwargs['_source_value_type'] = kwargs['_value_type']
return kwargs
def __new__(cls, *args, **kwargs):
kwargs = cls._transform(*args, **kwargs)
return super(Scalar, cls).__new__(cls, **kwargs)
def _init(self, *args, **kwargs):
self._init_attr('_name', None)
self._init_attr('_value', None)
kwargs = self._transform(*args, **kwargs)
super(Scalar, self)._init(**kwargs)
def equals(self, other):
return super(Scalar, self).equals(other)
@property
def value(self):
return self._value
@property
def value_type(self):
return self._value_type
@property
def dtype(self):
return self._value_type
def output_type(self):
return 'Scalar[%s]' % repr(self._value_type)
def astype(self, value_type):
value_type = types.validate_data_type(value_type)
if value_type == self._value_type:
return self
attr_dict = dict()
attr_dict['_input'] = self
attr_dict['_value_type'] = value_type
attr_dict['_source_value_type'] = self._source_value_type
new_scalar = AsTypedScalar(**attr_dict)
return new_scalar
def to_sequence(self):
if self._value is None:
attr_values = dict((attr, getattr(self, attr)) for attr in get_attrs(self))
attr_values['_data_type'] = attr_values.pop('_value_type')
if '_source_value_type' in attr_values:
attr_values['_source_data_type'] = attr_values.pop('_source_value_type')
del attr_values['_value']
cls = next(c for c in inspect.getmro(type(self))[1:]
if c.__name__ == type(self).__name__ and not issubclass(c, Scalar))
seq = cls._new(**attr_values)
return seq
raise ExpressionError('Cannot convert valued scalar to sequence')
def accept(self, visitor):
visitor.visit_scalar(self)
def get_cached(self, data):
return Scalar(_value=data, _value_type=self.dtype)
class AsTypedScalar(Scalar):
_args = '_input',
node_name = "TypedScalar"
def accept(self, visitor):
return visitor.visit_cast(self)
@property
def name(self):
return self._name or self._input.name
@property
def source_name(self):
return self._source_name or self._input.source_name
@property
def value(self):
return self._value
@property
def value_type(self):
return self._value_type
@property
def dtype(self):
return self._value_type
def rebuild(self):
attr_dict = self._attr_dict()
tp = self._copy_type()
if isinstance(attr_dict['_source_value_type'], types.Unknown):
attr_dict['_source_value_type'] = self._input.dtype
return tp(**attr_dict)
@property
def source_type(self):
return self._source_value_type
class BooleanScalar(Scalar):
def _init(self, *args, **kwargs):
super(BooleanScalar, self)._init(*args, **kwargs)
self._value_type = types.boolean
class Int8Scalar(Scalar):
def _init(self, *args, **kwargs):
super(Int8Scalar, self)._init(*args, **kwargs)
self._value_type = types.int8
class Int16Scalar(Scalar):
def _init(self, *args, **kwargs):
super(Int16Scalar, self)._init(*args, **kwargs)
self._value_type = types.int16
class Int32Scalar(Scalar):
def _init(self, *args, **kwargs):
super(Int32Scalar, self)._init(*args, **kwargs)
self._value_type = types.int32
class Int64Scalar(Scalar):
def _init(self, *args, **kwargs):
super(Int64Scalar, self)._init(*args, **kwargs)
self._value_type = types.int64
class Float32Scalar(Scalar):
def _init(self, *args, **kwargs):
super(Float32Scalar, self)._init(*args, **kwargs)
self._value_type = types.float32
class Float64Scalar(Scalar):
def _init(self, *args, **kwargs):
super(Float64Scalar, self)._init(*args, **kwargs)
self._value_type = types.float64
class DecimalScalar(Scalar):
def _init(self, *args, **kwargs):
super(DecimalScalar, self)._init(*args, **kwargs)
self._value_type = types.decimal
class StringScalar(Scalar):
def _init(self, *args, **kwargs):
super(StringScalar, self)._init(*args, **kwargs)
self._value_type = types.string
class DateScalar(Scalar):
def _init(self, *args, **kwargs):
super(DateScalar, self)._init(*args, **kwargs)
self._value_type = types.date
class DatetimeScalar(Scalar):
def _init(self, *args, **kwargs):
super(DatetimeScalar, self)._init(*args, **kwargs)
self._value_type = types.datetime
class TimestampScalar(Scalar):
def _init(self, *args, **kwargs):
super(TimestampScalar, self)._init(*args, **kwargs)
self._value_type = types.timestamp
class BinaryScalar(Scalar):
def _init(self, *args, **kwargs):
super(BinaryScalar, self)._init(*args, **kwargs)
self._value_type = types.binary
class JsonScalar(Scalar):
def _init(self, *args, **kwargs):
super(JsonScalar, self)._init(*args, **kwargs)
self._value_type = types.json
class ListScalar(Scalar):
pass
class DictScalar(Scalar):
pass
class StructScalar(Scalar):
pass
class UnknownScalar(Scalar):
def _init(self, *args, **kwargs):
super(UnknownScalar, self)._init(*args, **kwargs)
if not isinstance(self._value_type, types.Unknown):
self._value_type = types.Unknown()
_typed_scalar_exprs = [globals()[t.__class__.__name__ + Scalar.__name__]
for t in types._data_types.values()]
number_scalars = [globals().get(repr(t).capitalize() + Scalar.__name__)
for t in types.number_types()]
int_number_scalars = [globals().get(repr(t).capitalize() + Scalar.__name__)
for t in types.number_types() if repr(t).startswith('int')]
class BuiltinFunction(Scalar):
__slots__ = '_func_name', '_func_args', '_func_kwargs'
def __init__(self, name=None, rtype=None, args=(), **kwargs):
rtype = rtype or kwargs.pop('_value_type', types.string)
rtype = types.validate_data_type(rtype)
func_name = name or kwargs.pop('_func_name', None)
func_args = args or kwargs.pop('_func_args', ())
super(BuiltinFunction, self).__init__(_func_name=func_name,
_func_args=func_args,
_value_type=rtype,
**kwargs)
def accept(self, visitor):
visitor.visit_builtin_function(self)
[文档]
class RandomScalar(BuiltinFunction):
"""
Represent for the random scalar type.
:param seed: random seed, None by default
:Example:
>>> df[df, RandomScalar().rename('append_random')]
"""
def __new__(cls, seed=None, **kw):
args = (seed, ) if seed is not None else kw.get('_func_args', ())
kw.update(dict(_func_name='rand', _value_type='float', _func_args=args))
return BuiltinFunction.__new__(cls, **kw)
def __init__(self, seed=None, **kw):
args = (seed, ) if seed is not None else kw.get('_func_args', ())
kw.update(dict(_func_name='rand', _value_type='float', _func_args=args))
super(RandomScalar, self).__init__(**kw)
class FilterCollectionExpr(CollectionExpr):
_args = '_input', '_predicate'
node_name = 'Filter'
def _init(self, *args, **kwargs):
super(FilterCollectionExpr, self)._init(*args, **kwargs)
if self._schema is None:
self._schema = self._input.schema
def iter_args(self):
for it in zip(['collection', 'predicate'], self.args):
yield it
@property
def input(self):
return self._input
def rebuild(self):
rebuilt = super(FilterCollectionExpr, self).rebuild()
rebuilt._schema = rebuilt.input.schema
return rebuilt
def accept(self, visitor):
visitor.visit_filter_collection(self)
class ProjectCollectionExpr(CollectionExpr):
__slots__ = '_raw_fields',
_args = '_input', '_fields'
_extra_args = '_raw_fields',
node_name = 'Projection'
def _init(self, *args, **kwargs):
fields = kwargs.get('_fields')
if fields is None and len(args) >= 2:
fields = args[1]
for field in fields:
if field.name is None:
raise ExpressionError('Column does not have a name, '
'please specify one by `rename`: %s' % repr_obj(field._repr()))
self._init_attr('_raw_fields', None)
super(ProjectCollectionExpr, self)._init(*args, **kwargs)
def _set_field(self, value, value_dag=None):
from ..backends.context import context
from .window import Window
if context.is_cached(self):
super(ProjectCollectionExpr, self)._set_field(value, value_dag=value_dag)
return
if value_dag is None:
value_dag = value.to_dag(copy=False, validate=False)
for n in value.traverse(top_down=True, unique=True,
stop_cond=lambda x: isinstance(x, Column)):
if isinstance(n, Column) and (n.input is self or n.input._proxy is self):
source_name = n.source_name
idx = self._schema._name_indexes[to_lower_str(source_name)]
field = self._fields[idx]
if field.name != n.name:
field = field.rename(n.name)
value_dag.substitute(n, field)
elif isinstance(n, Window) and n.input is self:
# Window object like rank will point to collection directly instead of through column
n._input = self.input
value = value_dag.root
fields = [field if field.name != value.name else value for field in self._fields]
if value.name not in self._schema:
fields.append(value)
self._schema = TableSchema.from_lists(
[f.name for f in fields], [f.dtype for f in fields]
)
self._fields = fields
def _delitem(self, key):
from ..backends.context import context
if context.is_cached(self):
return super(ProjectCollectionExpr, self).__delitem__(key)
fields = [f for f in self._fields if f.name != key]
self._schema = TableSchema.from_lists(
[f.name for f in fields], [f.dtype for f in fields]
)
self._fields = fields
def __delitem__(self, key):
if key not in self._schema:
raise KeyError('Field({0}) does not exist'.format(key))
self._delitem(key)
@property
def _project_fields(self):
return self._fields
def iter_args(self):
for it in zip(['collection', 'selections'], self.args):
yield it
@property
def input(self):
return self._input
@property
def fields(self):
return self._fields
def rebuild(self):
if self._raw_fields:
return self._input.select(*self._raw_fields)
rebuilt = super(ProjectCollectionExpr, self).rebuild()
rebuilt._schema = TableSchema.from_lists(
[f.name for f in rebuilt._fields], [f.dtype for f in rebuilt._fields]
)
return rebuilt
def accept(self, visitor):
visitor.visit_project_collection(self)
class LateralViewCollectionExpr(ProjectCollectionExpr):
_args = '_input', '_fields', '_lateral_views'
node_name = 'LateralView'
def _init(self, *args, **kwargs):
self._init_attr('_lateral_views', None)
super(LateralViewCollectionExpr, self)._init(*args, **kwargs)
self._redirect_lateral_views()
def _redirect_lateral_views(self):
# dealing with cases like df[df['a', 'b'].apply(..., axis=1), df.c]
for lv in self.lateral_views:
if lv.input is self.input:
continue
for f, f_o in zip(lv._fields, lv.input._fields):
lv.substitute(f, f_o)
lv.substitute(lv.input, self.input)
def iter_args(self):
for it in zip(['collection', 'selections', 'lateral_views'], self.args):
yield it
@property
def lateral_views(self):
return self._lateral_views
def accept(self, visitor):
visitor.visit_lateral_view(self)
class FilterPartitionCollectionExpr(CollectionExpr):
__slots__ = '_predicate_string',
_args = '_input', '_predicate', '_fields'
node_name = 'FilterPartition'
def _init(self, *args, **kwargs):
super(FilterPartitionCollectionExpr, self)._init(*args, **kwargs)
self._fields = [self._input[col.name] for col in self._schema.columns]
self._predicate_string = kwargs.get('_predicate_string')
@property
def _project_fields(self):
return self._fields
def iter_args(self):
for it in zip(['collection', 'predicate', 'selections'], self.args):
yield it
@property
def input(self):
return self._input
@property
def fields(self):
return self._fields
@property
def predicate_string(self):
return self._predicate_string
def accept(self, visitor):
visitor.visit_filter_partition_collection(self)
@property
def data_table(self):
return self.input.data
@property
def data(self):
from ...types import PartitionSpec
from .arithmetic import Or
if isinstance(self._predicate, Or):
raise AttributeError('Cannot get data when predicate contains multiple partition specs.')
spec = PartitionSpec(self.predicate_string)
return self.data_table.partitions[spec]
class SliceCollectionExpr(CollectionExpr):
_args = '_input', '_indexes'
node_name = 'Slice'
def _init(self, *args, **kwargs):
super(SliceCollectionExpr, self)._init(*args, **kwargs)
if isinstance(self._indexes, slice):
scalar = lambda v: Scalar(_value=v) if v is not None else None
self._indexes = scalar(self._indexes.start), \
scalar(self._indexes.stop), scalar(self._indexes.step)
@property
def start(self):
res = self._indexes[0]
return res.value if res is not None else None
@property
def stop(self):
res = self._indexes[1]
return res.value if res is not None else None
@property
def step(self):
res = self._indexes[2]
return res.value if res is not None else None
@property
def input(self):
return self._input
def iter_args(self):
args = [self._input] + list(self._indexes)
for it in zip(['collection', 'start', 'stop', 'step'], args):
yield it
def rebuild(self):
rebuilt = super(SliceCollectionExpr, self).rebuild()
rebuilt._schema = self.input.schema
return rebuilt
def accept(self, visitor):
visitor.visit_slice_collection(self)
class Summary(CollectionExpr):
__slots__ = '_schema',
_args = '_input', '_fields'
def _init(self, *args, **kwargs):
super(Summary, self)._init(*args, **kwargs)
if hasattr(self, '_schema') and any(it is None for it in self._schema.names):
raise TypeError('Schema cannot has field which name is None')
@property
def input(self):
return self._input
@property
def fields(self):
return self._fields
@property
def schema(self):
return self._schema
def iter_args(self):
for it in zip(['collection', 'fields'], self.args):
yield it
def accept(self, visitor):
visitor.visit_project_collection(self)
from . import element
from . import arithmetic
from . import reduction
from . import groupby
from . import collections
from . import window
from . import math
from . import strings
from . import datetimes
from . import merge
from . import composites
from ..tools import *
# hack for count
def _count(expr, *args, **kwargs):
if len(args) + len(kwargs) > 0:
from .strings import _count
return _count(expr, *args, **kwargs)
else:
from .reduction import count
return count(expr)
StringSequenceExpr.count = _count