From 77c445a8382255a77a9e0d48016b665d41d1d10a Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 2 Feb 2017 10:03:20 -0800 Subject: [PATCH 01/11] dag: Set difference --- dag/set.go | 22 +++++++++++++++++++ dag/set_test.go | 56 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 dag/set_test.go diff --git a/dag/set.go b/dag/set.go index d4b29226b..3929c9d0e 100644 --- a/dag/set.go +++ b/dag/set.go @@ -48,6 +48,9 @@ func (s *Set) Include(v interface{}) bool { // Intersection computes the set intersection with other. func (s *Set) Intersection(other *Set) *Set { result := new(Set) + if s == nil { + return result + } if other != nil { for _, v := range s.m { if other.Include(v) { @@ -59,6 +62,25 @@ func (s *Set) Intersection(other *Set) *Set { return result } +// Difference returns a set with the elements that s has but +// other doesn't. +func (s *Set) Difference(other *Set) *Set { + result := new(Set) + if s != nil { + for k, v := range s.m { + var ok bool + if other != nil { + _, ok = other.m[k] + } + if !ok { + result.Add(v) + } + } + } + + return result +} + // Len is the number of items in the set. func (s *Set) Len() int { if s == nil { diff --git a/dag/set_test.go b/dag/set_test.go new file mode 100644 index 000000000..8aeae7073 --- /dev/null +++ b/dag/set_test.go @@ -0,0 +1,56 @@ +package dag + +import ( + "fmt" + "testing" +) + +func TestSetDifference(t *testing.T) { + cases := []struct { + Name string + A, B []interface{} + Expected []interface{} + }{ + { + "same", + []interface{}{1, 2, 3}, + []interface{}{3, 1, 2}, + []interface{}{}, + }, + + { + "A has extra elements", + []interface{}{1, 2, 3}, + []interface{}{3, 2}, + []interface{}{1}, + }, + + { + "B has extra elements", + []interface{}{1, 2, 3}, + []interface{}{3, 2, 1, 4}, + []interface{}{}, + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("%d-%s", i, tc.Name), func(t *testing.T) { + var one, two, expected Set + for _, v := range tc.A { + one.Add(v) + } + for _, v := range tc.B { + two.Add(v) + } + for _, v := range tc.Expected { + expected.Add(v) + } + + actual := one.Difference(&two) + match := actual.Intersection(&expected) + if match.Len() != expected.Len() { + t.Fatalf("bad: %#v", actual.List()) + } + }) + } +} From cbc71d95085e7490650dccfaac299ecdc6dc8f6f Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 2 Feb 2017 11:41:46 -0800 Subject: [PATCH 02/11] dag: basic working update-able walker --- dag/walk.go | 267 +++++++++++++++++++++++++++++++++++++++++++++++ dag/walk_test.go | 43 ++++++++ 2 files changed, 310 insertions(+) create mode 100644 dag/walk.go create mode 100644 dag/walk_test.go diff --git a/dag/walk.go b/dag/walk.go new file mode 100644 index 000000000..d219234f7 --- /dev/null +++ b/dag/walk.go @@ -0,0 +1,267 @@ +package dag + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/hashicorp/go-multierror" +) + +// walker performs a graph walk +type walker struct { + Callback WalkFunc + + vertices *Set + edges *Set + vertexMap map[Vertex]*walkerVertex + + wait sync.WaitGroup + changeLock sync.Mutex + + errMap map[Vertex]error + errLock sync.Mutex +} + +type walkerVertex struct { + sync.Mutex + + DoneCh chan struct{} + CancelCh chan struct{} + DepsCh chan struct{} + DepsUpdateCh chan chan struct{} + + deps map[Vertex]chan struct{} + depsCancelCh chan struct{} +} + +// Wait waits for the completion of the walk and returns any errors ( +// in the form of a multierror) that occurred. Update should be called +// to populate the walk with vertices and edges. +func (w *walker) Wait() error { + // Wait for completion + w.wait.Wait() + + // Grab the error lock + w.errLock.Lock() + defer w.errLock.Unlock() + + // Build the error + var result error + for v, err := range w.errMap { + result = multierror.Append(result, fmt.Errorf( + "%s: %s", VertexName(v), err)) + } + + return result +} + +// Update updates the currently executing walk with the given vertices +// and edges. It does not block until completion. +// +// Update can be called in parallel to Walk. +func (w *walker) Update(v, e *Set) { + // Grab the change lock so no more updates happen but also so that + // no new vertices are executed during this time since we may be + // removing them. + w.changeLock.Lock() + defer w.changeLock.Unlock() + + // Initialize fields + if w.vertexMap == nil { + w.vertexMap = make(map[Vertex]*walkerVertex) + } + + // Calculate all our sets + newEdges := e.Difference(w.edges) + newVerts := v.Difference(w.vertices) + oldVerts := w.vertices.Difference(v) + + // Add the new vertices + for _, raw := range newVerts.List() { + v := raw.(Vertex) + + // Add to the waitgroup so our walk is not done until everything finishes + w.wait.Add(1) + + // Initialize the vertex info + info := &walkerVertex{ + DoneCh: make(chan struct{}), + CancelCh: make(chan struct{}), + DepsCh: make(chan struct{}), + DepsUpdateCh: make(chan chan struct{}, 5), + deps: make(map[Vertex]chan struct{}), + } + + // Close the deps channel immediately so it passes + close(info.DepsCh) + + // Add it to the map and kick off the walk + w.vertexMap[v] = info + } + + // Remove the old vertices + for _, raw := range oldVerts.List() { + v := raw.(Vertex) + + // Get the vertex info so we can cancel it + info, ok := w.vertexMap[v] + if !ok { + // This vertex for some reason was never in our map. This + // shouldn't be possible. + continue + } + + // Cancel the vertex + close(info.CancelCh) + + // Delete it out of the map + delete(w.vertexMap, v) + } + + // Add the new edges + var changedDeps Set + for _, raw := range newEdges.List() { + edge := raw.(Edge) + + // waiter is the vertex that is "waiting" on this edge + waiter := edge.Target() + + // dep is the dependency we're waiting on + dep := edge.Source() + + // Get the info for the waiter + waiterInfo, ok := w.vertexMap[waiter] + if !ok { + // Vertex doesn't exist... shouldn't be possible but ignore. + continue + } + + // Get the info for the dep + depInfo, ok := w.vertexMap[dep] + if !ok { + // Vertex doesn't exist... shouldn't be possible but ignore. + continue + } + + // Add the dependency to our waiter + waiterInfo.deps[dep] = depInfo.DoneCh + + // Record that the deps changed for this waiter + changedDeps.Add(waiter) + } + + for _, raw := range changedDeps.List() { + v := raw.(Vertex) + info, ok := w.vertexMap[v] + if !ok { + // Vertex doesn't exist... shouldn't be possible but ignore. + continue + } + + // Create a new done channel + doneCh := make(chan struct{}) + + // Create the channel we close for cancellation + cancelCh := make(chan struct{}) + info.depsCancelCh = cancelCh + + // Build a new deps copy + deps := make(map[Vertex]<-chan struct{}) + for k, v := range info.deps { + deps[k] = v + } + + // Update the update channel + info.DepsUpdateCh <- doneCh + + // Start the waiter + go w.waitDeps(v, deps, doneCh, cancelCh) + } + + // Kickstart all the vertices + for _, raw := range newVerts.List() { + v := raw.(Vertex) + go w.walkVertex(v, w.vertexMap[v]) + } +} + +// walkVertex walks a single vertex, waiting for any dependencies before +// executing the callback. +func (w *walker) walkVertex(v Vertex, info *walkerVertex) { + // When we're done executing, lower the waitgroup count + defer w.wait.Done() + + // When we're done, always close our done channel + defer close(info.DoneCh) + + // Wait for our dependencies + depsCh := info.DepsCh + for { + select { + case <-info.CancelCh: + // Cancel + return + + case <-depsCh: + // Deps complete! + depsCh = nil + + case depsCh = <-info.DepsUpdateCh: + // New deps, reloop + } + + if depsCh == nil { + // One final check if we have an update + select { + case depsCh = <-info.DepsUpdateCh: + default: + } + + if depsCh == nil { + break + } + } + } + + // Call our callback + if err := w.Callback(v); err != nil { + w.errLock.Lock() + defer w.errLock.Unlock() + + if w.errMap == nil { + w.errMap = make(map[Vertex]error) + } + w.errMap[v] = err + } +} + +func (w *walker) waitDeps( + v Vertex, + deps map[Vertex]<-chan struct{}, + doneCh chan<- struct{}, + cancelCh <-chan struct{}) { + // Whenever we return, mark ourselves as complete + defer close(doneCh) + + // For each dependency given to us, wait for it to complete + for dep, depCh := range deps { + DepSatisfied: + for { + select { + case <-depCh: + // Dependency satisfied! + break DepSatisfied + + case <-cancelCh: + // Wait cancelled + return + + case <-time.After(time.Second * 5): + log.Printf("[DEBUG] vertex %q, waiting for: %q", + VertexName(v), VertexName(dep)) + } + } + } +} diff --git a/dag/walk_test.go b/dag/walk_test.go new file mode 100644 index 000000000..d3bcb432a --- /dev/null +++ b/dag/walk_test.go @@ -0,0 +1,43 @@ +package dag + +import ( + "reflect" + "sync" + "testing" +) + +func TestWalker_basic(t *testing.T) { + var g Graph + g.Add(1) + g.Add(2) + g.Connect(BasicEdge(1, 2)) + + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var order []interface{} + w := &walker{Callback: walkCbRecord(&order)} + w.Update(g.vertices, g.edges) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + + // Check + expected := []interface{}{1, 2} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } + } +} + +// walkCbRecord is a test helper callback that just records the order called. +func walkCbRecord(order *[]interface{}) WalkFunc { + var l sync.Mutex + return func(v Vertex) error { + l.Lock() + defer l.Unlock() + *order = append(*order, v) + return nil + } +} From 5d49e7e6b64b8c14357fa3304e60bdc029b61524 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 2 Feb 2017 12:09:26 -0800 Subject: [PATCH 03/11] dag: tests for adding edges/vertices during walk-time --- dag/walk.go | 79 ++++++++++++++++++++---------- dag/walk_test.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 26 deletions(-) diff --git a/dag/walk.go b/dag/walk.go index d219234f7..6339e1736 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -13,8 +13,8 @@ import ( type walker struct { Callback WalkFunc - vertices *Set - edges *Set + vertices Set + edges Set vertexMap map[Vertex]*walkerVertex wait sync.WaitGroup @@ -25,13 +25,18 @@ type walker struct { } type walkerVertex struct { - sync.Mutex + // These should only be set once on initialization and never written again + DoneCh chan struct{} + CancelCh chan struct{} - DoneCh chan struct{} - CancelCh chan struct{} + // Dependency information. Any changes to any of these fields requires + // holding DepsLock. DepsCh chan struct{} - DepsUpdateCh chan chan struct{} + DepsUpdateCh chan struct{} + DepsLock sync.Mutex + // Below is not safe to read/write in parallel. This behavior is + // enforced by changes only happening in Update. deps map[Vertex]chan struct{} depsCancelCh chan struct{} } @@ -74,8 +79,8 @@ func (w *walker) Update(v, e *Set) { } // Calculate all our sets - newEdges := e.Difference(w.edges) - newVerts := v.Difference(w.vertices) + newEdges := e.Difference(&w.edges) + newVerts := v.Difference(&w.vertices) oldVerts := w.vertices.Difference(v) // Add the new vertices @@ -85,13 +90,16 @@ func (w *walker) Update(v, e *Set) { // Add to the waitgroup so our walk is not done until everything finishes w.wait.Add(1) + // Add to our own set so we know about it already + log.Printf("[DEBUG] dag/walk: added new vertex: %q", VertexName(v)) + w.vertices.Add(raw) + // Initialize the vertex info info := &walkerVertex{ - DoneCh: make(chan struct{}), - CancelCh: make(chan struct{}), - DepsCh: make(chan struct{}), - DepsUpdateCh: make(chan chan struct{}, 5), - deps: make(map[Vertex]chan struct{}), + DoneCh: make(chan struct{}), + CancelCh: make(chan struct{}), + DepsCh: make(chan struct{}), + deps: make(map[Vertex]chan struct{}), } // Close the deps channel immediately so it passes @@ -118,6 +126,9 @@ func (w *walker) Update(v, e *Set) { // Delete it out of the map delete(w.vertexMap, v) + + log.Printf("[DEBUG] dag/walk: removed vertex: %q", VertexName(v)) + w.vertices.Delete(raw) } // Add the new edges @@ -152,6 +163,8 @@ func (w *walker) Update(v, e *Set) { changedDeps.Add(waiter) } + // For each vertex with changed dependencies, we need to kick off + // a new waiter and notify the vertex of the changes. for _, raw := range changedDeps.List() { v := raw.(Vertex) info, ok := w.vertexMap[v] @@ -165,7 +178,6 @@ func (w *walker) Update(v, e *Set) { // Create the channel we close for cancellation cancelCh := make(chan struct{}) - info.depsCancelCh = cancelCh // Build a new deps copy deps := make(map[Vertex]<-chan struct{}) @@ -174,13 +186,26 @@ func (w *walker) Update(v, e *Set) { } // Update the update channel - info.DepsUpdateCh <- doneCh + info.DepsLock.Lock() + if info.DepsUpdateCh != nil { + close(info.DepsUpdateCh) + } + info.DepsCh = doneCh + info.DepsUpdateCh = make(chan struct{}) + info.DepsLock.Unlock() + + // Cancel the older waiter + if info.depsCancelCh != nil { + close(info.depsCancelCh) + } + info.depsCancelCh = cancelCh // Start the waiter go w.waitDeps(v, deps, doneCh, cancelCh) } - // Kickstart all the vertices + // Start all the new vertices. We do this at the end so that all + // the edge waiters and changes are setup above. for _, raw := range newVerts.List() { v := raw.(Vertex) go w.walkVertex(v, w.vertexMap[v]) @@ -208,24 +233,26 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { // Deps complete! depsCh = nil - case depsCh = <-info.DepsUpdateCh: + case <-info.DepsUpdateCh: // New deps, reloop } - if depsCh == nil { - // One final check if we have an update - select { - case depsCh = <-info.DepsUpdateCh: - default: - } + // Check if we have updated dependencies + info.DepsLock.Lock() + if info.DepsCh != nil { + depsCh = info.DepsCh + info.DepsCh = nil + } + info.DepsLock.Unlock() - if depsCh == nil { - break - } + // If we still have no deps channel set, then we're done! + if depsCh == nil { + break } } // Call our callback + log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v)) if err := w.Callback(v); err != nil { w.errLock.Lock() defer w.errLock.Unlock() diff --git a/dag/walk_test.go b/dag/walk_test.go index d3bcb432a..5051b9a9b 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -4,6 +4,7 @@ import ( "reflect" "sync" "testing" + "time" ) func TestWalker_basic(t *testing.T) { @@ -31,6 +32,129 @@ func TestWalker_basic(t *testing.T) { } } +func TestWalker_newVertex(t *testing.T) { + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var g Graph + g.Add(1) + g.Add(2) + g.Connect(BasicEdge(1, 2)) + + var order []interface{} + w := &walker{Callback: walkCbRecord(&order)} + w.Update(g.vertices, g.edges) + + // Wait a bit + time.Sleep(10 * time.Millisecond) + + // Update the graph + g.Add(3) + w.Update(g.vertices, g.edges) + + // Update the graph again but with the same vertex + g.Add(3) + w.Update(g.vertices, g.edges) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + + // Check + expected := []interface{}{1, 2, 3} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } + } +} + +func TestWalker_removeVertex(t *testing.T) { + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var g Graph + g.Add(1) + g.Add(2) + g.Connect(BasicEdge(1, 2)) + + // Record function + var order []interface{} + recordF := walkCbRecord(&order) + + // Build a callback that delays until we close a channel + gateCh := make(chan struct{}) + cb := func(v Vertex) error { + if v == 1 { + <-gateCh + } + + return recordF(v) + } + + // Add the initial vertices + w := &walker{Callback: cb} + w.Update(g.vertices, g.edges) + + // Remove a vertex + g.Remove(2) + w.Update(g.vertices, g.edges) + + // Open gate + close(gateCh) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + + // Check + expected := []interface{}{1} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } + } +} + +func TestWalker_newEdge(t *testing.T) { + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var g Graph + g.Add(1) + g.Add(2) + g.Connect(BasicEdge(1, 2)) + + // Record function + var order []interface{} + recordF := walkCbRecord(&order) + + // Build a callback that delays until we close a channel + var w *walker + cb := func(v Vertex) error { + if v == 1 { + g.Add(3) + g.Connect(BasicEdge(3, 2)) + w.Update(g.vertices, g.edges) + } + + return recordF(v) + } + + // Add the initial vertices + w = &walker{Callback: cb} + w.Update(g.vertices, g.edges) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + + // Check + expected := []interface{}{1, 3, 2} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } + } +} + // walkCbRecord is a test helper callback that just records the order called. func walkCbRecord(order *[]interface{}) WalkFunc { var l sync.Mutex From 7f61f1172337d04870793a496707d6b930d63bef Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 2 Feb 2017 16:38:47 -0600 Subject: [PATCH 04/11] dag: support removing edges --- dag/walk.go | 35 +++++++++++++++++++++++++++ dag/walk_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/dag/walk.go b/dag/walk.go index 6339e1736..12c421681 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -80,6 +80,7 @@ func (w *walker) Update(v, e *Set) { // Calculate all our sets newEdges := e.Difference(&w.edges) + oldEdges := w.edges.Difference(e) newVerts := v.Difference(&w.vertices) oldVerts := w.vertices.Difference(v) @@ -161,6 +162,40 @@ func (w *walker) Update(v, e *Set) { // Record that the deps changed for this waiter changedDeps.Add(waiter) + + log.Printf( + "[DEBUG] dag/walk: added edge: %q waiting on %q", + VertexName(waiter), VertexName(dep)) + w.edges.Add(raw) + } + + // Process reoved edges + for _, raw := range oldEdges.List() { + edge := raw.(Edge) + + // waiter is the vertex that is "waiting" on this edge + waiter := edge.Target() + + // dep is the dependency we're waiting on + dep := edge.Source() + + // Get the info for the waiter + waiterInfo, ok := w.vertexMap[waiter] + if !ok { + // Vertex doesn't exist... shouldn't be possible but ignore. + continue + } + + // Delete the dependency from the waiter + delete(waiterInfo.deps, dep) + + // Record that the deps changed for this waiter + changedDeps.Add(waiter) + + log.Printf( + "[DEBUG] dag/walk: removed edge: %q waiting on %q", + VertexName(waiter), VertexName(dep)) + w.edges.Delete(raw) } // For each vertex with changed dependencies, we need to kick off diff --git a/dag/walk_test.go b/dag/walk_test.go index 5051b9a9b..c720e8a54 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -1,6 +1,7 @@ package dag import ( + "fmt" "reflect" "sync" "testing" @@ -155,6 +156,67 @@ func TestWalker_newEdge(t *testing.T) { } } +func TestWalker_removeEdge(t *testing.T) { + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var g Graph + g.Add(1) + g.Add(2) + g.Add(3) + g.Connect(BasicEdge(1, 2)) + g.Connect(BasicEdge(3, 2)) + + // Record function + var order []interface{} + recordF := walkCbRecord(&order) + + // The way this works is that our original graph forces + // the order of 1 => 3 => 2. During the execution of 1, we + // remove the edge forcing 3 before 2. Then, during the execution + // of 3, we wait on a channel that is only closed by 2, implicitly + // forcing 2 before 3 via the callback (and not the graph). If + // 2 cannot execute before 3 (edge removal is non-functional), then + // this test will timeout. + var w *walker + gateCh := make(chan struct{}) + cb := func(v Vertex) error { + if v == 1 { + g.RemoveEdge(BasicEdge(3, 2)) + w.Update(g.vertices, g.edges) + } + + if v == 2 { + close(gateCh) + } + + if v == 3 { + select { + case <-gateCh: + case <-time.After(50 * time.Millisecond): + return fmt.Errorf("timeout 3 waiting for 2") + } + } + + return recordF(v) + } + + // Add the initial vertices + w = &walker{Callback: cb} + w.Update(g.vertices, g.edges) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + + // Check + expected := []interface{}{1, 2, 3} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } + } +} + // walkCbRecord is a test helper callback that just records the order called. func walkCbRecord(order *[]interface{}) WalkFunc { var l sync.Mutex From b1aa6fd598cd88f05e016befb58fe2dea5d30dcc Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 2 Feb 2017 16:43:49 -0600 Subject: [PATCH 05/11] dag: improved comments --- dag/walk.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/dag/walk.go b/dag/walk.go index 12c421681..eb49b0c41 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -9,17 +9,30 @@ import ( "github.com/hashicorp/go-multierror" ) -// walker performs a graph walk +// walker performs a graph walk and supports walk-time changing of vertices +// and edges. +// +// A single walker is only valid for one graph walk. After the walk is complete +// you must construct a new walker to walk again. State for the walk is never +// deleted in case vertices or edges are changed. type walker struct { + // Callback is what is called for each vertex Callback WalkFunc - vertices Set - edges Set - vertexMap map[Vertex]*walkerVertex - - wait sync.WaitGroup + // changeLock must be held to modify any of the fields below. Only Update + // should modify these fields. Modifying them outside of Update can cause + // serious problems. changeLock sync.Mutex + vertices Set + edges Set + vertexMap map[Vertex]*walkerVertex + // wait is done when all vertices have executed. It may become "undone" + // if new vertices are added. + wait sync.WaitGroup + + // errMap contains the errors recorded so far for execution. Reading + // and writing should hold errLock. errMap map[Vertex]error errLock sync.Mutex } @@ -43,7 +56,11 @@ type walkerVertex struct { // Wait waits for the completion of the walk and returns any errors ( // in the form of a multierror) that occurred. Update should be called -// to populate the walk with vertices and edges. +// to populate the walk with vertices and edges prior to calling this. +// +// Wait will return as soon as all currently known vertices are complete. +// If you plan on calling Update with more vertices in the future, you +// should not call Wait until after this is done. func (w *walker) Wait() error { // Wait for completion w.wait.Wait() @@ -272,7 +289,10 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { // New deps, reloop } - // Check if we have updated dependencies + // Check if we have updated dependencies. This can happen if the + // dependencies were satisfied exactly prior to an Update occuring. + // In that case, we'd like to take into account new dependencies + // if possible. info.DepsLock.Lock() if info.DepsCh != nil { depsCh = info.DepsCh From 28fff99ea88e5ae5faa252a067fcc9bb554a78c3 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 3 Feb 2017 11:04:39 +0100 Subject: [PATCH 06/11] dag: replace dag.Walk with our walker --- dag/dag.go | 94 ++----------------------------------- dag/walk.go | 118 ++++++++++++++++++++++++++++++++++------------- dag/walk_test.go | 52 +++++++++++++++++---- 3 files changed, 131 insertions(+), 133 deletions(-) diff --git a/dag/dag.go b/dag/dag.go index ed7d77e99..4af78448b 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -2,11 +2,8 @@ package dag import ( "fmt" - "log" "sort" "strings" - "sync" - "time" "github.com/hashicorp/go-multierror" ) @@ -169,94 +166,9 @@ func (g *AcyclicGraph) Cycles() [][]Vertex { func (g *AcyclicGraph) Walk(cb WalkFunc) error { defer g.debug.BeginOperation(typeWalk, "").End("") - // Cache the vertices since we use it multiple times - vertices := g.Vertices() - - // Build the waitgroup that signals when we're done - var wg sync.WaitGroup - wg.Add(len(vertices)) - doneCh := make(chan struct{}) - go func() { - defer close(doneCh) - wg.Wait() - }() - - // The map of channels to watch to wait for vertices to finish - vertMap := make(map[Vertex]chan struct{}) - for _, v := range vertices { - vertMap[v] = make(chan struct{}) - } - - // The map of whether a vertex errored or not during the walk - var errLock sync.Mutex - var errs error - errMap := make(map[Vertex]bool) - for _, v := range vertices { - // Build our list of dependencies and the list of channels to - // wait on until we start executing for this vertex. - deps := AsVertexList(g.DownEdges(v)) - depChs := make([]<-chan struct{}, len(deps)) - for i, dep := range deps { - depChs[i] = vertMap[dep] - } - - // Get our channel so that we can close it when we're done - ourCh := vertMap[v] - - // Start the goroutine to wait for our dependencies - readyCh := make(chan bool) - go func(v Vertex, deps []Vertex, chs []<-chan struct{}, readyCh chan<- bool) { - // First wait for all the dependencies - for i, ch := range chs { - DepSatisfied: - for { - select { - case <-ch: - break DepSatisfied - case <-time.After(time.Second * 5): - log.Printf("[DEBUG] vertex %q, waiting for: %q", - VertexName(v), VertexName(deps[i])) - } - } - log.Printf("[DEBUG] vertex %q, got dep: %q", - VertexName(v), VertexName(deps[i])) - } - - // Then, check the map to see if any of our dependencies failed - errLock.Lock() - defer errLock.Unlock() - for _, dep := range deps { - if errMap[dep] { - errMap[v] = true - readyCh <- false - return - } - } - - readyCh <- true - }(v, deps, depChs, readyCh) - - // Start the goroutine that executes - go func(v Vertex, doneCh chan<- struct{}, readyCh <-chan bool) { - defer close(doneCh) - defer wg.Done() - - var err error - if ready := <-readyCh; ready { - err = cb(v) - } - - errLock.Lock() - defer errLock.Unlock() - if err != nil { - errMap[v] = true - errs = multierror.Append(errs, err) - } - }(v, ourCh, readyCh) - } - - <-doneCh - return errs + w := &walker{Callback: cb, Reverse: true} + w.Update(g.vertices, g.edges) + return w.Wait() } // simple convenience helper for converting a dag.Set to a []Vertex diff --git a/dag/walk.go b/dag/walk.go index eb49b0c41..30dd207a6 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -1,6 +1,7 @@ package dag import ( + "errors" "fmt" "log" "sync" @@ -12,6 +13,9 @@ import ( // walker performs a graph walk and supports walk-time changing of vertices // and edges. // +// The walk is depth first by default. This can be changed with the Reverse +// option. +// // A single walker is only valid for one graph walk. After the walk is complete // you must construct a new walker to walk again. State for the walk is never // deleted in case vertices or edges are changed. @@ -19,6 +23,10 @@ type walker struct { // Callback is what is called for each vertex Callback WalkFunc + // Reverse, if true, causes the source of an edge to depend on a target. + // When false (default), the target depends on the source. + Reverse bool + // changeLock must be held to modify any of the fields below. Only Update // should modify these fields. Modifying them outside of Update can cause // serious problems. @@ -44,7 +52,7 @@ type walkerVertex struct { // Dependency information. Any changes to any of these fields requires // holding DepsLock. - DepsCh chan struct{} + DepsCh chan bool DepsUpdateCh chan struct{} DepsLock sync.Mutex @@ -54,6 +62,11 @@ type walkerVertex struct { depsCancelCh chan struct{} } +// errWalkUpstream is used in the errMap of a walk to note that an upstream +// dependency failed so this vertex wasn't run. This is not shown in the final +// user-returned error. +var errWalkUpstream = errors.New("upstream dependency failed") + // Wait waits for the completion of the walk and returns any errors ( // in the form of a multierror) that occurred. Update should be called // to populate the walk with vertices and edges prior to calling this. @@ -72,8 +85,10 @@ func (w *walker) Wait() error { // Build the error var result error for v, err := range w.errMap { - result = multierror.Append(result, fmt.Errorf( - "%s: %s", VertexName(v), err)) + if err != nil && err != errWalkUpstream { + result = multierror.Append(result, fmt.Errorf( + "%s: %s", VertexName(v), err)) + } } return result @@ -116,12 +131,12 @@ func (w *walker) Update(v, e *Set) { info := &walkerVertex{ DoneCh: make(chan struct{}), CancelCh: make(chan struct{}), - DepsCh: make(chan struct{}), + DepsCh: make(chan bool, 1), deps: make(map[Vertex]chan struct{}), } - // Close the deps channel immediately so it passes - close(info.DepsCh) + // Pass dependencies immediately assuming we have no edges + info.DepsCh <- true // Add it to the map and kick off the walk w.vertexMap[v] = info @@ -153,12 +168,7 @@ func (w *walker) Update(v, e *Set) { var changedDeps Set for _, raw := range newEdges.List() { edge := raw.(Edge) - - // waiter is the vertex that is "waiting" on this edge - waiter := edge.Target() - - // dep is the dependency we're waiting on - dep := edge.Source() + waiter, dep := w.edgeParts(edge) // Get the info for the waiter waiterInfo, ok := w.vertexMap[waiter] @@ -189,12 +199,7 @@ func (w *walker) Update(v, e *Set) { // Process reoved edges for _, raw := range oldEdges.List() { edge := raw.(Edge) - - // waiter is the vertex that is "waiting" on this edge - waiter := edge.Target() - - // dep is the dependency we're waiting on - dep := edge.Source() + waiter, dep := w.edgeParts(edge) // Get the info for the waiter waiterInfo, ok := w.vertexMap[waiter] @@ -226,7 +231,7 @@ func (w *walker) Update(v, e *Set) { } // Create a new done channel - doneCh := make(chan struct{}) + doneCh := make(chan bool, 1) // Create the channel we close for cancellation cancelCh := make(chan struct{}) @@ -252,6 +257,10 @@ func (w *walker) Update(v, e *Set) { } info.depsCancelCh = cancelCh + log.Printf( + "[DEBUG] dag/walk: dependencies changed for %q, sending new deps", + VertexName(v)) + // Start the waiter go w.waitDeps(v, deps, doneCh, cancelCh) } @@ -264,6 +273,16 @@ func (w *walker) Update(v, e *Set) { } } +// edgeParts returns the waiter and the dependency, in that order. +// The waiter is waiting on the dependency. +func (w *walker) edgeParts(e Edge) (Vertex, Vertex) { + if w.Reverse { + return e.Source(), e.Target() + } + + return e.Target(), e.Source() +} + // walkVertex walks a single vertex, waiting for any dependencies before // executing the callback. func (w *walker) walkVertex(v Vertex, info *walkerVertex) { @@ -273,16 +292,20 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { // When we're done, always close our done channel defer close(info.DoneCh) - // Wait for our dependencies - depsCh := info.DepsCh + // Wait for our dependencies. We create a [closed] deps channel so + // that we can immediately fall through to load our actual DepsCh. + var depsSuccess bool + depsCh := make(chan bool, 1) + depsCh <- true + close(depsCh) for { select { case <-info.CancelCh: // Cancel return - case <-depsCh: - // Deps complete! + case depsSuccess = <-depsCh: + // Deps complete! Mark as nil to trigger completion handling. depsCh = nil case <-info.DepsUpdateCh: @@ -306,9 +329,27 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { } } - // Call our callback - log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v)) - if err := w.Callback(v); err != nil { + // If we passed dependencies, we just want to check once more that + // we're not cancelled, since this can happen just as dependencies pass. + select { + case <-info.CancelCh: + // Cancelled during an update while dependencies completed. + return + default: + } + + // Run our callback or note that our upstream failed + var err error + if depsSuccess { + log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v)) + err = w.Callback(v) + } else { + log.Printf("[DEBUG] dag/walk: upstream errored, not walking %q", VertexName(v)) + err = errWalkUpstream + } + + // Record the error + if err != nil { w.errLock.Lock() defer w.errLock.Unlock() @@ -322,11 +363,8 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { func (w *walker) waitDeps( v Vertex, deps map[Vertex]<-chan struct{}, - doneCh chan<- struct{}, + doneCh chan<- bool, cancelCh <-chan struct{}) { - // Whenever we return, mark ourselves as complete - defer close(doneCh) - // For each dependency given to us, wait for it to complete for dep, depCh := range deps { DepSatisfied: @@ -337,13 +375,29 @@ func (w *walker) waitDeps( break DepSatisfied case <-cancelCh: - // Wait cancelled + // Wait cancelled. Note that we didn't satisfy dependencies + // so that anything waiting on us also doesn't run. + doneCh <- false return case <-time.After(time.Second * 5): - log.Printf("[DEBUG] vertex %q, waiting for: %q", + log.Printf("[DEBUG] dag/walk: vertex %q, waiting for: %q", VertexName(v), VertexName(dep)) } } } + + // Dependencies satisfied! We need to check if any errored + w.errLock.Lock() + defer w.errLock.Unlock() + for dep, _ := range deps { + if w.errMap[dep] != nil { + // One of our dependencies failed, so return false + doneCh <- false + return + } + } + + // All dependencies satisfied and successful + doneCh <- true } diff --git a/dag/walk_test.go b/dag/walk_test.go index c720e8a54..9927eb48e 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -33,6 +33,44 @@ func TestWalker_basic(t *testing.T) { } } +func TestWalker_error(t *testing.T) { + var g Graph + g.Add(1) + g.Add(2) + g.Add(3) + g.Add(4) + g.Connect(BasicEdge(1, 2)) + g.Connect(BasicEdge(2, 3)) + g.Connect(BasicEdge(3, 4)) + + // Record function + var order []interface{} + recordF := walkCbRecord(&order) + + // Build a callback that delays until we close a channel + cb := func(v Vertex) error { + if v == 2 { + return fmt.Errorf("error!") + } + + return recordF(v) + } + + w := &walker{Callback: cb} + w.Update(g.vertices, g.edges) + + // Wait + if err := w.Wait(); err == nil { + t.Fatal("expect error") + } + + // Check + expected := []interface{}{1} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } +} + func TestWalker_newVertex(t *testing.T) { // Run it a bunch of times since it is timing dependent for i := 0; i < 50; i++ { @@ -82,26 +120,20 @@ func TestWalker_removeVertex(t *testing.T) { recordF := walkCbRecord(&order) // Build a callback that delays until we close a channel - gateCh := make(chan struct{}) + var w *walker cb := func(v Vertex) error { if v == 1 { - <-gateCh + g.Remove(2) + w.Update(g.vertices, g.edges) } return recordF(v) } // Add the initial vertices - w := &walker{Callback: cb} + w = &walker{Callback: cb} w.Update(g.vertices, g.edges) - // Remove a vertex - g.Remove(2) - w.Update(g.vertices, g.edges) - - // Open gate - close(gateCh) - // Wait if err := w.Wait(); err != nil { t.Fatalf("err: %s", err) From 72a717f2de804d4fdc70b00e1eb9f3de7a0b7beb Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 3 Feb 2017 11:11:37 +0100 Subject: [PATCH 07/11] dag: change the type sig of Update to Graph so its external friendly --- dag/dag.go | 2 +- dag/walk.go | 4 +++- dag/walk_test.go | 22 +++++++++++----------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/dag/dag.go b/dag/dag.go index 4af78448b..4177cd07e 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -167,7 +167,7 @@ func (g *AcyclicGraph) Walk(cb WalkFunc) error { defer g.debug.BeginOperation(typeWalk, "").End("") w := &walker{Callback: cb, Reverse: true} - w.Update(g.vertices, g.edges) + w.Update(&g.Graph) return w.Wait() } diff --git a/dag/walk.go b/dag/walk.go index 30dd207a6..73045e0fe 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -98,7 +98,9 @@ func (w *walker) Wait() error { // and edges. It does not block until completion. // // Update can be called in parallel to Walk. -func (w *walker) Update(v, e *Set) { +func (w *walker) Update(g *Graph) { + v, e := g.vertices, g.edges + // Grab the change lock so no more updates happen but also so that // no new vertices are executed during this time since we may be // removing them. diff --git a/dag/walk_test.go b/dag/walk_test.go index 9927eb48e..8f99d9046 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -18,7 +18,7 @@ func TestWalker_basic(t *testing.T) { for i := 0; i < 50; i++ { var order []interface{} w := &walker{Callback: walkCbRecord(&order)} - w.Update(g.vertices, g.edges) + w.Update(&g) // Wait if err := w.Wait(); err != nil { @@ -57,7 +57,7 @@ func TestWalker_error(t *testing.T) { } w := &walker{Callback: cb} - w.Update(g.vertices, g.edges) + w.Update(&g) // Wait if err := w.Wait(); err == nil { @@ -81,18 +81,18 @@ func TestWalker_newVertex(t *testing.T) { var order []interface{} w := &walker{Callback: walkCbRecord(&order)} - w.Update(g.vertices, g.edges) + w.Update(&g) // Wait a bit time.Sleep(10 * time.Millisecond) // Update the graph g.Add(3) - w.Update(g.vertices, g.edges) + w.Update(&g) // Update the graph again but with the same vertex g.Add(3) - w.Update(g.vertices, g.edges) + w.Update(&g) // Wait if err := w.Wait(); err != nil { @@ -124,7 +124,7 @@ func TestWalker_removeVertex(t *testing.T) { cb := func(v Vertex) error { if v == 1 { g.Remove(2) - w.Update(g.vertices, g.edges) + w.Update(&g) } return recordF(v) @@ -132,7 +132,7 @@ func TestWalker_removeVertex(t *testing.T) { // Add the initial vertices w = &walker{Callback: cb} - w.Update(g.vertices, g.edges) + w.Update(&g) // Wait if err := w.Wait(); err != nil { @@ -165,7 +165,7 @@ func TestWalker_newEdge(t *testing.T) { if v == 1 { g.Add(3) g.Connect(BasicEdge(3, 2)) - w.Update(g.vertices, g.edges) + w.Update(&g) } return recordF(v) @@ -173,7 +173,7 @@ func TestWalker_newEdge(t *testing.T) { // Add the initial vertices w = &walker{Callback: cb} - w.Update(g.vertices, g.edges) + w.Update(&g) // Wait if err := w.Wait(); err != nil { @@ -214,7 +214,7 @@ func TestWalker_removeEdge(t *testing.T) { cb := func(v Vertex) error { if v == 1 { g.RemoveEdge(BasicEdge(3, 2)) - w.Update(g.vertices, g.edges) + w.Update(&g) } if v == 2 { @@ -234,7 +234,7 @@ func TestWalker_removeEdge(t *testing.T) { // Add the initial vertices w = &walker{Callback: cb} - w.Update(g.vertices, g.edges) + w.Update(&g) // Wait if err := w.Wait(); err != nil { From 65752cd51aaef3db8a133184f5f15637bce773e0 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 3 Feb 2017 11:22:26 +0100 Subject: [PATCH 08/11] dag: improve docs and read access during a lock --- dag/dag.go | 2 +- dag/walk.go | 69 ++++++++++++++++++++++++++++++++++++++---------- dag/walk_test.go | 18 ++++++------- 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/dag/dag.go b/dag/dag.go index 4177cd07e..0026fd9fd 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -166,7 +166,7 @@ func (g *AcyclicGraph) Cycles() [][]Vertex { func (g *AcyclicGraph) Walk(cb WalkFunc) error { defer g.debug.BeginOperation(typeWalk, "").End("") - w := &walker{Callback: cb, Reverse: true} + w := &Walker{Callback: cb, Reverse: true} w.Update(&g.Graph) return w.Wait() } diff --git a/dag/walk.go b/dag/walk.go index 73045e0fe..826b7816e 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -10,8 +10,23 @@ import ( "github.com/hashicorp/go-multierror" ) -// walker performs a graph walk and supports walk-time changing of vertices -// and edges. +// Walker is used to walk every vertex of a graph in parallel. +// +// A vertex will only be walked when the dependencies of that vertex have +// been walked. If two vertices can be walked at the same time, they will be. +// +// Update can be called to update the graph. This can be called even during +// a walk, cahnging vertices/edges mid-walk. This should be done carefully. +// If a vertex is removed but has already been executed, the result of that +// execution (any error) is still returned by Wait. Changing or re-adding +// a vertex that has already executed has no effect. Changing edges of +// a vertex that has already executed has no effect. +// +// Non-parallelism can be enforced by introducing a lock in your callback +// function. However, the goroutine overhead of a walk will remain. +// Walker will create V*2 goroutines (one for each vertex, and dependency +// waiter for each vertex). In general this should be of no concern unless +// there are a huge number of vertices. // // The walk is depth first by default. This can be changed with the Reverse // option. @@ -19,7 +34,7 @@ import ( // A single walker is only valid for one graph walk. After the walk is complete // you must construct a new walker to walk again. State for the walk is never // deleted in case vertices or edges are changed. -type walker struct { +type Walker struct { // Callback is what is called for each vertex Callback WalkFunc @@ -46,18 +61,34 @@ type walker struct { } type walkerVertex struct { - // These should only be set once on initialization and never written again + // These should only be set once on initialization and never written again. + // They are not protected by a lock since they don't need to be since + // they are write-once. + + // DoneCh is closed when this vertex has completed execution, regardless + // of success. + // + // CancelCh is closed when the vertex should cancel execution. If execution + // is already complete (DoneCh is closed), this has no effect. Otherwise, + // execution is cancelled as quickly as possible. DoneCh chan struct{} CancelCh chan struct{} // Dependency information. Any changes to any of these fields requires // holding DepsLock. + // + // DepsCh is sent a single value that denotes whether the upstream deps + // were successful (no errors). Any value sent means that the upstream + // dependencies are complete. No other values will ever be sent again. + // + // DepsUpdateCh is closed when there is a new DepsCh set. DepsCh chan bool DepsUpdateCh chan struct{} DepsLock sync.Mutex // Below is not safe to read/write in parallel. This behavior is - // enforced by changes only happening in Update. + // enforced by changes only happening in Update. Nothing else should + // ever modify these. deps map[Vertex]chan struct{} depsCancelCh chan struct{} } @@ -74,7 +105,7 @@ var errWalkUpstream = errors.New("upstream dependency failed") // Wait will return as soon as all currently known vertices are complete. // If you plan on calling Update with more vertices in the future, you // should not call Wait until after this is done. -func (w *walker) Wait() error { +func (w *Walker) Wait() error { // Wait for completion w.wait.Wait() @@ -94,11 +125,17 @@ func (w *walker) Wait() error { return result } -// Update updates the currently executing walk with the given vertices -// and edges. It does not block until completion. +// Update updates the currently executing walk with the given graph. +// This will perform a diff of the vertices and edges and update the walker. +// Already completed vertices remain completed (including any errors during +// their execution). // -// Update can be called in parallel to Walk. -func (w *walker) Update(g *Graph) { +// This returns immediately once the walker is updated; it does not wait +// for completion of the walk. +// +// Multiple Updates can be called in parallel. Update can be called at any +// time during a walk. +func (w *Walker) Update(g *Graph) { v, e := g.vertices, g.edges // Grab the change lock so no more updates happen but also so that @@ -277,7 +314,7 @@ func (w *walker) Update(g *Graph) { // edgeParts returns the waiter and the dependency, in that order. // The waiter is waiting on the dependency. -func (w *walker) edgeParts(e Edge) (Vertex, Vertex) { +func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) { if w.Reverse { return e.Source(), e.Target() } @@ -287,7 +324,7 @@ func (w *walker) edgeParts(e Edge) (Vertex, Vertex) { // walkVertex walks a single vertex, waiting for any dependencies before // executing the callback. -func (w *walker) walkVertex(v Vertex, info *walkerVertex) { +func (w *Walker) walkVertex(v Vertex, info *walkerVertex) { // When we're done executing, lower the waitgroup count defer w.wait.Done() @@ -297,6 +334,7 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { // Wait for our dependencies. We create a [closed] deps channel so // that we can immediately fall through to load our actual DepsCh. var depsSuccess bool + var depsUpdateCh chan struct{} depsCh := make(chan bool, 1) depsCh <- true close(depsCh) @@ -310,7 +348,7 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { // Deps complete! Mark as nil to trigger completion handling. depsCh = nil - case <-info.DepsUpdateCh: + case <-depsUpdateCh: // New deps, reloop } @@ -323,6 +361,9 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { depsCh = info.DepsCh info.DepsCh = nil } + if info.DepsUpdateCh != nil { + depsUpdateCh = info.DepsUpdateCh + } info.DepsLock.Unlock() // If we still have no deps channel set, then we're done! @@ -362,7 +403,7 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { } } -func (w *walker) waitDeps( +func (w *Walker) waitDeps( v Vertex, deps map[Vertex]<-chan struct{}, doneCh chan<- bool, diff --git a/dag/walk_test.go b/dag/walk_test.go index 8f99d9046..04e6ebe3e 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -17,7 +17,7 @@ func TestWalker_basic(t *testing.T) { // Run it a bunch of times since it is timing dependent for i := 0; i < 50; i++ { var order []interface{} - w := &walker{Callback: walkCbRecord(&order)} + w := &Walker{Callback: walkCbRecord(&order)} w.Update(&g) // Wait @@ -56,7 +56,7 @@ func TestWalker_error(t *testing.T) { return recordF(v) } - w := &walker{Callback: cb} + w := &Walker{Callback: cb} w.Update(&g) // Wait @@ -80,7 +80,7 @@ func TestWalker_newVertex(t *testing.T) { g.Connect(BasicEdge(1, 2)) var order []interface{} - w := &walker{Callback: walkCbRecord(&order)} + w := &Walker{Callback: walkCbRecord(&order)} w.Update(&g) // Wait a bit @@ -120,7 +120,7 @@ func TestWalker_removeVertex(t *testing.T) { recordF := walkCbRecord(&order) // Build a callback that delays until we close a channel - var w *walker + var w *Walker cb := func(v Vertex) error { if v == 1 { g.Remove(2) @@ -131,7 +131,7 @@ func TestWalker_removeVertex(t *testing.T) { } // Add the initial vertices - w = &walker{Callback: cb} + w = &Walker{Callback: cb} w.Update(&g) // Wait @@ -160,7 +160,7 @@ func TestWalker_newEdge(t *testing.T) { recordF := walkCbRecord(&order) // Build a callback that delays until we close a channel - var w *walker + var w *Walker cb := func(v Vertex) error { if v == 1 { g.Add(3) @@ -172,7 +172,7 @@ func TestWalker_newEdge(t *testing.T) { } // Add the initial vertices - w = &walker{Callback: cb} + w = &Walker{Callback: cb} w.Update(&g) // Wait @@ -209,7 +209,7 @@ func TestWalker_removeEdge(t *testing.T) { // forcing 2 before 3 via the callback (and not the graph). If // 2 cannot execute before 3 (edge removal is non-functional), then // this test will timeout. - var w *walker + var w *Walker gateCh := make(chan struct{}) cb := func(v Vertex) error { if v == 1 { @@ -233,7 +233,7 @@ func TestWalker_removeEdge(t *testing.T) { } // Add the initial vertices - w = &walker{Callback: cb} + w = &Walker{Callback: cb} w.Update(&g) // Wait From 636648880971c1ed48da6518c87486d995f445d7 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 3 Feb 2017 11:26:58 +0100 Subject: [PATCH 09/11] dag: Update can be called with a nil graph --- dag/walk.go | 5 ++++- dag/walk_test.go | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/dag/walk.go b/dag/walk.go index 826b7816e..72dad61de 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -136,7 +136,10 @@ func (w *Walker) Wait() error { // Multiple Updates can be called in parallel. Update can be called at any // time during a walk. func (w *Walker) Update(g *Graph) { - v, e := g.vertices, g.edges + var v, e *Set + if g != nil { + v, e = g.vertices, g.edges + } // Grab the change lock so no more updates happen but also so that // no new vertices are executed during this time since we may be diff --git a/dag/walk_test.go b/dag/walk_test.go index 04e6ebe3e..5843ca64b 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -33,6 +33,26 @@ func TestWalker_basic(t *testing.T) { } } +func TestWalker_updateNilGraph(t *testing.T) { + var g Graph + g.Add(1) + g.Add(2) + g.Connect(BasicEdge(1, 2)) + + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var order []interface{} + w := &Walker{Callback: walkCbRecord(&order)} + w.Update(&g) + w.Update(nil) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + } +} + func TestWalker_error(t *testing.T) { var g Graph g.Add(1) From 07ce5a76240f4282730b2cd5fba94e4a621c66cf Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 3 Feb 2017 11:28:14 +0100 Subject: [PATCH 10/11] dag: cleanup some code that had no effect anymore --- dag/walk.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dag/walk.go b/dag/walk.go index 72dad61de..47965764c 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -173,13 +173,9 @@ func (w *Walker) Update(g *Graph) { info := &walkerVertex{ DoneCh: make(chan struct{}), CancelCh: make(chan struct{}), - DepsCh: make(chan bool, 1), deps: make(map[Vertex]chan struct{}), } - // Pass dependencies immediately assuming we have no edges - info.DepsCh <- true - // Add it to the map and kick off the walk w.vertexMap[v] = info } From 6702a2207490b4ed0218d00ed729968209cc8ad3 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 3 Feb 2017 11:48:09 +0100 Subject: [PATCH 11/11] dag: require acyclic graph --- dag/dag.go | 2 +- dag/walk.go | 2 +- dag/walk_test.go | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dag/dag.go b/dag/dag.go index 0026fd9fd..f8776bc51 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -167,7 +167,7 @@ func (g *AcyclicGraph) Walk(cb WalkFunc) error { defer g.debug.BeginOperation(typeWalk, "").End("") w := &Walker{Callback: cb, Reverse: true} - w.Update(&g.Graph) + w.Update(g) return w.Wait() } diff --git a/dag/walk.go b/dag/walk.go index 47965764c..a74f1142a 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -135,7 +135,7 @@ func (w *Walker) Wait() error { // // Multiple Updates can be called in parallel. Update can be called at any // time during a walk. -func (w *Walker) Update(g *Graph) { +func (w *Walker) Update(g *AcyclicGraph) { var v, e *Set if g != nil { v, e = g.vertices, g.edges diff --git a/dag/walk_test.go b/dag/walk_test.go index 5843ca64b..628ae6f64 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -9,7 +9,7 @@ import ( ) func TestWalker_basic(t *testing.T) { - var g Graph + var g AcyclicGraph g.Add(1) g.Add(2) g.Connect(BasicEdge(1, 2)) @@ -34,7 +34,7 @@ func TestWalker_basic(t *testing.T) { } func TestWalker_updateNilGraph(t *testing.T) { - var g Graph + var g AcyclicGraph g.Add(1) g.Add(2) g.Connect(BasicEdge(1, 2)) @@ -54,7 +54,7 @@ func TestWalker_updateNilGraph(t *testing.T) { } func TestWalker_error(t *testing.T) { - var g Graph + var g AcyclicGraph g.Add(1) g.Add(2) g.Add(3) @@ -94,7 +94,7 @@ func TestWalker_error(t *testing.T) { func TestWalker_newVertex(t *testing.T) { // Run it a bunch of times since it is timing dependent for i := 0; i < 50; i++ { - var g Graph + var g AcyclicGraph g.Add(1) g.Add(2) g.Connect(BasicEdge(1, 2)) @@ -130,7 +130,7 @@ func TestWalker_newVertex(t *testing.T) { func TestWalker_removeVertex(t *testing.T) { // Run it a bunch of times since it is timing dependent for i := 0; i < 50; i++ { - var g Graph + var g AcyclicGraph g.Add(1) g.Add(2) g.Connect(BasicEdge(1, 2)) @@ -170,7 +170,7 @@ func TestWalker_removeVertex(t *testing.T) { func TestWalker_newEdge(t *testing.T) { // Run it a bunch of times since it is timing dependent for i := 0; i < 50; i++ { - var g Graph + var g AcyclicGraph g.Add(1) g.Add(2) g.Connect(BasicEdge(1, 2)) @@ -211,7 +211,7 @@ func TestWalker_newEdge(t *testing.T) { func TestWalker_removeEdge(t *testing.T) { // Run it a bunch of times since it is timing dependent for i := 0; i < 50; i++ { - var g Graph + var g AcyclicGraph g.Add(1) g.Add(2) g.Add(3)