基本概念

在使用 DataFrame 时,你需要了解三个对象上的操作:Collection(DataFrame) ,SequenceScalar。 这三个对象分别表示表结构(或者二维结构)、列(一维结构)、标量。需要注意的是,这些对象仅在使用 Pandas 数据创建后会包含实际数据, 而在 ODPS 表上创建的对象中并不包含实际的数据,而仅仅包含对这些数据的操作,实质的存储和计算会在 ODPS 中进行。

创建 DataFrame

通常情况下,你唯一需要直接创建的 Collection 对象是 DataFrame,这一对象用于引用数据源,可能是一个 ODPS 表, ODPS 分区,Pandas DataFrame或sqlalchemy.Table(数据库表)。 使用这几种数据源时,相关的操作相同,这意味着你可以不更改数据处理的代码,仅仅修改输入/输出的指向, 便可以简单地将小数据量上本地测试运行的代码迁移到 ODPS 上,而迁移的正确性由 PyODPS 来保证。

创建 DataFrame 非常简单,只需将 Table 对象、 pandas DataFrame 对象或者 sqlalchemy Table 对象传入即可。

>>> from odps.df import DataFrame
>>>
>>> # 从 ODPS 表创建
>>> iris = DataFrame(o.get_table('pyodps_iris'))
>>> iris2 = o.get_table('pyodps_iris').to_df()  # 使用表的to_df方法
>>>
>>> # 从 ODPS 分区创建
>>> pt_df = DataFrame(o.get_table('partitioned_table').get_partition('pt=20171111'))
>>> pt_df2 = o.get_table('partitioned_table').get_partition('pt=20171111').to_df()  # 使用分区的to_df方法
>>>
>>> # 从 Pandas DataFrame 创建
>>> import pandas as pd
>>> import numpy as np
>>> df = DataFrame(pd.DataFrame(np.arange(9).reshape(3, 3), columns=list('abc')))
>>>
>>> # 从 sqlalchemy Table 创建
>>> engine = sqlalchemy.create_engine('mysql://root:123456@localhost/movielens')
>>> metadata = sqlalchemy.MetaData(bind=engine) # 需要绑定到engine
>>> table = sqlalchemy.Table('top_users', metadata, extend_existing=True, autoload=True)
>>> users = DataFrame(table)

在用 pandas DataFrame 初始化时,对于 numpy object 类型或者 string 类型,PyODPS DataFrame 会尝试推断类型, 如果一整列都为空,则会报错。这时,用户可以指定 unknown_as_string 为True,会将这些列指定为string类型。 用户也可以指定 as_type 参数。若类型为基本类型,会在创建 PyODPS DataFrame 时进行强制类型转换。 如果 Pandas DataFrame 中包含 list 或者 dict 列,该列的类型不会被推断,必须手动使用 as_type 指定。 as_type 参数类型必须是dict。

>>> df2 = DataFrame(df, unknown_as_string=True, as_type={'null_col2': 'float'})
>>> df2.dtypes
odps.Schema {
  sepallength           float64
  sepalwidth            float64
  petallength           float64
  petalwidth            float64
  name                  string
  null_col1             string   # 无法识别,通过unknown_as_string设置成string类型
  null_col2             float64  # 强制转换成float类型
}
>>> df4 = DataFrame(df3, as_type={'list_col': 'list<int64>'})
>>> df4.dtypes
odps.Schema {
  id        int64
  list_col  list<int64>  # 无法识别且无法自动转换,通过 as_type 设置
}

Sequence

SequenceExpr 代表了二维数据集中的一列。你不应当手动创建 SequenceExpr,而应当从一个 Collection 中获取。

获取列

你可以使用 collection.column_name 取出一列,例如

>>> iris.sepallength.head(5)
   sepallength
0          5.1
1          4.9
2          4.7
3          4.6
4          5.0

如果列名存储在一个字符串变量中,除了使用 getattr(df, ‘column_name’) 达到相同的效果外,也可以使用 df[column_name] 的形式,例如

>>> iris['sepallength'].head(5)
   sepallength
0          5.1
1          4.9
2          4.7
3          4.6
4          5.0

列类型

DataFrame包括自己的类型系统,在使用Table初始化的时候,ODPS的类型会被进行转换。这样做的好处是,能支持更多的计算后端。 目前,DataFrame的执行后端支持ODPS SQL、pandas以及数据库(MySQL和Postgres)。

PyODPS DataFrame 包括以下类型:

int8int16int32int64float32float64booleanstringdecimaldatetimelistdict

ODPS的字段和DataFrame的类型映射关系如下:

ODPS类型

