排序、去重、采样、数据变换

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))

排序

排序操作只能作用于Collection。我们只需要调用sort或者sort_values方法。

>>> iris.sort('sepalwidth').head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.2         2.2          4.5         1.5  Iris-versicolor
2          6.0         2.2          5.0         1.5   Iris-virginica
3          6.0         2.2          4.0         1.0  Iris-versicolor
4          5.5         2.3          4.0         1.3  Iris-versicolor

如果想要降序排列,则可以使用参数ascending,并设为False。

>>> iris.sort('sepalwidth', ascending=False).head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.7         4.4          1.5         0.4  Iris-setosa
1          5.5         4.2          1.4         0.2  Iris-setosa
2          5.2         4.1          1.5         0.1  Iris-setosa
3          5.8         4.0          1.2         0.2  Iris-setosa
4          5.4         3.9          1.3         0.4  Iris-setosa

也可以这样调用,来进行降序排列:

>>> iris.sort(-iris.sepalwidth).head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.7         4.4          1.5         0.4  Iris-setosa
1          5.5         4.2          1.4         0.2  Iris-setosa
2          5.2         4.1          1.5         0.1  Iris-setosa
3          5.8         4.0          1.2         0.2  Iris-setosa
4          5.4         3.9          1.3         0.4  Iris-setosa

多字段排序也很简单:

>>> iris.sort(['sepalwidth', 'petallength']).head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.0         2.2          4.0         1.0  Iris-versicolor
2          6.2         2.2          4.5         1.5  Iris-versicolor
3          6.0         2.2          5.0         1.5   Iris-virginica
4          4.5         2.3          1.3         0.3      Iris-setosa

多字段排序时,如果是升序降序不同,ascending参数可以传入一个列表,长度必须等同于排序的字段,它们的值都是boolean类型

>>> iris.sort(['sepalwidth', 'petallength'], ascending=[True, False]).head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.0         2.2          5.0         1.5   Iris-virginica
2          6.2         2.2          4.5         1.5  Iris-versicolor
3          6.0         2.2          4.0         1.0  Iris-versicolor
4          6.3         2.3          4.4         1.3  Iris-versicolor

下面效果是一样的:

>>> iris.sort(['sepalwidth', -iris.petallength]).head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.0         2.2          5.0         1.5   Iris-virginica
2          6.2         2.2          4.5         1.5  Iris-versicolor
3          6.0         2.2          4.0         1.0  Iris-versicolor
4          6.3         2.3          4.4         1.3  Iris-versicolor

备注

由于 ODPS 要求排序必须指定个数,所以在 ODPS 后端执行时, 会通过 options.df.odps.sort.limit 指定排序个数,这个值默认是 10000, 如果要排序尽量多的数据,可以把这个值设到较大的值。不过注意,此时可能会导致 OOM。

去重

去重在Collection上,用户可以调用distinct方法。

>>> iris[['name']].distinct()
              name
0      Iris-setosa
1  Iris-versicolor
2   Iris-virginica
>>> iris.distinct('name')
              name
0      Iris-setosa
1  Iris-versicolor
2   Iris-virginica
>>> iris.distinct('name', 'sepallength').head(3)
          name  sepallength
0  Iris-setosa          4.3
1  Iris-setosa          4.4
2  Iris-setosa          4.5

在Sequence上,用户可以调用unique,但是记住,调用unique的Sequence不能用在列选择中。

>>> iris.name.unique()
              name
0      Iris-setosa
1  Iris-versicolor
2   Iris-virginica

下面的代码是错误的用法。

>>> iris[iris.name, iris.name.unique()]  # 错误的

采样

要对一个 collection 的数据采样,可以调用 sample 方法。PyODPS 支持四种采样方式。

警告

除了按份数采样外,其余方法如果要在 ODPS DataFrame 上执行,需要 Project 支持 XFlow,否则,这些方法只能在 Pandas DataFrame 后端上执行。

  • 按份数采样

在这种采样方式下,数据被分为 parts 份,可选择选取的份数序号。

>>> iris.sample(parts=10)  # 分成10份,默认取第0份
>>> iris.sample(parts=10, i=0)  # 手动指定取第0份
>>> iris.sample(parts=10, i=[2, 5])   # 分成10份,取第2和第5份
>>> iris.sample(parts=10, columns=['name', 'sepalwidth'])  # 根据name和sepalwidth的值做采样
  • 按比例 / 条数采样

在这种采样方式下,用户指定需要采样的数据条数或采样比例。指定 replace 参数为 True 可启用放回采样。

