From 5d49e7e6b64b8c14357fa3304e60bdc029b61524 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 2 Feb 2017 12:09:26 -0800 Subject: [PATCH] dag: tests for adding edges/vertices during walk-time --- dag/walk.go | 79 ++++++++++++++++++++---------- dag/walk_test.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 26 deletions(-) diff --git a/dag/walk.go b/dag/walk.go index d219234f7..6339e1736 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -13,8 +13,8 @@ import ( type walker struct { Callback WalkFunc - vertices *Set - edges *Set + vertices Set + edges Set vertexMap map[Vertex]*walkerVertex wait sync.WaitGroup @@ -25,13 +25,18 @@ type walker struct { } type walkerVertex struct { - sync.Mutex + // These should only be set once on initialization and never written again + DoneCh chan struct{} + CancelCh chan struct{} - DoneCh chan struct{} - CancelCh chan struct{} + // Dependency information. Any changes to any of these fields requires + // holding DepsLock. DepsCh chan struct{} - DepsUpdateCh chan chan struct{} + DepsUpdateCh chan struct{} + DepsLock sync.Mutex + // Below is not safe to read/write in parallel. This behavior is + // enforced by changes only happening in Update. deps map[Vertex]chan struct{} depsCancelCh chan struct{} } @@ -74,8 +79,8 @@ func (w *walker) Update(v, e *Set) { } // Calculate all our sets - newEdges := e.Difference(w.edges) - newVerts := v.Difference(w.vertices) + newEdges := e.Difference(&w.edges) + newVerts := v.Difference(&w.vertices) oldVerts := w.vertices.Difference(v) // Add the new vertices @@ -85,13 +90,16 @@ func (w *walker) Update(v, e *Set) { // Add to the waitgroup so our walk is not done until everything finishes w.wait.Add(1) + // Add to our own set so we know about it already + log.Printf("[DEBUG] dag/walk: added new vertex: %q", VertexName(v)) + w.vertices.Add(raw) + // Initialize the vertex info info := &walkerVertex{ - DoneCh: make(chan struct{}), - CancelCh: make(chan struct{}), - DepsCh: make(chan struct{}), - DepsUpdateCh: make(chan chan struct{}, 5), - deps: make(map[Vertex]chan struct{}), + DoneCh: make(chan struct{}), + CancelCh: make(chan struct{}), + DepsCh: make(chan struct{}), + deps: make(map[Vertex]chan struct{}), } // Close the deps channel immediately so it passes @@ -118,6 +126,9 @@ func (w *walker) Update(v, e *Set) { // Delete it out of the map delete(w.vertexMap, v) + + log.Printf("[DEBUG] dag/walk: removed vertex: %q", VertexName(v)) + w.vertices.Delete(raw) } // Add the new edges @@ -152,6 +163,8 @@ func (w *walker) Update(v, e *Set) { changedDeps.Add(waiter) } + // For each vertex with changed dependencies, we need to kick off + // a new waiter and notify the vertex of the changes. for _, raw := range changedDeps.List() { v := raw.(Vertex) info, ok := w.vertexMap[v] @@ -165,7 +178,6 @@ func (w *walker) Update(v, e *Set) { // Create the channel we close for cancellation cancelCh := make(chan struct{}) - info.depsCancelCh = cancelCh // Build a new deps copy deps := make(map[Vertex]<-chan struct{}) @@ -174,13 +186,26 @@ func (w *walker) Update(v, e *Set) { } // Update the update channel - info.DepsUpdateCh <- doneCh + info.DepsLock.Lock() + if info.DepsUpdateCh != nil { + close(info.DepsUpdateCh) + } + info.DepsCh = doneCh + info.DepsUpdateCh = make(chan struct{}) + info.DepsLock.Unlock() + + // Cancel the older waiter + if info.depsCancelCh != nil { + close(info.depsCancelCh) + } + info.depsCancelCh = cancelCh // Start the waiter go w.waitDeps(v, deps, doneCh, cancelCh) } - // Kickstart all the vertices + // Start all the new vertices. We do this at the end so that all + // the edge waiters and changes are setup above. for _, raw := range newVerts.List() { v := raw.(Vertex) go w.walkVertex(v, w.vertexMap[v]) @@ -208,24 +233,26 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { // Deps complete! depsCh = nil - case depsCh = <-info.DepsUpdateCh: + case <-info.DepsUpdateCh: // New deps, reloop } - if depsCh == nil { - // One final check if we have an update - select { - case depsCh = <-info.DepsUpdateCh: - default: - } + // Check if we have updated dependencies + info.DepsLock.Lock() + if info.DepsCh != nil { + depsCh = info.DepsCh + info.DepsCh = nil + } + info.DepsLock.Unlock() - if depsCh == nil { - break - } + // If we still have no deps channel set, then we're done! + if depsCh == nil { + break } } // Call our callback + log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v)) if err := w.Callback(v); err != nil { w.errLock.Lock() defer w.errLock.Unlock() diff --git a/dag/walk_test.go b/dag/walk_test.go index d3bcb432a..5051b9a9b 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -4,6 +4,7 @@ import ( "reflect" "sync" "testing" + "time" ) func TestWalker_basic(t *testing.T) { @@ -31,6 +32,129 @@ func TestWalker_basic(t *testing.T) { } } +func TestWalker_newVertex(t *testing.T) { + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var g Graph + g.Add(1) + g.Add(2) + g.Connect(BasicEdge(1, 2)) + + var order []interface{} + w := &walker{Callback: walkCbRecord(&order)} + w.Update(g.vertices, g.edges) + + // Wait a bit + time.Sleep(10 * time.Millisecond) + + // Update the graph + g.Add(3) + w.Update(g.vertices, g.edges) + + // Update the graph again but with the same vertex + g.Add(3) + w.Update(g.vertices, g.edges) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + + // Check + expected := []interface{}{1, 2, 3} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } + } +} + +func TestWalker_removeVertex(t *testing.T) { + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var g Graph + g.Add(1) + g.Add(2) + g.Connect(BasicEdge(1, 2)) + + // Record function + var order []interface{} + recordF := walkCbRecord(&order) + + // Build a callback that delays until we close a channel + gateCh := make(chan struct{}) + cb := func(v Vertex) error { + if v == 1 { + <-gateCh + } + + return recordF(v) + } + + // Add the initial vertices + w := &walker{Callback: cb} + w.Update(g.vertices, g.edges) + + // Remove a vertex + g.Remove(2) + w.Update(g.vertices, g.edges) + + // Open gate + close(gateCh) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + + // Check + expected := []interface{}{1} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } + } +} + +func TestWalker_newEdge(t *testing.T) { + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var g Graph + g.Add(1) + g.Add(2) + g.Connect(BasicEdge(1, 2)) + + // Record function + var order []interface{} + recordF := walkCbRecord(&order) + + // Build a callback that delays until we close a channel + var w *walker + cb := func(v Vertex) error { + if v == 1 { + g.Add(3) + g.Connect(BasicEdge(3, 2)) + w.Update(g.vertices, g.edges) + } + + return recordF(v) + } + + // Add the initial vertices + w = &walker{Callback: cb} + w.Update(g.vertices, g.edges) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + + // Check + expected := []interface{}{1, 3, 2} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } + } +} + // walkCbRecord is a test helper callback that just records the order called. func walkCbRecord(order *[]interface{}) WalkFunc { var l sync.Mutex