from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))

聚合操作

首先,我们可以使用describe函数,来查看DataFrame里数字列的数量、最大值、最小值、平均值以及标准差是多少。

>>> print(iris.describe())
    type  sepal_length  sepal_width  petal_length  petal_width
0  count    150.000000   150.000000    150.000000   150.000000
1   mean      5.843333     3.054000      3.758667     1.198667
2    std      0.828066     0.433594      1.764420     0.763161
3    min      4.300000     2.000000      1.000000     0.100000
4    max      7.900000     4.400000      6.900000     2.500000

我们可以使用单列来执行聚合操作:

>>> iris.sepallength.max()
7.9

如果要在消除重复后的列上进行聚合,可以先调用 unique 方法,再调用相应的聚合函数:

>>> iris.name.unique().cat(sep=',')
u'Iris-setosa,Iris-versicolor,Iris-virginica'

如果所有列支持同一种聚合操作,也可以直接在整个 DataFrame 上执行聚合操作:

>>> iris.exclude('category').mean()
   sepal_length  sepal_width  petal_length  petal_width
1      5.843333     3.054000      3.758667     1.198667

需要注意的是,在 DataFrame 上执行 count 获取的是 DataFrame 的总行数:

>>> iris.count()
150

PyODPS 支持的聚合操作包括:

聚合操作

说明

count(或size)

数量

nunique

不重复值数量

min

最小值

max

最大值

sum

求和

mean

均值

median

中位数

quantile(p)

p分位数,仅在整数值下可取得准确值

var

方差

std

标准差

moment

n 阶中心矩(或 n 阶矩)

skew

样本偏度(无偏估计)

kurtosis

样本峰度(无偏估计)

cat

按sep做字符串连接操作

tolist

组合为 list

需要注意的是,与 Pandas 不同,对于列上的聚合操作,不论是在 ODPS 还是 Pandas 后端下,PyODPS DataFrame 都会忽略空值。这一逻辑与 SQL 类似。

分组聚合

DataFrame API提供了groupby来执行分组操作,分组后的一个主要操作就是通过调用agg或者aggregate方法,来执行聚合操作。

>>> iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
              name  sepallength_max  smin
0      Iris-setosa              5.8   4.3
1  Iris-versicolor              7.0   4.9
2   Iris-virginica              7.9   4.9

最终的结果列中会包含分组的列,以及聚合的列。

DataFrame 提供了一个value_counts操作,能返回按某列分组后,每个组的个数从大到小排列的操作。

我们使用 groupby 表达式可以写成:

>>> iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
              name  count
0   Iris-virginica     50
1  Iris-versicolor     50
2      Iris-setosa     50

使用value_counts就很简单了:

>>> iris['name'].value_counts().head(5)
              name  count
0   Iris-virginica     50
1  Iris-versicolor     50
2      Iris-setosa     50

需要注意的是,该方法返回的行数大小受到 ODPS 排序返回结果大小的限制,默认为 10000 行,可通过 options.df.odps.sort.limit 配置,详见 配置选项

对于聚合后的单列操作,我们也可以直接取出列名。但此时只能使用聚合函数。

>>> iris.groupby('name').petallength.sum()
   petallength_sum
0             73.2
1            213.0
2            277.6
>>> iris.groupby('name').agg(iris.petallength.notnull().sum())
              name  petallength_sum
0      Iris-setosa               50
1  Iris-versicolor               50
2   Iris-virginica               50

分组时也支持对常量进行分组,但是需要使用Scalar初始化。

>>> from odps.df import Scalar
>>> iris.groupby(Scalar(1)).petallength.sum()
   petallength_sum
0            563.8

编写自定义聚合

对字段调用agg或者aggregate方法来调用自定义聚合。自定义聚合需要提供一个类,这个类需要提供以下方法:

  • buffer():返回一个mutable的object(比如 list、dict),buffer大小不应随数据而递增。

  • __call__(buffer, *val):将值聚合到中间 buffer。

  • merge(buffer, pbuffer):将 pbuffer 聚合到 buffer 中。

  • getvalue(buffer):返回最终值。

让我们看一个计算平均值的例子。

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.sepalwidth.agg(Agg)
3.0540000000000007

如果最终类型和输入类型发生了变化,则需要指定类型。

>>> iris.sepalwidth.agg(Agg, 'float')

自定义聚合也可以用在分组聚合中。

>>> iris.groupby('name').sepalwidth.agg(Agg)
   petallength_aggregation
0                    3.418
1                    2.770
2                    2.974

当对多列调用自定义聚合,可以使用agg方法。

class Agg(object):

    def buffer(self):
        return [0.0, 0.0]

    def __call__(self, buffer, val1, val2):
        buffer[0] += val1
        buffer[1] += val2

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> from odps.df import agg
>>> to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float')  # 对两列调用自定义聚合
>>> iris.groupby('name').agg(val=to_agg)
              name       val
0      Iris-setosa  0.682781
1  Iris-versicolor  0.466644
2   Iris-virginica  0.451427

要调用 ODPS 上已经存在的 UDAF,指定函数名即可。

>>> iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))  # 对单列聚合
>>> to_agg = agg([iris.sepalwidth, iris.sepallength], 'your_func', rtype='float')
>>> iris.groupby('name').agg(to_agg.rename('val'))  # 对多列聚合

警告

目前,受限于 Python UDF,自定义聚合无法支持将 list / dict 类型作为初始输入或最终输出结果。

HyperLogLog 计数

DataFrame 提供了对列进行 HyperLogLog 计数的接口 hll_count,这个接口是个近似的估计接口, 当数据量很大时,能较快的对数据的唯一个数进行估计。

这个接口在对比如海量用户UV进行计算时,能很快得出估计值。

>>> df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
>>> df.a.hll_count()
63270
>>> df.a.nunique()
63250

提供 splitter 参数会对每个字段进行分隔,再计算唯一数。