列运算

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
lens = DataFrame(o.get_table('pyodps_ml_100k_lens'))

对于一个Sequence来说,对它加上一个常量、或者执行sin函数的这类操作时,是作用于每个元素上的。接下来会详细说明。

NULL相关(isnull,notnull,fillna)

DataFrame API提供了几个和NULL相关的内置函数,比如isnull来判断是否某字段是NULL,notnull则相反,fillna是将NULL填充为用户指定的值。

>>> iris.sepallength.isnull().head(5)
   sepallength
0        False
1        False
2        False
3        False
4        False

逻辑判断(ifelse,switch)

ifelse作用于boolean类型的字段,当条件成立时,返回第0个参数,否则返回第1个参数。

>>> (iris.sepallength > 5).ifelse('gt5', 'lte5').rename('cmp5').head(5)
   cmp5
0   gt5
1  lte5
2  lte5
3  lte5
4  lte5

switch用于多条件判断的情况。

>>> iris.sepallength.switch(4.9, 'eq4.9', 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
   equalness
0       noeq
1      eq4.9
2       noeq
3       noeq
4      eq5.0
>>> from odps.df import switch
>>> switch(iris.sepallength == 4.9, 'eq4.9', iris.sepallength == 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
   equalness
0       noeq
1      eq4.9
2       noeq
3       noeq
4      eq5.0

数学运算

对于数字类型的字段,支持+,-,*,/等操作,也支持log、sin等数学计算。

>>> (iris.sepallength * 10).log().head(5)
   sepallength
0     3.931826
1     3.891820
2     3.850148
3     3.828641
4     3.912023
>>> fields = [iris.sepallength,
>>>           (iris.sepallength / 2).rename('sepallength除以2'),
>>>           (iris.sepallength ** 2).rename('sepallength的平方')]
>>> iris[fields].head(5)
   sepallength  sepallength除以2  sepallength的平方
0          5.1              2.55             26.01
1          4.9              2.45             24.01
2          4.7              2.35             22.09
3          4.6              2.30             21.16
4          5.0              2.50             25.00

算术运算支持的操作包括:

算术操作 说明
abs 绝对值
sqrt 平方根
sin  
sinh  
cos  
cosh  
tan  
tanh  
arccos  
arccosh  
arcsin  
arcsinh  
arctan  
arctanh  
exp 指数函数
expm1 指数减1
log 传入参数表示底是几
log2  
log10  
log1p log(1+x)
radians 给定角度计算弧度
degrees 给定弧度计算角度
ceil 不小于输入值的最小整数
floor 向下取整,返回比输入值小的整数值
trunc 将输入值截取到指定小数点位置

对于sequence,也支持其于其他sequence或者scalar的比较。

>>> (iris.sepallength < 5).head(5)
   sepallength
0        False
1         True
2         True
3         True
4        False

值得主意的是,DataFrame API不支持连续操作,比如3 <= iris.sepallength <= 5,但是提供了between这个函数来进行是否在某个区间的判断。

>>> (iris.sepallength.between(3, 5)).head(5)
   sepallength
0        False
1         True
2         True
3         True
4         True

默认情况下,between包含两边的区间,如果计算开区间,则需要设inclusive=False。

>>> (iris.sepallength.between(3, 5, inclusive=False)).head(5)
   sepallength
0        False
1         True
2         True
3         True
4        False

String相关操作

DataFrame API提供了一系列针对string类型的Sequence或者Scalar的操作。

>>> fields = [
>>>     iris.name.upper().rename('upper_name'),
>>>     iris.name.extract('Iris(.*)', group=1)
>>> ]
>>> iris[fields].head(5)
    upper_name     name
0  IRIS-SETOSA  -setosa
1  IRIS-SETOSA  -setosa
2  IRIS-SETOSA  -setosa
3  IRIS-SETOSA  -setosa
4  IRIS-SETOSA  -setosa

string相关操作包括:

string 操作 算法类型
capitalize  
contains 包含某个字符串,如果 regex 参数为 True,则是包含某个正则表达式,默认为 True
count 指定字符串出现的次数
endswith 以某个字符串结尾
startswith 以某个字符串开头
extract 抽取出某个正则表达式,如果 group 不指定,则返回满足整个 pattern 的子串;否则,返回第几个 group
find 返回第一次出现的子串位置,若不存在则返回-1
rfind 从右查找返回子串第一次出现的位置,不存在则返回-1
replace 将某个 pattern 的子串全部替换成另一个子串, n 参数若指定,则替换n次
get 返回某个位置上的字符串
len 返回字符串的长度
ljust 若未达到指定的 width 的长度,则在右侧填充 fillchar 指定的字符串(默认空格)
rjust 若未达到指定的 width 的长度,则在左侧填充 fillchar 指定的字符串(默认空格)
lower 变为全部小写
upper 变为全部大写
lstrip 在左侧删除空格(包括空行符)
rstrip 在右侧删除空格(包括空行符)
strip 在左右两侧删除空格(包括空行符)
pad 在指定的位置(left,right 或者 both)用指定填充字符(用 fillchar 指定,默认空格)来对齐
repeat 重复指定 n
slice 切片操作
swapcase 对调大小写
title 同 str.title
zfill 长度没达到指定 width ,则左侧填充0
isalnum 同 str.isalnum
isalpha 同 str.isalpha
isdigit 是否都是数字,同 str.isdigit
isspace 是否都是空格,同 str.isspace
islower 是否都是小写,同 str.islower
isupper 是否都是大写,同 str.isupper
istitle 同 str.istitle
isnumeric 同 str.isnumeric
isdecimal 同 str.isdecimal
strptime 按格式化读取成时间,时间格式和Python标准库相同,详细参考 Python 时间格式化

时间相关操作

对于datetime类型Sequence或者Scalar,可以调用时间相关的内置函数。

>>> df = lens[[lens.unix_timestamp.astype('datetime').rename('dt')]]
>>> df[df.dt,
>>>    df.dt.year.rename('year'),
>>>    df.dt.month.rename('month'),
>>>    df.dt.day.rename('day'),
>>>    df.dt.hour.rename('hour')].head(5)
                    dt  year  month  day  hour
0  1998-04-08 11:02:00  1998      4    8    11
1  1998-04-08 10:57:55  1998      4    8    10
2  1998-04-08 10:45:26  1998      4    8    10
3  1998-04-08 10:25:52  1998      4    8    10
4  1998-04-08 10:44:19  1998      4    8    10

与时间相关的属性包括:

时间相关属性 说明
year  
month  
day  
hour  
minute  
second  
weekofyear 返回日期位于那一年的第几周。周一作为一周的第一天
weekday 返回日期当前周的第几天
dayofweek 同 weekday
strftime 格式化时间,时间格式和 Python 标准库相同,详细参考 Python 时间格式化

PyODPS 也支持时间的加减操作,比如可以通过以下方法得到前3天的日期。两个日期列相减得到相差的毫秒数。

>>> df
                           a                          b
0 2016-12-06 16:43:12.460001 2016-12-06 17:43:12.460018
1 2016-12-06 16:43:12.460012 2016-12-06 17:43:12.460021
2 2016-12-06 16:43:12.460015 2016-12-06 17:43:12.460022
>>> from odps.df import day
>>> df.a - day(3)
                           a
0 2016-12-03 16:43:12.460001
1 2016-12-03 16:43:12.460012
2 2016-12-03 16:43:12.460015
>>> (df.b - df.a).dtype
int64
>>> (df.b - df.a).rename('a')
         a
0  3600000
1  3600000
2  3600000

支持的时间类型包括:

属性 说明
year  
month  
day  
hour  
minute  
second  
millisecond  

其他元素操作(isin,notin,cut)

isin给出Sequence里的元素是否在某个集合元素里。notin是相反动作。

>>> iris.sepallength.isin([4.9, 5.1]).rename('sepallength').head(5)
   sepallength
0         True
1         True
2        False
3        False
4        False

cut提供离散化的操作,可以将Sequence的数据拆成几个区段。

>>> iris.sepallength.cut(range(6), labels=['0-1', '1-2', '2-3', '3-4', '4-5']).rename('sepallength_cut').head(5)
   sepallength_cut
0             None
1              4-5
2              4-5
3              4-5
4              4-5

include_underinclude_over可以分别包括向下和向上的区间。

>>> labels = ['0-1', '1-2', '2-3', '3-4', '4-5', '5-']
>>> iris.sepallength.cut(range(6), labels=labels, include_over=True).rename('sepallength_cut').head(5)
   sepallength_cut
0               5-
1              4-5
2              4-5
3              4-5
4              4-5

使用自定义函数

DataFrame函数支持对Sequence使用map,它会对它的每个元素调用自定义函数。比如:

>>> iris.sepallength.map(lambda x: x + 1).head(5)
   sepallength
0          6.1
1          5.9
2          5.7
3          5.6
4          6.0

如果map前后,Sequence的类型发生了变化,则需要显式指定map后的类型。

>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
   sepallength
0         t5.1
1         t4.9
2         t4.7
3         t4.6
4         t5.0

如果在函数中包含闭包,需要注意的是,函数外闭包变量值的变化会引起函数内该变量值的变化。例如,

>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(lambda x: x + i))

结果为 dfs 中每个 SequenceExpr 均为 df.sepal_length + 9。为解决此问题,可以将函数作为另一函数的返回值,或者使用 partial,如

>>> dfs = []
>>> def get_mapper(i):
>>>     return lambda x: x + i
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(get_mapper(i)))

