diff options
| author | Michael Vogt <mvo@ubuntu.com> | 2016-03-15 14:57:36 +0100 |
|---|---|---|
| committer | Michael Vogt <mvo@ubuntu.com> | 2016-03-15 14:57:36 +0100 |
| commit | b38ca5525897fd1ba6bb1cf5e86d6e89d560f8b8 (patch) | |
| tree | 7504ea38dee424c2073c2b7a944c3c77f238f8a6 | |
| parent | cf10fd73549f51979ef4c8f871ecb8b79019d723 (diff) | |
| parent | 59cc9d3df7d0f84e855fd4e2e53441ba5e19d6a1 (diff) | |
Merge remote-tracking branch 'upstream/master' into refactor/no-parts3refactor/no-parts3
| -rw-r--r-- | overlord/state/change.go | 55 | ||||
| -rw-r--r-- | overlord/state/change_test.go | 9 | ||||
| -rw-r--r-- | overlord/state/task.go | 72 | ||||
| -rw-r--r-- | overlord/state/task_test.go | 65 | ||||
| -rw-r--r-- | overlord/state/taskrunner.go | 155 | ||||
| -rw-r--r-- | overlord/state/taskrunner_test.go | 109 | ||||
| -rw-r--r-- | snappy/snap_remote_repo.go | 5 |
7 files changed, 433 insertions, 37 deletions
diff --git a/overlord/state/change.go b/overlord/state/change.go index 90c44f5d7c..877e31a525 100644 --- a/overlord/state/change.go +++ b/overlord/state/change.go @@ -37,6 +37,42 @@ const ( const nStatuses = ErrorStatus + 1 +type taskIDsSet map[string]bool + +func (ts taskIDsSet) add(tid string) { + ts[tid] = true +} + +func (ts taskIDsSet) tasks(s *State) []*Task { + res := make([]*Task, 0, len(ts)) + for tid := range ts { + res = append(res, s.tasks[tid]) + } + return res +} + +func (ts taskIDsSet) MarshalJSON() ([]byte, error) { + l := make([]string, 0, len(ts)) + for tid := range ts { + l = append(l, tid) + } + return json.Marshal(l) +} + +// NB: it's a bit odd but this one needs to be and works on *taskIDsSet +func (ts *taskIDsSet) UnmarshalJSON(data []byte) error { + var l []string + err := json.Unmarshal(data, &l) + if err != nil { + return err + } + *ts = make(map[string]bool, len(l)) + for _, tid := range l { + (*ts)[tid] = true + } + return nil +} + // Change represents a tracked modification to the system state. // // The Change provides both the justification for individual tasks @@ -54,7 +90,7 @@ type Change struct { summary string status Status data customData - taskIDs map[string]bool + taskIDs taskIDsSet } func newChange(state *State, id, kind, summary string) *Change { @@ -64,7 +100,7 @@ func newChange(state *State, id, kind, summary string) *Change { kind: kind, summary: summary, data: make(customData), - taskIDs: make(map[string]bool), + taskIDs: make(taskIDsSet), } } @@ -74,7 +110,7 @@ type marshalledChange struct { Summary string `json:"summary"` Status Status `json:"status"` Data map[string]*json.RawMessage `json:"data"` - TaskIDs map[string]bool `json:"task-ids"` + TaskIDs taskIDsSet `json:"task-ids"` } // MarshalJSON makes Change a json.Marshaller @@ -175,6 +211,11 @@ func (c *Change) SetStatus(s Status) { c.status = s } +// State returns the system State +func (c *Change) State() *State { + return c.state +} + // NewTask creates a new task and registers it as a required task for the // state change to be accomplished. func (c *Change) NewTask(kind, summary string) *Task { @@ -182,7 +223,7 @@ func (c *Change) NewTask(kind, summary string) *Task { id := c.state.genID() t := newTask(c.state, id, kind, summary) c.state.tasks[id] = t - c.taskIDs[id] = true + c.taskIDs.add(id) return t } @@ -191,9 +232,5 @@ func (c *Change) NewTask(kind, summary string) *Task { // Tasks returns all the tasks this state change depends on. func (c *Change) Tasks() []*Task { c.state.ensureLocked() - res := make([]*Task, 0, len(c.taskIDs)) - for tid := range c.taskIDs { - res = append(res, c.state.tasks[tid]) - } - return res + return c.taskIDs.tasks(c.state) } diff --git a/overlord/state/change_test.go b/overlord/state/change_test.go index 6d0f0b66f4..7128a3fe33 100644 --- a/overlord/state/change_test.go +++ b/overlord/state/change_test.go @@ -177,3 +177,12 @@ func (cs *changeSuite) TestStatusDerivedFromTasks(c *C) { t2.SetStatus(state.DoneStatus) c.Check(chg.Status(), Equals, state.DoneStatus) } + +func (cs *changeSuite) TestState(c *C) { + st := state.New(nil) + st.Lock() + chg := st.NewChange("install", "...") + st.Unlock() + + c.Assert(chg.State(), Equals, st) +} diff --git a/overlord/state/task.go b/overlord/state/task.go index 5cb74ca6e7..83a6e75812 100644 --- a/overlord/state/task.go +++ b/overlord/state/task.go @@ -33,44 +33,48 @@ type progress struct { // // See Change for more details. type Task struct { - state *State - id string - kind string - summary string - status Status - progress progress - data customData + state *State + id string + kind string + summary string + status Status + progress progress + data customData + waitTasks taskIDsSet } func newTask(state *State, id, kind, summary string) *Task { return &Task{ - state: state, - id: id, - kind: kind, - summary: summary, - data: make(customData), + state: state, + id: id, + kind: kind, + summary: summary, + data: make(customData), + waitTasks: make(taskIDsSet), } } type marshalledTask struct { - ID string `json:"id"` - Kind string `json:"kind"` - Summary string `json:"summary"` - Status Status `json:"status"` - Progress progress `json:"progress"` - Data map[string]*json.RawMessage `json:"data"` + ID string `json:"id"` + Kind string `json:"kind"` + Summary string `json:"summary"` + Status Status `json:"status"` + Progress progress `json:"progress"` + Data map[string]*json.RawMessage `json:"data"` + WaitTasks taskIDsSet `json:"wait-tasks"` } // MarshalJSON makes Task a json.Marshaller func (t *Task) MarshalJSON() ([]byte, error) { t.state.ensureLocked() return json.Marshal(marshalledTask{ - ID: t.id, - Kind: t.kind, - Summary: t.summary, - Status: t.status, - Progress: t.progress, - Data: t.data, + ID: t.id, + Kind: t.kind, + Summary: t.summary, + Status: t.status, + Progress: t.progress, + Data: t.data, + WaitTasks: t.waitTasks, }) } @@ -90,6 +94,7 @@ func (t *Task) UnmarshalJSON(data []byte) error { t.status = unmarshalled.Status t.progress = unmarshalled.Progress t.data = unmarshalled.Data + t.waitTasks = unmarshalled.WaitTasks return nil } @@ -124,6 +129,11 @@ func (t *Task) SetStatus(s Status) { t.status = s } +// State returns the system State +func (t *Task) State() *State { + return t.state +} + // Progress returns the current progress for the task. // If progress is not explicitly set, it returns (0, 1) if the status is // RunningStatus or WaitingStatus and (1, 1) otherwise. @@ -159,3 +169,17 @@ func (t *Task) Get(key string, value interface{}) error { t.state.ensureLocked() return t.data.get(key, value) } + +// WaitFor registers another task as a requirement for t to make progress +// and sets the status as WaitingStatus. +func (t *Task) WaitFor(another *Task) { + t.state.ensureLocked() + t.status = WaitingStatus + t.waitTasks.add(another.ID()) +} + +// WaitTasks returns the list of tasks registered for t to wait for. +func (t *Task) WaitTasks() []*Task { + t.state.ensureLocked() + return t.waitTasks.tasks(t.state) +} diff --git a/overlord/state/task_test.go b/overlord/state/task_test.go index 513b8b544d..509ee0e7b7 100644 --- a/overlord/state/task_test.go +++ b/overlord/state/task_test.go @@ -20,9 +20,12 @@ package state_test import ( + "fmt" + . "gopkg.in/check.v1" "github.com/ubuntu-core/snappy/overlord/state" + "github.com/ubuntu-core/snappy/testutil" ) type taskSuite struct{} @@ -182,3 +185,65 @@ func (ts *taskSuite) TestSetProgressNeedsLock(c *C) { c.Assert(func() { t.SetProgress(2, 2) }, PanicMatches, "internal error: accessing state without lock") } + +func (ts *taskSuite) TestState(c *C) { + st := state.New(nil) + st.Lock() + chg := st.NewChange("install", "...") + t := chg.NewTask("download", "1...") + st.Unlock() + + c.Assert(t.State(), Equals, st) +} + +func (ts *taskSuite) TestTaskMarshalsWaitFor(c *C) { + st := state.New(nil) + st.Lock() + defer st.Unlock() + + chg := st.NewChange("install", "...") + t1 := chg.NewTask("download", "1...") + t2 := chg.NewTask("install", "2...") + t2.WaitFor(t1) + + d, err := t2.MarshalJSON() + c.Assert(err, IsNil) + + needle := fmt.Sprintf(`"wait-tasks":["%s"`, t1.ID()) + c.Assert(string(d), testutil.Contains, needle) +} + +func (ts *taskSuite) TestTaskWaitFor(c *C) { + st := state.New(nil) + st.Lock() + defer st.Unlock() + + chg := st.NewChange("install", "...") + t1 := chg.NewTask("download", "1...") + t2 := chg.NewTask("install", "2...") + t2.WaitFor(t1) + + c.Assert(t2.WaitTasks(), DeepEquals, []*state.Task{t1}) + c.Assert(t2.Status(), Equals, state.WaitingStatus) +} + +func (cs *taskSuite) TestWaitForNeedsLocked(c *C) { + st := state.New(nil) + st.Lock() + chg := st.NewChange("install", "...") + t1 := chg.NewTask("download", "1...") + t2 := chg.NewTask("install", "2...") + st.Unlock() + + c.Assert(func() { t2.WaitFor(t1) }, PanicMatches, "internal error: accessing state without lock") +} + +func (cs *taskSuite) TestWaitTasksNeedsLocked(c *C) { + st := state.New(nil) + st.Lock() + chg := st.NewChange("install", "...") + t := chg.NewTask("download", "1...") + st.Unlock() + + c.Assert(func() { t.WaitTasks() }, PanicMatches, "internal error: accessing state without lock") +} diff --git a/overlord/state/taskrunner.go b/overlord/state/taskrunner.go new file mode 100644 index 0000000000..5f0c971489 --- /dev/null +++ b/overlord/state/taskrunner.go @@ -0,0 +1,155 @@ +// -*- Mode: Go; indent-tabs-mode: t -*- + +/* + * Copyright (C) 2016 Canonical Ltd + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 3 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package state + +import ( + "sync" + + "gopkg.in/tomb.v2" +) + +// HandlerFunc is the type of function for the hanlders +type HandlerFunc func(task *Task) error + +// TaskRunner controls the running of goroutines to execute known task kinds. +type TaskRunner struct { + state *State + + // locking + mu sync.Mutex + handlers map[string]HandlerFunc + + // go-routines lifecycle + tombs map[string]*tomb.Tomb +} + +// NewTaskRunner creates a new TaskRunner +func NewTaskRunner(s *State) *TaskRunner { + return &TaskRunner{ + state: s, + handlers: make(map[string]HandlerFunc), + tombs: make(map[string]*tomb.Tomb), + } +} + +// AddHandler registers the function to concurrently call for handling +// tasks of the given kind. +func (r *TaskRunner) AddHandler(kind string, fn HandlerFunc) { + r.mu.Lock() + defer r.mu.Unlock() + + r.handlers[kind] = fn +} + +// Handlers returns the map of name/handler functions +func (r *TaskRunner) Handlers() map[string]HandlerFunc { + return r.handlers +} + +// run must be called with the state lock in place +func (r *TaskRunner) run(fn HandlerFunc, task *Task) { + r.tombs[task.ID()] = &tomb.Tomb{} + r.tombs[task.ID()].Go(func() error { + err := fn(task) + + r.state.Lock() + defer r.state.Unlock() + if err == nil { + task.SetStatus(DoneStatus) + } else { + task.SetStatus(ErrorStatus) + } + delete(r.tombs, task.ID()) + + return err + }) +} + +// mustWait must be called with the state lock in place +func (r *TaskRunner) mustWait(t *Task) bool { + for _, wt := range t.WaitTasks() { + if wt.Status() != DoneStatus { + return true + } + } + + return false +} + +// Ensure starts new goroutines for all known tasks with no pending +// dependencies. +// Note that Ensure will lock the state. +func (r *TaskRunner) Ensure() { + r.state.Lock() + defer r.state.Unlock() + + r.mu.Lock() + defer r.mu.Unlock() + + for _, chg := range r.state.Changes() { + if chg.Status() == DoneStatus { + continue + } + + tasks := chg.Tasks() + for _, t := range tasks { + // done, nothing to do + if t.Status() == DoneStatus { + continue + } + + // No handler for the given kind of task, + // this means another taskrunner is going + // to handle this task. + if _, ok := r.handlers[t.Kind()]; !ok { + continue + } + + // we look at the Tomb instead of Status because + // a task can be in RunningStatus even when it + // is not started yet (like when the daemon + // process restarts) + if _, ok := r.tombs[t.ID()]; ok { + continue + } + + // check if there is anything we need to wait for + if r.mustWait(t) { + continue + } + + // the task is ready to run (all prerequists done) + // so full steam ahead! + r.run(r.handlers[t.Kind()], t) + } + } +} + +// Stop stops all concurrent activities and returns after that's done. +// Note that Stop will lock the state. +func (r *TaskRunner) Stop() { + r.state.Lock() + defer r.state.Unlock() + + for _, tb := range r.tombs { + tb.Kill(nil) + tb.Wait() + } +} diff --git a/overlord/state/taskrunner_test.go b/overlord/state/taskrunner_test.go new file mode 100644 index 0000000000..5349328a66 --- /dev/null +++ b/overlord/state/taskrunner_test.go @@ -0,0 +1,109 @@ +// -*- Mode: Go; indent-tabs-mode: t -*- + +/* + * Copyright (C) 2016 Canonical Ltd + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 3 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package state_test + +import ( + "sync" + "time" + + . "gopkg.in/check.v1" + + "github.com/ubuntu-core/snappy/overlord/state" +) + +type taskRunnerSuite struct{} + +var _ = Suite(&taskRunnerSuite{}) + +func (ts *taskRunnerSuite) TestAddHandler(c *C) { + r := state.NewTaskRunner(nil) + fn := func(task *state.Task) error { + return nil + } + r.AddHandler("download", fn) + + c.Assert(r.Handlers(), HasLen, 1) +} + +func (ts *taskRunnerSuite) TestEnsureTrivial(c *C) { + // we need state + st := state.New(nil) + + // setup the download handler + taskCompleted := sync.WaitGroup{} + r := state.NewTaskRunner(st) + fn := func(task *state.Task) error { + taskCompleted.Done() + return nil + } + r.AddHandler("download", fn) + + // add a download task to the state tracker + st.Lock() + chg := st.NewChange("install", "...") + chg.NewTask("download", "1...") + taskCompleted.Add(1) + st.Unlock() + + // ensure just kicks the go routine off + r.Ensure() + taskCompleted.Wait() +} + +func (ts *taskRunnerSuite) TestEnsureComplex(c *C) { + // we need state + st := state.New(nil) + + // setup handlers + r := state.NewTaskRunner(st) + + var ordering []string + fn := func(task *state.Task) error { + ordering = append(ordering, task.Kind()) + return nil + } + r.AddHandler("download", fn) + r.AddHandler("unpack", fn) + r.AddHandler("configure", fn) + + // run in a loop to ensure ordering is correct by pure chance + for i := 0; i < 100; i++ { + ordering = []string{} + + st.Lock() + chg := st.NewChange("mock-install", "...") + + // create sub-tasks + tDl := chg.NewTask("download", "1...") + tUnp := chg.NewTask("unpack", "2...") + tUnp.WaitFor(tDl) + tConf := chg.NewTask("configure", "3...") + tConf.WaitFor(tUnp) + st.Unlock() + + // ensure just kicks the go routine off + for len(ordering) < 3 { + r.Ensure() + time.Sleep(1 * time.Millisecond) + } + + c.Assert(ordering, DeepEquals, []string{"download", "unpack", "configure"}) + } +} diff --git a/snappy/snap_remote_repo.go b/snappy/snap_remote_repo.go index 2dab1ad9f6..3aa95651c1 100644 --- a/snappy/snap_remote_repo.go +++ b/snappy/snap_remote_repo.go @@ -371,11 +371,8 @@ func (s *SnapUbuntuStoreRepository) Updates() ([]Part, error) { return parts, nil } -// SnapUpdates returns the available updates +// SnapUpdates returns the available updates as RemoteSnap types func (s *SnapUbuntuStoreRepository) SnapUpdates() (snaps []*RemoteSnap, err error) { - // the store only supports apps, gadget and frameworks currently, so no - // sense in sending it our ubuntu-core snap - // // NOTE this *will* send .sideload apps to the store. installed, err := ActiveSnapIterByType(fullNameWithChannel, snap.TypeApp, snap.TypeFramework, snap.TypeGadget, snap.TypeOS, snap.TypeKernel) if err != nil || len(installed) == 0 { |
