Sorting, deduplication, sampling, and data transformation

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))

Sorting

You can sort Collection objects only. Use the sort or sort_values method to start sorting.

>>> iris.sort('sepalwidth').head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.2         2.2          4.5         1.5  Iris-versicolor
2          6.0         2.2          5.0         1.5   Iris-virginica
3          6.0         2.2          4.0         1.0  Iris-versicolor
4          5.5         2.3          4.0         1.3  Iris-versicolor

To sort data in a descending order, set the ascendingparameter to False.

>>> iris.sort('sepalwidth', ascending=False).head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.7         4.4          1.5         0.4  Iris-setosa
1          5.5         4.2          1.4         0.2  Iris-setosa
2          5.2         4.1          1.5         0.1  Iris-setosa
3          5.8         4.0          1.2         0.2  Iris-setosa
4          5.4         3.9          1.3         0.4  Iris-setosa

You can also sort data in a descending order in the following way:

>>> iris.sort(-iris.sepalwidth).head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.7         4.4          1.5         0.4  Iris-setosa
1          5.5         4.2          1.4         0.2  Iris-setosa
2          5.2         4.1          1.5         0.1  Iris-setosa
3          5.8         4.0          1.2         0.2  Iris-setosa
4          5.4         3.9          1.3         0.4  Iris-setosa

The system sorts multiple fields easily as shown in the following code:

>>> iris.sort(['sepalwidth', 'petallength']).head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.0         2.2          4.0         1.0  Iris-versicolor
2          6.2         2.2          4.5         1.5  Iris-versicolor
3          6.0         2.2          5.0         1.5   Iris-virginica
4          4.5         2.3          1.3         0.3      Iris-setosa

To sort multiple fields in both ascending and descending orders, use the ascendingparameter with a list of boolean values. The length of the list must be equal to that of sorted fields.

>>> iris.sort(['sepalwidth', 'petallength'], ascending=[True, False]).head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.0         2.2          5.0         1.5   Iris-virginica
2          6.2         2.2          4.5         1.5  Iris-versicolor
3          6.0         2.2          4.0         1.0  Iris-versicolor
4          6.3         2.3          4.4         1.3  Iris-versicolor

To achieve the same effect, the preceding code can be modified as follows:

>>> iris.sort(['sepalwidth', -iris.petallength]).head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.0         2.2          5.0         1.5   Iris-virginica
2          6.2         2.2          4.5         1.5  Iris-versicolor
3          6.0         2.2          4.0         1.0  Iris-versicolor
4          6.3         2.3          4.4         1.3  Iris-versicolor

Note

MaxCompute requires the number of items to be picked from the top of the sorted list. You can use options.df.odps.sort.limit to specify the number of items that you want to retrieve from the top of the sorted list. The default value is 10000. To sort large volumes of data, set the parameter to a larger value. However, this setting may cause an out-of-memory (OOM) error.

Deduplication

To deduplicate Collection objects, use the distinct method.

>>> iris[['name']].distinct()
              name
0      Iris-setosa
1  Iris-versicolor
2   Iris-virginica
>>> iris.distinct('name')
              name
0      Iris-setosa
1  Iris-versicolor
2   Iris-virginica
>>> iris.distinct('name', 'sepallength').head(3)
          name  sepallength
0  Iris-setosa          4.3
1  Iris-setosa          4.4
2  Iris-setosa          4.5

To deduplicate Sequence objects, use the unique method. However, the Sequence object for the unique method cannot be used in column selection.

>>> iris.name.unique()
              name
0      Iris-setosa
1  Iris-versicolor
2   Iris-virginica

We do not recommend that you use the following code:

>>> iris[iris.name, iris.name.unique()]  # this line produces an error

Sampling

To sample data from a Collection object, use the sample method. Python on MaxCompute (PyODPS) supports the following sampling means:

Warning

Except for sampling by parts, other sampling methods require XFlow to execute on the MaxCompute DataFrame. If they do not support XFlow, these sampling methods can only be executed at the backend of Pandas DataFrame.

  • Sampling by parts

Data is divided into parts by using this method. You can select the part number that you want to sample.

>>> iris.sample(parts=10)  # split into 10 parts and take the 0th by default
>>> iris.sample(parts=10, i=0)  # split into 10 parts and take the 0th part
>>> iris.sample(parts=10, i=[2, 5])   # split into 10 parts and take the 2nd and 5th part
>>> iris.sample(parts=10, columns=['name', 'sepalwidth'])  # sample given values of name and sepalwidth
  • Sampling by percentage or items