>>> iris.sample(n=100)  # 选取100条数据
>>> iris.sample(frac=0.3)  # 采样30%的数据
  • 按权重列采样

在这种采样方式下,用户指定权重列和数据条数 / 采样比例。指定 replace 参数为 True 可启用放回采样。

>>> iris.sample(n=100, weights='sepal_length')
>>> iris.sample(n=100, weights='sepal_width', replace=True)
  • 分层采样

在这种采样方式下,用户指定用于分层的标签列,同时为需要采样的每个标签指定采样比例( frac 参数)或条数 ( n 参数)。暂不支持放回采样。

>>> iris.sample(strata='category', n={'Iris Setosa': 10, 'Iris Versicolour': 10})
>>> iris.sample(strata='category', frac={'Iris Setosa': 0.5, 'Iris Versicolour': 0.4})

数据缩放

DataFrame 支持通过最大/最小值或平均值/标准差对数据进行缩放。例如,对数据

    name  id  fid
0  name1   4  5.3
1  name2   2  3.5
2  name2   3  1.5
3  name1   4  4.2
4  name1   3  2.2
5  name1   3  4.1

使用 min_max_scale 方法进行归一化:

>>> df.min_max_scale(columns=['fid'])
    name  id       fid
0  name1   4  1.000000
1  name2   2  0.526316
2  name2   3  0.000000
3  name1   4  0.710526
4  name1   3  0.184211
5  name1   3  0.684211

min_max_scale 还支持使用 feature_range 参数指定输出值的范围,例如,如果我们需要使输出值在 (-1, 1) 范围内,可使用

>>> df.min_max_scale(columns=['fid'], feature_range=(-1, 1))
    name  id       fid
0  name1   4  1.000000
1  name2   2  0.052632
2  name2   3 -1.000000
3  name1   4  0.421053
4  name1   3 -0.631579
5  name1   3  0.368421

如果需要保留原始值,可以使用 preserve 参数。此时,缩放后的数据将会以新增列的形式追加到数据中, 列名默认为原列名追加“_scaled”后缀,该后缀可使用 suffix 参数更改。例如,

>>> df.min_max_scale(columns=['fid'], preserve=True)
    name  id  fid  fid_scaled
0  name1   4  5.3    1.000000
1  name2   2  3.5    0.526316
2  name2   3  1.5    0.000000
3  name1   4  4.2    0.710526
4  name1   3  2.2    0.184211
5  name1   3  4.1    0.684211

min_max_scale 也支持使用 group 参数指定一个或多个分组列,在分组列中分别取最值进行缩放。例如,

>>> df.min_max_scale(columns=['fid'], group=['name'])
    name  id       fid
0  name1   4  1.000000
1  name1   4  0.645161
2  name1   3  0.000000
3  name1   3  0.612903
4  name2   2  1.000000
5  name2   3  0.000000

可见结果中,name1 和 name2 两组均按组中的最值进行了缩放。

std_scale 可依照标准正态分布对数据进行调整。例如,

>>> df.std_scale(columns=['fid'])
    name  id       fid
0  name1   4  1.436467
1  name2   2  0.026118
2  name2   3 -1.540938
3  name1   4  0.574587
4  name1   3 -0.992468
5  name1   3  0.496234

std_scale 同样支持 preserve 参数保留原始列以及使用 group 进行分组,具体请参考 min_max_scale,此处不再赘述。

空值处理

DataFrame 支持筛去空值以及填充空值的功能。例如,对数据

   id   name   f1   f2   f3   f4
0   0  name1  1.0  NaN  3.0  4.0
1   1  name1  2.0  NaN  NaN  1.0
2   2  name1  3.0  4.0  1.0  NaN
3   3  name1  NaN  1.0  2.0  3.0
4   4  name1  1.0  NaN  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0
6   6  name1  NaN  NaN  NaN  NaN

使用 dropna 可删除 subset 中包含空值的行:

