summaryrefslogtreecommitdiff
diff options
-rw-r--r--daemon/api_general.go1
-rw-r--r--daemon/api_general_test.go74
-rw-r--r--daemon/export_api_general_test.go3
-rw-r--r--daemon/response.go32
-rw-r--r--overlord/state/state.go1
5 files changed, 100 insertions, 11 deletions
diff --git a/daemon/api_general.go b/daemon/api_general.go
index b347ebba53..40170e9e3e 100644
--- a/daemon/api_general.go
+++ b/daemon/api_general.go
@@ -248,6 +248,7 @@ func getChanges(c *Command, r *http.Request, user *auth.UserState) Response {
st := c.d.overlord.State()
st.Lock()
defer st.Unlock()
+ // XXX: provide a way to stop these when the daemon stops
return newFollowChangesSeqResponse(st)
}
diff --git a/daemon/api_general_test.go b/daemon/api_general_test.go
index 81564a580a..0a410ee09b 100644
--- a/daemon/api_general_test.go
+++ b/daemon/api_general_test.go
@@ -755,3 +755,77 @@ func (s *generalSuite) TestAckWarnings(c *check.C) {
c.Check(calls, check.Equals, "ok")
c.Check(result, check.DeepEquals, 0)
}
+
+// XXX: test invalid follow/filter combinations
+func (s *generalSuite) TestStateChangesFollowHappy(c *check.C) {
+ restore := state.MockTime(time.Date(2016, 04, 21, 1, 2, 3, 0, time.UTC))
+ defer restore()
+
+ // Setup
+ d := s.daemon(c)
+ st := d.Overlord().State()
+
+ // Execute
+ req, err := http.NewRequest("GET", "/v2/changes?follow=true", nil)
+ c.Assert(err, check.IsNil)
+ rawRsp := s.req(c, req, nil)
+ rsp := rawRsp.(*daemon.FollowChangesSeqResponse)
+
+ rec := httptest.NewRecorder()
+ go func() {
+ c.Assert(rec.Code, check.Equals, 200)
+
+ st.Lock()
+ chg1 := st.NewChange("install", "install...")
+ t1 := st.NewTask("download", "1...")
+ t2 := st.NewTask("activate", "2...")
+ chg1.AddAll(state.NewTaskSet(t1, t2))
+ t1.SetStatus(state.DoingStatus)
+ st.Unlock()
+
+ var res string
+ waitRes := func() string {
+ for {
+ res1 := rec.Body.String()
+ if res != res1 {
+ return res1
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+
+ jsonSeq1 := `{"id":"1","kind":"install","summary":"install...","ready":false,"status":"Doing","old-status":"Default"}`
+ res = waitRes()
+ c.Check(string(res), check.Equals, "\x1e"+jsonSeq1+"\n")
+
+ st.Lock()
+ t1.SetStatus(state.DoneStatus)
+ t2.SetStatus(state.DoingStatus)
+ st.Unlock()
+
+ jsonSeq2 := `{"id":"1","kind":"install","summary":"install...","ready":false,"status":"Do","old-status":"Doing"}`
+ jsonSeq3 := `{"id":"1","kind":"install","summary":"install...","ready":false,"status":"Doing","old-status":"Do"}`
+ res = waitRes()
+ c.Check(string(res), check.Equals,
+ "\x1e"+jsonSeq1+"\n"+
+ "\x1e"+jsonSeq2+"\n"+
+ "\x1e"+jsonSeq3+"\n")
+
+ st.Lock()
+ t2.SetStatus(state.DoneStatus)
+ st.Unlock()
+
+ jsonSeq4 := `{"id":"1","kind":"install","summary":"install...","ready":true,"status":"Done","old-status":"Doing"}`
+ res = waitRes()
+ c.Check(string(res), check.Equals,
+ "\x1e"+jsonSeq1+"\n"+
+ "\x1e"+jsonSeq2+"\n"+
+ "\x1e"+jsonSeq3+"\n"+
+ "\x1e"+jsonSeq4+"\n")
+
+ // finished with the test
+ rsp.Stop()
+ }()
+
+ rsp.ServeHTTP(rec, nil)
+}
diff --git a/daemon/export_api_general_test.go b/daemon/export_api_general_test.go
index 75ac65064e..531064cdf2 100644
--- a/daemon/export_api_general_test.go
+++ b/daemon/export_api_general_test.go
@@ -54,5 +54,6 @@ func MockWarningsAccessors(okay func(*state.State, time.Time) int, all func(*sta
}
type (
- ChangeInfo = changeInfo
+ ChangeInfo = changeInfo
+ FollowChangesSeqResponse = followChangesSeqResponse
)
diff --git a/daemon/response.go b/daemon/response.go
index 8636f4b54c..04f2437b45 100644
--- a/daemon/response.go
+++ b/daemon/response.go
@@ -284,39 +284,47 @@ type followChangeJSON struct {
type followChangesSeqResponse struct {
activeChanges map[*state.Change]state.Status
- chgsCh chan followChangeJSON
+ chgsCh chan *followChangeJSON
}
func newFollowChangesSeqResponse(st *state.State) *followChangesSeqResponse {
rsp := &followChangesSeqResponse{
activeChanges: make(map[*state.Change]state.Status),
- chgsCh: make(chan followChangeJSON),
+ // XXX: use a small buffer size
+ chgsCh: make(chan *followChangeJSON, 10),
}
st.AddTaskStatusChangedObserver(rsp.onTaskChanged)
return rsp
}
-func (fc *followChangesSeqResponse) onTaskChanged(t *state.Task, old, new state.Status) {
+// XXX: better name?
+func (fc *followChangesSeqResponse) Stop() {
+ close(fc.chgsCh)
+}
+
+func (fc *followChangesSeqResponse) onTaskChanged(t *state.Task, oldTaskStatus, newTaskStatus state.Status) {
chg := t.Change()
if chg == nil {
return
}
- newChgStatus := chg.Status()
- oldChgStatus, ok := fc.activeChanges[chg]
+ new := chg.Status()
+ old, ok := fc.activeChanges[chg]
if !ok {
- oldChgStatus = state.DefaultStatus
+ old = state.DefaultStatus
}
- if newChgStatus.Ready() {
+ if new.Ready() {
delete(fc.activeChanges, chg)
} else {
- fc.activeChanges[chg] = newChgStatus
+ fc.activeChanges[chg] = new
}
// XXX: do we need more filtering here? there is a lot of
// Doing->Done->Doing->Done right now
- if newChgStatus != oldChgStatus {
- fc.chgsCh <- followChangeJSON{
+ if new != old {
+ // XXX: this is fragile, if the channel blocks we would
+ // block the state/taskrunner
+ fc.chgsCh <- &followChangeJSON{
Id: chg.ID(),
Kind: chg.Kind(),
Summary: chg.Summary(),
@@ -336,7 +344,11 @@ func (fc *followChangesSeqResponse) ServeHTTP(w http.ResponseWriter, r *http.Req
writer := bufio.NewWriter(w)
enc := json.NewEncoder(writer)
for {
+ // XXX: provide a way to close this channel/exit this loop
chg := <-fc.chgsCh
+ if chg == nil {
+ break
+ }
writer.WriteByte(0x1E) // RS -- see ascii(7), and RFC7464
diff --git a/overlord/state/state.go b/overlord/state/state.go
index e5e6c5edd3..874096fab9 100644
--- a/overlord/state/state.go
+++ b/overlord/state/state.go
@@ -495,6 +495,7 @@ func (s *State) AddTaskStatusChangedObserver(cb func(t *Task, old, new Status))
func (s *State) notifyTaskStatusChangedObservers(t *Task, old, new Status) {
for _, f := range s.taskStatusChangedObservers {
+ // XXX: this must not block or we cannot run changes anymore
f(t, old, new)
}
}