#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..models import Table
from ..models.partition import Partition
from ..compat import six, izip
from ..types import CompositeMixin
from .expr.utils import get_attrs
from .expr.expressions import CollectionExpr
from .types import validate_data_type
from .backends.odpssql.types import odps_schema_to_df_schema
from .backends.pd.types import pd_to_df_schema, df_type_to_np_type, cast_composite_sequence
from .backends.sqlalchemy.types import sqlalchemy_to_df_schema
try:
import pandas as pd
has_pandas = True
except ImportError:
has_pandas = False
try:
import sqlalchemy
has_sqlalchemy = True
except ImportError:
has_sqlalchemy = False
[docs]
class DataFrame(CollectionExpr):
"""
Main entrance of PyODPS DataFrame.
Users can initial a DataFrame by :class:`odps.models.Table`.
:param data: ODPS table or pandas DataFrame
:type data: :class:`odps.models.Table` or pandas DataFrame
:Example:
>>> df = DataFrame(o.get_table('my_example_table'))
>>> df.dtypes
odps.Schema {
movie_id int64
title string
release_date string
video_release_date string
imdb_url string
user_id int64
rating int64
unix_timestamp int64
age int64
sex string
occupation string
zip_code string
}
>>> df.count()
100000
>>>
>>> # Do the `groupby`, aggregate the `movie_id` by count, then sort the count in a reversed order
>>> # Finally we get the top 25 results
>>> df.groupby('title').agg(count=df.movie_id.count()).sort('count', ascending=False)[:25]
>>>
>>> # We can use the `value_counts` to reach the same goal
>>> df.movie_id.value_counts()[:25]
"""
__slots__ = ()
def __init__(self, *args, **kwargs):
if len(args) == 1:
data = args[0]
else:
data = kwargs.pop('_source_data', None)
if data is None:
raise ValueError('ODPS Table or pandas DataFrame should be provided.')
if isinstance(data, Table):
if '_schema' not in kwargs:
kwargs['_schema'] = odps_schema_to_df_schema(data.table_schema)
super(DataFrame, self).__init__(_source_data=data, **kwargs)
elif isinstance(data, Partition):
if '_schema' not in kwargs:
kwargs['_schema'] = odps_schema_to_df_schema(data.table.table_schema)
super(DataFrame, self).__init__(_source_data=data.parent.parent, **kwargs)
self._proxy = self.copy().filter_parts(data)
elif has_pandas and isinstance(data, pd.DataFrame):
if 'schema' in kwargs and kwargs['schema']:
schema = kwargs.pop('schema')
elif '_schema' in kwargs:
schema = kwargs.pop('_schema')
else:
unknown_as_string = kwargs.pop('unknown_as_string', False)
as_type = kwargs.pop('as_type', None)
if as_type:
data = data.copy()
data.is_copy = False
as_type = dict((k, validate_data_type(v)) for k, v in six.iteritems(as_type))
if not isinstance(as_type, dict):
raise TypeError('as_type must be dict')
for col_name, df_type in six.iteritems(as_type):
if col_name not in data:
raise ValueError('col(%s) does not exist in pd.DataFrame' % col_name)
try:
if isinstance(df_type, CompositeMixin):
data[col_name] = cast_composite_sequence(data[col_name], df_type)
else:
pd_type = df_type_to_np_type(df_type)
data[col_name] = data[col_name][data[col_name].notnull()].astype(pd_type)
except TypeError:
raise TypeError('Cannot cast col(%s) to data type: %s' % (col_name, df_type))
schema = pd_to_df_schema(data, as_type=as_type,
unknown_as_string=unknown_as_string)
super(DataFrame, self).__init__(_source_data=data, _schema=schema, **kwargs)
elif has_sqlalchemy and isinstance(data, sqlalchemy.Table):
if '_schema' not in kwargs:
kwargs['_schema'] = sqlalchemy_to_df_schema(data.c)
super(DataFrame, self).__init__(_source_data=data, **kwargs)
else:
raise ValueError('Unknown type: %s' % data)
def __setstate__(self, state):
kv = dict(state)
source_data = kv.pop('_source_data')
kv.pop('_schema', None)
self.__init__(source_data, **kv)
[docs]
def view(self):
kv = dict((attr, getattr(self, attr)) for attr in get_attrs(self))
data = kv.pop('_source_data')
kv.pop('_schema', None)
return type(self)(data, **kv)
[docs]
@staticmethod
def batch_persist(dfs, tables, *args, **kwargs):
"""
Persist multiple DataFrames into ODPS.
:param dfs: DataFrames to persist.
:param tables: Table names to persist to. Use (table, partition) tuple to store to a table partition.
:param args: args for Expr.persist
:param kwargs: kwargs for Expr.persist
:Examples:
>>> DataFrame.batch_persist([df1, df2], ['table_name1', ('table_name2', 'partition_name2')], lifecycle=1)
"""
from .delay import Delay
if 'async' in kwargs:
kwargs['async_'] = kwargs['async']
execute_keys = ('ui', 'async_', 'n_parallel', 'timeout', 'close_and_notify')
execute_kw = dict((k, v) for k, v in six.iteritems(kwargs) if k in execute_keys)
persist_kw = dict((k, v) for k, v in six.iteritems(kwargs) if k not in execute_keys)
delay = Delay()
persist_kw['delay'] = delay
for df, table in izip(dfs, tables):
if isinstance(table, tuple):
table, partition = table
else:
partition = None
df.persist(table, partition=partition, *args, **persist_kw)
return delay.execute(**execute_kw)
@property
def data(self):
return self._source_data