>>> df.dropna(subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   5  name1  1.0  2.0  3.0  4.0

如果行中包含非空值则不删除,可以使用 how=’all’:

>>> df.dropna(how='all', subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  NaN  3.0  4.0
1   1  name1  2.0  NaN  NaN  1.0
2   2  name1  3.0  4.0  1.0  NaN
3   3  name1  NaN  1.0  2.0  3.0
4   4  name1  1.0  NaN  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0

你也可以使用 thresh 参数来指定行中至少要有多少个非空值。例如:

>>> df.dropna(thresh=3, subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  NaN  3.0  4.0
2   2  name1  3.0  4.0  1.0  NaN
3   3  name1  NaN  1.0  2.0  3.0
4   4  name1  1.0  NaN  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0

使用 fillna 可使用常数或已有的列填充未知值。下面给出了使用常数填充的例子:

>>> df.fillna(100, subset=['f1', 'f2', 'f3', 'f4'])
   id   name     f1     f2     f3     f4
0   0  name1    1.0  100.0    3.0    4.0
1   1  name1    2.0  100.0  100.0    1.0
2   2  name1    3.0    4.0    1.0  100.0
3   3  name1  100.0    1.0    2.0    3.0
4   4  name1    1.0  100.0    3.0    4.0
5   5  name1    1.0    2.0    3.0    4.0
6   6  name1  100.0  100.0  100.0  100.0

你也可以使用一个已有的列来填充未知值。例如:

>>> df.fillna(df.f2, subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  NaN  3.0  4.0
1   1  name1  2.0  NaN  NaN  1.0
2   2  name1  3.0  4.0  1.0  4.0
3   3  name1  1.0  1.0  2.0  3.0
4   4  name1  1.0  NaN  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0
6   6  name1  NaN  NaN  NaN  NaN

特别地,DataFrame 提供了向前 / 向后填充的功能。通过指定 method 参数为下列值可以达到目的:

取值

含义

bfill / backfill

向前填充

ffill / pad

向后填充

例如:

>>> df.fillna(method='bfill', subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  3.0  3.0  4.0
1   1  name1  2.0  1.0  1.0  1.0
2   2  name1  3.0  4.0  1.0  NaN
3   3  name1  1.0  1.0  2.0  3.0
4   4  name1  1.0  3.0  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0
6   6  name1  NaN  NaN  NaN  NaN
>>> df.fillna(method='ffill', subset=['f1', 'f2', 'f3', 'f4'])
   id   name   f1   f2   f3   f4
0   0  name1  1.0  1.0  3.0  4.0
1   1  name1  2.0  2.0  2.0  1.0
2   2  name1  3.0  4.0  1.0  1.0
3   3  name1  NaN  1.0  2.0  3.0
4   4  name1  1.0  1.0  3.0  4.0
5   5  name1  1.0  2.0  3.0  4.0
6   6  name1  NaN  NaN  NaN  NaN

你也可以使用 ffill / bfill 函数来简化代码。ffill 等价于 fillna(method=’ffill’), bfill 等价于 fillna(method=’bfill’)

对所有行/列调用自定义函数

对一行数据使用自定义函数

要对一行数据使用自定义函数,可以使用 apply 方法,axis 参数必须为 1,表示在行上操作。

apply 的自定义函数接收一个参数,为上一步 Collection 的一行数据,用户可以通过属性、或者偏移取得一个字段的数据。

>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
   sepaladd
0       8.6
1       7.9
2       7.9

reduce为 True 时,表示返回结果为Sequence,否则返回结果为Collection。 namestypes参数分别指定返回的Sequence或Collection的字段名和类型。 如果类型不指定,将会默认为string类型。

在 apply 的自定义函数中,reduce 为 False 时,也可以使用 yield关键字来返回多行结果。

>>> iris.count()
150
>>>
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
300
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).head(5)
   iris_add  iris_sub
0       1.6       8.6
1       1.2       1.6
2       1.9       7.9
3       1.2       1.6
4       1.5       7.9

我们也可以在函数上来注释返回的字段和类型,这样就不需要在函数调用时再指定。

>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1).count()
300
>>> iris.apply(handle, axis=1).head(5)
   iris_add  iris_sub
0       1.6       8.6
1       1.2       1.6
2       1.9       7.9
3       1.2       1.6
4       1.5       7.9

也可以使用 map-only 的 map_reduce,和 axis=1 的apply操作是等价的。

>>> iris.map_reduce(mapper=handle).count()
300
>>> iris.map_reduce(mapper=handle).head(5)
   iris_add  iris_sub
0       1.6       8.6
1       1.2       1.6
2       1.9       7.9
3       1.2       1.6
4       1.5       7.9

如果想调用 ODPS 上已经存在的 UDTF,则函数指定为函数名即可。

>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])

使用 apply 对行操作,且 reduce为 False 时,可以使用 并列多行输出 与已有的行结合,用于后续聚合等操作。

>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris[iris.category, iris.apply(handle, axis=1)]

对所有列调用自定义聚合

调用apply方法,当我们不指定axis,或者axis为0的时候,我们可以通过传入一个自定义聚合类来对所有sequence进行聚合操作。

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
   sepallength_aggregation  sepalwidth_aggregation  petallength_aggregation  petalwidth_aggregation
0                 5.843333                   3.054                 3.758667                1.198667

警告

目前,受限于 Python UDF,自定义函数无法支持将 list / dict 类型作为初始输入或最终输出结果。

警告

由于 PyODPS DataFrame 默认 Collection / Sequence 等对象均为分布式对象,故不支持在自定义函数内部引用这些对象。 请考虑改用 Join 等方法 引用多个 DataFrame 的数据,或者引用 Collection 作为资源,如下文所述。

引用资源

类似于对 map 方法的resources参数,每个resource可以是ODPS上的资源(表资源或文件资源),或者引用一个collection作为资源。

对于axis为1,也就是在行上操作,我们需要写一个函数闭包或者callable的类。 而对于列上的聚合操作,我们只需在 __init__ 函数里读取资源即可。

>>> words_df
                     sentence
0                 Hello World
1                Hello Python
2  Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>>     stop_words = set([r[0] for r in resources[0]])
>>>     def h(row):
>>>         return ' '.join(w for w in row[0].split() if w not in stop_words),
>>>     return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
                sentence
0            Hello World
1           Hello Python
2  Life short use Python

可以看到这里的stop_words是存放于本地,但在真正执行时会被上传到ODPS作为资源引用。

使用第三方Python库

现在用户可以把第三方 Wheel 包作为资源上传到 MaxCompute。在全局或者在立即执行的方法时,指定需要使用的包文件, 即可以在自定义函数中使用第三方库。值得注意的是,第三方库的依赖库也必须指定,否则依然会有导入错误。

如果你的 MaxCompute 服务支持在执行 SQL 时使用镜像,可以在 execute / persist / to_pandas 方法指定 image 参数以使用镜像。与此同时,DataFrame 的 execute / persist / to_pandas 方法支持增加 libraries 参数以将资源作为三方包。 PyODPS 提供了 pyodps-pack 工具,可在安装完 PyODPS 后打包三方包及其依赖。如何制作及使用三方包的说明请参考 这里

警告

由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from )时,代码在使用 Python 2.7 的 ODPS Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常 执行。