>>> import functools
>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))

map也支持使用现有的UDF函数,传入的参数是str类型(函数名)或者 Function对象

map传入Python函数的实现使用了ODPS Python UDF,因此,如果用户所在的Project不支持Python UDF,则map函数无法使用。除此以外,所有 Python UDF 的限制在此都适用。

目前,第三方库(包含C)只能使用numpy,纯Python库使用参考 使用第三方纯Python库

除了调用自定义函数,DataFrame还提供了很多内置函数,这些函数中部分使用了map函数来实现,因此,如果用户所在Project未开通Python UDF,则这些函数也就无法使用(注:阿里云公共服务暂不提供Python UDF支持)

警告

由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from )时,代码在使用 Python 2.7 的 ODPS Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常 执行。

引用资源

自定义函数也能读取ODPS上的资源(表资源或文件资源),或者引用一个collection作为资源。 此时,自定义函数需要写成函数闭包或callable的类。

>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
       sepallength
0      Iris-setosa
1  Iris-versicolor
>>> def myfunc(resources):  # resources按调用顺序传入
>>>     names = set()
>>>     fileobj = resources[0] # 文件资源是一个file-like的object
>>>     for l in fileobj:
>>>         names.add(l)
>>>     collection = resources[1]
>>>     for r in collection:
>>>         names.add(r.name)  # 这里可以通过字段名或者偏移来取
>>>     def h(x):
>>>         if x in names:
>>>             return True
>>>         else:
>>>             return False
>>>     return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>>         df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
              name   isin
