列运算
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
lens = DataFrame(o.get_table('pyodps_ml_100k_lens'))
对于一个Sequence来说,对它加上一个常量、或者执行sin函数的这类操作时,是作用于每个元素上的。接下来会详细说明。
NULL相关(isnull,notnull,fillna)
DataFrame API提供了几个和NULL相关的内置函数,比如isnull来判断是否某字段是NULL,notnull则相反,fillna是将NULL填充为用户指定的值。
>>> iris.sepallength.isnull().head(5)
sepallength
0 False
1 False
2 False
3 False
4 False
逻辑判断(ifelse,switch)
ifelse
作用于boolean类型的字段,当条件成立时,返回第0个参数,否则返回第1个参数。
>>> (iris.sepallength > 5).ifelse('gt5', 'lte5').rename('cmp5').head(5)
cmp5
0 gt5
1 lte5
2 lte5
3 lte5
4 lte5
switch用于多条件判断的情况。
>>> iris.sepallength.switch(4.9, 'eq4.9', 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
equalness
0 noeq
1 eq4.9
2 noeq
3 noeq
4 eq5.0
>>> from odps.df import switch
>>> switch(iris.sepallength == 4.9, 'eq4.9', iris.sepallength == 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
equalness
0 noeq
1 eq4.9
2 noeq
3 noeq
4 eq5.0
PyODPS 0.7.8 以上版本支持根据条件修改数据集某一列的一部分值,写法为:
>>> iris[iris.sepallength > 5, 'cmp5'] = 'gt5'
>>> iris[iris.sepallength <= 5, 'cmp5'] = 'lte5'
>>> iris.head(5)
cmp5
0 gt5
1 lte5
2 lte5
3 lte5
4 lte5
数学运算
对于数字类型的字段,支持+,-,*,/等操作,也支持log、sin等数学计算。
>>> (iris.sepallength * 10).log().head(5)
sepallength
0 3.931826
1 3.891820
2 3.850148
3 3.828641
4 3.912023
>>> fields = [iris.sepallength,
>>> (iris.sepallength / 2).rename('sepallength除以2'),
>>> (iris.sepallength ** 2).rename('sepallength的平方')]
>>> iris[fields].head(5)
sepallength sepallength除以2 sepallength的平方
0 5.1 2.55 26.01
1 4.9 2.45 24.01
2 4.7 2.35 22.09
3 4.6 2.30 21.16
4 5.0 2.50 25.00
算术运算支持的操作包括:
算术操作 |
说明 |
---|---|
abs |
绝对值 |
sqrt |
平方根 |
sin |
|
sinh |
|
cos |
|
cosh |
|
tan |
|
tanh |
|
arccos |
|
arccosh |
|
arcsin |
|
arcsinh |
|
arctan |
|
arctanh |
|
exp |
指数函数 |
expm1 |
指数减1 |
log |
传入参数表示底是几 |
log2 |
|
log10 |
|
log1p |
log(1+x) |
radians |
给定角度计算弧度 |
degrees |
给定弧度计算角度 |
ceil |
不小于输入值的最小整数 |
floor |
向下取整,返回比输入值小的整数值 |
trunc |
将输入值截取到指定小数点位置 |
对于sequence,也支持其于其他sequence或者scalar的比较。
>>> (iris.sepallength < 5).head(5)
sepallength
0 False
1 True
2 True
3 True
4 False
值得主意的是,DataFrame
API不支持连续操作,比如3 <= iris.sepallength <= 5
,但是提供了between这个函数来进行是否在某个区间的判断。
>>> (iris.sepallength.between(3, 5)).head(5)
sepallength
0 False
1 True
2 True
3 True
4 True
默认情况下,between包含两边的区间,如果计算开区间,则需要设inclusive=False。
>>> (iris.sepallength.between(3, 5, inclusive=False)).head(5)
sepallength
0 False
1 True
2 True
3 True
4 False
String 相关操作
DataFrame API提供了一系列针对string类型的Sequence或者Scalar的操作。
>>> fields = [
>>> iris.name.upper().rename('upper_name'),
>>> iris.name.extract('Iris(.*)', group=1)
>>> ]
>>> iris[fields].head(5)
upper_name name
0 IRIS-SETOSA -setosa
1 IRIS-SETOSA -setosa
2 IRIS-SETOSA -setosa
3 IRIS-SETOSA -setosa
4 IRIS-SETOSA -setosa
string相关操作包括:
string 操作 |
说明 |
---|---|
capitalize |
|
contains |
包含某个字符串,如果 regex 参数为 True,则是包含某个正则表达式,默认为 True |
count |
指定字符串出现的次数 |
endswith |
以某个字符串结尾 |
startswith |
以某个字符串开头 |
extract |
抽取出某个正则表达式,如果 group 不指定,则返回满足整个 pattern 的子串;否则,返回第几个 group |
find |
返回第一次出现的子串位置,若不存在则返回-1 |
rfind |
从右查找返回子串第一次出现的位置,不存在则返回-1 |
replace |
将某个 pattern 的子串全部替换成另一个子串, |
get |
返回某个位置上的字符串 |
len |
返回字符串的长度 |
ljust |
若未达到指定的 |
rjust |
若未达到指定的 |
lower |
变为全部小写 |
upper |
变为全部大写 |
lstrip |
在左侧删除空格(包括空行符) |
rstrip |
在右侧删除空格(包括空行符) |
strip |
在左右两侧删除空格(包括空行符) |
split |
将字符串按分隔符拆分为若干个字符串(返回 list<string> 类型) |
pad |
在指定的位置(left,right 或者 both)用指定填充字符(用 |
repeat |
重复指定 |
slice |
切片操作 |
swapcase |
对调大小写 |
title |
同 str.title |
zfill |
长度没达到指定 |
isalnum |
同 str.isalnum |
isalpha |
同 str.isalpha |
isdigit |
是否都是数字,同 str.isdigit |
isspace |
是否都是空格,同 str.isspace |
islower |
是否都是小写,同 str.islower |
isupper |
是否都是大写,同 str.isupper |
istitle |
同 str.istitle |
isnumeric |
同 str.isnumeric |
isdecimal |
同 str.isdecimal |
todict |
将字符串按分隔符拆分为一个 dict,传入的两个参数分别为项目分隔符和 Key-Value 分隔符(返回 dict<string, string> 类型) |
strptime |
按格式化读取成时间,时间格式和Python标准库相同,详细参考 Python 时间格式化 |
时间相关操作
对于datetime类型Sequence或者Scalar,可以调用时间相关的内置函数。
>>> df = lens[[lens.unix_timestamp.astype('datetime').rename('dt')]]
>>> df[df.dt,
>>> df.dt.year.rename('year'),
>>> df.dt.month.rename('month'),
>>> df.dt.day.rename('day'),
>>> df.dt.hour.rename('hour')].head(5)
dt year month day hour
0 1998-04-08 11:02:00 1998 4 8 11
1 1998-04-08 10:57:55 1998 4 8 10
2 1998-04-08 10:45:26 1998 4 8 10
3 1998-04-08 10:25:52 1998 4 8 10
4 1998-04-08 10:44:19 1998 4 8 10
与时间相关的属性包括:
时间相关属性 |
说明 |
---|---|
year |
|
month |
|
day |
|
hour |
|
minute |
|
second |
|
weekofyear |
返回日期位于那一年的第几周。周一作为一周的第一天 |
weekday |
返回日期当前周的第几天 |
dayofweek |
同 weekday |
strftime |
格式化时间,时间格式和 Python 标准库相同,详细参考 Python 时间格式化 |
PyODPS 也支持时间的加减操作,比如可以通过以下方法得到前3天的日期。两个日期列相减得到相差的毫秒数。
>>> df
a b
0 2016-12-06 16:43:12.460001 2016-12-06 17:43:12.460018
1 2016-12-06 16:43:12.460012 2016-12-06 17:43:12.460021
2 2016-12-06 16:43:12.460015 2016-12-06 17:43:12.460022
>>> from odps.df import day
>>> df.a - day(3)
a
0 2016-12-03 16:43:12.460001
1 2016-12-03 16:43:12.460012
2 2016-12-03 16:43:12.460015
>>> (df.b - df.a).dtype
int64
>>> (df.b - df.a).rename('a')
a
0 3600000
1 3600000
2 3600000
支持的时间类型包括:
属性 |
说明 |
---|---|
year |
|
month |
|
day |
|
hour |
|
minute |
|
second |
|
millisecond |
集合类型相关操作
PyODPS 支持的集合类型有 List 和 Dict。这两个类型都可以使用下标获取集合中的某个项目,另有 len 方法,可获得集合的大小。
同时,两种集合均有 explode 方法,用于展开集合中的内容。对于 List,explode 默认返回一列,当传入参数 pos 时, 将返回两列,其中一列为值在数组中的编号(类似 Python 的 enumerate 函数)。对于 Dict,explode 会返回两列, 分别表示 keys 及 values。explode 中也可以传入列名,作为最后生成的列。
示例如下:
>>> df
id a b
0 1 [a1, b1] {'a2': 0, 'b2': 1, 'c2': 2}
1 2 [c1] {'d2': 3, 'e2': 4}
>>> df[df.id, df.a[0], df.b['b2']]
id a b
0 1 a1 1
1 2 c1 NaN
>>> df[df.id, df.a.len(), df.b.len()]
id a b
0 1 2 3
1 2 1 2
>>> df.a.explode()
a
0 a1
1 b1
2 c1
>>> df.a.explode(pos=True)
a_pos a
0 0 a1
1 1 b1
2 0 c1
>>> # 指定列名
>>> df.a.explode(['pos', 'value'], pos=True)
pos value
0 0 a1
1 1 b1
2 0 c1
>>> df.b.explode()
b_key b_value
0 a2 0
1 b2 1
2 c2 2
3 d2 3
4 e2 4
>>> # 指定列名
>>> df.b.explode(['key', 'value'])
key value
0 a2 0
1 b2 1
2 c2 2
3 d2 3
4 e2 4
explode 也可以和 并列多行输出 结合,以将原有列和 explode 的结果相结合,例子如下:
>>> df[df.id, df.a.explode()]
id a
0 1 a1
1 1 b1
2 2 c1
>>> df[df.id, df.a.explode(), df.b.explode()]
id a b_key b_value
0 1 a1 a2 0
1 1 a1 b2 1
2 1 a1 c2 2
3 1 b1 a2 0
4 1 b1 b2 1
5 1 b1 c2 2
6 2 c1 d2 3
7 2 c1 e2 4
除了下标、len 和 explode 两个共有方法以外,List 还支持下列方法:
list 操作 |
说明 |
---|---|
contains(v) |
列表是否包含某个元素 |
sort |
返回排序后的列表(返回值为 List) |
Dict 还支持下列方法:
dict 操作 |
说明 |
---|---|
keys |
获取 Dict keys(返回值为 List) |
values |
获取 Dict values(返回值为 List) |
其他元素操作(isin,notin,cut)
isin
给出Sequence里的元素是否在某个集合元素里。notin
是相反动作。
>>> iris.sepallength.isin([4.9, 5.1]).rename('sepallength').head(5)
sepallength
0 True
1 True
2 False
3 False
4 False
cut提供离散化的操作,可以将Sequence的数据拆成几个区段。
>>> iris.sepallength.cut(range(6), labels=['0-1', '1-2', '2-3', '3-4', '4-5']).rename('sepallength_cut').head(5)
sepallength_cut
0 None
1 4-5
2 4-5
3 4-5
4 4-5
include_under
和include_over
可以分别包括向下和向上的区间。
>>> labels = ['0-1', '1-2', '2-3', '3-4', '4-5', '5-']
>>> iris.sepallength.cut(range(6), labels=labels, include_over=True).rename('sepallength_cut').head(5)
sepallength_cut
0 5-
1 4-5
2 4-5
3 4-5
4 4-5
使用自定义函数
DataFrame函数支持对Sequence使用map,它会对它的每个元素调用自定义函数。比如:
>>> iris.sepallength.map(lambda x: x + 1).head(5)
sepallength
0 6.1
1 5.9
2 5.7
3 5.6
4 6.0
警告
目前,受限于 Python UDF,自定义函数无法支持将 list / dict 类型作为输入或输出。
如果map前后,Sequence的类型发生了变化,则需要显式指定map后的类型。
>>> iris.sepallength.map(lambda x: 't' + str(x), rtype='string').head(5)
sepallength
0 t5.1
1 t4.9
2 t4.7
3 t4.6
4 t5.0
如果在函数中包含闭包,需要注意的是,函数外闭包变量值的变化会引起函数内该变量值的变化。例如,
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(lambda x: x + i))
结果为 dfs 中每个 SequenceExpr 均为 df.sepal_length + 9
。为解决此问题,可以将函数作为另一函数的返回值,或者使用
partial,如
>>> dfs = []
>>> def get_mapper(i):
>>> return lambda x: x + i
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(get_mapper(i)))
或
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))
map也支持使用现有的UDF函数,传入的参数是str类型(函数名)或者 Function对象 。
map传入Python函数的实现使用了ODPS Python UDF,因此,如果用户所在的Project不支持Python UDF,则map函数无法使用。除此以外,所有 Python UDF 的限制在此都适用。
目前,第三方库(包含C)只能使用numpy
,第三方库使用参考 使用第三方Python库。
警告
由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from
)时,代码在使用 Python 2.7 的 ODPS
Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常
执行。
警告
由于 PyODPS DataFrame 默认 Collection / Sequence 等对象均为分布式对象,故不支持在自定义函数内部引用这些对象。 请考虑改用 Join 等方法 引用多个 DataFrame 的数据,或者引用 Collection 作为资源,如下文所述。
引用资源
自定义函数也能读取ODPS上的资源(表资源或文件资源),或者引用一个collection作为资源。 此时,自定义函数需要写成函数闭包或callable的类。
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
sepallength
0 Iris-setosa
1 Iris-versicolor
>>> def myfunc(resources): # resources按调用顺序传入
>>> names = set()
>>> fileobj = resources[0] # 文件资源是一个file-like的object
>>> for l in fileobj:
>>> names.add(l)
>>> collection = resources[1]
>>> for r in collection:
>>> names.add(r.name) # 这里可以通过字段名或者偏移来取
>>> def h(x):
>>> if x in names:
>>> return True
>>> else:
>>> return False
>>> return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>> df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
name isin
0 Iris-setosa True
1 Iris-versicolor True
2 Iris-virginica False
注:分区表资源在读取时不包含分区字段。
使用第三方Python库
现在用户可以把第三方 Wheel 包作为资源上传到 MaxCompute。在全局或者在立即执行的方法时,指定需要使用的包文件, 即可以在自定义函数中使用第三方库。值得注意的是,第三方库的依赖库也必须指定,否则依然会有导入错误。
如果你的 MaxCompute 服务支持在执行 SQL 时使用镜像,可以在 execute / persist / to_pandas 方法指定 image
参数以使用镜像。与此同时,DataFrame 的 execute / persist / to_pandas 方法支持增加 libraries
参数以将资源作为三方包。
PyODPS 提供了 pyodps-pack
工具,可在安装完 PyODPS 后打包三方包及其依赖。如何制作及使用三方包的说明请参考
这里。
警告
由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from
)时,代码在使用 Python 2.7 的 ODPS
Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常
执行。
使用计数器
from odps.udf import get_execution_context
def h(x):
ctx = get_execution_context()
counters = ctx.get_counters()
counters.get_counter('df', 'add_one').increment(1)
return x + 1
df.field.map(h)
logview 的 JSONSummary 中即可找到计数器值。
调用ODPS内建或者已定义函数
要想调用 ODPS 上的内建或者已定义函数,来生成列,我们可以使用 func
接口,该接口默认函数返回值为 String,
可以用 rtype 参数指定返回值。
>>> from odps.df import func
>>>
>>> iris[iris.name, func.rand(rtype='float').rename('rand')][:4]
>>> iris[iris.name, func.rand(10, rtype='float').rename('rand')][:4]
>>> # 调用 ODPS 上定义的 UDF,列名无法确定时需要手动指定
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float').rename('new_col')]
>>> # 从其他 Project 调用 UDF,也可通过 name 参数指定列名
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float', project='udf_project',
>>> name='new_col')]
备注
注意:在使用 Pandas 后端时,不支持执行带有 func
的表达式。