DataFrame类型

bigint

int64

double

float64

string

string

datetime

datetime

boolean

boolean

decimal

decimal

array<value_type>

list<value_type>

map<key_type, value_type>

dict<key_type, value_type>

list 和 dict 必须填写其包含值的类型,否则会报错。目前 DataFrame 暂不支持 MaxCompute 2.0 中新增的 Timestamp 及 Struct 类型,未来的版本会支持。

在 Sequence 中可以通过 sequence.dtype 获取数据类型:

>>> iris.sepallength.dtype
float64

如果要修改一列的类型,可以使用 astype 方法。该方法输入一个类型,并返回类型转换后的 Sequence。例如,

>>> iris.sepallength.astype('int')
   sepallength
0            5
1            4
2            4
3            4
4            5

列名

在 DataFrame 的计算过程中,一个 Sequence 必须要有列名。在很多情况下,DataFrame 会起一个名字。比如:

>>> iris.groupby('name').sepalwidth.max()
   sepalwidth_max
0             4.4
1             3.4
2             3.8

可以看到,sepalwidth取最大值后被命名为sepalwidth_max。还有一些操作,比如一个 Sequence 做加法,加上一个 Scalar,这时,会被命名为这个 Sequence 的名字。其它情况下,需要用户去自己命名。

Sequence 提供 rename 方法为一列设置名字,用法示例如下:

>>> iris.sepalwidth.rename('sepal_width').head(5)
   sepal_width
0          3.5
1          3.0
2          3.2
3          3.1
4          3.6

需要注意的是,rename 操作并不是就地重命名。如果要在 Collection 上应用新的列名,需要重新做列选择,例如

>>> iris[iris.exclude("sepalwidth"), iris.sepalwidth.rename('sepal_width')].head(5)
   sepallength  petallength  petalwidth         name  sepal_width
0          5.1          1.4         0.2  Iris-setosa          3.5
1          4.9          1.4         0.2  Iris-setosa          3.0
2          4.7          1.3         0.2  Iris-setosa          3.2
3          4.6          1.5         0.2  Iris-setosa          3.1
4          5.0          1.4         0.2  Iris-setosa          3.6

简单的列变换

你可以对一个 Sequence 进行运算,返回一个新的 Sequence,正如对简单的 Python 变量进行运算一样。对数值列, Sequence 支持四则运算,而对字符串则支持字符串相加等操作。例如,

>>> (iris.sepallength + 5).head(5)
   sepallength
0         10.1
1          9.9
2          9.7
3          9.6
4         10.0