MapReduce API

PyODPS DataFrame也支持MapReduce API,用户可以分别编写map和reduce函数(map_reduce可以只有mapper或者reducer过程)。 我们来看个简单的wordcount的例子。

>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> def reducer(keys):
>>>     # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
>>>     cnt = [0]
>>>     def h(row, done):  # done表示这个key已经迭代结束
>>>         cnt[0] += row[1]
>>>         if done:
>>>             yield keys[0], cnt[0]
>>>     return h
>>>
>>> words_df.map_reduce(mapper, reducer, group=['word', ],
>>>                     mapper_output_names=['word', 'cnt'],
>>>                     mapper_output_types=['string', 'int'],
>>>                     reducer_output_names=['word', 'cnt'],
>>>                     reducer_output_types=['string', 'int'])
     word  cnt
0   hello    2
1       i    1
2      is    1
3    life    1
4  python    2
5   short    1
6     use    1
7   world    1

group参数用来指定reduce按哪些字段做分组,如果不指定,会按全部字段做分组。

其中对于reducer来说,会稍微有些不同。它需要接收聚合的keys初始化,并能继续处理按这些keys聚合的每行数据。 第2个参数表示这些keys相关的所有行是不是都迭代完成。

这里写成函数闭包的方式,主要为了方便,当然我们也能写成callable的类。

class reducer(object):
    def __init__(self, keys):
        self.cnt = 0

    def __call__(self, row, done):  # done表示这个key已经迭代结束
        self.cnt += row.cnt
        if done:
            yield row.word, self.cnt

使用 output来注释会让代码更简单些。

>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>>     # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
>>>     cnt = [0]
>>>     def h(row, done):  # done表示这个key已经迭代结束
>>>         cnt[0] += row.cnt
>>>         if done:
>>>             yield keys.word, cnt[0]
>>>     return h
>>>
>>> words_df.map_reduce(mapper, reducer, group='word')
     word  cnt
0   hello    2
1       i    1
2      is    1
3    life    1
4  python    2
5   short    1
6     use    1
7   world    1

有时候我们在迭代的时候需要按某些列排序,则可以使用 sort参数,来指定按哪些列排序,升序降序则通过 ascending参数指定。 ascending 参数可以是一个bool值,表示所有的 sort字段是相同升序或降序, 也可以是一个列表,长度必须和 sort字段长度相同。

