列运算

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
lens = DataFrame(o.get_table('pyodps_ml_100k_lens'))

对于一个Sequence来说,对它加上一个常量、或者执行sin函数的这类操作时,是作用于每个元素上的。接下来会详细说明。

NULL相关(isnull,notnull,fillna)

DataFrame API提供了几个和NULL相关的内置函数,比如isnull来判断是否某字段是NULL,notnull则相反,fillna是将NULL填充为用户指定的值。

>>> iris.sepallength.isnull().head(5)
   sepallength
0        False
1        False
2        False
3        False
4        False

逻辑判断(ifelse,switch)

ifelse作用于boolean类型的字段,当条件成立时,返回第0个参数,否则返回第1个参数。

>>> (iris.sepallength > 5).ifelse('gt5', 'lte5').rename('cmp5').head(5)
   cmp5
0   gt5
1  lte5
2  lte5
3  lte5
4  lte5

switch用于多条件判断的情况。

>>> iris.sepallength.switch(4.9, 'eq4.9', 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
   equalness
0       noeq
1      eq4.9
2       noeq
3       noeq
4      eq5.0
>>> from odps.df import switch
>>> switch(iris.sepallength == 4.9, 'eq4.9', iris.sepallength == 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
   equalness
0       noeq
1      eq4.9
2       noeq
3       noeq
4      eq5.0

PyODPS 0.7.8 以上版本支持根据条件修改数据集某一列的一部分值,写法为:

>>> iris[iris.sepallength > 5, 'cmp5'] = 'gt5'
>>> iris[iris.sepallength <= 5, 'cmp5'] = 'lte5'
>>> iris.head(5)
   cmp5
0   gt5
1  lte5
2  lte5
3  lte5
4  lte5

数学运算

对于数字类型的字段,支持+,-,*,/等操作,也支持log、sin等数学计算。

>>> (iris.sepallength * 10).log().head(5)
   sepallength
0     3.931826
1     3.891820
2     3.850148
3     3.828641
4     3.912023
>>> fields = [iris.sepallength,
>>>           (iris.sepallength / 2).rename('sepallength除以2'),
>>>           (iris.sepallength ** 2).rename('sepallength的平方')]
>>> iris[fields].head(5)
   sepallength  sepallength除以2  sepallength的平方
0          5.1              2.55             26.01
1          4.9              2.45             24.01
2          4.7              2.35             22.09
3          4.6              2.30             21.16
4          5.0              2.50             25.00

算术运算支持的操作包括:

算术操作 说明
abs 绝对值
sqrt 平方根
sin  
sinh  
cos  
cosh  
tan  
tanh  
arccos  
arccosh  
arcsin  
arcsinh  
arctan  
arctanh  
exp 指数函数
expm1 指数减1
log 传入参数表示底是几
log2  
log10  
log1p log(1+x)
radians 给定角度计算弧度
degrees 给定弧度计算角度
ceil 不小于输入值的最小整数
floor 向下取整,返回比输入值小的整数值
trunc 将输入值截取到指定小数点位置

对于sequence,也支持其于其他sequence或者scalar的比较。

>>> (iris.sepallength < 5).head(5)
   sepallength
0        False
1         True
2         True
3         True
4        False

值得主意的是,DataFrame API不支持连续操作,比如3 <= iris.sepallength <= 5,但是提供了between这个函数来进行是否在某个区间的判断。

>>> (iris.sepallength.between(3, 5)).head(5)
   sepallength
0        False
1         True
2         True
3         True
4         True

默认情况下,between包含两边的区间,如果计算开区间,则需要设inclusive=False。

>>> (iris.sepallength.between(3, 5, inclusive=False)).head(5)
   sepallength
0        False
1         True
2         True
3         True
4        False

String 相关操作

DataFrame API提供了一系列针对string类型的Sequence或者Scalar的操作。

>>> fields = [
>>>     iris.name.upper().rename('upper_name'),
>>>     iris.name.extract('Iris(.*)', group=1)
>>> ]
>>> iris[fields].head(5)
    upper_name     name
0  IRIS-SETOSA  -setosa
1  IRIS-SETOSA  -setosa
2  IRIS-SETOSA  -setosa
3  IRIS-SETOSA  -setosa
4  IRIS-SETOSA  -setosa

string相关操作包括:

string 操作 说明
capitalize  
contains 包含某个字符串,如果 regex 参数为 True,则是包含某个正则表达式,默认为 True
count 指定字符串出现的次数
endswith 以某个字符串结尾
startswith 以某个字符串开头
extract 抽取出某个正则表达式,如果 group 不指定,则返回满足整个 pattern 的子串;否则,返回第几个 group
find 返回第一次出现的子串位置,若不存在则返回-1
rfind 从右查找返回子串第一次出现的位置,不存在则返回-1
replace 将某个 pattern 的子串全部替换成另一个子串, n 参数若指定,则替换n次
get 返回某个位置上的字符串
len 返回字符串的长度
ljust 若未达到指定的 width 的长度,则在右侧填充 fillchar 指定的字符串(默认空格)
rjust 若未达到指定的 width 的长度,则在左侧填充 fillchar 指定的字符串(默认空格)
lower 变为全部小写
upper 变为全部大写
lstrip 在左侧删除空格(包括空行符)
rstrip 在右侧删除空格(包括空行符)
strip 在左右两侧删除空格(包括空行符)
split 将字符串按分隔符拆分为若干个字符串(返回 list<string> 类型)
pad 在指定的位置(left,right 或者 both)用指定填充字符(用 fillchar 指定,默认空格)来对齐
repeat 重复指定 n
slice 切片操作
swapcase 对调大小写
title 同 str.title
zfill 长度没达到指定 width ,则左侧填充0
isalnum 同 str.isalnum
isalpha 同 str.isalpha
isdigit 是否都是数字,同 str.isdigit
isspace 是否都是空格,同 str.isspace
islower 是否都是小写,同 str.islower
isupper 是否都是大写,同 str.isupper
istitle 同 str.istitle
isnumeric 同 str.isnumeric
isdecimal 同 str.isdecimal
todict 将字符串按分隔符拆分为一个 dict,传入的两个参数分别为项目分隔符和 Key-Value 分隔符(返回 dict<string, string> 类型)
strptime 按格式化读取成时间,时间格式和Python标准库相同,详细参考 Python 时间格式化

时间相关操作

对于datetime类型Sequence或者Scalar,可以调用时间相关的内置函数。

>>> df = lens[[lens.unix_timestamp.astype('datetime').rename('dt')]]
>>> df[df.dt,
>>>    df.dt.year.rename('year'),
>>>    df.dt.month.rename('month'),
>>>    df.dt.day.rename('day'),
>>>    df.dt.hour.rename('hour')].head(5)
                    dt  year  month  day  hour
0  1998-04-08 11:02:00  1998      4    8    11
1  1998-04-08 10:57:55  1998      4    8    10
2  1998-04-08 10:45:26  1998      4    8    10
3  1998-04-08 10:25:52  1998      4    8    10
4  1998-04-08 10:44:19  1998      4    8    10

与时间相关的属性包括:

时间相关属性 说明
year  
month  
day  
hour  
minute  
second  
weekofyear 返回日期位于那一年的第几周。周一作为一周的第一天
weekday 返回日期当前周的第几天
dayofweek 同 weekday
strftime 格式化时间,时间格式和 Python 标准库相同,详细参考 Python 时间格式化

PyODPS 也支持时间的加减操作,比如可以通过以下方法得到前3天的日期。两个日期列相减得到相差的毫秒数。

>>> df
                           a                          b
0 2016-12-06 16:43:12.460001 2016-12-06 17:43:12.460018
1 2016-12-06 16:43:12.460012 2016-12-06 17:43:12.460021
2 2016-12-06 16:43:12.460015 2016-12-06 17:43:12.460022
>>> from odps.df import day
>>> df.a - day(3)
                           a
0 2016-12-03 16:43:12.460001
1 2016-12-03 16:43:12.460012
2 2016-12-03 16:43:12.460015
>>> (df.b - df.a).dtype
int64
>>> (df.b - df.a).rename('a')
         a
0  3600000
1  3600000
2  3600000

支持的时间类型包括:

属性 说明
year  
month  
day  
hour  
minute  
second  
millisecond  

集合类型相关操作

PyODPS 支持的集合类型有 List 和 Dict。这两个类型都可以使用下标获取集合中的某个项目,另有 len 方法,可获得集合的大小。

同时,两种集合均有 explode 方法,用于展开集合中的内容。对于 List,explode 默认返回一列,当传入参数 pos 时, 将返回两列,其中一列为值在数组中的编号(类似 Python 的 enumerate 函数)。对于 Dict,explode 会返回两列, 分别表示 keys 及 values。explode 中也可以传入列名,作为最后生成的列。

示例如下:

>>> df
   id         a                            b
0   1  [a1, b1]  {'a2': 0, 'b2': 1, 'c2': 2}
1   2      [c1]           {'d2': 3, 'e2': 4}
>>> df[df.id, df.a[0], df.b['b2']]
   id   a    b
0   1  a1    1
1   2  c1  NaN
>>> df[df.id, df.a.len(), df.b.len()]
   id  a  b
0   1  2  3
1   2  1  2
>>> df.a.explode()
    a
0  a1
1  b1
2  c1
>>> df.a.explode(pos=True)
   a_pos   a
0      0  a1
1      1  b1
2      0  c1
>>> # 指定列名
>>> df.a.explode(['pos', 'value'], pos=True)
   pos value
0    0    a1
1    1    b1
2    0    c1
>>> df.b.explode()
  b_key  b_value
0    a2        0
1    b2        1
2    c2        2
3    d2        3
4    e2        4
>>> # 指定列名
>>> df.b.explode(['key', 'value'])
  key  value
0  a2      0
1  b2      1
2  c2      2
3  d2      3
4  e2      4

explode 也可以和 并列多行输出 结合,以将原有列和 explode 的结果相结合,例子如下:

>>> df[df.id, df.a.explode()]
   id   a
0   1  a1
1   1  b1
2   2  c1
>>> df[df.id, df.a.explode(), df.b.explode()]
   id   a b_key  b_value
0   1  a1    a2        0
1   1  a1    b2        1
2   1  a1    c2        2
3   1  b1    a2        0
4   1  b1    b2        1
5   1  b1    c2        2
6   2  c1    d2        3
7   2  c1    e2        4

除了下标、len 和 explode 两个共有方法以外,List 还支持下列方法:

list 操作 说明
contains(v) 列表是否包含某个元素
sort 返回排序后的列表(返回值为 List)

Dict 还支持下列方法:

dict 操作 说明
keys 获取 Dict keys(返回值为 List)
values 获取 Dict values(返回值为 List)

其他元素操作(isin,notin,cut)

isin给出Sequence里的元素是否在某个集合元素里。notin是相反动作。

>>> iris.sepallength.isin([4.9, 5.1]).rename('sepallength').head(5)
   sepallength
0         True
1         True
2        False
3        False
4        False

cut提供离散化的操作,可以将Sequence的数据拆成几个区段。

>>> iris.sepallength.cut(range(6), labels=['0-1', '1-2', '2-3', '3-4', '4-5']).rename('sepallength_cut').head(5)
   sepallength_cut
0             None
1              4-5
2              4-5
3              4-5
4              4-5

include_underinclude_over可以分别包括向下和向上的区间。

>>> labels = ['0-1', '1-2', '2-3', '3-4', '4-5', '5-']
>>> iris.sepallength.cut(range(6), labels=labels, include_over=True).rename('sepallength_cut').head(5)
   sepallength_cut
0               5-
1              4-5
2              4-5
3              4-5
4              4-5

使用自定义函数

DataFrame函数支持对Sequence使用map,它会对它的每个元素调用自定义函数。比如:

>>> iris.sepallength.map(lambda x: x + 1).head(5)
   sepallength
0          6.1
1          5.9
2          5.7
3          5.6
4          6.0

警告

目前,受限于 Python UDF,自定义函数无法支持将 list / dict 类型作为输入或输出。

如果map前后,Sequence的类型发生了变化,则需要显式指定map后的类型。

>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
   sepallength
0         t5.1
1         t4.9
2         t4.7
3         t4.6
4         t5.0

如果在函数中包含闭包,需要注意的是,函数外闭包变量值的变化会引起函数内该变量值的变化。例如,

>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(lambda x: x + i))

结果为 dfs 中每个 SequenceExpr 均为 df.sepal_length + 9。为解决此问题,可以将函数作为另一函数的返回值,或者使用 partial,如

>>> dfs = []
>>> def get_mapper(i):
>>>     return lambda x: x + i
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(get_mapper(i)))

