diff options
| author | Michael Vogt <mvo@ubuntu.com> | 2023-06-17 11:48:49 +0200 |
|---|---|---|
| committer | Michael Vogt <mvo@ubuntu.com> | 2023-06-17 12:59:59 +0200 |
| commit | f04cfae8db641ab77b80e2990ed4af55aee6ff08 (patch) | |
| tree | 18225eafa8656a86d9500c5187f4631b0c19afc5 | |
| parent | d7e85dbf2aa3ac78da9a8c135cb5603f8750edbb (diff) | |
state,daemon: add test, provide way to stop followChangesSeqResponse
| -rw-r--r-- | daemon/api_general.go | 1 | ||||
| -rw-r--r-- | daemon/api_general_test.go | 74 | ||||
| -rw-r--r-- | daemon/export_api_general_test.go | 3 | ||||
| -rw-r--r-- | daemon/response.go | 32 | ||||
| -rw-r--r-- | overlord/state/state.go | 1 |
5 files changed, 100 insertions, 11 deletions
diff --git a/daemon/api_general.go b/daemon/api_general.go index b347ebba53..40170e9e3e 100644 --- a/daemon/api_general.go +++ b/daemon/api_general.go @@ -248,6 +248,7 @@ func getChanges(c *Command, r *http.Request, user *auth.UserState) Response { st := c.d.overlord.State() st.Lock() defer st.Unlock() + // XXX: provide a way to stop these when the daemon stops return newFollowChangesSeqResponse(st) } diff --git a/daemon/api_general_test.go b/daemon/api_general_test.go index 81564a580a..0a410ee09b 100644 --- a/daemon/api_general_test.go +++ b/daemon/api_general_test.go @@ -755,3 +755,77 @@ func (s *generalSuite) TestAckWarnings(c *check.C) { c.Check(calls, check.Equals, "ok") c.Check(result, check.DeepEquals, 0) } + +// XXX: test invalid follow/filter combinations +func (s *generalSuite) TestStateChangesFollowHappy(c *check.C) { + restore := state.MockTime(time.Date(2016, 04, 21, 1, 2, 3, 0, time.UTC)) + defer restore() + + // Setup + d := s.daemon(c) + st := d.Overlord().State() + + // Execute + req, err := http.NewRequest("GET", "/v2/changes?follow=true", nil) + c.Assert(err, check.IsNil) + rawRsp := s.req(c, req, nil) + rsp := rawRsp.(*daemon.FollowChangesSeqResponse) + + rec := httptest.NewRecorder() + go func() { + c.Assert(rec.Code, check.Equals, 200) + + st.Lock() + chg1 := st.NewChange("install", "install...") + t1 := st.NewTask("download", "1...") + t2 := st.NewTask("activate", "2...") + chg1.AddAll(state.NewTaskSet(t1, t2)) + t1.SetStatus(state.DoingStatus) + st.Unlock() + + var res string + waitRes := func() string { + for { + res1 := rec.Body.String() + if res != res1 { + return res1 + } + time.Sleep(100 * time.Millisecond) + } + } + + jsonSeq1 := `{"id":"1","kind":"install","summary":"install...","ready":false,"status":"Doing","old-status":"Default"}` + res = waitRes() + c.Check(string(res), check.Equals, "\x1e"+jsonSeq1+"\n") + + st.Lock() + t1.SetStatus(state.DoneStatus) + t2.SetStatus(state.DoingStatus) + st.Unlock() + + jsonSeq2 := `{"id":"1","kind":"install","summary":"install...","ready":false,"status":"Do","old-status":"Doing"}` + jsonSeq3 := `{"id":"1","kind":"install","summary":"install...","ready":false,"status":"Doing","old-status":"Do"}` + res = waitRes() + c.Check(string(res), check.Equals, + "\x1e"+jsonSeq1+"\n"+ + "\x1e"+jsonSeq2+"\n"+ + "\x1e"+jsonSeq3+"\n") + + st.Lock() + t2.SetStatus(state.DoneStatus) + st.Unlock() + + jsonSeq4 := `{"id":"1","kind":"install","summary":"install...","ready":true,"status":"Done","old-status":"Doing"}` + res = waitRes() + c.Check(string(res), check.Equals, + "\x1e"+jsonSeq1+"\n"+ + "\x1e"+jsonSeq2+"\n"+ + "\x1e"+jsonSeq3+"\n"+ + "\x1e"+jsonSeq4+"\n") + + // finished with the test + rsp.Stop() + }() + + rsp.ServeHTTP(rec, nil) +} diff --git a/daemon/export_api_general_test.go b/daemon/export_api_general_test.go index 75ac65064e..531064cdf2 100644 --- a/daemon/export_api_general_test.go +++ b/daemon/export_api_general_test.go @@ -54,5 +54,6 @@ func MockWarningsAccessors(okay func(*state.State, time.Time) int, all func(*sta } type ( - ChangeInfo = changeInfo + ChangeInfo = changeInfo + FollowChangesSeqResponse = followChangesSeqResponse ) diff --git a/daemon/response.go b/daemon/response.go index 8636f4b54c..04f2437b45 100644 --- a/daemon/response.go +++ b/daemon/response.go @@ -284,39 +284,47 @@ type followChangeJSON struct { type followChangesSeqResponse struct { activeChanges map[*state.Change]state.Status - chgsCh chan followChangeJSON + chgsCh chan *followChangeJSON } func newFollowChangesSeqResponse(st *state.State) *followChangesSeqResponse { rsp := &followChangesSeqResponse{ activeChanges: make(map[*state.Change]state.Status), - chgsCh: make(chan followChangeJSON), + // XXX: use a small buffer size + chgsCh: make(chan *followChangeJSON, 10), } st.AddTaskStatusChangedObserver(rsp.onTaskChanged) return rsp } -func (fc *followChangesSeqResponse) onTaskChanged(t *state.Task, old, new state.Status) { +// XXX: better name? +func (fc *followChangesSeqResponse) Stop() { + close(fc.chgsCh) +} + +func (fc *followChangesSeqResponse) onTaskChanged(t *state.Task, oldTaskStatus, newTaskStatus state.Status) { chg := t.Change() if chg == nil { return } - newChgStatus := chg.Status() - oldChgStatus, ok := fc.activeChanges[chg] + new := chg.Status() + old, ok := fc.activeChanges[chg] if !ok { - oldChgStatus = state.DefaultStatus + old = state.DefaultStatus } - if newChgStatus.Ready() { + if new.Ready() { delete(fc.activeChanges, chg) } else { - fc.activeChanges[chg] = newChgStatus + fc.activeChanges[chg] = new } // XXX: do we need more filtering here? there is a lot of // Doing->Done->Doing->Done right now - if newChgStatus != oldChgStatus { - fc.chgsCh <- followChangeJSON{ + if new != old { + // XXX: this is fragile, if the channel blocks we would + // block the state/taskrunner + fc.chgsCh <- &followChangeJSON{ Id: chg.ID(), Kind: chg.Kind(), Summary: chg.Summary(), @@ -336,7 +344,11 @@ func (fc *followChangesSeqResponse) ServeHTTP(w http.ResponseWriter, r *http.Req writer := bufio.NewWriter(w) enc := json.NewEncoder(writer) for { + // XXX: provide a way to close this channel/exit this loop chg := <-fc.chgsCh + if chg == nil { + break + } writer.WriteByte(0x1E) // RS -- see ascii(7), and RFC7464 diff --git a/overlord/state/state.go b/overlord/state/state.go index e5e6c5edd3..874096fab9 100644 --- a/overlord/state/state.go +++ b/overlord/state/state.go @@ -495,6 +495,7 @@ func (s *State) AddTaskStatusChangedObserver(cb func(t *Task, old, new Status)) func (s *State) notifyTaskStatusChangedObservers(t *Task, old, new Status) { for _, f := range s.taskStatusChangedObservers { + // XXX: this must not block or we cannot run changes anymore f(t, old, new) } } |