指定combiner

combiner表示在map_reduce API里表示在mapper端,就先对数据进行聚合操作,它的用法和reducer是完全一致的,但不能引用资源。 并且,combiner的输出的字段名和字段类型必须和mapper完全一致。

上面的例子,我们就可以使用reducer作为combiner来先在mapper端对数据做初步的聚合,减少shuffle出去的数据量。

>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')

使用第三方Python库

现在用户可以把第三方 Wheel 包作为资源上传到 MaxCompute。在全局或者在立即执行的方法时,指定需要使用的包文件, 即可以在自定义函数中使用第三方库。值得注意的是,第三方库的依赖库也必须指定,否则依然会有导入错误。

如果你的 MaxCompute 服务支持在执行 SQL 时使用镜像,可以在 execute / persist / to_pandas 方法指定 image 参数以使用镜像。与此同时,DataFrame 的 execute / persist / to_pandas 方法支持增加 libraries 参数以将资源作为三方包。 PyODPS 提供了 pyodps-pack 工具,可在安装完 PyODPS 后打包三方包及其依赖。如何制作及使用三方包的说明请参考 这里

警告

由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from )时,代码在使用 Python 2.7 的 ODPS Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常 执行。

警告

由于 PyODPS DataFrame 默认 Collection / Sequence 等对象均为分布式对象,故不支持在自定义函数内部引用这些对象。 请考虑改用 Join 等方法 引用多个 DataFrame 的数据,或者引用 Collection 作为资源。

引用资源

在MapReduce API里,我们能分别指定mapper和reducer所要引用的资源。

如下面的例子,我们对mapper里的单词做停词过滤,在reducer里对白名单的单词数量加5。

>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>>     stop_words = set(r[0].strip() for r in resources[0])
>>>     def h(row):
>>>         for word in row[0].split():
>>>             if word not in stop_words:
>>>                 yield word, 1
>>>     return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>>     d = dict()
>>>     d['white_list'] = set(word.strip() for word in resources[0])
>>>     d['cnt'] = 0
>>>     def inner(keys):
>>>         d['cnt'] = 0
>>>         def h(row, done):
>>>             d['cnt'] += row.cnt
>>>             if done:
>>>                 if row.word in d['white_list']:
>>>                     d['cnt'] += 5
>>>                 yield keys.word, d['cnt']
>>>         return h
>>>     return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>>                     mapper_resources=[stop_words], reducer_resources=[white_list_file])
     word  cnt
0   hello    2
1    life    1
2  python    7
3   world    6
4   short    1
5     use    1

重排数据

有时候我们的数据在集群上分布可能是不均匀的,我们需要对数据重排。调用 reshuffle 接口即可。

>>> df1 = df.reshuffle()

默认会按随机数做哈希来分布。也可以指定按那些列做分布,且可以指定重排后的排序顺序。

>>> df1.reshuffle('name', sort='id', ascending=False)

布隆过滤器

PyODPS DataFrame提供了 bloom_filter 接口来进行布隆过滤器的计算。

给定某个collection,和它的某个列计算的sequence1,我们能对另外一个sequence2进行布隆过滤,sequence1不在sequence2中的一定会过滤, 但可能不能完全过滤掉不存在于sequence2中的数据,这也是一种近似的方法。

这样的好处是能快速对collection进行快速过滤一些无用数据。

这在大规模join的时候,一边数据量远大过另一边数据,而大部分并不会join上的场景很有用。 比如,我们在join用户的浏览数据和交易数据时,用户的浏览大部分不会带来交易,我们可以利用交易数据先对浏览数据进行布隆过滤, 然后再join能很好提升性能。

>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
       a  b
0  name1  1
1  name2  2
2  name3  3
3  name1  4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
       a
0  name1
>>> df1.bloom_filter('a', df2.a) # 这里第0个参数可以是个计算表达式如: df1.a + '1'
       a  b
0  name1  1
1  name1  4

这里由于数据量很小,df1中的a为name2和name3的行都被正确过滤掉了,当数据量很大的时候,可能会有一定的数据不能被过滤。

如之前提的join场景中,少量不能过滤并不能并不会影响正确性,但能较大提升join的性能。

我们可以传入 capacityerror_rate 来设置数据的量以及错误率,默认值是 30000.01

备注

要注意,调大 capacity 或者减小 error_rate 会增加内存的使用,所以应当根据实际情况选择一个合理的值。