>>> import functools
>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))

map也支持使用现有的UDF函数,传入的参数是str类型(函数名)或者 Function对象

map传入Python函数的实现使用了ODPS Python UDF,因此,如果用户所在的Project不支持Python UDF,则map函数无法使用。除此以外,所有 Python UDF 的限制在此都适用。

目前,第三方库(包含C)只能使用numpy,第三方库使用参考 使用第三方Python库

除了调用自定义函数,DataFrame还提供了很多内置函数,这些函数中部分使用了map函数来实现,因此,如果用户所在Project未开通Python UDF,则这些函数也就无法使用(注:阿里云公共服务暂不提供Python UDF支持)

警告

由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from )时,代码在使用 Python 2.7 的 ODPS Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常 执行。

引用资源

自定义函数也能读取ODPS上的资源(表资源或文件资源),或者引用一个collection作为资源。 此时,自定义函数需要写成函数闭包或callable的类。

>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
       sepallength
0      Iris-setosa
1  Iris-versicolor
>>> def myfunc(resources):  # resources按调用顺序传入
>>>     names = set()
>>>     fileobj = resources[0] # 文件资源是一个file-like的object
>>>     for l in fileobj:
>>>         names.add(l)
>>>     collection = resources[1]
>>>     for r in collection:
>>>         names.add(r.name)  # 这里可以通过字段名或者偏移来取
>>>     def h(x):
>>>         if x in names:
>>>             return True
>>>         else:
>>>             return False
>>>     return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>>         df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
              name   isin
