@@ -2,7 +2,9 @@ package act
22
33import (
44"context"
5+ "encoding/json"
56"errors"
7+ "fmt"
68"slices"
79"time"
810
@@ -26,6 +28,10 @@ type Registry struct {
2628// Default timeout for running actions
2729DefaultActionTimeout time.Duration
2830
31+ // TaskPublisher is the publisher for async actions.
32+ // if not given, will invoke simple goroutine to run async actions
33+ TaskPublisher * Publisher
34+
2935Signals map [string ]* sdkAct.Signal
3036Policies map [string ]* sdkAct.Policy
3137Actions map [string ]* sdkAct.Action
@@ -34,6 +40,27 @@ type Registry struct {
3440DefaultSignal * sdkAct.Signal
3541}
3642
43+ type AsyncActionMessage struct {
44+ Output * sdkAct.Output
45+ Params []sdkAct.Parameter
46+ }
47+
48+ // Encode marshals the AsyncActionMessage struct to JSON bytes.
49+ func (msg * AsyncActionMessage ) Encode () ([]byte , error ) {
50+ marshaled , err := json .Marshal (msg )
51+ if err != nil {
52+ return nil , fmt .Errorf ("error encoding JSON: %w" , err )
53+ }
54+ return marshaled , nil
55+ }
56+
57+ func (msg * AsyncActionMessage ) Decode (data []byte ) error {
58+ if err := json .Unmarshal (data , msg ); err != nil {
59+ return fmt .Errorf ("error decoding JSON: %w" , err )
60+ }
61+ return nil
62+ }
63+
3764var _ IRegistry = (* Registry )(nil )
3865
3966// NewActRegistry creates a new act registry with the specified default policy and timeout
@@ -88,6 +115,7 @@ func NewActRegistry(
88115Actions : registry .Actions ,
89116DefaultPolicy : registry .Policies [registry .DefaultPolicyName ],
90117DefaultSignal : registry .Signals [registry .DefaultPolicyName ],
118+ TaskPublisher : registry .TaskPublisher ,
91119}
92120}
93121
@@ -234,6 +262,18 @@ func (r *Registry) Run(
234262if action .Timeout > 0 {
235263timeout = time .Duration (action .Timeout ) * time .Second
236264}
265+
266+ // if task is async and publisher is configured, publish it and do not run it
267+ if r .TaskPublisher != nil && ! action .Sync {
268+ err := r .publishTask (output , params )
269+ if err != nil {
270+ r .Logger .Error ().Err (err ).Msg ("Error publishing async action" )
271+ return nil , gerr .ErrPublishingAsyncAction
272+ }
273+ return nil , gerr .ErrAsyncAction
274+ }
275+
276+ // no publisher, or sync action. run the action
237277var ctx context.Context
238278var cancel context.CancelFunc
239279// if timeout is zero, then the context should not have timeout
@@ -248,14 +288,83 @@ func (r *Registry) Run(
248288return runActionWithTimeout (ctx , action , output , params , r .Logger )
249289}
250290
251- // Run the action asynchronously .
291+ // If the action is asynchronous, run it in a goroutine and return the sentinel error .
252292go func () {
253293defer cancel ()
254294_ , _ = runActionWithTimeout (ctx , action , output , params , r .Logger )
255295}()
296+
256297return nil , gerr .ErrAsyncAction
257298}
258299
300+ func (r * Registry ) publishTask (output * sdkAct.Output , params []sdkAct.Parameter ) error {
301+ r .Logger .Debug ().Msg ("Publishing async action" )
302+ task := AsyncActionMessage {
303+ Output : output ,
304+ Params : params ,
305+ }
306+ payload , err := task .Encode ()
307+ if err != nil {
308+ return err
309+ }
310+ if err := r .TaskPublisher .Publish (context .Background (), payload ); err != nil {
311+ return fmt .Errorf ("error publishing task: %w" , err )
312+ }
313+ return nil
314+ }
315+
316+ func (r * Registry ) runAsyncActionFn (ctx context.Context , message []byte ) error {
317+ msg := & AsyncActionMessage {}
318+ if err := msg .Decode (message ); err != nil {
319+ r .Logger .Error ().Err (err ).Msg ("Error decoding message" )
320+ return err
321+ }
322+ output := msg .Output
323+ params := msg .Params
324+
325+ // In certain cases, the output may be nil, for example, if the policy
326+ // evaluation fails. In this case, the run is aborted.
327+ if output == nil {
328+ // This should never happen, since the output is always set by the registry
329+ // to be the default policy if no signals are provided.
330+ r .Logger .Debug ().Msg ("Output is nil, run aborted" )
331+ return gerr .ErrNilPointer
332+ }
333+
334+ action , ok := r .Actions [output .MatchedPolicy ]
335+ if ! ok {
336+ r .Logger .Warn ().Str ("matchedPolicy" , output .MatchedPolicy ).Msg (
337+ "Action does not exist, run aborted" )
338+ return gerr .ErrActionNotExist
339+ }
340+
341+ // Prepend the logger to the parameters if needed.
342+ if len (params ) == 0 || params [0 ].Key != LoggerKey {
343+ params = append ([]sdkAct.Parameter {WithLogger (r .Logger )}, params ... )
344+ } else {
345+ params [0 ] = WithLogger (r .Logger )
346+ }
347+
348+ timeout := r .DefaultActionTimeout
349+ if action .Timeout > 0 {
350+ timeout = time .Duration (action .Timeout ) * time .Second
351+ }
352+ var ctxWithTimeout context.Context
353+ var cancel context.CancelFunc
354+ // if timeout is zero, then the context should not have timeout
355+ if timeout > 0 {
356+ ctxWithTimeout , cancel = context .WithTimeout (ctx , timeout )
357+ } else {
358+ ctxWithTimeout , cancel = context .WithCancel (ctx )
359+ }
360+ // If the action is synchronous, run it and return the result immediately.
361+ defer cancel ()
362+ if _ , err := runActionWithTimeout (ctxWithTimeout , action , output , params , r .Logger ); err != nil {
363+ return err
364+ }
365+ return nil
366+ }
367+
259368func runActionWithTimeout (
260369ctx context.Context ,
261370action * sdkAct.Action ,
@@ -293,7 +402,7 @@ func runActionWithTimeout(
293402}
294403}
295404
296- // WithLogger returns a parameter with the logger to be used by the action.
405+ // WithLogger returns a parameter with the Logger to be used by the action.
297406// This is automatically prepended to the parameters when running an action.
298407func WithLogger (logger zerolog.Logger ) sdkAct.Parameter {
299408return sdkAct.Parameter {
0 commit comments