>>> (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
   sum_sepal
0        8.6
1        7.9
2        7.9
3        7.7
4        8.6

注意到两列参与运算,因而 PyODPS 无法确定最终显示的列名,需要手动指定。详细的列变换说明,请参见 列运算

Collection

DataFrame 中所有二维数据集上的操作都属于 CollectionExpr,可视为一张 ODPS 表或一张电子表单,DataFrame 对象也是 CollectionExpr 的特例。CollectionExpr 中包含针对二维数据集的列操作、筛选、变换等大量操作。

获取类型

dtypes可以用来获取 CollectionExpr 中所有列的类型。dtypes 返回的是 Schema类型

>>> iris.dtypes
odps.Schema {
  sepallength           float64
  sepalwidth            float64
  petallength           float64
  petalwidth            float64
  name                  string
}

列选择和增删

如果要从一个 CollectionExpr 中选取部分列,产生新的数据集,可以使用 expr[columns] 语法。例如,

>>> iris['name', 'sepallength'].head(5)
          name  sepallength
0  Iris-setosa          5.1
1  Iris-setosa          4.9
2  Iris-setosa          4.7
3  Iris-setosa          4.6
4  Iris-setosa          5.0

备注

注意:如果需要选择的列只有一列,需要在 columns 后加上逗号或者显示标记为列表,例如 df[df.sepal_length, ] 或 df[[df.sepal_length]],否则返回的将是一个 Sequence 对象,而不是 Collection。

如果想要在新的数据集中排除已有数据集的某些列,可使用 exclude 方法:

>>> iris.exclude('sepallength', 'petallength')[:5]
   sepalwidth  petalwidth         name
0         3.5         0.2  Iris-setosa
1         3.0         0.2  Iris-setosa
2         3.2         0.2  Iris-setosa
3         3.1         0.2  Iris-setosa
4         3.6         0.2  Iris-setosa

0.7.2 以后的 PyODPS 支持另一种写法,即在数据集上直接排除相应的列:

>>> del iris['sepallength']
>>> del iris['petallength']
>>> iris[:5]
   sepalwidth  petalwidth         name
0         3.5         0.2  Iris-setosa
1         3.0         0.2  Iris-setosa
2         3.2         0.2  Iris-setosa
3         3.1         0.2  Iris-setosa
4         3.6         0.2  Iris-setosa

如果我们需要在已有 collection 中添加某一列变换的结果,也可以使用 expr[expr, new_sequence] 语法, 新增的列会作为新 collection 的一部分。

下面的例子将 iris 中的 sepalwidth 列加一后重命名为 sepalwidthplus1 并追加到数据集末尾,形成新的数据集:

>>> iris[iris, (iris.sepalwidth + 1).rename('sepalwidthplus1')].head(5)
   sepallength  sepalwidth  petallength  petalwidth         name  \
0          5.1         3.5          1.4         0.2  Iris-setosa
1          4.9         3.0          1.4         0.2  Iris-setosa
2          4.7         3.2          1.3         0.2  Iris-setosa
3          4.6         3.1          1.5         0.2  Iris-setosa
4          5.0         3.6          1.4         0.2  Iris-setosa

   sepalwidthplus1
0              4.5
1              4.0
2              4.2
3              4.1
4              4.6

使用 df[df, new_sequence] 需要注意的是,变换后的列名与原列名可能相同,如果需要与原 collection 合并, 请将该列重命名。

0.7.2 以后版本的 PyODPS 支持直接在当前数据集中追加,写法为

>>> iris['sepalwidthplus1'] = iris.sepalwidth + 1
>>> iris.head(5)
   sepallength  sepalwidth  petallength  petalwidth         name  \
0          5.1         3.5          1.4         0.2  Iris-setosa
1          4.9         3.0          1.4         0.2  Iris-setosa
2          4.7         3.2          1.3         0.2  Iris-setosa
3          4.6         3.1          1.5         0.2  Iris-setosa
4          5.0         3.6          1.4         0.2  Iris-setosa

   sepalwidthplus1
0              4.5
1              4.0
2              4.2
3              4.1
4              4.6

我们也可以先将原列通过 exclude 方法进行排除,再将变换后的新列并入,而不必担心重名。

>>> iris[iris.exclude('sepalwidth'), iris.sepalwidth * 2].head(5)
   sepallength  petallength  petalwidth         name  sepalwidth
0          5.1          1.4         0.2  Iris-setosa         7.0
1          4.9          1.4         0.2  Iris-setosa         6.0
2          4.7          1.3         0.2  Iris-setosa         6.4
3          4.6          1.5         0.2  Iris-setosa         6.2
4          5.0          1.4         0.2  Iris-setosa         7.2

对于 0.7.2 以后版本的 PyODPS,如果想在当前数据集上直接覆盖,则可以写

>>> iris['sepalwidth'] = iris.sepalwidth * 2
>>> iris.head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.1         7.0          1.4         0.2  Iris-setosa
1          4.9         6.0          1.4         0.2  Iris-setosa
2          4.7         6.4          1.3         0.2  Iris-setosa
3          4.6         6.2          1.5         0.2  Iris-setosa
4          5.0         7.2          1.4         0.2  Iris-setosa

增删列以创建新 collection 的另一种方法是调用 select 方法,将需要选择的列作为参数输入。如果需要重命名,使用 keyword 参数输入,并将新的列名作为参数名即可。

>>> iris.select('name', sepalwidthminus1=iris.sepalwidth - 1).head(5)
          name  sepalwidthminus1
0  Iris-setosa               2.5
1  Iris-setosa               2.0
2  Iris-setosa               2.2
3  Iris-setosa               2.1
4  Iris-setosa               2.6

此外,我们也可以传入一个 lambda 表达式,它接收一个参数,接收上一步的结果。在执行时,PyODPS 会检查这些 lambda 表达式,传入上一步生成的 collection 并将其替换为正确的列。

>>> iris['name', 'petallength'][[lambda x: x.name]].head(5)
          name
0  Iris-setosa
1  Iris-setosa
2  Iris-setosa
3  Iris-setosa
4  Iris-setosa

此外,在 0.7.2 以后版本的 PyODPS 中,支持对数据进行条件赋值,例如

>>> iris[iris.sepallength > 5.0, 'sepalwidth'] = iris.sepalwidth * 2
>>> iris.head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.1        14.0          1.4         0.2  Iris-setosa
1          4.9         6.0          1.4         0.2  Iris-setosa
2          4.7         6.4          1.3         0.2  Iris-setosa
3          4.6         6.2          1.5         0.2  Iris-setosa
4          5.0         7.2          1.4         0.2  Iris-setosa

引入常数和随机数

DataFrame 支持在 collection 中追加一列常数。追加常数需要使用 Scalar,引入时需要手动指定列名,如

>>> from odps.df import Scalar
>>> iris[iris, Scalar(1).rename('id')][:5]
   sepallength  sepalwidth  petallength  petalwidth         name  id
0          5.1         3.5          1.4         0.2  Iris-setosa   1
1          4.9         3.0          1.4         0.2  Iris-setosa   1
2          4.7         3.2          1.3         0.2  Iris-setosa   1
3          4.6         3.1          1.5         0.2  Iris-setosa   1
4          5.0         3.6          1.4         0.2  Iris-setosa   1

如果需要指定一个空值列,可以使用 NullScalar,需要提供字段类型。

>>> from odps.df import NullScalar
>>> iris[iris, NullScalar('float').rename('fid')][:5]
   sepal_length  sepal_width  petal_length  petal_width     category   fid
0           5.1          3.5           1.4          0.2  Iris-setosa  None
1           4.9          3.0           1.4          0.2  Iris-setosa  None
2           4.7          3.2           1.3          0.2  Iris-setosa  None
3           4.6          3.1           1.5          0.2  Iris-setosa  None
4           5.0          3.6           1.4          0.2  Iris-setosa  None

在 PyODPS 0.7.12 及以后版本中,引入了简化写法:

>>> iris['id'] = 1
>>> iris
   sepallength  sepalwidth  petallength  petalwidth         name  id
0          5.1         3.5          1.4         0.2  Iris-setosa   1
1          4.9         3.0          1.4         0.2  Iris-setosa   1
2          4.7         3.2          1.3         0.2  Iris-setosa   1
3          4.6         3.1          1.5         0.2  Iris-setosa   1
4          5.0         3.6          1.4         0.2  Iris-setosa   1

需要注意的是,这种写法无法自动识别空值的类型,所以在增加空值列时,仍然要使用

>>> iris['null_col'] = NullScalar('float')
>>> iris
   sepallength  sepalwidth  petallength  petalwidth         name  null_col
0          5.1         3.5          1.4         0.2  Iris-setosa      None
1          4.9         3.0          1.4         0.2  Iris-setosa      None
2          4.7         3.2          1.3         0.2  Iris-setosa      None
3          4.6         3.1          1.5         0.2  Iris-setosa      None
4          5.0         3.6          1.4         0.2  Iris-setosa      None

DataFrame 也支持在 collection 中增加一列随机数列,该列类型为 float,范围为 0 - 1,每行数值均不同。 追加随机数列需要使用 RandomScalar,参数为随机数种子,可省略。

>>> from odps.df import RandomScalar
>>> iris[iris, RandomScalar().rename('rand_val')][:5]
   sepallength  sepalwidth  petallength  petalwidth         name  rand_val
0          5.1         3.5          1.4         0.2  Iris-setosa  0.000471
1          4.9         3.0          1.4         0.2  Iris-setosa  0.799520
2          4.7         3.2          1.3         0.2  Iris-setosa  0.834609
3          4.6         3.1          1.5         0.2  Iris-setosa  0.106921
4          5.0         3.6          1.4         0.2  Iris-setosa  0.763442

过滤数据

Collection 提供了数据过滤的功能,

我们试着查询sepallength大于5的几条数据。

>>> iris[iris.sepallength > 5].head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.1         3.5          1.4         0.2  Iris-setosa
1          5.4         3.9          1.7         0.4  Iris-setosa
2          5.4         3.7          1.5         0.2  Iris-setosa
3          5.8         4.0          1.2         0.2  Iris-setosa
4          5.7         4.4          1.5         0.4  Iris-setosa

多个查询条件:

>>> iris[(iris.sepallength < 5) & (iris['petallength'] > 1.5)].head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          4.8         3.4          1.6         0.2      Iris-setosa
1          4.8         3.4          1.9         0.2      Iris-setosa
2          4.7         3.2          1.6         0.2      Iris-setosa
3          4.8         3.1          1.6         0.2      Iris-setosa
4          4.9         2.4          3.3         1.0  Iris-versicolor

或条件:

>>> iris[(iris.sepalwidth < 2.5) | (iris.sepalwidth > 4)].head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.7         4.4          1.5         0.4      Iris-setosa
1          5.2         4.1          1.5         0.1      Iris-setosa
2          5.5         4.2          1.4         0.2      Iris-setosa
3          4.5         2.3          1.3         0.3      Iris-setosa
4          5.5         2.3          4.0         1.3  Iris-versicolor

备注

记住,与和或条件必须使用&和|,不能使用and和or。

非条件:

>>> iris[~(iris.sepalwidth > 3)].head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          4.9         3.0          1.4         0.2  Iris-setosa
1          4.4         2.9          1.4         0.2  Iris-setosa
2          4.8         3.0          1.4         0.1  Iris-setosa
3          4.3         3.0          1.1         0.1  Iris-setosa
4          5.0         3.0          1.6         0.2  Iris-setosa

我们也可以显式调用filter方法,提供多个与条件

>>> iris.filter(iris.sepalwidth > 3.5, iris.sepalwidth < 4).head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.0         3.6          1.4         0.2  Iris-setosa
1          5.4         3.9          1.7         0.4  Iris-setosa
2          5.4         3.7          1.5         0.2  Iris-setosa
3          5.4         3.9          1.3         0.4  Iris-setosa
4          5.7         3.8          1.7         0.3  Iris-setosa

同样对于连续的操作,我们可以使用lambda表达式

>>> iris[iris.sepalwidth > 3.8]['name', lambda x: x.sepallength + 1]
          name  sepallength
0  Iris-setosa          6.4
1  Iris-setosa          6.8
2  Iris-setosa          6.7
3  Iris-setosa          6.4
4  Iris-setosa          6.2
5  Iris-setosa          6.5

对于Collection,如果它包含一个列是boolean类型,则可以直接使用该列作为过滤条件。

>>> df.dtypes
odps.Schema {
  a boolean
  b int64
}
>>> df[df.a]
      a  b
0  True  1
1  True  3

因此,记住对Collection取单个squence的操作时,只有boolean列是合法的,即对Collection作过滤操作。

>>> df[df.a, ]       # 取列操作
>>> df[[df.a]]       # 取列操作
>>> df.select(df.a)  # 显式取列
>>> df[df.a]         # a列是boolean列,执行过滤操作
>>> df.a             # 取单列
>>> df['a']          # 取单列

同时,我们也支持Pandas中的query方法,用查询语句来做数据的筛选,在表达式中直接使用列名如sepallength进行操作, 另外在查询语句中&and都表示与操作,|or都表示或操作。

>>> iris.query("(sepallength < 5) and (petallength > 1.5)").head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          4.8         3.4          1.6         0.2      Iris-setosa
1          4.8         3.4          1.9         0.2      Iris-setosa
2          4.7         3.2          1.6         0.2      Iris-setosa
3          4.8         3.1          1.6         0.2      Iris-setosa
4          4.9         2.4          3.3         1.0  Iris-versicolor

当表达式中需要使用到本地变量时,需要在该变量前加一个@前缀。

>>> var = 4
>>> iris.query("(sepalwidth < 2.5) | (sepalwidth > @var)").head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.7         4.4          1.5         0.4      Iris-setosa
1          5.2         4.1          1.5         0.1      Iris-setosa
2          5.5         4.2          1.4         0.2      Iris-setosa
3          4.5         2.3          1.3         0.3      Iris-setosa
4          5.5         2.3          4.0         1.3  Iris-versicolor

目前query支持的语法包括:

语法

说明

name

没有 @前缀的都当做列名处理,有前缀的会获取本地变量

operator

支持部分运算符:+-*///%**, ==!=<<=>>=innot in

bool

与或非操作,其中 &and表示与,|or表示或

attribute

取对象属性

index, slice, Subscript

切片操作

并列多行输出

对于 list 及 map 类型的列,explode 方法会将该列转换为多行输出。使用 apply 方法也可以输出多行。 为了进行聚合等操作,常常需要将这些输出和原表中的列合并。此时可以使用 DataFrame 提供的并列多行输出功能, 写法为将多行输出函数生成的集合与原集合中的列名一起映射。

并列多行输出的例子如下:

>>> df
   id         a             b
0   1  [a1, b1]  [a2, b2, c2]
1   2      [c1]      [d2, e2]
>>> df[df.id, df.a.explode(), df.b]
   id   a             b
0   1  a1  [a2, b2, c2]
1   1  b1  [a2, b2, c2]
2   2  c1      [d2, e2]
>>> df[df.id, df.a.explode(), df.b.explode()]
   id   a   b
0   1  a1  a2
1   1  a1  b2
2   1  a1  c2
3   1  b1  a2
4   1  b1  b2
5   1  b1  c2
6   2  c1  d2
7   2  c1  e2

如果多行输出方法对某个输入不产生任何输出,默认输入行将不在最终结果中出现。如果需要在结果中出现该行,可以设置 keep_nulls=True。此时,与该行并列的值将输出为空值:

>>> df
   id         a
0   1  [a1, b1]
1   2        []
>>> df[df.id, df.a.explode()]
   id   a
0   1  a1
1   1  b1
>>> df[df.id, df.a.explode(keep_nulls=True)]
   id     a
0   1    a1
1   1    b1
2   2  None

关于 explode 使用并列输出的具体文档可参考 集合类型相关操作,对于 apply 方法使用并列输出的例子可参考 对一行数据使用自定义函数

限制条数

>>> iris[:3]
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.1         3.5          1.4         0.2  Iris-setosa
1          4.9         3.0          1.4         0.2  Iris-setosa
2          4.7         3.2          1.3         0.2  Iris-setosa

值得注意的是,目前切片对于ODPS SQL后端不支持start和step。我们也可以使用limit方法

>>> iris.limit(3)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.1         3.5          1.4         0.2  Iris-setosa
1          4.9         3.0          1.4         0.2  Iris-setosa
2          4.7         3.2          1.3         0.2  Iris-setosa

备注

另外,切片操作只能作用在collection上,不能作用于sequence。

执行

延迟执行

DataFrame上的所有操作并不会立即执行,只有当用户显式调用execute方法,或者一些立即执行的方法时(内部调用的就是execute),才会真正去执行。

这些立即执行的方法包括:

方法

说明

返回值

persist

将执行结果保存到ODPS表

PyODPS DataFrame

execute

执行并返回全部结果

ResultFrame

head

查看开头N行数据,这个方法会执行所有结果,并取开头N行数据

ResultFrame

tail

查看结尾N行数据,这个方法会执行所有结果,并取结尾N行数据

ResultFrame

to_pandas

转化为 Pandas DataFrame 或者 Series,wrap 参数为 True 的时候,返回 PyODPS DataFrame 对象

wrap为True返回PyODPS DataFrame,False(默认)返回pandas DataFrame

plot,hist,boxplot

画图有关

备注

注意:在交互式环境下,PyODPS DataFrame会在打印或者repr的时候,调用execute方法,这样省去了用户手动去调用execute。

>>> iris[iris.sepallength < 5][:5]
   sepallength  sepalwidth  petallength  petalwidth         name
0          4.9         3.0          1.4         0.2  Iris-setosa
1          4.7         3.2          1.3         0.2  Iris-setosa
2          4.6         3.1          1.5         0.2  Iris-setosa
3          4.6         3.4          1.4         0.3  Iris-setosa
4          4.4         2.9          1.4         0.2  Iris-setosa

如果想关闭自动调用执行,则需要手动设置

>>> from odps import options
>>> options.interactive = False
>>>
>>> iris[iris.sepallength < 5][:5]
Collection: ref_0
  odps.Table
    name: odps_test_sqltask_finance.`pyodps_iris`
    schema:
      sepallength           : double
      sepalwidth            : double
      petallength           : double
      petalwidth            : double
      name                  : string

Collection: ref_1
  Filter[collection]
    collection: ref_0
    predicate:
      Less[sequence(boolean)]
        sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
        Scalar[int8]
          5

Slice[collection]
  collection: ref_1
  stop:
    Scalar[int8]
      5

此时打印或者 repr 对象,会显示整棵抽象语法树。如果需要执行,则必须手动调用 execute 方法:

>>> iris[iris.sepallength < 5][:5].execute()
   sepallength  sepalwidth  petallength  petalwidth         name
0          4.9         3.0          1.4         0.2  Iris-setosa
1          4.7         3.2          1.3         0.2  Iris-setosa
2          4.6         3.1          1.5         0.2  Iris-setosa
3          4.6         3.4          1.4         0.3  Iris-setosa
4          4.4         2.9          1.4         0.2  Iris-setosa

读取执行结果

executehead 函数输出的结果为 ResultFrame 类型,可从中读取结果。

备注

ResultFrame 是结果集合,不能参与后续计算。

ResultFrame 可以迭代取出每条记录:

>>> result = iris.head(3)
>>> for r in result:
>>>     print(list(r))
[5.0999999999999996, 3.5, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.9000000000000004, 3.0, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.7000000000000002, 3.2000000000000002, 1.3, 0.20000000000000001, u'Iris-setosa']

ResultFrame 也支持在安装有 pandas 的前提下转换为 pandas DataFrame 或使用 pandas 后端的 PyODPS DataFrame:

>>> pd_df = iris.head(3).to_pandas()  # 返回 pandas DataFrame
>>> wrapped_df = iris.head(3).to_pandas(wrap=True)  # 返回使用 Pandas 后端的 PyODPS DataFrame

关于如何使用 pandas,请参考 pandas 文档 。pandas 为开源库,ODPS 不对其结果负责。

保存执行结果为 ODPS 表

对 Collection,我们可以调用persist方法,参数为表名。返回一个新的DataFrame对象

>>> iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris2')
>>> iris2.head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          4.5         2.3          1.3         0.3      Iris-setosa
1          5.5         2.3          4.0         1.3  Iris-versicolor
2          4.9         2.4          3.3         1.0  Iris-versicolor
3          5.0         2.0          3.5         1.0  Iris-versicolor
4          6.0         2.2          4.0         1.0  Iris-versicolor

persist可以传入 partitions 参数。加入该参数后,会创建一个分区表,它的分区字段为 partitions 列出的字段, DataFrame 中相应字段的值决定该行将被写入的分区。例如,当 partitions 为 [‘name’] 且某行 name 的值为 test, 那么该行将被写入分区 name=test。这适用于当分区需要通过计算获取的情形。

>>> iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris3', partitions=['name'])
>>> iris3.data
odps.Table
  name: odps_test_sqltask_finance.`pyodps_iris3`
  schema:
    sepallength           : double
    sepalwidth            : double
    petallength           : double
    petalwidth            : double
  partitions:
    name                  : string

如果想写入已经存在的表的某个分区,persist可以传入 partition 参数,指明写入表的哪个分区(如ds=******)。 这时要注意,该DataFrame的每个字段都必须在该表存在,且类型相同。drop_partition和create_partition参数只有在此时有效, 分别表示是否要删除(如果分区存在)或创建(如果分区不存在)该分区。

>>> iris[iris.sepalwidth < 2.5].persist('pyodps_iris4', partition='ds=test', drop_partition=True, create_partition=True)

persist 时,默认会覆盖原有数据。例如,当 persist 到一张分区表,对应分区的数据将会被重写。如果写入一张非分区表,整张表的数据都将被重写。如果你想要追加数据,可以使用参数 overwrite=False

写入表时,还可以指定表的生命周期,如下列语句将表的生命周期指定为10天:

>>> iris[iris.sepalwidth < 2.5].persist('pyodps_iris5', lifecycle=10)

如果数据源中没有 ODPS 对象,例如数据源仅为 Pandas,在 persist 时需要手动指定 ODPS 入口对象, 或者将需要的入口对象标明为全局对象,如:

>>> # 假设入口对象为 o
>>> # 指定入口对象
>>> df.persist('table_name', odps=o)
>>> # 或者可将入口对象标记为全局
>>> o.to_global()
>>> df.persist('table_name')

保存执行结果为 Pandas DataFrame

我们可以使用 to_pandas方法,如果wrap参数为True,将返回PyODPS DataFrame对象。

>>> type(iris[iris.sepalwidth < 2.5].to_pandas())
pandas.core.frame.DataFrame
>>> type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True))
odps.df.core.DataFrame