0      Iris-setosa   True
1  Iris-versicolor   True
2   Iris-virginica  False

注:分区表资源在读取时不包含分区字段。

使用第三方Python库

现在用户可以把第三方 Python 包作为资源上传到 MaxCompute,支持的格式有 whl、egg、zip 以及 tar.gz。 在全局或者在立即执行的方法时,指定需要使用的包文件。即可以在自定义函数中使用第三方库。

值得注意的是,第三方库的依赖库,也必须指定,否则依然会有导入错误。

下面我们会以 python-dateutil 这个包作为例子。

首先,我们可以使用pip download命令,下载包以及其依赖到某个路径。 这里下载后会出现两个包:six-1.10.0-py2.py3-none-any.whl和python_dateutil-2.5.3-py2.py3-none-any.whl (这里注意需要下载支持linux环境的包)

$ pip download python-dateutil -d /to/path/

然后我们分别把两个文件上传到ODPS资源

>>> # 这里要确保资源名的后缀是正确的文件类型
>>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
>>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))

现在我们有个DataFrame,只有一个string类型字段。

>>> df
               datestr
0  2016-08-26 14:03:29
1  2015-08-26 14:03:29

全局配置使用到的三方库:

>>> from odps import options
>>>
>>> def get_year(t):
>>>     from dateutil.parser import parse
>>>     return parse(t).strftime('%Y')
>>>
>>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
>>> df.datestr.map(get_year)
   datestr
