This topic describes how to perform PyODPS sequence and execution operations.
Prerequisites
Before you begin, complete the following operations:
You have created a MaxCompute project.
You have created a DataWorks workspace. This topic uses a workspace in the public preview version of DataStudio as an example.
You have created a workflow in DataWorks. For more information, see Create a workflow.
Procedure
Download the test dataset and import it to MaxCompute.
Download and extract the Iris dataset, and rename iris.data to iris.csv.
Create a table named pyodps_iris and upload the dataset iris.csv to the table. For more information, see Create tables and upload data.
Sample statement:
CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment 'sepal length (cm)', sepalwidth DOUBLE comment 'sepal width (cm)', petallength DOUBLE comment ''petal length (cm)', petalwidth DOUBLE comment 'petal width (cm)', name STRING comment 'type' );
Log on to the DataWorks console.In the navigation pane on the left, click Workspaces. Find the target workspace and click in the Actions column to go to the DataStudio page.
On the DataStudio page, right-click your workflow and choose Enter a name for the node and click Confirm. .
On the configuration tab of the PyODPS node, enter the following sample code.
from odps import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) # Obtain columns. print iris.sepallength.head(5) print iris['sepallength'].head(5) # View the data type of a column. print iris.sepallength.dtype # Modify the data type of a column. iris.sepallength.astype('int') # Perform calculations. print iris.groupby('name').sepallength.max().head(5) print iris.sepallength.max() # Rename a column. print iris.sepalwidth.rename('speal_width').head(5) # Perform simple column operations. print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
Click Run and view the result on the Run Log tab.
The following result is returned:
Executing user script with PyODPS 0.8.0 Try to fetch data from tunnel sepallength 0 4.9 1 4.7 2 4.6 3 5.0 4 5.4 Try to fetch data from tunnel sepallength 0 4.9 1 4.7 2 4.6 3 5.0 4 5.4 FLOAT64 Sql compiled: CREATE TABLE tmp_pyodps_ed78e3ba_f13c_4a49_812d_2790d57c25dd LIFECYCLE 1 AS SELECT MAX(t1.`sepallength`) AS `sepallength_max` FROM data_service_fr.`pyodps_iris` t1 GROUP BY t1.`name` sepallength_max 0 5.8 1 7.0 2 7.9 Collection: ref_0 odps.Table name: data_service_fr.`pyodps_iris` schema: sepallength : double # Sepal length (cm) sepalwidth : double # Sepal width (cm) petallength : double # Petal length (cm) petalwidth : double # Petal width (cm) name : string # Species max = Max[float64] sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0 Try to fetch data from tunnel speal_width 0 3.0 1 3.2 2 3.1 3 3.6 4 3.9 Sql compiled: CREATE TABLE tmp_pyodps_28120275_8d0f_4683_8318_302fa21459ac LIFECYCLE 1 AS SELECT t1.`sepallength` + t1.`sepalwidth` AS `sum_sepal` FROM data_service_fr.`pyodps_iris` t1 sum_sepal 0 7.9 1 7.9 2 7.7 3 8.6 4 9.3 2019-08-13 10:48:13 INFO ================================================================= 2019-08-13 10:48:13 INFO Exit code of the Shell command 0 2019-08-13 10:48:13 INFO --- Invocation of Shell command completed --- 2019-08-13 10:48:13 INFO Shell run successfully!
Create and run another PyODPS node named PyExecute.
Use the following sample code:
from odps import options from odps import DataFrame # View the Logview URL of the runtime instance. options.verbose = True iris = DataFrame(o.get_table('pyodps_iris')) iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() my_logs = [] def my_loggers(x): my_logs.append(x) options.verbose_log = my_loggers iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() print(my_logs) # Cache the intermediate Collection result. cached = iris[iris.sepalwidth < 3.5].cache() print cached.head(3) # Perform asynchronous and parallel execution. from odps.df import Delay delay = Delay() # Create a Delay object. df = iris[iris.sepalwidth < 5].cache() # A common dependency exists. future1 = df.sepalwidth.sum().execute(delay=delay) # The system immediately returns a future object, but the execution is not started. future2 = df.sepalwidth.mean().execute(delay=delay) future3 = df.sepalwidth.max().execute(delay=delay) delay.execute(n_parallel=3) print future1.result() print future2.result() print future3.result()
The following result is returned:
Executing user script with PyODPS 0.8.0 Sql compiled: CREATE TABLE tmp_pyodps_4a204590_0510_4e9c_823b_5b837a437840 LIFECYCLE 1 AS SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` FROM data_service_fr.`pyodps_iris` t1 WHERE t1.`sepallength` < 5 LIMIT 5 Instance ID: 20190813025233386g04djssa Log view: http://logview.odps.aliyun.com/logview/XXX ['Sql compiled:', 'CREATE TABLE tmp_pyodps_03b92c55_8442_4e61_8978_656495487b8a LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM data_service_fr.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20190813025236282gcsna5pr2', u' Log view: http://logview.odps.aliyun.com/logview/?h=http://service.odps.aliyun.com/api&XXX sepallength sepalwidth petallength petalwidth name 0 4.9 3.0 1.4 0.2 Iris-setosa 1 4.7 3.2 1.3 0.2 Iris-setosa 2 4.6 3.1 1.5 0.2 Iris-setosa 454.6 3.05100671141 4.4 2019-08-13 10:52:48 INFO ================================================================= 2019-08-13 10:52:48 INFO Exit code of the Shell command 0 2019-08-13 10:52:48 INFO --- Invocation of Shell command completed --- 2019-08-13 10:52:48 INFO Shell run successfully! 2019-08-13 10:52:48 INFO Current task status: FINISH