备注

to_pandas 返回的 pandas DataFrame 与直接通过 pandas 创建的 DataFrame 没有任何区别, 数据的存储和计算均在本地。如果 wrap=True,生成的即便是 PyODPS DataFrame,数据依然在本地。 如果你的数据很大,或者运行环境的内存限制较为严格,请谨慎使用 to_pandas

立即运行设置运行参数

对于立即执行的方法,比如 executepersistto_pandas 等,可以设置运行时参数(仅对ODPS SQL后端有效 )。

一种方法是设置全局参数。详细参考 SQL设置运行参数

也可以在这些立即执行的方法上,使用 hints 参数。这样,这些参数只会作用于当前的计算过程。

>>> iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16})

运行时显示详细信息

有时,用户需要查看运行时instance的logview时,需要修改全局配置:

>>> from odps import options
>>> options.verbose = True
>>>
>>> iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
Sql compiled:
SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
FROM odps_test_sqltask_finance.`pyodps_iris` t1
WHERE t1.`sepallength` < 5
LIMIT 5
logview:
http://logview

   sepalwidth  petallength  petalwidth         name
0         3.0          1.4         0.2  Iris-setosa
1         3.2          1.3         0.2  Iris-setosa
2         3.1          1.5         0.2  Iris-setosa
3         3.4          1.4         0.3  Iris-setosa
4         2.9          1.4         0.2  Iris-setosa

