XFlow and models
XFlow
XFlow is a MaxCompute algorithm package. You can use PyODPS to execute XFlow tasks. For the following PAI command:
PAI -name AlgoName -project algo_public -Dparam1=param_value1 -Dparam2=param_value2 ...
You can call run_xflow() to execute it asynchronously:
>>> # call asynchronously
>>> inst = o.run_xflow('AlgoName', 'algo_public',
parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})
Or call execute_xflow() to execute it synchronously:
>>> # call synchronously
>>> inst = o.execute_xflow('AlgoName', 'algo_public',
parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})
Parameters should not include quotation marks at the ends of argument values in PAI command if they are in the PAI command, nor the semicolon at the end of the command.
Both methods return an Instance object. An XFlow instance contains several sub-instances. You can obtain the LogView of each Instance by using the following method:
>>> for sub_inst_name, sub_inst in o.get_xflow_sub_instances(inst).items():
>>> print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))
需要注意的是,get_xflow_sub_instances() 返回的是 Instance 当前的子 Instance,可能会随时间变化,因而可能需要定时查询。为简化这一步骤,可以使用 iter_xflow_sub_instances()
方法。该方法返回一个迭代器,会阻塞执行直至发现新的子 Instance 或者主 Instance 结束。同时需要注意的是,
iter_xflow_sub_instances() 默认不会检查 Instance 是否报错,建议在循环结束时手动检查
Instance 是否报错,以免遗漏可能的问题,或者增加 check=True 参数在 iter_xflow_sub_instances()
退出时自动检查:
>>> # asynchronous call is recommended here
>>> inst = o.run_xflow('AlgoName', 'algo_public',
parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})
>>> # if no break in loop, will run till instance exits
>>> for sub_inst_name, sub_inst in o.iter_xflow_sub_instances(inst):
>>> print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))
>>> # check if instance succeeds in case of uncaught errors
>>> instance.wait_for_success()
Or
>>> # asynchronous call is recommended here
>>> inst = o.run_xflow('AlgoName', 'algo_public',
parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})
>>> # add check=True to check if instance succeeds at exit.
>>> # check not available if break in loop
>>> for sub_inst_name, sub_inst in o.iter_xflow_sub_instances(inst):
>>> print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))
You can specify runtime parameters when calling run_xflow or execute_xflow. This process is similar to executing SQL statements:
>>> parameters = {'param1': 'param_value1', 'param2': 'param_value2', ...}
>>> o.execute_xflow('AlgoName', 'algo_public', parameters=parameters, hints={'odps.xxx.yyy': 10})
For instance, if you want to run your code in hosts with certain hardware, you can add a configuration in hints:
>>> hints={"settings": json.dumps({"odps.algo.hybrid.deploy.info": "xxxxx"})}
You can use options.ml.xflow_settings to configure the global settings:
>>> from odps import options
>>> options.ml.xflow_settings = {'odps.xxx.yyy': 10}
>>> parameters = {'param1': 'param_value1', 'param2': 'param_value2', ...}
>>> o.execute_xflow('AlgoName', 'algo_public', parameters=parameters)
Details about PAI commands can be found in chapters about different components linked in this page 。
Offline models
Offline models are outputs of XFlow classification or regression algorithms. You can directly call odps.run_xflow to create an offline model. For example:
>>> o.run_xflow('LogisticRegression', 'algo_public', dict(modelName='logistic_regression_model_name',
>>> regularizedLevel='1', maxIter='100', regularizedType='l1', epsilon='0.000001', labelColName='y',
>>> featureColNames='pdays,emp_var_rate', goodValue='1', inputTableName='bank_data'))
After creating the models, you can list the models under the current project as follows:
>>> models = o.list_offline_models(prefix='prefix')
You can also retrieve the models and read their PMML (if supported) by the model names:
>>> model = o.get_offline_model('logistic_regression_model_name')
>>> pmml = model.get_model()
You can copy a model using the following statement:
>>> model = o.get_offline_model('logistic_regression_model_name')
>>> # copy to current project
>>> new_model = model.copy('logistic_regression_model_name_new')
>>> # copy to another project
>>> new_model2 = model.copy('logistic_regression_model_name_new2', project='new_project')
You can delete a model using the following statement:
>>> o.delete_offline_model('logistic_regression_model_name')