Concepts
When using DataFrame, you need to know the operations on the following objects: Collection
(DataFrame
), Sequence
, and Scalar
. These three objects respectively represent table structures (two-dimensional structures), columns (one-dimensional structures), and scalars, and are also known as expr objects in PyODPS. Note that all storage and calculations are performed in MaxCompute for objects created from MaxCompute tables.
Create a DataFrame object
The only Collection object you need to directly create is DataFrame
. This object is used to reference the data source, which may be a MaxCompute table, MaxCompute partition, pandas DataFrame, or sqalchemy.Table (database table). The data processing code can be exactly the same when performing similar operations on different data sources. This means you can migrate your local test code to MaxCompute by only changing the data source used for creating DataFrame objects.
Creating a DataFrame object is as simple as passing in a Table object, a pandas DataFrame object, or a sqlalchemy Table object.
>>> from odps.df import DataFrame
>>>
>>> # create from a MaxCompute table
>>> iris = DataFrame(o.get_table('pyodps_iris'))
>>> iris2 = o.get_table('pyodps_iris').to_df() # use to_df() method of a table
>>>
>>> # create from a MaxCompute table partition
>>> pt_df = DataFrame(o.get_table('partitioned_table').get_partition('pt=20171111'))
>>> pt_df2 = o.get_table('partitioned_table').get_partition('pt=20171111').to_df() # use to_df() method of a partition
>>>
>>> # create from a Pandas DataFrame
>>> import pandas as pd
>>> import numpy as np
>>> df = DataFrame(pd.DataFrame(np.arange(9).reshape(3, 3), columns=list('abc')))
>>>
>>> # create from a sqlalchemy table
>>> engine = sqlalchemy.create_engine('mysql://root:123456@localhost/movielens')
>>> metadata = sqlalchemy.MetaData(bind=engine) # need to bind an engine
>>> table = sqlalchemy.Table('top_users', metadata, extend_existing=True, autoload=True)
>>> users = DataFrame(table)
When initializing a DataFrame object by using a pandas DataFrame object, PyODPS attempts to determine the type of the column if it is a numpy object type or string sequence. If the value of an entire column is null, an error occurs. You can set unknown_as_string to True, which sets the columns to string type. You can also specify the as_type parameter. If the specified type is a primitive type, type casting is performed when the PyODPS DataFrame object is created. If the pandas DataFrame object contains a list or dict column, the type of this column is not inferred but must be manually specified using as_type. The as_type parameter must be a dict type.
>>> df2 = DataFrame(df, unknown_as_string=True, as_type={'null_col2': 'float'})
>>> df2.dtypes
odps.Schema {
sepallength float64
sepalwidth float64
petallength float64
petalwidth float64
name string
null_col1 string # cannot be identified, configured as string via unknown_as_string argument
null_col2 float64 # forcefully converted to float
}
>>> df4 = DataFrame(df3, as_type={'list_col': 'list<int64>'})
>>> df4.dtypes
odps.Schema {
id int64
list_col list<int64> # cannot be identified or converted, configured via as_type argument
}
Sequence
SequenceExpr
represents a column in a two-dimensional data set. You are not allowed to manually create SequenceExpr objects. Instead, you can retrieve one from a Collection object.
Retrieve a column
You can use collection.column_name to retrieve a column. For example:
>>> iris.sepallength.head(5)
sepallength
0 5.1
1 4.9
2 4.7
3 4.6
4 5.0
If the column name is stored in a string variable, apart from using getattr(df, ‘column_name’), df[column_name] can also be used to achieve the same effect. For example:
>>> iris['sepallength'].head(5)
sepallength
0 5.1
1 4.9
2 4.7
3 4.6
4 5.0
Column type
DataFrame has its own type system. When performing the Table initialization, the MaxCompute types are cast. This design provides support for more execution backends. Currently, DataFrame’s execution backends include MaxCompute SQL, pandas, and databases (MySQL and PostgresSQL).
PyODPS DataFrame includes the following types:
int8
, int16
, int32
, int64
, float32
, float64
, boolean
, string
, decimal
, datetime
, list
. and dict
.
The correspondence between MaxCompute and DataFrame types is as follows:
MaxCompute type |
DataFrame type |
---|---|
bigint |
int64 |
double |
float64 |
string |
string |
datetime |
datetime |
boolean |
boolean |
decimal |
decimal |
array<value_type> |
list<value_type> |
map<key_type, value_type> |
dict<key_type, value_type> |
For list and dict types, the types of their containing values must be specified, or an error occurs. Currently, DataFrame does not support the Timestamp type and Struct type that have been newly introduced in MaxCompute 2.0. Support may be added in future releases.
You can use sequence.dtype to retrieve the data types of Sequence objects:
>>> iris.sepallength.dtype
float64
You can use the astype method to change the type of an entire column. This method requires a type as input and returns the converted Sequence object as output. For example:
>>> iris.sepallength.astype('int')
sepallength
0 5
1 4
2 4
3 4
4 5
Column name
In the calculation of DataFrame, a Sequence object must have a column name. At most times, DataFrame creates names automatically. For example:
>>> iris.groupby('name').sepalwidth.max()
sepalwidth_max
0 4.4
1 3.4
2 3.8
As you can see, sepalwidth
was named sepalwidth_max
after obtaining the maximum value. Other operations, such as adding a Sequence to a Scalar, name the result with the name of the Sequence. In other cases, you need to name the column yourself.
Sequence provides rename method to reset names of columns. For example:
>>> iris.sepalwidth.rename('sepal_width').head(5)
sepal_width
0 3.5
1 3.0
2 3.2
3 3.1
4 3.6
Note that rename method does not actually rename the column in place. If you want to apply new names in your collection, you need to select columns again. For instance,
>>> iris[iris.exclude("sepalwidth"), iris.sepalwidth.rename('sepal_width')].head(5)
sepallength petallength petalwidth name sepal_width
0 5.1 1.4 0.2 Iris-setosa 3.5
1 4.9 1.4 0.2 Iris-setosa 3.0
2 4.7 1.3 0.2 Iris-setosa 3.2
3 4.6 1.5 0.2 Iris-setosa 3.1
4 5.0 1.4 0.2 Iris-setosa 3.6
Simple column transformations
You can perform operations on a Sequence object to get a new Sequence, just as you do with a simple Python variable. For numeric columns, arithmetic operations are supported. For string columns, operations such as string concatenations are supported. For example:
>>> (iris.sepallength + 5).head(5)
sepallength
0 10.1
1 9.9
2 9.7
3 9.6
4 10.0
String addition:
>>> (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
sum_sepal
0 8.6
1 7.9
2 7.9
3 7.7
4 8.6
Note that when two columns are involved in operations, you need to manually specify the name of the result column. For details about column transformations, see Column operations.
Collection
CollectionExpr supports all operations on DataFrame two-dimensional datasets. It can be seen as a MaxCompute table or a spreadsheet. DataFrame objects are also CollectionExpr objects. Many column operations, filter operations, and transform operations on two-dimensional datasets are supported by CollectionExpr.
Retrieve types
You can use the dtypes
method to retrieve the types of all columns in a CollectionExpr object. dtypes
returns a :ref:`Schema type.
>>> iris.dtypes
odps.Schema {
sepallength float64
sepalwidth float64
petallength float64
petalwidth float64
name string
}
Select, add, and delete columns
You can use the expr[columns] syntax to select a certain amount of columns from a CollectionExpr object and form a new dataset. For example:
>>> iris['name', 'sepallength'].head(5)
name sepallength
0 Iris-setosa 5.1
1 Iris-setosa 4.9
2 Iris-setosa 4.7
3 Iris-setosa 4.6
4 Iris-setosa 5.0
Note
Note: If only one column is needed, you need to add a comma (,) after the column name or explicitly mark the column as a list, for example df[df.sepal_length, ] or df[[df.sepal_length]]. Otherwise, a Sequence object, instead of a Collection object, is returned.
You can use the exclude method to exclude certain columns from the new dataset:
>>> iris.exclude('sepallength', 'petallength')[:5]
sepalwidth petalwidth name
0 3.5 0.2 Iris-setosa
1 3.0 0.2 Iris-setosa
2 3.2 0.2 Iris-setosa
3 3.1 0.2 Iris-setosa
4 3.6 0.2 Iris-setosa
In PyODPS versions 0.7.2 and higher, a new method is supported. You can exclude certain columns of the dataset directly:
>>> del iris['sepallength']
>>> del iris['petallength']
>>> iris[:5]
sepalwidth petalwidth name
0 3.5 0.2 Iris-setosa
1 3.0 0.2 Iris-setosa
2 3.2 0.2 Iris-setosa
3 3.1 0.2 Iris-setosa
4 3.6 0.2 Iris-setosa
You can also use the expr[expr, new_sequence] syntax to add a new sequence transformation to the original collection. The new sequence is part of the new collection.
The following example illustrates that, we create a new sequence by adding one to each element in the original sepalwidth column of iris, rename it to sepalwidthplus1, and add it to the iris collection as a new sequence.
>>> iris[iris, (iris.sepalwidth + 1).rename('sepalwidthplus1')].head(5)
sepallength sepalwidth petallength petalwidth name \
0 5.1 3.5 1.4 0.2 Iris-setosa
1 4.9 3.0 1.4 0.2 Iris-setosa
2 4.7 3.2 1.3 0.2 Iris-setosa
3 4.6 3.1 1.5 0.2 Iris-setosa
4 5.0 3.6 1.4 0.2 Iris-setosa
sepalwidthplus1
0 4.5
1 4.0
2 4.2
3 4.1
4 4.6
When using df[df, new_sequence], note that the transformed column may have the same name as the original column. Rename the new column if you want to append it to the original collection.
In PyODPS versions 0.7.2 and higher, you can append a column to the current dataset directly:
>>> iris['sepalwidthplus1'] = iris.sepalwidth + 1
>>> iris.head(5)
sepallength sepalwidth petallength petalwidth name \
0 5.1 3.5 1.4 0.2 Iris-setosa
1 4.9 3.0 1.4 0.2 Iris-setosa
2 4.7 3.2 1.3 0.2 Iris-setosa
3 4.6 3.1 1.5 0.2 Iris-setosa
4 5.0 3.6 1.4 0.2 Iris-setosa
sepalwidthplus1
0 4.5
1 4.0
2 4.2
3 4.1
4 4.6
You can also use the exclude method to exclude the original column before appending the new one to the dataset.
>>> iris[iris.exclude('sepalwidth'), iris.sepalwidth * 2].head(5)
sepallength petallength petalwidth name sepalwidth
0 5.1 1.4 0.2 Iris-setosa 7.0
1 4.9 1.4 0.2 Iris-setosa 6.0
2 4.7 1.3 0.2 Iris-setosa 6.4
3 4.6 1.5 0.2 Iris-setosa 6.2
4 5.0 1.4 0.2 Iris-setosa 7.2
In PyODPS versions 0.7.2 and higher, you can also overwrite an existing column:
>>> iris['sepalwidth'] = iris.sepalwidth * 2
>>> iris.head(5)
sepallength sepalwidth petallength petalwidth name
0 5.1 7.0 1.4 0.2 Iris-setosa
1 4.9 6.0 1.4 0.2 Iris-setosa
2 4.7 6.4 1.3 0.2 Iris-setosa
3 4.6 6.2 1.5 0.2 Iris-setosa
4 5.0 7.2 1.4 0.2 Iris-setosa
The select method provides another way to create a new collection, keyword argument will rename the provided sequence to the given keyword.
>>> iris.select('name', sepalwidthminus1=iris.sepalwidth - 1).head(5)
name sepalwidthminus1
0 Iris-setosa 2.5
1 Iris-setosa 2.0
2 Iris-setosa 2.2
3 Iris-setosa 2.1
4 Iris-setosa 2.6
You can also pass in a lambda expression, which takes the result from the previous operation as a parameter. When executed, PyODPS checks the lambda expression and passes in the Collection object generated from the previous operation to get new columns.
>>> iris['name', 'petallength'][[lambda x: x.name]].head(5)
name
0 Iris-setosa
1 Iris-setosa
2 Iris-setosa
3 Iris-setosa
4 Iris-setosa
In PyODPS versions 0.7.2 and higher, conditional assignments are supported.
>>> iris[iris.sepallength > 5.0, 'sepalwidth'] = iris.sepalwidth * 2
>>> iris.head(5)
sepallength sepalwidth petallength petalwidth name
0 5.1 14.0 1.4 0.2 Iris-setosa
1 4.9 6.0 1.4 0.2 Iris-setosa
2 4.7 6.4 1.3 0.2 Iris-setosa
3 4.6 6.2 1.5 0.2 Iris-setosa
4 5.0 7.2 1.4 0.2 Iris-setosa
Constants and random numbers
DataFrame allows you to append a column of constants to a Collection object. Scalar is required and you need to manually specify the column name. For example:
>>> from odps.df import Scalar
>>> iris[iris, Scalar(1).rename('id')][:5]
sepallength sepalwidth petallength petalwidth name id
0 5.1 3.5 1.4 0.2 Iris-setosa 1
1 4.9 3.0 1.4 0.2 Iris-setosa 1
2 4.7 3.2 1.3 0.2 Iris-setosa 1
3 4.6 3.1 1.5 0.2 Iris-setosa 1
4 5.0 3.6 1.4 0.2 Iris-setosa 1
You can use NullScalar to create a null column. The column type must be specified.
>>> from odps.df import NullScalar
>>> iris[iris, NullScalar('float').rename('fid')][:5]
sepal_length sepal_width petal_length petal_width category fid
0 5.1 3.5 1.4 0.2 Iris-setosa None
1 4.9 3.0 1.4 0.2 Iris-setosa None
2 4.7 3.2 1.3 0.2 Iris-setosa None
3 4.6 3.1 1.5 0.2 Iris-setosa None
4 5.0 3.6 1.4 0.2 Iris-setosa None
In PyODPS versions 0.7.12 and higher, a simpler method has been introduced:
>>> iris['id'] = 1
>>> iris
sepallength sepalwidth petallength petalwidth name id
0 5.1 3.5 1.4 0.2 Iris-setosa 1
1 4.9 3.0 1.4 0.2 Iris-setosa 1
2 4.7 3.2 1.3 0.2 Iris-setosa 1
3 4.6 3.1 1.5 0.2 Iris-setosa 1
4 5.0 3.6 1.4 0.2 Iris-setosa 1
Note that this method cannot automatically recognize the type of null values. Add null columns as follows:
>>> iris['null_col'] = NullScalar('float')
>>> iris
sepallength sepalwidth petallength petalwidth name null_col
0 5.1 3.5 1.4 0.2 Iris-setosa None
1 4.9 3.0 1.4 0.2 Iris-setosa None
2 4.7 3.2 1.3 0.2 Iris-setosa None
3 4.6 3.1 1.5 0.2 Iris-setosa None
4 5.0 3.6 1.4 0.2 Iris-setosa None
DataFrame also allows you to append a column of random numbers to a Collection object. The column type is float and the value range is 0-1. Each number has a different value. RandomScalar
is required and the parameter is an optional random seed.
>>> from odps.df import RandomScalar
>>> iris[iris, RandomScalar().rename('rand_val')][:5]
sepallength sepalwidth petallength petalwidth name rand_val
0 5.1 3.5 1.4 0.2 Iris-setosa 0.000471
1 4.9 3.0 1.4 0.2 Iris-setosa 0.799520
2 4.7 3.2 1.3 0.2 Iris-setosa 0.834609
3 4.6 3.1 1.5 0.2 Iris-setosa 0.106921
4 5.0 3.6 1.4 0.2 Iris-setosa 0.763442
Filter data
Collection provides different ways to filter data.
The following example finds records where sepallength
is greater than 5.
>>> iris[iris.sepallength > 5].head(5)
sepallength sepalwidth petallength petalwidth name
0 5.1 3.5 1.4 0.2 Iris-setosa
1 5.4 3.9 1.7 0.4 Iris-setosa
2 5.4 3.7 1.5 0.2 Iris-setosa
3 5.8 4.0 1.2 0.2 Iris-setosa
4 5.7 4.4 1.5 0.4 Iris-setosa
Multiple filter conditions:
>>> iris[(iris.sepallength < 5) & (iris['petallength'] > 1.5)].head(5)
sepallength sepalwidth petallength petalwidth name
0 4.8 3.4 1.6 0.2 Iris-setosa
1 4.8 3.4 1.9 0.2 Iris-setosa
2 4.7 3.2 1.6 0.2 Iris-setosa
3 4.8 3.1 1.6 0.2 Iris-setosa
4 4.9 2.4 3.3 1.0 Iris-versicolor
OR operator:
>>> iris[(iris.sepalwidth < 2.5) | (iris.sepalwidth > 4)].head(5)
sepallength sepalwidth petallength petalwidth name
0 5.7 4.4 1.5 0.4 Iris-setosa
1 5.2 4.1 1.5 0.1 Iris-setosa
2 5.5 4.2 1.4 0.2 Iris-setosa
3 4.5 2.3 1.3 0.3 Iris-setosa
4 5.5 2.3 4.0 1.3 Iris-versicolor
Note
Note that you must use ampersands (&) and vertical bars (|) to represent the AND and OR operators respectively.
NOT operator:
>>> iris[~(iris.sepalwidth > 3)].head(5)
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.4 2.9 1.4 0.2 Iris-setosa
2 4.8 3.0 1.4 0.1 Iris-setosa
3 4.3 3.0 1.1 0.1 Iris-setosa
4 5.0 3.0 1.6 0.2 Iris-setosa
You can also explicitly call the filter method to specify multiple conditions.
>>> iris.filter(iris.sepalwidth > 3.5, iris.sepalwidth < 4).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.0 3.6 1.4 0.2 Iris-setosa
1 5.4 3.9 1.7 0.4 Iris-setosa
2 5.4 3.7 1.5 0.2 Iris-setosa
3 5.4 3.9 1.3 0.4 Iris-setosa
4 5.7 3.8 1.7 0.3 Iris-setosa
You can use lambda expressions to perform sequential operations.
>>> iris[iris.sepalwidth > 3.8]['name', lambda x: x.sepallength + 1]
name sepallength
0 Iris-setosa 6.4
1 Iris-setosa 6.8
2 Iris-setosa 6.7
3 Iris-setosa 6.4
4 Iris-setosa 6.2
5 Iris-setosa 6.5
If a Collection object contains a boolean column, you can use it directly as a filter condition.
>>> df.dtypes
odps.Schema {
a boolean
b int64
}
>>> df[df.a]
a b
0 True 1
1 True 3
When retrieving a single sequence from a Collection object, only the boolean column can be used as a valid filter condition.
>>> df[df.a, ] # obtain a one-column collection via item syntax
>>> df[[df.a]] # obtain a one-column collection via item syntax
>>> df.select(df.a) # obtain a one-column collection explicitly with select method
>>> df[df.a] # filter with a binary-typed column
>>> df.a # obtain a sequence from a collection
>>> df['a'] # obtain a sequence from a collection
The query
method in pandas is also supported. You can write query statements to filter data. Column names such as sepallength
can be used directly. Ampersands (``&``) and ``and``both represent the AND operator. Vertical bars (``|``) and or
both represent the OR operator.
>>> iris.query("(sepallength < 5) and (petallength > 1.5)").head(5)
sepallength sepalwidth petallength petalwidth name
0 4.8 3.4 1.6 0.2 Iris-setosa
1 4.8 3.4 1.9 0.2 Iris-setosa
2 4.7 3.2 1.6 0.2 Iris-setosa
3 4.8 3.1 1.6 0.2 Iris-setosa
4 4.9 2.4 3.3 1.0 Iris-versicolor
When local variables are needed in the expressions, add an at sign (@) before the variable name.
>>> var = 4
>>> iris.query("(sepalwidth < 2.5) | (sepalwidth > @var)").head(5)
sepallength sepalwidth petallength petalwidth name
0 5.7 4.4 1.5 0.4 Iris-setosa
1 5.2 4.1 1.5 0.1 Iris-setosa
2 5.5 4.2 1.4 0.2 Iris-setosa
3 4.5 2.3 1.3 0.3 Iris-setosa
4 5.5 2.3 4.0 1.3 Iris-versicolor
The query method currently supports the following syntaxes.
Syntax |
Description |
---|---|
name |
Names without the at sign (@) are processed as column names. Otherwise, they are processed as local variables. |
operator |
The following operators are supported: |
bool |
Ampersands ( |
attribute |
the attribute of the object |
index, slice, Subscript |
slice operations |
Lateral view
For list and map columns, the explode
method can convert the columns into multiple rows for output. Functions passed into the apply
method often generate multiple lines, too. It is often need to merge these outputs with columns in original tables for purposes such as aggregation. In databases, this kind of operation is often called lateral view. In PyODPS, you can achieve this as follows:
To generate the lateral view:
>>> df
id a b
0 1 [a1, b1] [a2, b2, c2]
1 2 [c1] [d2, e2]
>>> df[df.id, df.a.explode(), df.b]
id a b
0 1 a1 [a2, b2, c2]
1 1 b1 [a2, b2, c2]
2 2 c1 [d2, e2]
>>> df[df.id, df.a.explode(), df.b.explode()]
id a b
0 1 a1 a2
1 1 a1 b2
2 1 a1 c2
3 1 b1 a2
4 1 b1 b2
5 1 b1 c2
6 2 c1 d2
7 2 c1 e2
When the method does not produce any output for some rows, these rows will not appear in the output, which is often not expected. To preserve these rows in the output, you can add keep_nulls=True
to explode
or apply
calls. In this scenario, columns produced by explode
or apply
will be left None.
>>> df
id a
0 1 [a1, b1]
1 2 []
>>> df[df.id, df.a.explode()]
id a
0 1 a1
1 1 b1
>>> df[df.id, df.a.explode(keep_nulls=True)]
id a
0 1 a1
1 1 b1
2 2 None
For details about the explode method, see Collection type related operations. For examples of the apply method, see Using custom functions for one row.
Limit output records
>>> iris[:3]
sepallength sepalwidth petallength petalwidth name
0 5.1 3.5 1.4 0.2 Iris-setosa
1 4.9 3.0 1.4 0.2 Iris-setosa
2 4.7 3.2 1.3 0.2 Iris-setosa
Note that for MaxCompute SQL backend, start and step are not supported in slice operations. You can also use the limit method.
>>> iris.limit(3)
sepallength sepalwidth petallength petalwidth name
0 5.1 3.5 1.4 0.2 Iris-setosa
1 4.9 3.0 1.4 0.2 Iris-setosa
2 4.7 3.2 1.3 0.2 Iris-setosa
Note
Additionally, you can perform slice operations on Collection objects only, not Sequence objects.
Execution
Deferred execution
The operations in DataFrame are not automatically executed. They are only executed when you explicitly call the ʻexecute` action or actions that internally call ʻexecute` .
These actions include:
Action |
Description |
Return value |
---|---|---|
persist |
Saves the execution result to MaxCompute tables. |
PyODPS DataFrame |
execute |
Executes the operations and returns all the results. |
ResultFrame |
head |
Executes the operations and returns the first N rows of result data. |
ResultFrame |
tail |
Executes the operations and returns the last N rows of result data. |
ResultFrame |
to_pandas |
Converts a collection object to a pandas DataFrame object or a Sequence object to a Series object. When the wrap parameter is set to Ture, a PyODPS DataFrame object is returned. |
When wrap is set to True, a PyODPS DataFrame object is returned. If you do not set wrap to True, a pandas DataFrame object is returned by default. |
plot, hist, boxplot |
Plotting-related methods. |
Note
Note: In an interactive environment, PyODPS DataFrame objects automatically call the execute
method when printing or repr is called, so that you do not need to manually call the execution
method.
>>> iris[iris.sepallength < 5][:5]
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
3 4.6 3.4 1.4 0.3 Iris-setosa
4 4.4 2.9 1.4 0.2 Iris-setosa
You can manually disable automatic execution as follows:
>>> from odps import options
>>> options.interactive = False
>>>
>>> iris[iris.sepallength < 5][:5]
Collection: ref_0
odps.Table
name: odps_test_sqltask_finance.`pyodps_iris`
schema:
sepallength : double
sepalwidth : double
petallength : double
petalwidth : double
name : string
Collection: ref_1
Filter[collection]
collection: ref_0
predicate:
Less[sequence(boolean)]
sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
Scalar[int8]
5
Slice[collection]
collection: ref_1
stop:
Scalar[int8]
5
Now the entire abstract syntax tree is displayed when printing or repr is called. When you want to actually execute this DataFrame, you have to call execute
method manually:
>>> iris[iris.sepallength < 5][:5].execute()
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
3 4.6 3.4 1.4 0.3 Iris-setosa
4 4.4 2.9 1.4 0.2 Iris-setosa
Obtaining execution results
Outputs of execute
or head
methods are ResultFrame
instances where you can obtain execution results.
Note
The ResultFrame is a result set and cannot be used in subsequent calculations.
You can use an iterative method to retrieve all records from ResultFrame.
>>> result = iris.head(3)
>>> for r in result:
>>> print(list(r))
[5.0999999999999996, 3.5, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.9000000000000004, 3.0, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.7000000000000002, 3.2000000000000002, 1.3, 0.20000000000000001, u'Iris-setosa']
When pandas is installed, a ResultFrame can be converted into a pandas DataFrame or a PyODPS DataFrame with pandas backend.
>>> pd_df = iris.head(3).to_pandas() # returns a pandas DataFrame
>>> wrapped_df = iris.head(3).to_pandas(wrap=True) # returns a PyODPS DataFrame with pandas backend
关于如何使用 pandas,请参考 pandas 文档 。pandas 为开源库,ODPS 不对其结果负责。
Save results to MaxCompute tables
For collection objects, you can use the persist
method, which takes a table name as the parameter and returns a new DataFrame object.
>>> iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris2')
>>> iris2.head(5)
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
You can pass in a partitions parameter to persist
. A table is created with partition fields specified by the parameter. Data in the input DataFrame will be stored into the partition specified by the value of the corresponding fields. For example, when the value of partitions is [‘name’] and the value of field name
in a record is test
, the row will be written into partition name=test
. This parameter can be used when target partitions of data need to be calculated.
>>> iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris3', partitions=['name'])
>>> iris3.data
odps.Table
name: odps_test_sqltask_finance.`pyodps_iris3`
schema:
sepallength : double
sepalwidth : double
petallength : double
petalwidth : double
partitions:
name : string
To write to a partition of an existing table, you can pass in a partition parameter to persist
to indicate which partition is the target (for example, ds=******). Note that the table must contain all fields of the DataFrame object and the fields must be the same types. The drop_partition and create_partition parameters are only valid here, indicating respectively whether to delete (if the partition exists) or create (if the partition does not exist) the partition.
>>> iris[iris.sepalwidth < 2.5].persist('pyodps_iris4', partition='ds=test', drop_partition=True, create_partition=True)
Persisting a DataFrame will overwrite existing data by default. For instance, when persisting into a partitioned table, data in corresponding partitions will be overwritten, while persisting into an unpartitioned table will overwrite all data in it. If you want to append data into existing tables or partitions, you may add overwrite=False
.
You can also specify the lifecycle of a table when writing to it. The following example sets the lifecycle of a table to 10 days.
>>> iris[iris.sepalwidth < 2.5].persist('pyodps_iris5', lifecycle=10)
If the data source contains no ODPS objects, for example, only pandas data, you need to manually specify the ODPS object or mark the object as global when calling persist. For example:
>>> # we assume the entrance object of PyODPS is o
>>> # specify entrance object with odps argument
>>> df.persist('table_name', odps=o)
>>> # or mark the entrance object as global
>>> o.to_global()
>>> df.persist('table_name')
Save results to pandas DataFrame
You can use the to_pandas
method. If wrap is set to True, a PyODPS DataFrame object is returned.
>>> type(iris[iris.sepalwidth < 2.5].to_pandas())
pandas.core.frame.DataFrame
>>> type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True))
odps.df.core.DataFrame
Note
There are no differences between pandas DataFrames returned by to_pandas()
calls and pandas DataFrames originally created by pandas. Therefore all data store and computations are done locally. Even if you added wrap=True
and the function returns a PyODPS DataFrame, the data are still stored and computed locally. If you are handling a huge amount of data, or your running enviromnent is quite restrictive, please be cautious when using to_pandas
.
Set runtime parameters
For actions such as execute`, persist
, and to_pandas
, you can set runtime parameters. This is only valid for MaxCompute SQL.
You can also set global parameters. For details, see SQL - runtime parameters.
Additionally, you can use the hints` parameter. These parameters are only valid for the current calculation.
>>> iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16})
Display details at runtime
You sometimes need to modify the global configuration to view the logview of an instance.
>>> from odps import options
>>> options.verbose = True
>>>
>>> iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
Sql compiled:
SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
FROM odps_test_sqltask_finance.`pyodps_iris` t1
WHERE t1.`sepallength` < 5
LIMIT 5
logview:
http://logview
sepalwidth petallength petalwidth name
0 3.0 1.4 0.2 Iris-setosa
1 3.2 1.3 0.2 Iris-setosa
2 3.1 1.5 0.2 Iris-setosa
3 3.4 1.4 0.3 Iris-setosa
4 2.9 1.4 0.2 Iris-setosa
You can specify a logging function as follows:
>>> my_logs = []
>>> def my_logger(x):
>>> my_logs.append(x)
>>>
>>> options.verbose_log = my_logger
>>>
>>> iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
sepalwidth petallength petalwidth name
0 3.0 1.4 0.2 Iris-setosa
1 3.2 1.3 0.2 Iris-setosa
2 3.1 1.5 0.2 Iris-setosa
3 3.4 1.4 0.3 Iris-setosa
4 2.9 1.4 0.2 Iris-setosa
>>> print(my_logs)
['Sql compiled:', 'SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'logview:', u'http://logview']
Cache intermediate results
During the calculation of DataFrame, some Collection objects are used multiple times, or you need to check the execution results of an intermediate process, you can use the cache method to mark a collection object so that it is calculated first.
Note
Note that cache
delays execution. Calling this method does not trigger automatic calculation.
>>> cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
>>> df = cached.head(3)
>>> df
sepallength name
0 4.9 Iris-setosa
1 4.7 Iris-setosa
2 4.6 Iris-setosa
>>> cached.head(3) # results can be fetched immediately as cached has already been computed
sepallength name
0 4.9 Iris-setosa
1 4.7 Iris-setosa
2 4.6 Iris-setosa
Asynchronous and parallel executions
DataFrame supports asynchronous operations. For actions such as execute
, persist
, head
, tail
, and to_pandas
, you can pass in the async_
parameter to enable asynchronous executions. The timeout
parameter specifies the timeout period, and asynchronous executions return Future objects.
>>> future = iris[iris.sepal_width < 10].head(10, async_=True)
>>> future.result()
sepal_length sepal_width petal_length petal_width category
0 5.1 3.5 1.4 0.2 Iris-setosa
1 4.9 3.0 1.4 0.2 Iris-setosa
2 4.7 3.2 1.3 0.2 Iris-setosa
3 4.6 3.1 1.5 0.2 Iris-setosa
4 5.0 3.6 1.4 0.2 Iris-setosa
5 5.4 3.9 1.7 0.4 Iris-setosa
6 4.6 3.4 1.4 0.3 Iris-setosa
7 5.0 3.4 1.5 0.2 Iris-setosa
8 4.4 2.9 1.4 0.2 Iris-setosa
9 4.9 3.1 1.5 0.1 Iris-setosa
The parallel execution of DataFrame can be achieved by using multiple threads. You can use the n_parallel
parameter to specify the degree of parallelism for each expr execution. The parameter becomes valid when the multiple cached DataFrame objects that a single DataFrame execution depends on can be executed in parallel.
>>> expr1 = iris.groupby('category').agg(value=iris.sepal_width.sum()).cache()
>>> expr2 = iris.groupby('category').agg(value=iris.sepal_length.mean()).cache()
>>> expr3 = iris.groupby('category').agg(value=iris.petal_length.min()).cache()
>>> expr = expr1.union(expr2).union(expr3)
>>> future = expr.execute(n_parallel=3, async_=True, timeout=2) # Concurrent and asynchronous execution with 2 seconds timeout, return Future object
>>> future.result()
category value
0 Iris-setosa 5.006
1 Iris-versicolor 5.936
2 Iris-virginica 6.588
3 Iris-setosa 170.900
4 Iris-versicolor 138.500
5 Iris-virginica 148.700
6 Iris-setosa 1.000
7 Iris-versicolor 3.000
8 Iris-virginica 4.500
You can use multiple threads to execute multiple expr objects in parallel, but you may encounter a problem when two DataFrame objects share the same dependency, and this dependency will be executed twice.
A new Delay API
is provided, which can delay the execution of actions (including execute
, persist
, head
, tail
, and to_pandas
) and return Future objects. When the delay method is triggered, the shared dependency is executed based on the degree of parallelism you have specified. Asynchronous execution is supported.
>>> from odps.df import Delay
>>> delay = Delay() # create Delay object
>>>
>>> df = iris[iris.sepal_width < 5].cache() # common dependency of subsequent expressions
>>> future1 = df.sepal_width.sum().execute(delay=delay) # return Future object, execution not started yet
>>> future2 = df.sepal_width.mean().execute(delay=delay)
>>> future3 = df.sepal_length.max().execute(delay=delay)
>>> delay.execute(n_parallel=3) # execution starts here with 3 concurrent threads
|==========================================| 1 / 1 (100.00%) 21s
>>> future1.result()
458.10000000000014
>>> future2.result()
3.0540000000000007
As you can see in the above example, the shared dependency is executed first. Objects future1 to future3 are then executed with the degree of parallelism set to 3. When n_parallel
is set to 1, the execution time reaches 37s.
You can also pass in the async_
parameter to delay.execute
to specify whether asynchronous execution is enabled. When enabled, you can use the timeout
parameter to specify the timeout period.