1
1
//! Convenience async functions for creating gWasm tasks, connecting to a
2
2
//! Golem instance, and listening for task's progress as it's computed
3
3
//! on Golem.
4
- use super :: {
5
- error:: Error ,
6
- task:: { ComputedTask , Task } ,
7
- Net , ProgressUpdate ,
8
- } ;
4
+ use super :: error:: { Error , Result } ;
5
+ use super :: task:: { ComputedTask , Task } ;
6
+ use super :: { Net , ProgressUpdate } ;
9
7
use actix:: { Actor , ActorContext , Context , Handler , Message } ;
10
8
use actix_wamp:: RpcEndpoint ;
11
- use futures:: {
12
- future,
13
- stream:: { self , Stream } ,
14
- Future ,
15
- } ;
16
- use golem_rpc_api:: {
17
- comp:: { AsGolemComp , TaskStatus as GolemTaskStatus } ,
18
- connect_to_app,
19
- } ;
9
+ use futures:: future:: FutureExt ;
10
+ use futures:: stream:: { self , Stream , StreamExt , TryStreamExt } ;
11
+ use futures:: { pin_mut, select} ;
12
+ use golem_rpc_api:: comp:: { AsGolemComp , TaskStatus as GolemTaskStatus } ;
13
+ use golem_rpc_api:: connect_to_app;
20
14
use serde_json:: json;
21
- use std:: { convert:: TryInto , path:: Path , time:: Duration } ;
22
- use tokio:: timer:: Interval ;
23
- use tokio_ctrlc_error:: AsyncCtrlc ;
15
+ use std:: convert:: TryInto ;
16
+ use std:: path:: { Path , PathBuf } ;
17
+ use std:: pin:: Pin ;
18
+ use std:: time:: Duration ;
19
+ use tokio:: { signal, time} ;
24
20
25
21
/// A convenience function for running a gWasm [`Task`] on Golem
26
22
///
@@ -34,46 +30,51 @@ use tokio_ctrlc_error::AsyncCtrlc;
34
30
/// [`Task`]: ../task/struct.Task.html
35
31
/// [`ComputedTask`]: ../task/struct.ComputedTask.html
36
32
/// [`gwasm_api::compute`]: ../fn.compute.html
37
- pub fn compute < P , S > (
33
+ pub async fn compute < P , S > (
38
34
datadir : P ,
39
35
address : S ,
40
36
port : u16 ,
41
37
task : Task ,
42
38
net : Net ,
43
39
progress_handler : impl ProgressUpdate + ' static ,
44
40
polling_interval : Option < Duration > ,
45
- ) -> impl Future < Item = ComputedTask , Error = Error > + ' static
41
+ ) -> Result < ComputedTask >
46
42
where
47
- P : AsRef < Path > ,
48
- S : AsRef < str > ,
43
+ P : Into < PathBuf > ,
44
+ S : Into < String > ,
49
45
{
50
- create_task ( datadir. as_ref ( ) , address. as_ref ( ) , port, net, task. clone ( ) )
51
- . and_then ( move |( endpoint, task_id) | {
52
- poll_task_progress ( endpoint. clone ( ) , task_id. clone ( ) , polling_interval)
53
- . fold (
54
- ProgressActor :: new ( progress_handler) . start ( ) ,
55
- |addr, task_status| {
56
- addr. send ( Update {
57
- progress : task_status. progress ,
58
- } )
59
- . and_then ( |_| Ok ( addr) )
60
- } ,
61
- )
62
- . and_then ( |addr| addr. send ( Finish ) . map_err ( Error :: from) )
63
- . ctrlc_as_error ( )
64
- . or_else ( move |e : Error | match e {
65
- Error :: KeyboardInterrupt ( e) => {
66
- future:: Either :: A ( endpoint. as_golem_comp ( ) . abort_task ( task_id) . then (
67
- |res| match res {
68
- Ok ( ( ) ) => future:: err ( Error :: KeyboardInterrupt ( e) ) ,
69
- Err ( e) => future:: err ( e. into ( ) ) ,
70
- } ,
71
- ) )
72
- }
73
- e => future:: Either :: B ( future:: err ( e) ) ,
46
+ let ( endpoint, task_id) =
47
+ create_task ( & datadir. into ( ) , & address. into ( ) , port, net, task. clone ( ) ) . await ?;
48
+ let poll_stream = poll_task_progress ( endpoint. clone ( ) , task_id. clone ( ) , polling_interval) ;
49
+ let progress = poll_stream
50
+ . try_fold (
51
+ ProgressActor :: new ( progress_handler) . start ( ) ,
52
+ |addr, task_status| async move {
53
+ addr. send ( Update {
54
+ progress : task_status. progress ,
74
55
} )
75
- } )
76
- . and_then ( |( ) | task. try_into ( ) )
56
+ . await ?;
57
+ Ok ( addr)
58
+ } ,
59
+ )
60
+ . fuse ( ) ;
61
+ let ctrlc = signal:: ctrl_c ( ) . fuse ( ) ;
62
+
63
+ pin_mut ! ( ctrlc, progress) ;
64
+
65
+ select ! {
66
+ maybe_ctrlc = ctrlc => {
67
+ maybe_ctrlc?;
68
+ endpoint. as_golem_comp( ) . abort_task( task_id) . await ?;
69
+ Err ( Error :: KeyboardInterrupt )
70
+ }
71
+ maybe_addr = progress => {
72
+ let addr = maybe_addr?;
73
+ addr. send( Finish ) . await ?;
74
+ let task: ComputedTask = task. try_into( ) ?;
75
+ Ok ( task)
76
+ }
77
+ }
77
78
}
78
79
79
80
/// A convenience function for creating a gWasm [`Task`] on Golem
@@ -84,21 +85,16 @@ where
84
85
/// [`Task`]: ../task/struct.Task.html
85
86
/// [`RpcEndpoint`]:
86
87
/// https://golemfactory.github.io/golem-client/latest/actix_wamp/trait.RpcEndpoint.html
87
- pub fn create_task (
88
+ pub async fn create_task (
88
89
datadir : & Path ,
89
90
address : & str ,
90
91
port : u16 ,
91
92
net : Net ,
92
93
task : Task ,
93
- ) -> impl Future < Item = ( impl Clone + Send + RpcEndpoint , String ) , Error = Error > + ' static {
94
- connect_to_app ( datadir, Some ( net) , Some ( ( address, port) ) )
95
- . and_then ( move |endpoint| {
96
- endpoint
97
- . as_golem_comp ( )
98
- . create_task ( json ! ( task) )
99
- . map ( |task_id| ( endpoint, task_id) )
100
- } )
101
- . from_err ( )
94
+ ) -> Result < ( impl Clone + Send + RpcEndpoint , String ) > {
95
+ let endpoint = connect_to_app ( datadir, Some ( net) , Some ( ( address, port) ) ) . await ?;
96
+ let task_id = endpoint. as_golem_comp ( ) . create_task ( json ! ( task) ) . await ?;
97
+ Ok ( ( endpoint, task_id) )
102
98
}
103
99
104
100
/// A convenience function for polling gWasm [`Task`]'s computation progress on Golem
@@ -114,72 +110,60 @@ pub fn poll_task_progress(
114
110
endpoint : impl Clone + Send + RpcEndpoint + ' static ,
115
111
task_id : String ,
116
112
polling_interval : Option < Duration > ,
117
- ) -> impl Stream < Item = TaskStatus , Error = Error > + ' static {
118
- stream:: unfold ( TaskState :: new ( endpoint, task_id) , |state| {
113
+ ) -> impl Stream < Item = Result < TaskStatus > > {
114
+ stream:: try_unfold ( TaskState :: new ( endpoint, task_id) , |state| async move {
119
115
if let Some ( status) = state. task_status . status {
120
116
match status {
121
- GolemTaskStatus :: Finished => return None ,
122
- GolemTaskStatus :: Aborted => {
123
- return Some ( future:: Either :: A ( future:: err ( Error :: TaskAborted ) ) )
124
- }
125
- GolemTaskStatus :: Timeout => {
126
- return Some ( future:: Either :: A ( future:: err ( Error :: TaskTimedOut ) ) )
127
- }
117
+ GolemTaskStatus :: Finished => return Ok ( None ) ,
118
+ GolemTaskStatus :: Aborted => return Err ( Error :: TaskAborted ) ,
119
+ GolemTaskStatus :: Timeout => return Err ( Error :: TaskTimedOut ) ,
128
120
_ => { }
129
121
}
130
122
}
131
123
132
124
let mut next_state = TaskState :: new ( state. endpoint . clone ( ) , state. task_id . clone ( ) ) ;
133
- Some ( future:: Either :: B (
134
- state
135
- . endpoint
136
- . as_golem_comp ( )
137
- . get_task ( state. task_id . clone ( ) )
138
- . map_err ( Error :: from)
139
- . and_then ( move |task_info| {
140
- let task_info = task_info. ok_or ( Error :: EmptyTaskInfo ) ?;
141
- next_state. task_status . status = Some ( task_info. status ) ;
142
- next_state. task_status . progress =
143
- task_info. progress . ok_or ( Error :: EmptyProgress ) ?;
144
- Ok ( ( next_state. task_status . clone ( ) , next_state) )
145
- } ) ,
146
- ) )
125
+ let task_info = state
126
+ . endpoint
127
+ . as_golem_comp ( )
128
+ . get_task ( state. task_id . clone ( ) )
129
+ . await ?;
130
+ let task_info = task_info. ok_or ( Error :: EmptyTaskInfo ) ?;
131
+ next_state. task_status . status = Some ( task_info. status ) ;
132
+ next_state. task_status . progress = task_info. progress . ok_or ( Error :: EmptyProgress ) ?;
133
+ Ok ( Some ( ( next_state. task_status . clone ( ) , next_state) ) )
147
134
} )
148
- . zip (
149
- Interval :: new_interval ( polling_interval. unwrap_or_else ( || Duration :: from_secs ( 2 ) ) )
150
- . from_err ( ) ,
151
- )
135
+ . zip ( time:: interval (
136
+ polling_interval. unwrap_or_else ( || Duration :: from_secs ( 2 ) ) ,
137
+ ) )
152
138
. map ( |( x, _) | x)
153
139
}
154
140
155
- #[ derive( Message ) ]
156
141
struct Update {
157
142
progress : f64 ,
158
143
}
159
144
160
- #[ derive( Message ) ]
145
+ impl Message for Update {
146
+ type Result = ( ) ;
147
+ }
148
+
161
149
struct Finish ;
162
150
163
- struct ProgressActor < T >
164
- where
165
- T : ProgressUpdate + ' static ,
166
- {
167
- handler : T ,
151
+ impl Message for Finish {
152
+ type Result = ( ) ;
168
153
}
169
154
170
- impl < T > ProgressActor < T >
171
- where
172
- T : ProgressUpdate + ' static ,
173
- {
174
- fn new ( handler : T ) -> Self {
155
+ struct ProgressActor {
156
+ handler : Pin < Box < dyn ProgressUpdate > > ,
157
+ }
158
+
159
+ impl ProgressActor {
160
+ fn new < T : ProgressUpdate + ' static > ( handler : T ) -> Self {
161
+ let handler = Box :: pin ( handler) ;
175
162
Self { handler }
176
163
}
177
164
}
178
165
179
- impl < T > Actor for ProgressActor < T >
180
- where
181
- T : ProgressUpdate + ' static ,
182
- {
166
+ impl Actor for ProgressActor {
183
167
type Context = Context < Self > ;
184
168
185
169
fn started ( & mut self , _ctx : & mut Self :: Context ) {
@@ -191,21 +175,15 @@ where
191
175
}
192
176
}
193
177
194
- impl < T > Handler < Update > for ProgressActor < T >
195
- where
196
- T : ProgressUpdate + ' static ,
197
- {
178
+ impl Handler < Update > for ProgressActor {
198
179
type Result = ( ) ;
199
180
200
181
fn handle ( & mut self , msg : Update , _ctx : & mut Self :: Context ) -> Self :: Result {
201
182
self . handler . update ( msg. progress ) ;
202
183
}
203
184
}
204
185
205
- impl < T > Handler < Finish > for ProgressActor < T >
206
- where
207
- T : ProgressUpdate + ' static ,
208
- {
186
+ impl Handler < Finish > for ProgressActor {
209
187
type Result = ( ) ;
210
188
211
189
fn handle ( & mut self , _msg : Finish , ctx : & mut Self :: Context ) -> Self :: Result {
0 commit comments