summaryrefslogtreecommitdiff
diff options
authorMichael Vogt <mvo@ubuntu.com>2016-03-15 14:57:36 +0100
committerMichael Vogt <mvo@ubuntu.com>2016-03-15 14:57:36 +0100
commitb38ca5525897fd1ba6bb1cf5e86d6e89d560f8b8 (patch)
tree7504ea38dee424c2073c2b7a944c3c77f238f8a6
parentcf10fd73549f51979ef4c8f871ecb8b79019d723 (diff)
parent59cc9d3df7d0f84e855fd4e2e53441ba5e19d6a1 (diff)
Merge remote-tracking branch 'upstream/master' into refactor/no-parts3refactor/no-parts3
-rw-r--r--overlord/state/change.go55
-rw-r--r--overlord/state/change_test.go9
-rw-r--r--overlord/state/task.go72
-rw-r--r--overlord/state/task_test.go65
-rw-r--r--overlord/state/taskrunner.go155
-rw-r--r--overlord/state/taskrunner_test.go109
-rw-r--r--snappy/snap_remote_repo.go5
7 files changed, 433 insertions, 37 deletions
diff --git a/overlord/state/change.go b/overlord/state/change.go
index 90c44f5d7c..877e31a525 100644
--- a/overlord/state/change.go
+++ b/overlord/state/change.go
@@ -37,6 +37,42 @@ const (
const nStatuses = ErrorStatus + 1
+type taskIDsSet map[string]bool
+
+func (ts taskIDsSet) add(tid string) {
+ ts[tid] = true
+}
+
+func (ts taskIDsSet) tasks(s *State) []*Task {
+ res := make([]*Task, 0, len(ts))
+ for tid := range ts {
+ res = append(res, s.tasks[tid])
+ }
+ return res
+}
+
+func (ts taskIDsSet) MarshalJSON() ([]byte, error) {
+ l := make([]string, 0, len(ts))
+ for tid := range ts {
+ l = append(l, tid)
+ }
+ return json.Marshal(l)
+}
+
+// NB: it's a bit odd but this one needs to be and works on *taskIDsSet
+func (ts *taskIDsSet) UnmarshalJSON(data []byte) error {
+ var l []string
+ err := json.Unmarshal(data, &l)
+ if err != nil {
+ return err
+ }
+ *ts = make(map[string]bool, len(l))
+ for _, tid := range l {
+ (*ts)[tid] = true
+ }
+ return nil
+}
+
// Change represents a tracked modification to the system state.
//
// The Change provides both the justification for individual tasks
@@ -54,7 +90,7 @@ type Change struct {
summary string
status Status
data customData
- taskIDs map[string]bool
+ taskIDs taskIDsSet
}
func newChange(state *State, id, kind, summary string) *Change {
@@ -64,7 +100,7 @@ func newChange(state *State, id, kind, summary string) *Change {
kind: kind,
summary: summary,
data: make(customData),
- taskIDs: make(map[string]bool),
+ taskIDs: make(taskIDsSet),
}
}
@@ -74,7 +110,7 @@ type marshalledChange struct {
Summary string `json:"summary"`
Status Status `json:"status"`
Data map[string]*json.RawMessage `json:"data"`
- TaskIDs map[string]bool `json:"task-ids"`
+ TaskIDs taskIDsSet `json:"task-ids"`
}
// MarshalJSON makes Change a json.Marshaller
@@ -175,6 +211,11 @@ func (c *Change) SetStatus(s Status) {
c.status = s
}
+// State returns the system State
+func (c *Change) State() *State {
+ return c.state
+}
+
// NewTask creates a new task and registers it as a required task for the
// state change to be accomplished.
func (c *Change) NewTask(kind, summary string) *Task {
@@ -182,7 +223,7 @@ func (c *Change) NewTask(kind, summary string) *Task {
id := c.state.genID()
t := newTask(c.state, id, kind, summary)
c.state.tasks[id] = t
- c.taskIDs[id] = true
+ c.taskIDs.add(id)
return t
}
@@ -191,9 +232,5 @@ func (c *Change) NewTask(kind, summary string) *Task {
// Tasks returns all the tasks this state change depends on.
func (c *Change) Tasks() []*Task {
c.state.ensureLocked()
- res := make([]*Task, 0, len(c.taskIDs))
- for tid := range c.taskIDs {
- res = append(res, c.state.tasks[tid])
- }
- return res
+ return c.taskIDs.tasks(c.state)
}
diff --git a/overlord/state/change_test.go b/overlord/state/change_test.go
index 6d0f0b66f4..7128a3fe33 100644
--- a/overlord/state/change_test.go
+++ b/overlord/state/change_test.go
@@ -177,3 +177,12 @@ func (cs *changeSuite) TestStatusDerivedFromTasks(c *C) {
t2.SetStatus(state.DoneStatus)
c.Check(chg.Status(), Equals, state.DoneStatus)
}
+
+func (cs *changeSuite) TestState(c *C) {
+ st := state.New(nil)
+ st.Lock()
+ chg := st.NewChange("install", "...")
+ st.Unlock()
+
+ c.Assert(chg.State(), Equals, st)
+}
diff --git a/overlord/state/task.go b/overlord/state/task.go
index 5cb74ca6e7..83a6e75812 100644
--- a/overlord/state/task.go
+++ b/overlord/state/task.go
@@ -33,44 +33,48 @@ type progress struct {
//
// See Change for more details.
type Task struct {
- state *State
- id string
- kind string
- summary string
- status Status
- progress progress
- data customData
+ state *State
+ id string
+ kind string
+ summary string
+ status Status
+ progress progress
+ data customData
+ waitTasks taskIDsSet
}
func newTask(state *State, id, kind, summary string) *Task {
return &Task{
- state: state,
- id: id,
- kind: kind,
- summary: summary,
- data: make(customData),
+ state: state,
+ id: id,
+ kind: kind,
+ summary: summary,
+ data: make(customData),
+ waitTasks: make(taskIDsSet),
}
}
type marshalledTask struct {
- ID string `json:"id"`
- Kind string `json:"kind"`
- Summary string `json:"summary"`
- Status Status `json:"status"`
- Progress progress `json:"progress"`
- Data map[string]*json.RawMessage `json:"data"`
+ ID string `json:"id"`
+ Kind string `json:"kind"`
+ Summary string `json:"summary"`
+ Status Status `json:"status"`
+ Progress progress `json:"progress"`
+ Data map[string]*json.RawMessage `json:"data"`
+ WaitTasks taskIDsSet `json:"wait-tasks"`
}
// MarshalJSON makes Task a json.Marshaller
func (t *Task) MarshalJSON() ([]byte, error) {
t.state.ensureLocked()
return json.Marshal(marshalledTask{
- ID: t.id,
- Kind: t.kind,
- Summary: t.summary,
- Status: t.status,
- Progress: t.progress,
- Data: t.data,
+ ID: t.id,
+ Kind: t.kind,
+ Summary: t.summary,
+ Status: t.status,
+ Progress: t.progress,
+ Data: t.data,
+ WaitTasks: t.waitTasks,
})
}
@@ -90,6 +94,7 @@ func (t *Task) UnmarshalJSON(data []byte) error {
t.status = unmarshalled.Status
t.progress = unmarshalled.Progress
t.data = unmarshalled.Data
+ t.waitTasks = unmarshalled.WaitTasks
return nil
}
@@ -124,6 +129,11 @@ func (t *Task) SetStatus(s Status) {
t.status = s
}
+// State returns the system State
+func (t *Task) State() *State {
+ return t.state
+}
+
// Progress returns the current progress for the task.
// If progress is not explicitly set, it returns (0, 1) if the status is
// RunningStatus or WaitingStatus and (1, 1) otherwise.
@@ -159,3 +169,17 @@ func (t *Task) Get(key string, value interface{}) error {
t.state.ensureLocked()
return t.data.get(key, value)
}
+
+// WaitFor registers another task as a requirement for t to make progress
+// and sets the status as WaitingStatus.
+func (t *Task) WaitFor(another *Task) {
+ t.state.ensureLocked()
+ t.status = WaitingStatus
+ t.waitTasks.add(another.ID())
+}
+
+// WaitTasks returns the list of tasks registered for t to wait for.
+func (t *Task) WaitTasks() []*Task {
+ t.state.ensureLocked()
+ return t.waitTasks.tasks(t.state)
+}
diff --git a/overlord/state/task_test.go b/overlord/state/task_test.go
index 513b8b544d..509ee0e7b7 100644
--- a/overlord/state/task_test.go
+++ b/overlord/state/task_test.go
@@ -20,9 +20,12 @@
package state_test
import (
+ "fmt"
+
. "gopkg.in/check.v1"
"github.com/ubuntu-core/snappy/overlord/state"
+ "github.com/ubuntu-core/snappy/testutil"
)
type taskSuite struct{}
@@ -182,3 +185,65 @@ func (ts *taskSuite) TestSetProgressNeedsLock(c *C) {
c.Assert(func() { t.SetProgress(2, 2) }, PanicMatches, "internal error: accessing state without lock")
}
+
+func (ts *taskSuite) TestState(c *C) {
+ st := state.New(nil)
+ st.Lock()
+ chg := st.NewChange("install", "...")
+ t := chg.NewTask("download", "1...")
+ st.Unlock()
+
+ c.Assert(t.State(), Equals, st)
+}
+
+func (ts *taskSuite) TestTaskMarshalsWaitFor(c *C) {
+ st := state.New(nil)
+ st.Lock()
+ defer st.Unlock()
+
+ chg := st.NewChange("install", "...")
+ t1 := chg.NewTask("download", "1...")
+ t2 := chg.NewTask("install", "2...")
+ t2.WaitFor(t1)
+
+ d, err := t2.MarshalJSON()
+ c.Assert(err, IsNil)
+
+ needle := fmt.Sprintf(`"wait-tasks":["%s"`, t1.ID())
+ c.Assert(string(d), testutil.Contains, needle)
+}
+
+func (ts *taskSuite) TestTaskWaitFor(c *C) {
+ st := state.New(nil)
+ st.Lock()
+ defer st.Unlock()
+
+ chg := st.NewChange("install", "...")
+ t1 := chg.NewTask("download", "1...")
+ t2 := chg.NewTask("install", "2...")
+ t2.WaitFor(t1)
+
+ c.Assert(t2.WaitTasks(), DeepEquals, []*state.Task{t1})
+ c.Assert(t2.Status(), Equals, state.WaitingStatus)
+}
+
+func (cs *taskSuite) TestWaitForNeedsLocked(c *C) {
+ st := state.New(nil)
+ st.Lock()
+ chg := st.NewChange("install", "...")
+ t1 := chg.NewTask("download", "1...")
+ t2 := chg.NewTask("install", "2...")
+ st.Unlock()
+
+ c.Assert(func() { t2.WaitFor(t1) }, PanicMatches, "internal error: accessing state without lock")
+}
+
+func (cs *taskSuite) TestWaitTasksNeedsLocked(c *C) {
+ st := state.New(nil)
+ st.Lock()
+ chg := st.NewChange("install", "...")
+ t := chg.NewTask("download", "1...")
+ st.Unlock()
+
+ c.Assert(func() { t.WaitTasks() }, PanicMatches, "internal error: accessing state without lock")
+}
diff --git a/overlord/state/taskrunner.go b/overlord/state/taskrunner.go
new file mode 100644
index 0000000000..5f0c971489
--- /dev/null
+++ b/overlord/state/taskrunner.go
@@ -0,0 +1,155 @@
+// -*- Mode: Go; indent-tabs-mode: t -*-
+
+/*
+ * Copyright (C) 2016 Canonical Ltd
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 3 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package state
+
+import (
+ "sync"
+
+ "gopkg.in/tomb.v2"
+)
+
+// HandlerFunc is the type of function for the hanlders
+type HandlerFunc func(task *Task) error
+
+// TaskRunner controls the running of goroutines to execute known task kinds.
+type TaskRunner struct {
+ state *State
+
+ // locking
+ mu sync.Mutex
+ handlers map[string]HandlerFunc
+
+ // go-routines lifecycle
+ tombs map[string]*tomb.Tomb
+}
+
+// NewTaskRunner creates a new TaskRunner
+func NewTaskRunner(s *State) *TaskRunner {
+ return &TaskRunner{
+ state: s,
+ handlers: make(map[string]HandlerFunc),
+ tombs: make(map[string]*tomb.Tomb),
+ }
+}
+
+// AddHandler registers the function to concurrently call for handling
+// tasks of the given kind.
+func (r *TaskRunner) AddHandler(kind string, fn HandlerFunc) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ r.handlers[kind] = fn
+}
+
+// Handlers returns the map of name/handler functions
+func (r *TaskRunner) Handlers() map[string]HandlerFunc {
+ return r.handlers
+}
+
+// run must be called with the state lock in place
+func (r *TaskRunner) run(fn HandlerFunc, task *Task) {
+ r.tombs[task.ID()] = &tomb.Tomb{}
+ r.tombs[task.ID()].Go(func() error {
+ err := fn(task)
+
+ r.state.Lock()
+ defer r.state.Unlock()
+ if err == nil {
+ task.SetStatus(DoneStatus)
+ } else {
+ task.SetStatus(ErrorStatus)
+ }
+ delete(r.tombs, task.ID())
+
+ return err
+ })
+}
+
+// mustWait must be called with the state lock in place
+func (r *TaskRunner) mustWait(t *Task) bool {
+ for _, wt := range t.WaitTasks() {
+ if wt.Status() != DoneStatus {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Ensure starts new goroutines for all known tasks with no pending
+// dependencies.
+// Note that Ensure will lock the state.
+func (r *TaskRunner) Ensure() {
+ r.state.Lock()
+ defer r.state.Unlock()
+
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ for _, chg := range r.state.Changes() {
+ if chg.Status() == DoneStatus {
+ continue
+ }
+
+ tasks := chg.Tasks()
+ for _, t := range tasks {
+ // done, nothing to do
+ if t.Status() == DoneStatus {
+ continue
+ }
+
+ // No handler for the given kind of task,
+ // this means another taskrunner is going
+ // to handle this task.
+ if _, ok := r.handlers[t.Kind()]; !ok {
+ continue
+ }
+
+ // we look at the Tomb instead of Status because
+ // a task can be in RunningStatus even when it
+ // is not started yet (like when the daemon
+ // process restarts)
+ if _, ok := r.tombs[t.ID()]; ok {
+ continue
+ }
+
+ // check if there is anything we need to wait for
+ if r.mustWait(t) {
+ continue
+ }
+
+ // the task is ready to run (all prerequists done)
+ // so full steam ahead!
+ r.run(r.handlers[t.Kind()], t)
+ }
+ }
+}
+
+// Stop stops all concurrent activities and returns after that's done.
+// Note that Stop will lock the state.
+func (r *TaskRunner) Stop() {
+ r.state.Lock()
+ defer r.state.Unlock()
+
+ for _, tb := range r.tombs {
+ tb.Kill(nil)
+ tb.Wait()
+ }
+}
diff --git a/overlord/state/taskrunner_test.go b/overlord/state/taskrunner_test.go
new file mode 100644
index 0000000000..5349328a66
--- /dev/null
+++ b/overlord/state/taskrunner_test.go
@@ -0,0 +1,109 @@
+// -*- Mode: Go; indent-tabs-mode: t -*-
+
+/*
+ * Copyright (C) 2016 Canonical Ltd
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 3 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package state_test
+
+import (
+ "sync"
+ "time"
+
+ . "gopkg.in/check.v1"
+
+ "github.com/ubuntu-core/snappy/overlord/state"
+)
+
+type taskRunnerSuite struct{}
+
+var _ = Suite(&taskRunnerSuite{})
+
+func (ts *taskRunnerSuite) TestAddHandler(c *C) {
+ r := state.NewTaskRunner(nil)
+ fn := func(task *state.Task) error {
+ return nil
+ }
+ r.AddHandler("download", fn)
+
+ c.Assert(r.Handlers(), HasLen, 1)
+}
+
+func (ts *taskRunnerSuite) TestEnsureTrivial(c *C) {
+ // we need state
+ st := state.New(nil)
+
+ // setup the download handler
+ taskCompleted := sync.WaitGroup{}
+ r := state.NewTaskRunner(st)
+ fn := func(task *state.Task) error {
+ taskCompleted.Done()
+ return nil
+ }
+ r.AddHandler("download", fn)
+
+ // add a download task to the state tracker
+ st.Lock()
+ chg := st.NewChange("install", "...")
+ chg.NewTask("download", "1...")
+ taskCompleted.Add(1)
+ st.Unlock()
+
+ // ensure just kicks the go routine off
+ r.Ensure()
+ taskCompleted.Wait()
+}
+
+func (ts *taskRunnerSuite) TestEnsureComplex(c *C) {
+ // we need state
+ st := state.New(nil)
+
+ // setup handlers
+ r := state.NewTaskRunner(st)
+
+ var ordering []string
+ fn := func(task *state.Task) error {
+ ordering = append(ordering, task.Kind())
+ return nil
+ }
+ r.AddHandler("download", fn)
+ r.AddHandler("unpack", fn)
+ r.AddHandler("configure", fn)
+
+ // run in a loop to ensure ordering is correct by pure chance
+ for i := 0; i < 100; i++ {
+ ordering = []string{}
+
+ st.Lock()
+ chg := st.NewChange("mock-install", "...")
+
+ // create sub-tasks
+ tDl := chg.NewTask("download", "1...")
+ tUnp := chg.NewTask("unpack", "2...")
+ tUnp.WaitFor(tDl)
+ tConf := chg.NewTask("configure", "3...")
+ tConf.WaitFor(tUnp)
+ st.Unlock()
+
+ // ensure just kicks the go routine off
+ for len(ordering) < 3 {
+ r.Ensure()
+ time.Sleep(1 * time.Millisecond)
+ }
+
+ c.Assert(ordering, DeepEquals, []string{"download", "unpack", "configure"})
+ }
+}
diff --git a/snappy/snap_remote_repo.go b/snappy/snap_remote_repo.go
index 2dab1ad9f6..3aa95651c1 100644
--- a/snappy/snap_remote_repo.go
+++ b/snappy/snap_remote_repo.go
@@ -371,11 +371,8 @@ func (s *SnapUbuntuStoreRepository) Updates() ([]Part, error) {
return parts, nil
}
-// SnapUpdates returns the available updates
+// SnapUpdates returns the available updates as RemoteSnap types
func (s *SnapUbuntuStoreRepository) SnapUpdates() (snaps []*RemoteSnap, err error) {
- // the store only supports apps, gadget and frameworks currently, so no
- // sense in sending it our ubuntu-core snap
- //
// NOTE this *will* send .sideload apps to the store.
installed, err := ActiveSnapIterByType(fullNameWithChannel, snap.TypeApp, snap.TypeFramework, snap.TypeGadget, snap.TypeOS, snap.TypeKernel)
if err != nil || len(installed) == 0 {