Sorting, deduplication, sampling, and data transformation
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
Sorting
You can sort Collection objects only. Use the sort or sort_values method to start sorting.
>>> iris.sort('sepalwidth').head(5)
sepallength sepalwidth petallength petalwidth name
0 5.0 2.0 3.5 1.0 Iris-versicolor
1 6.2 2.2 4.5 1.5 Iris-versicolor
2 6.0 2.2 5.0 1.5 Iris-virginica
3 6.0 2.2 4.0 1.0 Iris-versicolor
4 5.5 2.3 4.0 1.3 Iris-versicolor
To sort data in a descending order, set the ascending
parameter to False.
>>> iris.sort('sepalwidth', ascending=False).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.7 4.4 1.5 0.4 Iris-setosa
1 5.5 4.2 1.4 0.2 Iris-setosa
2 5.2 4.1 1.5 0.1 Iris-setosa
3 5.8 4.0 1.2 0.2 Iris-setosa
4 5.4 3.9 1.3 0.4 Iris-setosa
You can also sort data in a descending order in the following way:
>>> iris.sort(-iris.sepalwidth).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.7 4.4 1.5 0.4 Iris-setosa
1 5.5 4.2 1.4 0.2 Iris-setosa
2 5.2 4.1 1.5 0.1 Iris-setosa
3 5.8 4.0 1.2 0.2 Iris-setosa
4 5.4 3.9 1.3 0.4 Iris-setosa
The system sorts multiple fields easily as shown in the following code:
>>> iris.sort(['sepalwidth', 'petallength']).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.0 2.0 3.5 1.0 Iris-versicolor
1 6.0 2.2 4.0 1.0 Iris-versicolor
2 6.2 2.2 4.5 1.5 Iris-versicolor
3 6.0 2.2 5.0 1.5 Iris-virginica
4 4.5 2.3 1.3 0.3 Iris-setosa
To sort multiple fields in both ascending and descending orders, use the ascending
parameter with a list of boolean values. The length of the list must be equal to that of sorted fields.
>>> iris.sort(['sepalwidth', 'petallength'], ascending=[True, False]).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.0 2.0 3.5 1.0 Iris-versicolor
1 6.0 2.2 5.0 1.5 Iris-virginica
2 6.2 2.2 4.5 1.5 Iris-versicolor
3 6.0 2.2 4.0 1.0 Iris-versicolor
4 6.3 2.3 4.4 1.3 Iris-versicolor
To achieve the same effect, the preceding code can be modified as follows:
>>> iris.sort(['sepalwidth', -iris.petallength]).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.0 2.0 3.5 1.0 Iris-versicolor
1 6.0 2.2 5.0 1.5 Iris-virginica
2 6.2 2.2 4.5 1.5 Iris-versicolor
3 6.0 2.2 4.0 1.0 Iris-versicolor
4 6.3 2.3 4.4 1.3 Iris-versicolor
Note
MaxCompute requires the number of items to be picked from the top of the sorted list. You can use options.df.odps.sort.limit
to specify the number of items that you want to retrieve from the top of the sorted list. The default value is 10000. To sort large volumes of data, set the parameter to a larger value. However, this setting may cause an out-of-memory (OOM) error.
Deduplication
To deduplicate Collection objects, use the distinct method.
>>> iris[['name']].distinct()
name
0 Iris-setosa
1 Iris-versicolor
2 Iris-virginica
>>> iris.distinct('name')
name
0 Iris-setosa
1 Iris-versicolor
2 Iris-virginica
>>> iris.distinct('name', 'sepallength').head(3)
name sepallength
0 Iris-setosa 4.3
1 Iris-setosa 4.4
2 Iris-setosa 4.5
To deduplicate Sequence objects, use the unique method. However, the Sequence object for the unique method cannot be used in column selection.
>>> iris.name.unique()
name
0 Iris-setosa
1 Iris-versicolor
2 Iris-virginica
We do not recommend that you use the following code:
>>> iris[iris.name, iris.name.unique()] # this line produces an error
Sampling
To sample data from a Collection object, use the sample
method. Python on MaxCompute (PyODPS) supports the following sampling means:
Warning
Except for sampling by parts, other sampling methods require XFlow to execute on the MaxCompute DataFrame. If they do not support XFlow, these sampling methods can only be executed at the backend of Pandas DataFrame.
Sampling by parts
Data is divided into parts by using this method. You can select the part number that you want to sample.
>>> iris.sample(parts=10) # split into 10 parts and take the 0th by default
>>> iris.sample(parts=10, i=0) # split into 10 parts and take the 0th part
>>> iris.sample(parts=10, i=[2, 5]) # split into 10 parts and take the 2nd and 5th part
>>> iris.sample(parts=10, columns=['name', 'sepalwidth']) # sample given values of name and sepalwidth
Sampling by percentage or items
You need to specify the number of data items or the percentage of data that you want to sample using this method. To enable sampling with replacement, set the replace
parameter to True.
>>> iris.sample(n=100) # sample 100 records
>>> iris.sample(frac=0.3) # sample 30% of all records
Sampling by weight
You need to specify the weight column and the number of data items or the proportion of data that you want sample in this way. To enable sampling with replacement, set the replace
parameter to True.
>>> iris.sample(n=100, weights='sepal_length')
>>> iris.sample(n=100, weights='sepal_width', replace=True)
Stratified sampling
In this sampling, you need to specify the label column that you want to stratify, and specify the sampling proportion (the frac
parameter) or the number of items (the n
parameter) for each label. Currently, stratified sampling does not support replacement.
>>> iris.sample(strata='category', n={'Iris Setosa': 10, 'Iris Versicolour': 10})
>>> iris.sample(strata='category', frac={'Iris Setosa': 0.5, 'Iris Versicolour': 0.4})
Data scaling
DataFrame supports data scaling by using the maximum, minimum, mean, and standard deviation. The following is an example for scaling:
name id fid
0 name1 4 5.3
1 name2 2 3.5
2 name2 3 1.5
3 name1 4 4.2
4 name1 3 2.2
5 name1 3 4.1
You can use the min_max_scale method to normalize data:
>>> df.min_max_scale(columns=['fid'])
name id fid
0 name1 4 1.000000
1 name2 2 0.526316
2 name2 3 0.000000
3 name1 4 0.710526
4 name1 3 0.184211
5 name1 3 0.684211
min_max_scale can work with the feature_range parameter to specify the output range. The following is an example of how to output values in the range of (-1, 1):
>>> df.min_max_scale(columns=['fid'], feature_range=(-1, 1))
name id fid
0 name1 4 1.000000
1 name2 2 0.052632
2 name2 3 -1.000000
3 name1 4 0.421053
4 name1 3 -0.631579
5 name1 3 0.368421
To keep original values, use the preserve parameter. Then, scaled data is added to a new column that is named as the original column name suffixed with “_scaled”. To change the suffix, use the suffix parameter, as shown in the following code:
>>> df.min_max_scale(columns=['fid'], preserve=True)
name id fid fid_scaled
0 name1 4 5.3 1.000000
1 name2 2 3.5 0.526316
2 name2 3 1.5 0.000000
3 name1 4 4.2 0.710526
4 name1 3 2.2 0.184211
5 name1 3 4.1 0.684211
min_max_scale can also work with the group parameter to specify one or more group columns and to retrieve the minimum and maximum from the specified column to scale data, as shown in the following code:
>>> df.min_max_scale(columns=['fid'], group=['name'])
name id fid
0 name1 4 1.000000
1 name1 4 0.645161
2 name1 3 0.000000
3 name1 3 0.612903
4 name2 2 1.000000
5 name2 3 0.000000
The system has scaled data in both name1 and name2 by using the minimum and maximum.
The std_scale method scales data by using standard normal distribution, as shown in the following code:
>>> df.std_scale(columns=['fid'])
name id fid
0 name1 4 1.436467
1 name2 2 0.026118
2 name2 3 -1.540938
3 name1 4 0.574587
4 name1 3 -0.992468
5 name1 3 0.496234
This method also supports using the preserve parameter to keep the original column and the group parameter, in order to group data. For more information about grouping, see min_max_scale.
Null value processing
DataFrame supports removing or filling null values. The following data is the example for processing:
id name f1 f2 f3 f4
0 0 name1 1.0 NaN 3.0 4.0
1 1 name1 2.0 NaN NaN 1.0
2 2 name1 3.0 4.0 1.0 NaN
3 3 name1 NaN 1.0 2.0 3.0
4 4 name1 1.0 NaN 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 NaN NaN NaN NaN
You can use the dropna method to delete the rows that contain null values in the subset object, as follows:
>>> df.dropna(subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 5 name1 1.0 2.0 3.0 4.0
If the rows also contain non-null values, you can use how=’all’ to keep these rows, as follows:
>>> df.dropna(how='all', subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 NaN 3.0 4.0
1 1 name1 2.0 NaN NaN 1.0
2 2 name1 3.0 4.0 1.0 NaN
3 3 name1 NaN 1.0 2.0 3.0
4 4 name1 1.0 NaN 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
To specify the minimum number of non-null values in the rows, use the thresh parameter to keep qualified rows, as shown in the following code:
>>> df.dropna(thresh=3, subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 NaN 3.0 4.0
2 2 name1 3.0 4.0 1.0 NaN
3 3 name1 NaN 1.0 2.0 3.0
4 4 name1 1.0 NaN 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
You can use fillna to replace null values with a specified constant or values in an existing column. In the following example, null values are replaced with a constant:
>>> df.fillna(100, subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 100.0 3.0 4.0
1 1 name1 2.0 100.0 100.0 1.0
2 2 name1 3.0 4.0 1.0 100.0
3 3 name1 100.0 1.0 2.0 3.0
4 4 name1 1.0 100.0 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 100.0 100.0 100.0 100.0
You can replace null values with values in the specified existing column, as shown in the following code:
>>> df.fillna(df.f2, subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 NaN 3.0 4.0
1 1 name1 2.0 NaN NaN 1.0
2 2 name1 3.0 4.0 1.0 4.0
3 3 name1 1.0 1.0 2.0 3.0
4 4 name1 1.0 NaN 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 NaN NaN NaN NaN
DataFrame also provides backward filling and forward filling by using the method parameter, as follows:
Value |
Definition |
---|---|
bfill / backfill |
Backward filling |
ffill / pad |
Forward filling |
Example:
>>> df.fillna(method='bfill', subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 3.0 3.0 4.0
1 1 name1 2.0 1.0 1.0 1.0
2 2 name1 3.0 4.0 1.0 NaN
3 3 name1 1.0 1.0 2.0 3.0
4 4 name1 1.0 3.0 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 NaN NaN NaN NaN
>>> df.fillna(method='ffill', subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 1.0 3.0 4.0
1 1 name1 2.0 2.0 2.0 1.0
2 2 name1 3.0 4.0 1.0 1.0
3 3 name1 NaN 1.0 2.0 3.0
4 4 name1 1.0 1.0 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 NaN NaN NaN NaN
You can use the ffill / bfill function to simplify code, where ffill equals to fillna(method=’ffill’) and bfill equals to fillna(method=’bfill’).
Applying custom functions for all columns and rows
Using custom functions for one row
To use custom functions for one row, you can use the apply method. The axis parameter must be 1 to indicate that the operation works on the row.
The apply method calls your function. You can specify the type or offset in the function to retrieve data in a field from the row in the preceding Collection object.
>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
sepaladd
0 8.6
1 7.9
2 7.9
If reduce
is True, the system returns the Sequence object. If not, the system returns the Collection object. Use the names
and types
parameters to specify the field name and type of the returned Sequence or Collection object. The type is string by default if not specified.
If reduce is False in your function, you can use the yield
keyword to return multiple rows.
>>> iris.count()
150
>>>
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
300
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).head(5)
iris_add iris_sub
0 1.6 8.6
1 1.2 1.6
2 1.9 7.9
3 1.2 1.6
4 1.5 7.9
You can also annotate your function with returned field names and types. In this way, you do not need to specify them when calling your function.
>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1).count()
300
>>> iris.apply(handle, axis=1).head(5)
iris_add iris_sub
0 1.6 8.6
1 1.2 1.6
2 1.9 7.9
3 1.2 1.6
4 1.5 7.9
Calling map_reduce
without reducers is equal to calling apply
method with axis=1.
>>> iris.map_reduce(mapper=handle).count()
300
>>> iris.map_reduce(mapper=handle).head(5)
iris_add iris_sub
0 1.6 8.6
1 1.2 1.6
2 1.9 7.9
3 1.2 1.6
4 1.5 7.9
To use an existing user-defined table function (UDTF) in MaxCompute, specify the name of the UDTF.
>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
When you use the apply method in a row and reduce
is False, combine this row with an existing row by using Lateral view to prepare for aggregation.
>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris[iris.category, iris.apply(handle, axis=1)]
Using custom aggregations for all columns
When you use the apply method and axis is not specified or is 0, call a custom aggregation to aggregate all Sequence objects.
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
sepallength_aggregation sepalwidth_aggregation petallength_aggregation petalwidth_aggregation
0 5.843333 3.054 3.758667 1.198667
Warning
Limited by Python UDFs, custom aggregations cannot specify the input or the output result type as the list or dict type.
Warning
PyODPS DataFrame recognizes all collections and sequences as distributed objects, and does not support referencing these objects inside user-defined functions. Please consider using methods like join to reference data in multiple DataFrames, or referencing collections as resources, which is stated in the next section.
Referring to resources
Similar to the resources parameter for map , resources can be tables, files, or Collection objects that you refer to in MaxCompute.
If axis is 1 in a row operation, write a function closure or callable class. For column aggregation, you can read resources by using the __init__ function.
>>> words_df
sentence
0 Hello World
1 Hello Python
2 Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>> stop_words = set([r[0] for r in resources[0]])
>>> def h(row):
>>> return ' '.join(w for w in row[0].split() if w not in stop_words),
>>> return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
sentence
0 Hello World
1 Hello Python
2 Life short use Python
In this example, stop_words
is a local variable which is referenced as a resource in MaxCompute during execution.
Using a third-party Python library
You can upload third-party wheel packages as resources to MaxCompute. You need to specify the package files globally or in methods that execute DataFrames immediately. Note that you also need to add dependencies of your third-party libraries, or import could fail.
If your MaxCompute service supports specifying images when executing SQL statements, you may specify image
argument with execute
, persist
or to_pandas
to use these images. Meanwhile libraries
argument can be used with execute
, persist
or to_pandas
to specify resources as thirdparty libraries. PyODPS installation provides pyodps-pack
tool for packing third-party libraries. You may take a look at documents here to see how to build and use these third-party libraries.
Warning
Due to the difference in bytecode definition, when you write code with new language features such as yield from
in Python 3, the system generates an error when running the code in the MaxCompute Worker of Python 2.7. Therefore, we recommend that you make sure that the code runs normally before you implement the production operations using the MapReduce API (application program interface) of Python 3.
MapReduce API
PyODPS DataFrame supports the MapReduce API. You can write the map and reduce functions respectively, because map_reduce may include the mapper or reducer process only. The following code is an example of a wordcount:
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> def reducer(keys):
>>> # we use a list here instead of letting cnt = 0, or cnt in function h will be treated as a local variable whose value cannot be used outside the function
>>> cnt = [0]
>>> def h(row, done): # done == True indicates that all rows with current key are visited
>>> cnt[0] += row[1]
>>> if done:
>>> yield keys[0], cnt[0]
>>> return h
>>>
>>> words_df.map_reduce(mapper, reducer, group=['word', ],
>>> mapper_output_names=['word', 'cnt'],
>>> mapper_output_types=['string', 'int'],
>>> reducer_output_names=['word', 'cnt'],
>>> reducer_output_types=['string', 'int'])
word cnt
0 hello 2
1 i 1
2 is 1
3 life 1
4 python 2
5 short 1
6 use 1
7 world 1
Use the group parameter to specify the fields that you want to group with the reduce function. If you do not specify the fields, all fields are grouped.
The reducer process is different. The reducer receives aggregated initial values of keys, and then processes each row aggregated by keys. The second parameter done indicates whether all the rows related to these keys have been completely iterated.
The function closure is convenient, and the callable is also allowed.
class reducer(object):
def __init__(self, keys):
self.cnt = 0
def __call__(self, row, done): # # done == True indicates that all rows with current key are visited
self.cnt += row.cnt
if done:
yield row.word, self.cnt
You can annotate with output
to simplify the code.
>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>> # we use a list here instead of letting cnt = 0, or cnt in function h will be treated as a local variable whose value cannot be used outside the function
>>> cnt = [0]
>>> def h(row, done): # done == True indicates that all rows with current key are visited
>>> cnt[0] += row.cnt
>>> if done:
>>> yield keys.word, cnt[0]
>>> return h
>>>
>>> words_df.map_reduce(mapper, reducer, group='word')
word cnt
0 hello 2
1 i 1
2 is 1
3 life 1
4 python 2
5 short 1
6 use 1
7 world 1
Sometimes, when you need to sort the data by column during iteration, you can use the sort
parameter to specify columns for sorting, and set the ascending
parameter for ascending or descending. The ascending
parameter can be a Boolean value to indicate that all the sort
fields are in an ascending or descending order, or it may be a list, whose length must be the same as that of the sort
fields.
Specifying the combiner
If the combiner has been expressed in the map_reduce API, the mapper aggregates data first. The reducer has the same usage but cannot reference resources. The field name and type that the combiner outputs must be consistent with the mapper.
In the preceding example, you can use the reducer as the combiner to aggregate data in the mapper to reduce shuffled data.
>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')
Using a third-party Python library
You can upload third-party wheel packages as resources to MaxCompute. You need to specify the package files globally or in methods that execute DataFrames immediately. Note that you also need to add dependencies of your third-party libraries, or import could fail.
If your MaxCompute service supports specifying images when executing SQL statements, you may specify image
argument with execute
, persist
or to_pandas
to use these images. Meanwhile libraries
argument can be used with execute
, persist
or to_pandas
to specify resources as thirdparty libraries. PyODPS installation provides pyodps-pack
tool for packing third-party libraries. You may take a look at documents here to see how to build and use these third-party libraries.
Warning
Due to the difference in bytecode definition, when you write code with new language features such as yield from
in Python 3, the system generates an error when running the code in the MaxCompute Worker of Python 2.7. Therefore, we recommend that you make sure that the code runs normally before you implement the production operations using the MapReduce API (application program interface) of Python 3.
Warning
PyODPS DataFrame recognizes all collections and sequences as distributed objects, and does not support referencing these objects inside user-defined functions. Please consider using methods like join to reference data in multiple DataFrames, or referencing collections as resources.
Referring to resources
You can respectively specify resources referenced by the mapper and the reducer in MapReduce API.
In the following example, you filter stop words in the mapper, and the number of whitelisted words in the reducer is plus 5.
>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>> stop_words = set(r[0].strip() for r in resources[0])
>>> def h(row):
>>> for word in row[0].split():
>>> if word not in stop_words:
>>> yield word, 1
>>> return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>> d = dict()
>>> d['white_list'] = set(word.strip() for word in resources[0])
>>> d['cnt'] = 0
>>> def inner(keys):
>>> d['cnt'] = 0
>>> def h(row, done):
>>> d['cnt'] += row.cnt
>>> if done:
>>> if row.word in d['white_list']:
>>> d['cnt'] += 5
>>> yield keys.word, d['cnt']
>>> return h
>>> return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>> mapper_resources=[stop_words], reducer_resources=[white_list_file])
word cnt
0 hello 2
1 life 1
2 python 7
3 world 6
4 short 1
5 use 1
Reshuffling data
If data is unevenly distributed in a cluster, you need to reshuffle the data by using the reshuffle
API.
>>> df1 = df.reshuffle()
By default, data is hashed as random numbers. You can also distribute the data by a specified column, and sort the reshuffled data in a specified order.
>>> df1.reshuffle('name', sort='id', ascending=False)
Bloom filter
PyODPS DataFrame provides the bloom_filter
API to calculate with a Bloom filter.
When you have specified a Collection object and its sequence1 for column calculation, you can apply the Bloom filter to sequence2. Therefore, the data in sequence1 instead of in sequence2 must be filtered out, but the data that is not in sequence2 may not be filtered out completely. This is an approximate method.
This method can quickly filter out some useless data from the Collection object.
The Bloom filter is particularly suitable for the large-scale join operation used between large volumes of data and small amounts of data. Most data is not joined in this scenario. For example, most user visits do not create transactions during the join operation between visit data and transaction data. Therefore, you can apply the Bloom filter to visit data before joining visit data with transaction data to enhance operation performance.
>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
a b
0 name1 1
1 name2 2
2 name3 3
3 name1 4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
a
0 name1
>>> df1.bloom_filter('a', df2.a) # the 0th argument can also be an expression, for instance, df1.a + '1'
a b
0 name1 1
1 name1 4
The preceding code processes a small volume of data. Therefore, the rows that contain name2 and name3 in column a of df1 are filtered out. However, when processing large volumes of data, the system may not filter out some data that meets the specified condition.
As shown in the preceding join operation, some data that cannot be filtered out does not affect the program correctness, but can enhance the operation performance.
You can use capacity
and error_rate
to specify the data capacity and error rate, 3000
and 0.01
by default.
Note
Tuning the capacity
value up or the error_rate
value down increases the usage of memory. Therefore, you need to select a proper value as needed.
Pivot table
PyODPS DataFrame provides the pivot_table feature. You can use this feature as shown in the following example.
>>> df
A B C D E
0 foo one small 1 3
1 foo one large 2 4
2 foo one large 2 5
3 foo two small 3 6
4 foo two small 3 4
5 bar one large 4 5
6 bar one small 5 3
7 bar two small 6 2
8 bar two large 7 1
The simplest pivot_table must provide the rows
parameter to retrieve the mean from one or more fields.
>>> df['A', 'D', 'E'].pivot_table(rows='A')
A D_mean E_mean
0 bar 5.5 2.75
1 foo 2.2 4.40
You can specify multiple rows to aggregate multiple fields.
>>> df.pivot_table(rows=['A', 'B', 'C'])
A B C D_mean E_mean
0 bar one large 4.0 5.0
1 bar one small 5.0 3.0
2 bar two large 7.0 1.0
3 bar two small 6.0 2.0
4 foo one large 2.0 4.5
5 foo one small 1.0 3.0
6 foo two small 3.0 5.0
Specify the values
parameter to display the column that you want to calculate.
>>> df.pivot_table(rows=['A', 'B'], values='D')
A B D_mean
0 bar one 4.500000
1 bar two 6.500000
2 foo one 1.666667
3 foo two 3.000000
By default, the system calculates the mean of the column specified in values. You can specify one or more aggregate functions.
>>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
A B D_mean D_count D_sum
0 bar one 4.500000 2 9
1 bar two 6.500000 2 13
2 foo one 1.666667 3 5
3 foo two 3.000000 2 6
You can also use the values of a column in original data as a new Collection column. This is the most powerful benefit of pivot_table.
>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
A B large_D_mean small_D_mean
0 bar one 4.0 5.0
1 bar two 7.0 6.0
2 foo one 2.0 1.0
3 foo two NaN 3.0
The system provides fill_value
to replace null values.
>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
A B large_D_mean small_D_mean
0 bar one 4 5
1 bar two 7 6
2 foo one 2 1
3 foo two 0 3
Key-value string transformation
DataFrame can extract key-value pairs into a column, and transform a common column to a key-value column.
The following code is the example for transformation:
>>> df
name kv
0 name1 k1=1,k2=3,k5=10
1 name1 k1=7.1,k7=8.2
2 name2 k2=1.2,k3=1.5
3 name2 k9=1.1,k2=1
Use the extract_kv method to extract key-value fields:
>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9
0 name1 1.0 3.0 NaN 10.0 NaN NaN
1 name1 7.0 NaN NaN NaN 8.2 NaN
2 name2 NaN 1.2 1.5 NaN NaN NaN
3 name2 NaN 1.0 NaN NaN NaN 1.1
In this code, use columns to specify the field name that you want to extract. The separators for Key, Value, and key-value pairs are specified by the following two parameters, kv_delim and item_delim. The default separators are colons and commas. The output field name is the combination of the original field name and the Key value connected by underscores (_). For the default None value, you can use fill_value
to specify the value to replace None. The following example is the processing result of the preceding df data.
>>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9
0 name1 1.0 3.0 0.0 10.0 0.0 0.0
1 name1 7.0 0.0 0.0 0.0 8.2 0.0
2 name2 0.0 1.2 1.5 0.0 0.0 0.0
3 name2 0.0 1.0 0.0 0.0 0.0 1.1
The default output type of extract_kv
is float
. If you need to output with other data types, please specify dtype
argument. For instance,
df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0, dtype='string')
DataFrame can also transform multiple columns to one key-value column. The following code is an example for transformation:
>>> df
name k1 k2 k3 k5 k7 k9
0 name1 1.0 3.0 NaN 10.0 NaN NaN
1 name1 7.0 NaN NaN NaN 8.2 NaN
2 name2 NaN 1.2 1.5 NaN NaN NaN
3 name2 NaN 1.0 NaN NaN NaN 1.1
You can use the to_kv method to transform data to the key-value format:
>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
name kv
0 name1 k1=1,k2=3,k5=10
1 name1 k1=7.1,k7=8.2
2 name2 k2=1.2,k3=1.5
3 name2 k9=1.1,k2=1