from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))

Aggregation

First, use the describefunction to view the quantity, maximum, minimum, mean, and standard deviation in number columns of DataFrame.

>>> print(iris.describe())
    type  sepal_length  sepal_width  petal_length  petal_width
0  count    150.000000   150.000000    150.000000   150.000000
1   mean      5.843333     3.054000      3.758667     1.198667
2    std      0.828066     0.433594      1.764420     0.763161
3    min      4.300000     2.000000      1.000000     0.100000
4    max      7.900000     4.400000      6.900000     2.500000

You can execute aggregation operations in one column:

>>> iris.sepallength.max()
7.9

You can aggregate over a distinct sequence, you can call unique method on the sequence before calling actual aggregation methods.

>>> iris.name.unique().cat(sep=',')
u'Iris-setosa,Iris-versicolor,Iris-virginica'

If all columns support the same aggregation operation, you can execute this aggregation operation in the entire DataFrame:

>>> iris.exclude('category').mean()
   sepal_length  sepal_width  petal_length  petal_width
1      5.843333     3.054000      3.758667     1.198667

Note that you can obtain the total number of DataFrame rows by executing the count operation:

>>> iris.count()
150

Python on MaxCompute (PyODPS) includes the following aggregation operations:

Aggregation

Description

count (or size)

Quantity

nunique

Number of unique values

min

Minimum

max

Maximum

sum

Summation

mean

Mean value

median

Median value

quantile(p)

P-quantile, accurate only for integers

var

Variance

std

Standard deviation

moment

Nth central moment

skew

Sample skewness (unbiased estimation)

kurtosis

Sample kurtosis (unbiased estimation)

cat

Operation of concatenating character strings with a separator

tolist

Operation of aggregating a column into a list

Note that different from Pandas, aggregations in PyODPS DataFrame, whenever under MaxCompute or Pandas backend, neglect null values. This behavior can also be seen in SQL.

Group and aggregate

The DataFrame application program interface (API) provides the groupby method to execute grouping. One of the main operations after grouping is to use the agg or aggregate method to execute aggregation operations.

>>> iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
              name  sepallength_max  smin
0      Iris-setosa              5.8   4.3
1  Iris-versicolor              7.0   4.9
2   Iris-virginica              7.9   4.9

The result columns include the group column and the aggregated column.

DataFrame provides the value_countsoperation to sort the unique data quantity in a descending order in a group after grouping by column.

You can use the groupby expression in the following code:

>>> iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
              name  count
0   Iris-virginica     50
1  Iris-versicolor     50
2      Iris-setosa     50

Simplify the code by using value_counts:

>>> iris['name'].value_counts().head(5)
              name  count
0   Iris-virginica     50
1  Iris-versicolor     50
2      Iris-setosa     50

Note that the number of lines returned by value_counts is limited due to limitations on ORDER BY clause of MaxCompute SQL. The default limitation is 10,000, which can be changed via options.df.odps.sort.limit. More details can be seen in configuration section.

Only aggregate functions can be used after you retrieve the column name in a single aggregated column.

>>> iris.groupby('name').petallength.sum()
   petallength_sum
0             73.2
1            213.0
2            277.6
>>> iris.groupby('name').agg(iris.petallength.notnull().sum())
              name  petallength_sum
0      Iris-setosa               50
1  Iris-versicolor               50
2   Iris-virginica               50

The system also supports grouping by constants. This requires Scalar initialization.

>>> from odps.df import Scalar
>>> iris.groupby(Scalar(1)).petallength.sum()
   petallength_sum
0            563.8

Write custom aggregations

Use the agg or aggregate method on fields with a custom aggregation. The custom aggregation requires a class, which provides the following methods:

  • buffer(): returns a mutable object such as list and dict. The buffer size should not increase during data processing.

  • __call__(buffer, *val): aggregates values to buffer.

  • merge(buffer, pbuffer): aggregates pbuffer to buffer.

  • getvalue(buffer): returns the final value.

Calculate a mean in the following example:

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.sepalwidth.agg(Agg)
3.0540000000000007

If the output type is different from the input type, you need to specify the output type.

>>> iris.sepalwidth.agg(Agg, 'float')

Custom aggregations support grouping and aggregation.

>>> iris.groupby('name').sepalwidth.agg(Agg)
   petallength_aggregation
0                    3.418
1                    2.770
2                    2.974

You can use the agg method in your aggregation for multiple columns.

class Agg(object):

    def buffer(self):
        return [0.0, 0.0]

    def __call__(self, buffer, val1, val2):
        buffer[0] += val1
        buffer[1] += val2

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> from odps.df import agg
>>> to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float')  # call custom aggregation on two columns
>>> iris.groupby('name').agg(val=to_agg)
              name       val
0      Iris-setosa  0.682781
1  Iris-versicolor  0.466644
2   Iris-virginica  0.451427

Specify the function name to use an existing UDAF in MaxCompute.

>>> iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))  # aggregate one column
>>> to_agg = agg([iris.sepalwidth, iris.sepallength], 'your_func', rtype='float')
>>> iris.groupby('name').agg(to_agg.rename('val'))  # aggregate multiple columns

Warning

Limited by Python user-defined functions (UDFs), custom aggregations cannot specify the input or output result type as the list or dict type.

HyperLogLog counting

DataFrame provides the hll_count API to use HyperLogLog counting for columns. This API estimates the unique quantity among large volumes of data in a short time.

This API can quickly calculate and provide an estimate of the unique visitors (UVs).

>>> df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
>>> df.a.hll_count()
63270
>>> df.a.nunique()
63250

The system provides the splitter parameter to split fields and then calculate the unique quantity.