0      Iris-setosa   True
1  Iris-versicolor   True
2   Iris-virginica  False

使用第三方纯Python库

现在用户可以把第三方Python包作为资源上传到ODPS,支持的格式有whl、egg、zip以及tar.gz。 在全局或者在立即执行的方法时,指定需要使用的包文件。即可以在自定义函数中使用第三方库。

值得注意的是,第三方库的依赖库,也必须指定,否则依然会有导入错误。

下面我们会以 python-dateutil 这个包作为例子。

首先,我们可以使用pip download命令,下载包以及其依赖到某个路径。 这里下载后会出现两个包:six-1.10.0-py2.py3-none-any.whl和python_dateutil-2.5.3-py2.py3-none-any.whl (这里注意需要下载支持linux环境的包)

$ pip download python-dateutil -d /to/path/

然后我们分别把两个文件上传到ODPS资源

>>> # 这里要确保资源名的后缀是正确的文件类型
>>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl'))
>>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl'))

现在我们有个DataFrame,只有一个string类型字段。

>>> df
               datestr
0  2016-08-26 14:03:29
1  2015-08-26 14:03:29

全局配置使用到的三方库:

>>> from odps import options
>>>
>>> def get_year(t):
>>>     from dateutil.parser import parse
>>>     return parse(t).strftime('%Y')
>>>
>>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
>>> df.datestr.map(get_year)
   datestr
0     2016
1     2015

或者,通过立即运行方法的 libraries 参数指定:

>>> def get_year(t):
>>>     from dateutil.parser import parse
>>>     return parse(t).strftime('%Y')
>>>
>>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
   datestr
0     2016
1     2015

使用计数器

from odps.udf import get_execution_context

def h(x):
    ctx = get_execution_context()
    counters = ctx.get_counters()
    counters.get_counter('df', 'add_one').increment(1)
    return x + 1

df.field.map(h)

logview 的 JSONSummary 中即可找到计数器值。

要想调用ODPS上的无参或者常数参的内建函数,我们可以使用 BuiltinFunction 类来完成。

>>> from odps.df import BuiltinFunction
>>>
>>> iris[iris.name, BuiltinFunction('rand', rtype='float').rename('rand')][:4]
>>> iris[iris.name, BuiltinFunction('rand', rtype='float', args=(10, )).rename('rand')][:4]