0     2016
1     2015

或者,通过立即运行方法的 libraries 参数指定:

>>> def get_year(t):
>>>     from dateutil.parser import parse
>>>     return parse(t).strftime('%Y')
>>>
>>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
   datestr
0     2016
1     2015

PyODPS 默认支持执行纯 Python 且不含文件操作的第三方库。在较新版本的 MaxCompute 服务下,PyODPS 也支持执行带有二进制代码或带有文件操作的 Python 库。这些库的后缀必须是 cp27-cp27m-manylinux1_x86_64, 以 archive 格式上传,whl 后缀的包需要重命名为 zip。同时,作业需要开启 odps.isolation.session.enable 选项,或者在 Project 级别开启 Isolation。下面的例子展示了如何上传并使用 scipy 中的特殊函数:

>>> # 对于含有二进制代码的包,必须使用 Archive 方式上传资源,whl 后缀需要改为 zip
>>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
>>>
>>> # 如果 Project 开启了 Isolation,下面的选项不是必需的
>>> options.sql.settings = { 'odps.isolation.session.enable': True }
>>>
>>> def psi(value):
>>>     # 建议在函数内部 import 第三方库,以防止不同操作系统下二进制包结构差异造成执行错误
>>>     from scipy.special import psi
>>>     return float(psi(value))
>>>
>>> df.float_col.map(psi).execute(libraries=['scipy.zip'])

对于只提供源码的二进制包,可以在 Linux Shell 中打包成 Wheel 再上传,Mac 和 Windows 中生成的 Wheel 包无法在 MaxCompute 中使用:

python setup.py bdist_wheel

使用计数器

from odps.udf import get_execution_context

def h(x):
    ctx = get_execution_context()
    counters = ctx.get_counters()
    counters.get_counter('df', 'add_one').increment(1)
    return x + 1

df.field.map(h)

logview 的 JSONSummary 中即可找到计数器值。

调用ODPS内建或者已定义函数

要想调用 ODPS 上的内建或者已定义函数,来生成列,我们可以使用 func 接口,该接口默认函数返回值为 String, 可以用 rtype 参数指定返回值。

>>> from odps.df import func
>>>
>>> iris[iris.name, func.rand(rtype='float').rename('rand')][:4]
>>> iris[iris.name, func.rand(10, rtype='float').rename('rand')][:4]
>>> # 调用 ODPS 上定义的 UDF,列名无法确定时需要手动指定
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float').rename('new_col')]
>>> # 从其他 Project 调用 UDF,也可通过 name 参数指定列名
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float', project='udf_project',
>>>                               name='new_col')]

注解

注意:在使用 Pandas 后端时,不支持执行带有 func 的表达式。