dag: tests for adding edges/vertices during walk-time

This commit is contained in:
Mitchell Hashimoto 2017-02-02 12:09:26 -08:00
parent cbc71d9508
commit 5d49e7e6b6
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
2 changed files with 177 additions and 26 deletions

View File

@ -13,8 +13,8 @@ import (
type walker struct { type walker struct {
Callback WalkFunc Callback WalkFunc
vertices *Set vertices Set
edges *Set edges Set
vertexMap map[Vertex]*walkerVertex vertexMap map[Vertex]*walkerVertex
wait sync.WaitGroup wait sync.WaitGroup
@ -25,13 +25,18 @@ type walker struct {
} }
type walkerVertex struct { type walkerVertex struct {
sync.Mutex // These should only be set once on initialization and never written again
DoneCh chan struct{} DoneCh chan struct{}
CancelCh chan struct{} CancelCh chan struct{}
DepsCh chan struct{}
DepsUpdateCh chan chan struct{}
// Dependency information. Any changes to any of these fields requires
// holding DepsLock.
DepsCh chan struct{}
DepsUpdateCh chan struct{}
DepsLock sync.Mutex
// Below is not safe to read/write in parallel. This behavior is
// enforced by changes only happening in Update.
deps map[Vertex]chan struct{} deps map[Vertex]chan struct{}
depsCancelCh chan struct{} depsCancelCh chan struct{}
} }
@ -74,8 +79,8 @@ func (w *walker) Update(v, e *Set) {
} }
// Calculate all our sets // Calculate all our sets
newEdges := e.Difference(w.edges) newEdges := e.Difference(&w.edges)
newVerts := v.Difference(w.vertices) newVerts := v.Difference(&w.vertices)
oldVerts := w.vertices.Difference(v) oldVerts := w.vertices.Difference(v)
// Add the new vertices // Add the new vertices
@ -85,12 +90,15 @@ func (w *walker) Update(v, e *Set) {
// Add to the waitgroup so our walk is not done until everything finishes // Add to the waitgroup so our walk is not done until everything finishes
w.wait.Add(1) w.wait.Add(1)
// Add to our own set so we know about it already
log.Printf("[DEBUG] dag/walk: added new vertex: %q", VertexName(v))
w.vertices.Add(raw)
// Initialize the vertex info // Initialize the vertex info
info := &walkerVertex{ info := &walkerVertex{
DoneCh: make(chan struct{}), DoneCh: make(chan struct{}),
CancelCh: make(chan struct{}), CancelCh: make(chan struct{}),
DepsCh: make(chan struct{}), DepsCh: make(chan struct{}),
DepsUpdateCh: make(chan chan struct{}, 5),
deps: make(map[Vertex]chan struct{}), deps: make(map[Vertex]chan struct{}),
} }
@ -118,6 +126,9 @@ func (w *walker) Update(v, e *Set) {
// Delete it out of the map // Delete it out of the map
delete(w.vertexMap, v) delete(w.vertexMap, v)
log.Printf("[DEBUG] dag/walk: removed vertex: %q", VertexName(v))
w.vertices.Delete(raw)
} }
// Add the new edges // Add the new edges
@ -152,6 +163,8 @@ func (w *walker) Update(v, e *Set) {
changedDeps.Add(waiter) changedDeps.Add(waiter)
} }
// For each vertex with changed dependencies, we need to kick off
// a new waiter and notify the vertex of the changes.
for _, raw := range changedDeps.List() { for _, raw := range changedDeps.List() {
v := raw.(Vertex) v := raw.(Vertex)
info, ok := w.vertexMap[v] info, ok := w.vertexMap[v]
@ -165,7 +178,6 @@ func (w *walker) Update(v, e *Set) {
// Create the channel we close for cancellation // Create the channel we close for cancellation
cancelCh := make(chan struct{}) cancelCh := make(chan struct{})
info.depsCancelCh = cancelCh
// Build a new deps copy // Build a new deps copy
deps := make(map[Vertex]<-chan struct{}) deps := make(map[Vertex]<-chan struct{})
@ -174,13 +186,26 @@ func (w *walker) Update(v, e *Set) {
} }
// Update the update channel // Update the update channel
info.DepsUpdateCh <- doneCh info.DepsLock.Lock()
if info.DepsUpdateCh != nil {
close(info.DepsUpdateCh)
}
info.DepsCh = doneCh
info.DepsUpdateCh = make(chan struct{})
info.DepsLock.Unlock()
// Cancel the older waiter
if info.depsCancelCh != nil {
close(info.depsCancelCh)
}
info.depsCancelCh = cancelCh
// Start the waiter // Start the waiter
go w.waitDeps(v, deps, doneCh, cancelCh) go w.waitDeps(v, deps, doneCh, cancelCh)
} }
// Kickstart all the vertices // Start all the new vertices. We do this at the end so that all
// the edge waiters and changes are setup above.
for _, raw := range newVerts.List() { for _, raw := range newVerts.List() {
v := raw.(Vertex) v := raw.(Vertex)
go w.walkVertex(v, w.vertexMap[v]) go w.walkVertex(v, w.vertexMap[v])
@ -208,24 +233,26 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) {
// Deps complete! // Deps complete!
depsCh = nil depsCh = nil
case depsCh = <-info.DepsUpdateCh: case <-info.DepsUpdateCh:
// New deps, reloop // New deps, reloop
} }
if depsCh == nil { // Check if we have updated dependencies
// One final check if we have an update info.DepsLock.Lock()
select { if info.DepsCh != nil {
case depsCh = <-info.DepsUpdateCh: depsCh = info.DepsCh
default: info.DepsCh = nil
} }
info.DepsLock.Unlock()
// If we still have no deps channel set, then we're done!
if depsCh == nil { if depsCh == nil {
break break
} }
} }
}
// Call our callback // Call our callback
log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v))
if err := w.Callback(v); err != nil { if err := w.Callback(v); err != nil {
w.errLock.Lock() w.errLock.Lock()
defer w.errLock.Unlock() defer w.errLock.Unlock()

View File

@ -4,6 +4,7 @@ import (
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"time"
) )
func TestWalker_basic(t *testing.T) { func TestWalker_basic(t *testing.T) {
@ -31,6 +32,129 @@ func TestWalker_basic(t *testing.T) {
} }
} }
func TestWalker_newVertex(t *testing.T) {
// Run it a bunch of times since it is timing dependent
for i := 0; i < 50; i++ {
var g Graph
g.Add(1)
g.Add(2)
g.Connect(BasicEdge(1, 2))
var order []interface{}
w := &walker{Callback: walkCbRecord(&order)}
w.Update(g.vertices, g.edges)
// Wait a bit
time.Sleep(10 * time.Millisecond)
// Update the graph
g.Add(3)
w.Update(g.vertices, g.edges)
// Update the graph again but with the same vertex
g.Add(3)
w.Update(g.vertices, g.edges)
// Wait
if err := w.Wait(); err != nil {
t.Fatalf("err: %s", err)
}
// Check
expected := []interface{}{1, 2, 3}
if !reflect.DeepEqual(order, expected) {
t.Fatalf("bad: %#v", order)
}
}
}
func TestWalker_removeVertex(t *testing.T) {
// Run it a bunch of times since it is timing dependent
for i := 0; i < 50; i++ {
var g Graph
g.Add(1)
g.Add(2)
g.Connect(BasicEdge(1, 2))
// Record function
var order []interface{}
recordF := walkCbRecord(&order)
// Build a callback that delays until we close a channel
gateCh := make(chan struct{})
cb := func(v Vertex) error {
if v == 1 {
<-gateCh
}
return recordF(v)
}
// Add the initial vertices
w := &walker{Callback: cb}
w.Update(g.vertices, g.edges)
// Remove a vertex
g.Remove(2)
w.Update(g.vertices, g.edges)
// Open gate
close(gateCh)
// Wait
if err := w.Wait(); err != nil {
t.Fatalf("err: %s", err)
}
// Check
expected := []interface{}{1}
if !reflect.DeepEqual(order, expected) {
t.Fatalf("bad: %#v", order)
}
}
}
func TestWalker_newEdge(t *testing.T) {
// Run it a bunch of times since it is timing dependent
for i := 0; i < 50; i++ {
var g Graph
g.Add(1)
g.Add(2)
g.Connect(BasicEdge(1, 2))
// Record function
var order []interface{}
recordF := walkCbRecord(&order)
// Build a callback that delays until we close a channel
var w *walker
cb := func(v Vertex) error {
if v == 1 {
g.Add(3)
g.Connect(BasicEdge(3, 2))
w.Update(g.vertices, g.edges)
}
return recordF(v)
}
// Add the initial vertices
w = &walker{Callback: cb}
w.Update(g.vertices, g.edges)
// Wait
if err := w.Wait(); err != nil {
t.Fatalf("err: %s", err)
}
// Check
expected := []interface{}{1, 3, 2}
if !reflect.DeepEqual(order, expected) {
t.Fatalf("bad: %#v", order)
}
}
}
// walkCbRecord is a test helper callback that just records the order called. // walkCbRecord is a test helper callback that just records the order called.
func walkCbRecord(order *[]interface{}) WalkFunc { func walkCbRecord(order *[]interface{}) WalkFunc {
var l sync.Mutex var l sync.Mutex