You need to specify the number of data items or the percentage of data that you want to sample using this method. To enable sampling with replacement, set the replace parameter to True.

>>> iris.sample(n=100)  # sample 100 records
>>> iris.sample(frac=0.3)  # sample 30% of all records
  • Sampling by weight

You need to specify the weight column and the number of data items or the proportion of data that you want sample in this way. To enable sampling with replacement, set the replace parameter to True.

>>> iris.sample(n=100, weights='sepal_length')
>>> iris.sample(n=100, weights='sepal_width', replace=True)
  • Stratified sampling

In this sampling, you need to specify the label column that you want to stratify, and specify the sampling proportion (the frac parameter) or the number of items (the n parameter) for each label. Currently, stratified sampling does not support replacement.

>>> iris.sample(strata='category', n={'Iris Setosa': 10, 'Iris Versicolour': 10})
>>> iris.sample(strata='category', frac={'Iris Setosa': 0.5, 'Iris Versicolour': 0.4})

Data scaling

DataFrame supports data scaling by using the maximum, minimum, mean, and standard deviation. The following is an example for scaling:

    name  id  fid
0  name1   4  5.3
1  name2   2  3.5
2  name2   3  1.5
3  name1   4  4.2
4  name1   3  2.2
5  name1   3  4.1

You can use the min_max_scale method to normalize data:

>>> df.min_max_scale(columns=['fid'])
    name  id       fid
0  name1   4  1.000000
1  name2   2  0.526316
2  name2   3  0.000000
3  name1   4  0.710526
4  name1   3  0.184211
5  name1   3  0.684211

min_max_scale can work with the feature_range parameter to specify the output range. The following is an example of how to output values in the range of (-1, 1):

>>> df.min_max_scale(columns=['fid'], feature_range=(-1, 1))
    name  id       fid
0  name1   4  1.000000
1  name2   2  0.052632
2  name2   3 -1.000000
3  name1   4  0.421053
4  name1   3 -0.631579
5  name1   3  0.368421

To keep original values, use the preserve parameter. Then, scaled data is added to a new column that is named as the original column name suffixed with “_scaled”. To change the suffix, use the suffix parameter, as shown in the following code:

>>> df.min_max_scale(columns=['fid'], preserve=True)
    name  id  fid  fid_scaled
0  name1   4  5.3    1.000000
1  name2   2  3.5    0.526316
2  name2   3  1.5    0.000000
3  name1   4  4.2    0.710526
4  name1   3  2.2    0.184211
5  name1   3  4.1    0.684211

min_max_scale can also work with the group parameter to specify one or more group columns and to retrieve the minimum and maximum from the specified column to scale data, as shown in the following code:

>>> df.min_max_scale(columns=['fid'], group=['name'])
    name  id       fid
0  name1   4  1.000000
1  name1   4  0.645161
2  name1   3  0.000000
3  name1   3  0.612903
4  name2   2  1.000000
5  name2   3  0.000000

The system has scaled data in both name1 and name2 by using the minimum and maximum.

The std_scale method scales data by using standard normal distribution, as shown in the following code:

>>> df.std_scale(columns=['fid'])
    name  id       fid
0  name1   4  1.436467
1  name2   2  0.026118
2  name2   3 -1.540938
3  name1   4  0.574587
4  name1   3 -0.992468
5  name1   3  0.496234

This method also supports using the preserve parameter to keep the original column and the group parameter, in order to group data. For more information about grouping, see min_max_scale.

Null value processing

DataFrame supports removing or filling null values. The following data is the example for processing:

   id   name   f1   f2   f3   f4
0   0  name1  1.0  NaN  3.0  4.0
1   1  name1  2.0  NaN  NaN  1.0
2   2  name1  3.0  4.0  1.0  NaN
3   3  name1  NaN  1.0  2.0  3.0
4   4  name1  1.0  NaN  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0
6   6  name1  NaN  NaN  NaN  NaN

You can use the dropna method to delete the rows that contain null values in the subset object, as follows:

