XFlow 和模型

XFlow

XFlow 是 ODPS 对算法包的封装,使用 PyODPS 可以执行 XFlow。对于下面的 PAI 命令:

PAI -name AlgoName -project algo_public -Dparam1=param_value1 -Dparam2=param_value2 ...

可以使用如下方法调用:

>>> # 异步调用
>>> inst = o.run_xflow('AlgoName', 'algo_public',
                       parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})

或者使用同步调用:

>>> # 同步调用
>>> inst = o.execute_xflow('AlgoName', 'algo_public',
                           parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})

参数不应包含命令两端的引号(如果有),也不应该包含末尾的分号。

这两个方法都会返回一个 Instance 对象。由于 XFlow 的一个 Instance 包含若干个子 Instance,需要使用下面的方法来获得每个 Instance 的 LogView:

>>> for sub_inst_name, sub_inst in o.get_xflow_sub_instances(inst).items():
>>>     print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))

需要注意的是,get_xflow_sub_instances 返回的是 Instance 当前的子 Instance,可能会随时间变化,因而可能需要定时查询。 为简化这一步骤,可以使用 iter_xflow_sub_instances 方法。该方法返回一个迭代器,会阻塞执行直至发现新的子 Instance 或者主 Instance 结束。同时需要注意的是, iter_xflow_sub_instances 默认不会检查 Instance 是否报错,建议在循环结束时手动检查 Instance 是否报错,以免遗漏可能的问题,或者增加 check=True 参数在 iter_xflow_sub_instances 退出时自动检查:

>>> # 此处建议使用异步调用
>>> inst = o.run_xflow('AlgoName', 'algo_public',
                       parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})
>>> # 如果循环中没有 break,该循环会执行到 instance 退出
>>> for sub_inst_name, sub_inst in o.iter_xflow_sub_instances(inst):
>>>     print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))
>>> # 手动检查 instance 是否成功,以避免遗漏 instance 报错
>>> instance.wait_for_success()

或者

>>> # 此处建议使用异步调用
>>> inst = o.run_xflow('AlgoName', 'algo_public',
                       parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})
>>> # 增加 check=True,在循环结束时自动检查报错。如果循环中 break,instance 错误不会被抛出
>>> for sub_inst_name, sub_inst in o.iter_xflow_sub_instances(inst, check=True):
>>>     print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))

在调用 run_xflow 或者 execute_xflow 时,也可以指定运行参数,指定的方法与 SQL 类似:

>>> parameters = {'param1': 'param_value1', 'param2': 'param_value2', ...}
>>> o.execute_xflow('AlgoName', 'algo_public', parameters=parameters, hints={'odps.xxx.yyy': 10})

例如,如果需要任务运行到指定卡型的机器上,可以在 hints 中增加如下配置:

>>> hints={"settings": json.dumps({"odps.algo.hybrid.deploy.info": "xxxxx"})}

使用 options.ml.xflow_settings 可以配置全局设置:

>>> from odps import options
>>> options.ml.xflow_settings = {'odps.xxx.yyy': 10}
>>> parameters = {'param1': 'param_value1', 'param2': 'param_value2', ...}
>>> o.execute_xflow('AlgoName', 'algo_public', parameters=parameters)

PAI 命令的文档可以参考 这份文档 里列出的各个"组件参考"章节。

离线模型

离线模型是 XFlow 分类 / 回归算法输出的模型。用户可以使用 PyODPS ML 或直接使用 odps.run_xflow 创建一个离线模型,例如下面使用 run_xflow 的例子:

>>> o.run_xflow('LogisticRegression', 'algo_public', dict(modelName='logistic_regression_model_name',
>>>             regularizedLevel='1', maxIter='100', regularizedType='l1', epsilon='0.000001', labelColName='y',
>>>             featureColNames='pdays,emp_var_rate', goodValue='1', inputTableName='bank_data'))

在模型创建后,用户可以列出当前 Project 下的模型:

>>> models = o.list_offline_models(prefix='prefix')

也可以通过模型名获取模型并读取模型 PMML(如果支持):

>>> model = o.get_offline_model('logistic_regression_model_name')
>>> pmml = model.get_model()

复制离线模型可以使用下列语句:

>>> model = o.get_offline_model('logistic_regression_model_name')
>>> # 复制到当前 project
>>> new_model = model.copy('logistic_regression_model_name_new')
>>> # 复制到其他 project
>>> new_model2 = model.copy('logistic_regression_model_name_new2', project='new_project')

删除模型可使用下列语句:

>>> o.delete_offline_model('logistic_regression_model_name')