用户可以指定自己的日志记录函数,比如像这样:

>>> my_logs = []
>>> def my_logger(x):
>>>     my_logs.append(x)
>>>
>>> options.verbose_log = my_logger
>>>
>>> iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
   sepalwidth  petallength  petalwidth         name
0         3.0          1.4         0.2  Iris-setosa
1         3.2          1.3         0.2  Iris-setosa
2         3.1          1.5         0.2  Iris-setosa
3         3.4          1.4         0.3  Iris-setosa
4         2.9          1.4         0.2  Iris-setosa

>>> print(my_logs)
['Sql compiled:', 'SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'logview:', u'http://logview']

缓存中间Collection计算结果

DataFrame的计算过程中,一些Collection被多处使用,或者用户需要查看中间过程的执行结果, 这时用户可以使用 cache标记某个collection需要被优先计算。

备注

值得注意的是,cache延迟执行,调用cache不会触发立即计算。

>>> cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
>>> df = cached.head(3)
>>> df
   sepallength         name
0          4.9  Iris-setosa
1          4.7  Iris-setosa
2          4.6  Iris-setosa
>>> cached.head(3)  # 由于cached已经被计算,所以能立刻取到计算结果
   sepallength         name
0          4.9  Iris-setosa
1          4.7  Iris-setosa
2          4.6  Iris-setosa

异步和并行执行

DataFrame 支持异步操作,对于立即执行的方法,包括 executepersistheadtailto_pandas (其他方法不支持), 传入 async_ 参数,即可以将一个操作异步执行,timeout 参数指定超时时间, 异步返回的是 Future 对象。

>>> future = iris[iris.sepal_width < 10].head(10, async_=True)
>>> future.result()
   sepal_length  sepal_width  petal_length  petal_width     category
0           5.1          3.5           1.4          0.2  Iris-setosa
1           4.9          3.0           1.4          0.2  Iris-setosa
2           4.7          3.2           1.3          0.2  Iris-setosa
3           4.6          3.1           1.5          0.2  Iris-setosa
4           5.0          3.6           1.4          0.2  Iris-setosa
5           5.4          3.9           1.7          0.4  Iris-setosa
6           4.6          3.4           1.4          0.3  Iris-setosa
7           5.0          3.4           1.5          0.2  Iris-setosa
8           4.4          2.9           1.4          0.2  Iris-setosa
9           4.9          3.1           1.5          0.1  Iris-setosa

DataFrame 的并行执行可以使用多线程来并行,单个 expr 的执行可以通过 n_parallel 参数来指定并发度。 比如,当一个 DataFrame 的执行依赖的多个 cache 的 DataFrame 能够并行执行时,该参数就会生效。

>>> expr1 = iris.groupby('category').agg(value=iris.sepal_width.sum()).cache()
>>> expr2 = iris.groupby('category').agg(value=iris.sepal_length.mean()).cache()
>>> expr3 = iris.groupby('category').agg(value=iris.petal_length.min()).cache()
>>> expr = expr1.union(expr2).union(expr3)
>>> future = expr.execute(n_parallel=3, async_=True, timeout=2)  # 并行和异步执行,2秒超时,返回Future对象
>>> future.result()
          category    value
0      Iris-setosa    5.006
1  Iris-versicolor    5.936
2   Iris-virginica    6.588
3      Iris-setosa  170.900
4  Iris-versicolor  138.500
5   Iris-virginica  148.700
6      Iris-setosa    1.000
7  Iris-versicolor    3.000
8   Iris-virginica    4.500

当同时执行多个 expr 时,我们可以用多线程执行,但会面临一个问题, 比如两个 DataFrame 有共同的依赖,这个依赖将会被执行两遍。

现在我们提供了新的 Delay API, 来将立即执行的操作(包括 executepersistheadtailto_pandas,其他方法不支持)变成延迟操作, 并返回 Future 对象。 当用户触发delay执行的时候,会去寻找共同依赖,按用户给定的并发度执行,并支持异步执行。

>>> from odps.df import Delay
>>> delay = Delay()  # 创建Delay对象
>>>
>>> df = iris[iris.sepal_width < 5].cache()  # 有一个共同的依赖
>>> future1 = df.sepal_width.sum().execute(delay=delay)  # 立即返回future对象,此时并没有执行
>>> future2 = df.sepal_width.mean().execute(delay=delay)
>>> future3 = df.sepal_length.max().execute(delay=delay)
>>> delay.execute(n_parallel=3)  # 并发度是3,此时才真正执行。
|==========================================|   1 /  1  (100.00%)        21s
>>> future1.result()
458.10000000000014
>>> future2.result()
3.0540000000000007

可以看到上面的例子里,共同依赖的对象会先执行,然后再以并发度为3分别执行future1到future3。 当 n_parallel 为1时,执行时间会达到37s。

delay.execute 也接受 async_ 操作来指定是否异步执行,当异步的时候,也可以指定 timeout 参数来指定超时时间。