>>> df.dropna(subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   5  name1  1.0  2.0  3.0  4.0

If the rows also contain non-null values, you can use how=’all’ to keep these rows, as follows:

>>> df.dropna(how='all', subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  NaN  3.0  4.0
1   1  name1  2.0  NaN  NaN  1.0
2   2  name1  3.0  4.0  1.0  NaN
3   3  name1  NaN  1.0  2.0  3.0
4   4  name1  1.0  NaN  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0

To specify the minimum number of non-null values in the rows, use the thresh parameter to keep qualified rows, as shown in the following code:

>>> df.dropna(thresh=3, subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  NaN  3.0  4.0
2   2  name1  3.0  4.0  1.0  NaN
3   3  name1  NaN  1.0  2.0  3.0
4   4  name1  1.0  NaN  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0

You can use fillna to replace null values with a specified constant or values in an existing column. In the following example, null values are replaced with a constant:

>>> df.fillna(100, subset=['f1', 'f2', 'f3', 'f4'])
   id   name     f1     f2     f3     f4
0   0  name1    1.0  100.0    3.0    4.0
1   1  name1    2.0  100.0  100.0    1.0
2   2  name1    3.0    4.0    1.0  100.0
3   3  name1  100.0    1.0    2.0    3.0
4   4  name1    1.0  100.0    3.0    4.0
5   5  name1    1.0    2.0    3.0    4.0
6   6  name1  100.0  100.0  100.0  100.0

You can replace null values with values in the specified existing column, as shown in the following code:

>>> df.fillna(df.f2, subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  NaN  3.0  4.0
1   1  name1  2.0  NaN  NaN  1.0
2   2  name1  3.0  4.0  1.0  4.0
3   3  name1  1.0  1.0  2.0  3.0
4   4  name1  1.0  NaN  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0
6   6  name1  NaN  NaN  NaN  NaN

DataFrame also provides backward filling and forward filling by using the method parameter, as follows:

Value

Definition

bfill / backfill

Backward filling

ffill / pad

Forward filling

Example:

>>> df.fillna(method='bfill', subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  3.0  3.0  4.0
1   1  name1  2.0  1.0  1.0  1.0
2   2  name1  3.0  4.0  1.0  NaN
3   3  name1  1.0  1.0  2.0  3.0
4   4  name1  1.0  3.0  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0
6   6  name1  NaN  NaN  NaN  NaN
>>> df.fillna(method='ffill', subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  1.0  3.0  4.0
1   1  name1  2.0  2.0  2.0  1.0
2   2  name1  3.0  4.0  1.0  1.0
3   3  name1  NaN  1.0  2.0  3.0
4   4  name1  1.0  1.0  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0
6   6  name1  NaN  NaN  NaN  NaN

You can use the ffill / bfill function to simplify code, where ffill equals to fillna(method=’ffill’) and bfill equals to fillna(method=’bfill’).

Applying custom functions for all columns and rows

Using custom functions for one row

To use custom functions for one row, you can use the apply method. The axis parameter must be 1 to indicate that the operation works on the row.

The apply method calls your function. You can specify the type or offset in the function to retrieve data in a field from the row in the preceding Collection object.

>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
   sepaladd
0       8.6
1       7.9
2       7.9

If reduceis True, the system returns the Sequence object. If not, the system returns the Collection object. Use the namesand typesparameters to specify the field name and type of the returned Sequence or Collection object. The type is string by default if not specified.

If reduce is False in your function, you can use the yieldkeyword to return multiple rows.

>>> iris.count()
150
>>>
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
300
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).head(5)
   iris_add  iris_sub
0       1.6       8.6
1       1.2       1.6
2       1.9       7.9
3       1.2       1.6
4       1.5       7.9

You can also annotate your function with returned field names and types. In this way, you do not need to specify them when calling your function.

>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1).count()
300
>>> iris.apply(handle, axis=1).head(5)
   iris_add  iris_sub
0       1.6       8.6
1       1.2       1.6
2       1.9       7.9
3       1.2       1.6
4       1.5       7.9

Calling map_reduce without reducers is equal to calling apply method with axis=1.

>>> iris.map_reduce(mapper=handle).count()
300
>>> iris.map_reduce(mapper=handle).head(5)
   iris_add  iris_sub
0       1.6       8.6
1       1.2       1.6
2       1.9       7.9
3       1.2       1.6
4       1.5       7.9

To use an existing user-defined table function (UDTF) in MaxCompute, specify the name of the UDTF.

>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])

When you use the apply method in a row and reduceis False, combine this row with an existing row by using Lateral view to prepare for aggregation.

>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris[iris.category, iris.apply(handle, axis=1)]

Using custom aggregations for all columns

When you use the apply method and axis is not specified or is 0, call a custom aggregation to aggregate all Sequence objects.

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
   sepallength_aggregation  sepalwidth_aggregation  petallength_aggregation  petalwidth_aggregation
0                 5.843333                   3.054                 3.758667                1.198667

Warning

Limited by Python UDFs, custom aggregations cannot specify the input or the output result type as the list or dict type.

Warning

PyODPS DataFrame recognizes all collections and sequences as distributed objects, and does not support referencing these objects inside user-defined functions. Please consider using methods like join to reference data in multiple DataFrames, or referencing collections as resources, which is stated in the next section.

Referring to resources

Similar to the resources parameter for map , resources can be tables, files, or Collection objects that you refer to in MaxCompute.

If axis is 1 in a row operation, write a function closure or callable class. For column aggregation, you can read resources by using the __init__ function.

>>> words_df
                     sentence
0                 Hello World
1                Hello Python
2  Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>>     stop_words = set([r[0] for r in resources[0]])
>>>     def h(row):
>>>         return ' '.join(w for w in row[0].split() if w not in stop_words),
>>>     return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
                sentence
0            Hello World
1           Hello Python
2  Life short use Python

In this example, stop_words is a local variable which is referenced as a resource in MaxCompute during execution.

Using a third-party Python library

You can upload third-party wheel packages as resources to MaxCompute. You need to specify the package files globally or in methods that execute DataFrames immediately. Note that you also need to add dependencies of your third-party libraries, or import could fail.

If your MaxCompute service supports specifying images when executing SQL statements, you may specify image argument with execute, persist or to_pandas to use these images. Meanwhile libraries argument can be used with execute, persist or to_pandas to specify resources as thirdparty libraries. PyODPS installation provides pyodps-pack tool for packing third-party libraries. You may take a look at documents here to see how to build and use these third-party libraries.

Warning

Due to the difference in bytecode definition, when you write code with new language features such as yield from in Python 3, the system generates an error when running the code in the MaxCompute Worker of Python 2.7. Therefore, we recommend that you make sure that the code runs normally before you implement the production operations using the MapReduce API (application program interface) of Python 3.

MapReduce API

PyODPS DataFrame supports the MapReduce API. You can write the map and reduce functions respectively, because map_reduce may include the mapper or reducer process only. The following code is an example of a wordcount:

>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> def reducer(keys):
>>>     # we use a list here instead of letting cnt = 0, or cnt in function h will be treated as a local variable whose value cannot be used outside the function
>>>     cnt = [0]
>>>     def h(row, done):  # done == True indicates that all rows with current key are visited
>>>         cnt[0] += row[1]
>>>         if done:
>>>             yield keys[0], cnt[0]
>>>     return h
>>>
>>> words_df.map_reduce(mapper, reducer, group=['word', ],
>>>                     mapper_output_names=['word', 'cnt'],
>>>                     mapper_output_types=['string', 'int'],
>>>                     reducer_output_names=['word', 'cnt'],
>>>                     reducer_output_types=['string', 'int'])
     word  cnt
0   hello    2
1       i    1
2      is    1
3    life    1
4  python    2
5   short    1
6     use    1
7   world    1

Use the group parameter to specify the fields that you want to group with the reduce function. If you do not specify the fields, all fields are grouped.

The reducer process is different. The reducer receives aggregated initial values of keys, and then processes each row aggregated by keys. The second parameter done indicates whether all the rows related to these keys have been completely iterated.

The function closure is convenient, and the callable is also allowed.

class reducer(object):
    def __init__(self, keys):
        self.cnt = 0

    def __call__(self, row, done):  # # done == True indicates that all rows with current key are visited
        self.cnt += row.cnt
        if done:
            yield row.word, self.cnt

You can annotate with outputto simplify the code.

>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>>     # we use a list here instead of letting cnt = 0, or cnt in function h will be treated as a local variable whose value cannot be used outside the function
>>>     cnt = [0]
>>>     def h(row, done):  # done == True indicates that all rows with current key are visited
>>>         cnt[0] += row.cnt
>>>         if done:
>>>             yield keys.word, cnt[0]
>>>     return h
>>>
>>> words_df.map_reduce(mapper, reducer, group='word')
     word  cnt
0   hello    2
1       i    1
2      is    1
3    life    1
4  python    2
5   short    1
6     use    1
7   world    1

Sometimes, when you need to sort the data by column during iteration, you can use the sortparameter to specify columns for sorting, and set the ascendingparameter for ascending or descending. The ascending parameter can be a Boolean value to indicate that all the sortfields are in an ascending or descending order, or it may be a list, whose length must be the same as that of the sortfields.

Specifying the combiner

If the combiner has been expressed in the map_reduce API, the mapper aggregates data first. The reducer has the same usage but cannot reference resources. The field name and type that the combiner outputs must be consistent with the mapper.

In the preceding example, you can use the reducer as the combiner to aggregate data in the mapper to reduce shuffled data.

>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')

Using a third-party Python library

You can upload third-party wheel packages as resources to MaxCompute. You need to specify the package files globally or in methods that execute DataFrames immediately. Note that you also need to add dependencies of your third-party libraries, or import could fail.

If your MaxCompute service supports specifying images when executing SQL statements, you may specify image argument with execute, persist or to_pandas to use these images. Meanwhile libraries argument can be used with execute, persist or to_pandas to specify resources as thirdparty libraries. PyODPS installation provides pyodps-pack tool for packing third-party libraries. You may take a look at documents here to see how to build and use these third-party libraries.

Warning

Due to the difference in bytecode definition, when you write code with new language features such as yield from in Python 3, the system generates an error when running the code in the MaxCompute Worker of Python 2.7. Therefore, we recommend that you make sure that the code runs normally before you implement the production operations using the MapReduce API (application program interface) of Python 3.

Warning

PyODPS DataFrame recognizes all collections and sequences as distributed objects, and does not support referencing these objects inside user-defined functions. Please consider using methods like join to reference data in multiple DataFrames, or referencing collections as resources.

Referring to resources

You can respectively specify resources referenced by the mapper and the reducer in MapReduce API.

In the following example, you filter stop words in the mapper, and the number of whitelisted words in the reducer is plus 5.

>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>>     stop_words = set(r[0].strip() for r in resources[0])
>>>     def h(row):
>>>         for word in row[0].split():
>>>             if word not in stop_words:
>>>                 yield word, 1
>>>     return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>>     d = dict()
>>>     d['white_list'] = set(word.strip() for word in resources[0])
>>>     d['cnt'] = 0
>>>     def inner(keys):
>>>         d['cnt'] = 0
>>>         def h(row, done):
>>>             d['cnt'] += row.cnt
>>>             if done:
>>>                 if row.word in d['white_list']:
>>>                     d['cnt'] += 5
>>>                 yield keys.word, d['cnt']
>>>         return h
>>>     return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>>                     mapper_resources=[stop_words], reducer_resources=[white_list_file])
     word  cnt
0   hello    2
1    life    1
2  python    7
3   world    6
4   short    1
5     use    1

Reshuffling data

If data is unevenly distributed in a cluster, you need to reshuffle the data by using the reshuffle API.

>>> df1 = df.reshuffle()

By default, data is hashed as random numbers. You can also distribute the data by a specified column, and sort the reshuffled data in a specified order.

>>> df1.reshuffle('name', sort='id', ascending=False)

Bloom filter

PyODPS DataFrame provides the bloom_filter API to calculate with a Bloom filter.

When you have specified a Collection object and its sequence1 for column calculation, you can apply the Bloom filter to sequence2. Therefore, the data in sequence1 instead of in sequence2 must be filtered out, but the data that is not in sequence2 may not be filtered out completely. This is an approximate method.

This method can quickly filter out some useless data from the Collection object.

The Bloom filter is particularly suitable for the large-scale join operation used between large volumes of data and small amounts of data. Most data is not joined in this scenario. For example, most user visits do not create transactions during the join operation between visit data and transaction data. Therefore, you can apply the Bloom filter to visit data before joining visit data with transaction data to enhance operation performance.

>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
       a  b
0  name1  1
1  name2  2
2  name3  3
3  name1  4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
       a
0  name1
>>> df1.bloom_filter('a', df2.a) # the 0th argument can also be an expression, for instance, df1.a + '1'
       a  b
0  name1  1
1  name1  4

The preceding code processes a small volume of data. Therefore, the rows that contain name2 and name3 in column a of df1 are filtered out. However, when processing large volumes of data, the system may not filter out some data that meets the specified condition.

As shown in the preceding join operation, some data that cannot be filtered out does not affect the program correctness, but can enhance the operation performance.

You can use capacity and error_rate to specify the data capacity and error rate, 3000 and 0.01 by default.

Note

Tuning the capacity value up or the error_rate value down increases the usage of memory. Therefore, you need to select a proper value as needed.

Pivot table

PyODPS DataFrame provides the pivot_table feature. You can use this feature as shown in the following example.

>>> df
     A    B      C  D  E
0  foo  one  small  1  3
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  6
4  foo  two  small  3  4
5  bar  one  large  4  5
6  bar  one  small  5  3
7  bar  two  small  6  2
8  bar  two  large  7  1

The simplest pivot_table must provide the rows parameter to retrieve the mean from one or more fields.

>>> df['A', 'D', 'E'].pivot_table(rows='A')
     A  D_mean  E_mean
0  bar     5.5    2.75
1  foo     2.2    4.40

You can specify multiple rows to aggregate multiple fields.

>>> df.pivot_table(rows=['A', 'B', 'C'])
     A    B      C  D_mean  E_mean
0  bar  one  large     4.0     5.0
1  bar  one  small     5.0     3.0
2  bar  two  large     7.0     1.0
3  bar  two  small     6.0     2.0
4  foo  one  large     2.0     4.5
5  foo  one  small     1.0     3.0
6  foo  two  small     3.0     5.0

Specify the values parameter to display the column that you want to calculate.

>>> df.pivot_table(rows=['A', 'B'], values='D')
     A    B    D_mean
0  bar  one  4.500000
1  bar  two  6.500000
2  foo  one  1.666667
3  foo  two  3.000000

By default, the system calculates the mean of the column specified in values. You can specify one or more aggregate functions.

>>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
     A    B    D_mean  D_count  D_sum
0  bar  one  4.500000        2      9
1  bar  two  6.500000        2     13
2  foo  one  1.666667        3      5
3  foo  two  3.000000        2      6

You can also use the values of a column in original data as a new Collection column. This is the most powerful benefit of pivot_table.

>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
     A    B  large_D_mean  small_D_mean
0  bar  one           4.0           5.0
1  bar  two           7.0           6.0
2  foo  one           2.0           1.0
3  foo  two           NaN           3.0

The system provides fill_value to replace null values.

>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
     A    B  large_D_mean  small_D_mean
0  bar  one             4             5
1  bar  two             7             6
2  foo  one             2             1
3  foo  two             0             3

Key-value string transformation

DataFrame can extract key-value pairs into a column, and transform a common column to a key-value column.

The following code is the example for transformation:

>>> df
    name               kv
0  name1  k1=1,k2=3,k5=10
1  name1    k1=7.1,k7=8.2
2  name2    k2=1.2,k3=1.5
3  name2      k9=1.1,k2=1

Use the extract_kv method to extract key-value fields:

>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
   name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
0  name1    1.0    3.0    NaN   10.0    NaN    NaN
1  name1    7.0    NaN    NaN    NaN    8.2    NaN
2  name2    NaN    1.2    1.5    NaN    NaN    NaN
3  name2    NaN    1.0    NaN    NaN    NaN    1.1

In this code, use columns to specify the field name that you want to extract. The separators for Key, Value, and key-value pairs are specified by the following two parameters, kv_delim and item_delim. The default separators are colons and commas. The output field name is the combination of the original field name and the Key value connected by underscores (_). For the default None value, you can use fill_value to specify the value to replace None. The following example is the processing result of the preceding df data.

>>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
   name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
0  name1    1.0    3.0    0.0   10.0    0.0    0.0
1  name1    7.0    0.0    0.0    0.0    8.2    0.0
2  name2    0.0    1.2    1.5    0.0    0.0    0.0
3  name2    0.0    1.0    0.0    0.0    0.0    1.1

The default output type of extract_kv is float. If you need to output with other data types, please specify dtype argument. For instance,

df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0, dtype='string')

DataFrame can also transform multiple columns to one key-value column. The following code is an example for transformation:

>>> df
   name    k1   k2   k3    k5   k7   k9
0  name1  1.0  3.0  NaN  10.0  NaN  NaN
1  name1  7.0  NaN  NaN   NaN  8.2  NaN
2  name2  NaN  1.2  1.5   NaN  NaN  NaN
3  name2  NaN  1.0  NaN   NaN  NaN  1.1

You can use the to_kv method to transform data to the key-value format:

>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
    name               kv
0  name1  k1=1,k2=3,k5=10
1  name1    k1=7.1,k7=8.2
2  name2    k2=1.2,k3=1.5
3  name2      k9=1.1,k2=1