透视表(pivot_table)

PyODPS DataFrame提供透视表的功能。我们通过几个例子来看使用。

>>> df
     A    B      C  D  E
0  foo  one  small  1  3
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  6
4  foo  two  small  3  4
5  bar  one  large  4  5
6  bar  one  small  5  3
7  bar  two  small  6  2
8  bar  two  large  7  1

最简单的透视表必须提供一个 rows 参数,表示按一个或者多个字段做取平均值的操作。

>>> df['A', 'D', 'E'].pivot_table(rows='A')
     A  D_mean  E_mean
0  bar     5.5    2.75
1  foo     2.2    4.40

rows可以提供多个,表示按多个字段做聚合。

>>> df.pivot_table(rows=['A', 'B', 'C'])
     A    B      C  D_mean  E_mean
0  bar  one  large     4.0     5.0
1  bar  one  small     5.0     3.0
2  bar  two  large     7.0     1.0
3  bar  two  small     6.0     2.0
4  foo  one  large     2.0     4.5
5  foo  one  small     1.0     3.0
6  foo  two  small     3.0     5.0

我们可以指定 values 来显示指定要计算的列。

>>> df.pivot_table(rows=['A', 'B'], values='D')
     A    B    D_mean
0  bar  one  4.500000
1  bar  two  6.500000
2  foo  one  1.666667
3  foo  two  3.000000

计算值列时,默认会计算平均值,用户可以指定一个或者多个聚合函数。

>>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
     A    B    D_mean  D_count  D_sum
0  bar  one  4.500000        2      9
1  bar  two  6.500000        2     13
2  foo  one  1.666667        3      5
3  foo  two  3.000000        2      6

我们也可以把原始数据的某一列的值,作为新的collection的列。 这也是透视表最强大的地方。

>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
     A    B  large_D_mean  small_D_mean
0  bar  one           4.0           5.0
1  bar  two           7.0           6.0
2  foo  one           2.0           1.0
3  foo  two           NaN           3.0

我们可以提供 fill_value 来填充空值。

>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
     A    B  large_D_mean  small_D_mean
0  bar  one             4             5
1  bar  two             7             6
2  foo  one             2             1
3  foo  two             0             3

Key-Value 字符串转换

DataFrame 提供了将 Key-Value 对展开为列,以及将普通列转换为 Key-Value 列的功能。

我们的数据为

>>> df
    name               kv
0  name1  k1=1,k2=3,k5=10
1  name1    k1=7.1,k7=8.2
2  name2    k2=1.2,k3=1.5
3  name2      k9=1.1,k2=1

可以通过 extract_kv 方法将 Key-Value 字段展开:

>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
   name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
0  name1    1.0    3.0    NaN   10.0    NaN    NaN
1  name1    7.0    NaN    NaN    NaN    8.2    NaN
2  name2    NaN    1.2    1.5    NaN    NaN    NaN
3  name2    NaN    1.0    NaN    NaN    NaN    1.1

其中,需要展开的字段名由 columns 指定,Key 和 Value 之间的分隔符,以及 Key-Value 对之间的分隔符分别由 kv_delim 和 item_delim 这两个参数指定,默认分别为半角冒号和半角逗号。输出的字段名为原字段名和 Key 值的组合,通过“_”相连。缺失值默认为 None,可通过 fill_value 选择需要填充的值。例如,相同的 df,

>>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
   name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
0  name1    1.0    3.0    0.0   10.0    0.0    0.0
1  name1    7.0    0.0    0.0    0.0    8.2    0.0
2  name2    0.0    1.2    1.5    0.0    0.0    0.0
3  name2    0.0    1.0    0.0    0.0    0.0    1.1

extract_kv 默认输出类型为 float。如果需要输出其他类型,可以指定 dtype 参数,例如

df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0, dtype='string')

DataFrame 也支持将多列数据转换为一个 Key-Value 列。例如,

>>> df
   name    k1   k2   k3    k5   k7   k9
0  name1  1.0  3.0  NaN  10.0  NaN  NaN
1  name1  7.0  NaN  NaN   NaN  8.2  NaN
2  name2  NaN  1.2  1.5   NaN  NaN  NaN
3  name2  NaN  1.0  NaN   NaN  NaN  1.1

可通过 to_kv 方法转换为 Key-Value 表示的格式:

>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
    name               kv
0  name1  k1=1,k2=3,k5=10
1  name1    k1=7.1,k7=8.2
2  name2    k2=1.2,k3=1.5
3  name2      k9=1.1,k2=1