Merge pull request #11657 from hashicorp/f-dynamic-walk
dag: new walker, supports walk-time updates
This commit is contained in:
commit
a612b43987
94
dag/dag.go
94
dag/dag.go
|
@ -2,11 +2,8 @@ package dag
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
)
|
)
|
||||||
|
@ -169,94 +166,9 @@ func (g *AcyclicGraph) Cycles() [][]Vertex {
|
||||||
func (g *AcyclicGraph) Walk(cb WalkFunc) error {
|
func (g *AcyclicGraph) Walk(cb WalkFunc) error {
|
||||||
defer g.debug.BeginOperation(typeWalk, "").End("")
|
defer g.debug.BeginOperation(typeWalk, "").End("")
|
||||||
|
|
||||||
// Cache the vertices since we use it multiple times
|
w := &Walker{Callback: cb, Reverse: true}
|
||||||
vertices := g.Vertices()
|
w.Update(g)
|
||||||
|
return w.Wait()
|
||||||
// Build the waitgroup that signals when we're done
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(len(vertices))
|
|
||||||
doneCh := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer close(doneCh)
|
|
||||||
wg.Wait()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// The map of channels to watch to wait for vertices to finish
|
|
||||||
vertMap := make(map[Vertex]chan struct{})
|
|
||||||
for _, v := range vertices {
|
|
||||||
vertMap[v] = make(chan struct{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// The map of whether a vertex errored or not during the walk
|
|
||||||
var errLock sync.Mutex
|
|
||||||
var errs error
|
|
||||||
errMap := make(map[Vertex]bool)
|
|
||||||
for _, v := range vertices {
|
|
||||||
// Build our list of dependencies and the list of channels to
|
|
||||||
// wait on until we start executing for this vertex.
|
|
||||||
deps := AsVertexList(g.DownEdges(v))
|
|
||||||
depChs := make([]<-chan struct{}, len(deps))
|
|
||||||
for i, dep := range deps {
|
|
||||||
depChs[i] = vertMap[dep]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get our channel so that we can close it when we're done
|
|
||||||
ourCh := vertMap[v]
|
|
||||||
|
|
||||||
// Start the goroutine to wait for our dependencies
|
|
||||||
readyCh := make(chan bool)
|
|
||||||
go func(v Vertex, deps []Vertex, chs []<-chan struct{}, readyCh chan<- bool) {
|
|
||||||
// First wait for all the dependencies
|
|
||||||
for i, ch := range chs {
|
|
||||||
DepSatisfied:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
break DepSatisfied
|
|
||||||
case <-time.After(time.Second * 5):
|
|
||||||
log.Printf("[DEBUG] vertex %q, waiting for: %q",
|
|
||||||
VertexName(v), VertexName(deps[i]))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.Printf("[DEBUG] vertex %q, got dep: %q",
|
|
||||||
VertexName(v), VertexName(deps[i]))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then, check the map to see if any of our dependencies failed
|
|
||||||
errLock.Lock()
|
|
||||||
defer errLock.Unlock()
|
|
||||||
for _, dep := range deps {
|
|
||||||
if errMap[dep] {
|
|
||||||
errMap[v] = true
|
|
||||||
readyCh <- false
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
readyCh <- true
|
|
||||||
}(v, deps, depChs, readyCh)
|
|
||||||
|
|
||||||
// Start the goroutine that executes
|
|
||||||
go func(v Vertex, doneCh chan<- struct{}, readyCh <-chan bool) {
|
|
||||||
defer close(doneCh)
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
var err error
|
|
||||||
if ready := <-readyCh; ready {
|
|
||||||
err = cb(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
errLock.Lock()
|
|
||||||
defer errLock.Unlock()
|
|
||||||
if err != nil {
|
|
||||||
errMap[v] = true
|
|
||||||
errs = multierror.Append(errs, err)
|
|
||||||
}
|
|
||||||
}(v, ourCh, readyCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
<-doneCh
|
|
||||||
return errs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// simple convenience helper for converting a dag.Set to a []Vertex
|
// simple convenience helper for converting a dag.Set to a []Vertex
|
||||||
|
|
22
dag/set.go
22
dag/set.go
|
@ -48,6 +48,9 @@ func (s *Set) Include(v interface{}) bool {
|
||||||
// Intersection computes the set intersection with other.
|
// Intersection computes the set intersection with other.
|
||||||
func (s *Set) Intersection(other *Set) *Set {
|
func (s *Set) Intersection(other *Set) *Set {
|
||||||
result := new(Set)
|
result := new(Set)
|
||||||
|
if s == nil {
|
||||||
|
return result
|
||||||
|
}
|
||||||
if other != nil {
|
if other != nil {
|
||||||
for _, v := range s.m {
|
for _, v := range s.m {
|
||||||
if other.Include(v) {
|
if other.Include(v) {
|
||||||
|
@ -59,6 +62,25 @@ func (s *Set) Intersection(other *Set) *Set {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Difference returns a set with the elements that s has but
|
||||||
|
// other doesn't.
|
||||||
|
func (s *Set) Difference(other *Set) *Set {
|
||||||
|
result := new(Set)
|
||||||
|
if s != nil {
|
||||||
|
for k, v := range s.m {
|
||||||
|
var ok bool
|
||||||
|
if other != nil {
|
||||||
|
_, ok = other.m[k]
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
result.Add(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
// Len is the number of items in the set.
|
// Len is the number of items in the set.
|
||||||
func (s *Set) Len() int {
|
func (s *Set) Len() int {
|
||||||
if s == nil {
|
if s == nil {
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
package dag
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSetDifference(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
Name string
|
||||||
|
A, B []interface{}
|
||||||
|
Expected []interface{}
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"same",
|
||||||
|
[]interface{}{1, 2, 3},
|
||||||
|
[]interface{}{3, 1, 2},
|
||||||
|
[]interface{}{},
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
"A has extra elements",
|
||||||
|
[]interface{}{1, 2, 3},
|
||||||
|
[]interface{}{3, 2},
|
||||||
|
[]interface{}{1},
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
"B has extra elements",
|
||||||
|
[]interface{}{1, 2, 3},
|
||||||
|
[]interface{}{3, 2, 1, 4},
|
||||||
|
[]interface{}{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, tc := range cases {
|
||||||
|
t.Run(fmt.Sprintf("%d-%s", i, tc.Name), func(t *testing.T) {
|
||||||
|
var one, two, expected Set
|
||||||
|
for _, v := range tc.A {
|
||||||
|
one.Add(v)
|
||||||
|
}
|
||||||
|
for _, v := range tc.B {
|
||||||
|
two.Add(v)
|
||||||
|
}
|
||||||
|
for _, v := range tc.Expected {
|
||||||
|
expected.Add(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
actual := one.Difference(&two)
|
||||||
|
match := actual.Intersection(&expected)
|
||||||
|
if match.Len() != expected.Len() {
|
||||||
|
t.Fatalf("bad: %#v", actual.List())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,445 @@
|
||||||
|
package dag
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Walker is used to walk every vertex of a graph in parallel.
|
||||||
|
//
|
||||||
|
// A vertex will only be walked when the dependencies of that vertex have
|
||||||
|
// been walked. If two vertices can be walked at the same time, they will be.
|
||||||
|
//
|
||||||
|
// Update can be called to update the graph. This can be called even during
|
||||||
|
// a walk, cahnging vertices/edges mid-walk. This should be done carefully.
|
||||||
|
// If a vertex is removed but has already been executed, the result of that
|
||||||
|
// execution (any error) is still returned by Wait. Changing or re-adding
|
||||||
|
// a vertex that has already executed has no effect. Changing edges of
|
||||||
|
// a vertex that has already executed has no effect.
|
||||||
|
//
|
||||||
|
// Non-parallelism can be enforced by introducing a lock in your callback
|
||||||
|
// function. However, the goroutine overhead of a walk will remain.
|
||||||
|
// Walker will create V*2 goroutines (one for each vertex, and dependency
|
||||||
|
// waiter for each vertex). In general this should be of no concern unless
|
||||||
|
// there are a huge number of vertices.
|
||||||
|
//
|
||||||
|
// The walk is depth first by default. This can be changed with the Reverse
|
||||||
|
// option.
|
||||||
|
//
|
||||||
|
// A single walker is only valid for one graph walk. After the walk is complete
|
||||||
|
// you must construct a new walker to walk again. State for the walk is never
|
||||||
|
// deleted in case vertices or edges are changed.
|
||||||
|
type Walker struct {
|
||||||
|
// Callback is what is called for each vertex
|
||||||
|
Callback WalkFunc
|
||||||
|
|
||||||
|
// Reverse, if true, causes the source of an edge to depend on a target.
|
||||||
|
// When false (default), the target depends on the source.
|
||||||
|
Reverse bool
|
||||||
|
|
||||||
|
// changeLock must be held to modify any of the fields below. Only Update
|
||||||
|
// should modify these fields. Modifying them outside of Update can cause
|
||||||
|
// serious problems.
|
||||||
|
changeLock sync.Mutex
|
||||||
|
vertices Set
|
||||||
|
edges Set
|
||||||
|
vertexMap map[Vertex]*walkerVertex
|
||||||
|
|
||||||
|
// wait is done when all vertices have executed. It may become "undone"
|
||||||
|
// if new vertices are added.
|
||||||
|
wait sync.WaitGroup
|
||||||
|
|
||||||
|
// errMap contains the errors recorded so far for execution. Reading
|
||||||
|
// and writing should hold errLock.
|
||||||
|
errMap map[Vertex]error
|
||||||
|
errLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type walkerVertex struct {
|
||||||
|
// These should only be set once on initialization and never written again.
|
||||||
|
// They are not protected by a lock since they don't need to be since
|
||||||
|
// they are write-once.
|
||||||
|
|
||||||
|
// DoneCh is closed when this vertex has completed execution, regardless
|
||||||
|
// of success.
|
||||||
|
//
|
||||||
|
// CancelCh is closed when the vertex should cancel execution. If execution
|
||||||
|
// is already complete (DoneCh is closed), this has no effect. Otherwise,
|
||||||
|
// execution is cancelled as quickly as possible.
|
||||||
|
DoneCh chan struct{}
|
||||||
|
CancelCh chan struct{}
|
||||||
|
|
||||||
|
// Dependency information. Any changes to any of these fields requires
|
||||||
|
// holding DepsLock.
|
||||||
|
//
|
||||||
|
// DepsCh is sent a single value that denotes whether the upstream deps
|
||||||
|
// were successful (no errors). Any value sent means that the upstream
|
||||||
|
// dependencies are complete. No other values will ever be sent again.
|
||||||
|
//
|
||||||
|
// DepsUpdateCh is closed when there is a new DepsCh set.
|
||||||
|
DepsCh chan bool
|
||||||
|
DepsUpdateCh chan struct{}
|
||||||
|
DepsLock sync.Mutex
|
||||||
|
|
||||||
|
// Below is not safe to read/write in parallel. This behavior is
|
||||||
|
// enforced by changes only happening in Update. Nothing else should
|
||||||
|
// ever modify these.
|
||||||
|
deps map[Vertex]chan struct{}
|
||||||
|
depsCancelCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// errWalkUpstream is used in the errMap of a walk to note that an upstream
|
||||||
|
// dependency failed so this vertex wasn't run. This is not shown in the final
|
||||||
|
// user-returned error.
|
||||||
|
var errWalkUpstream = errors.New("upstream dependency failed")
|
||||||
|
|
||||||
|
// Wait waits for the completion of the walk and returns any errors (
|
||||||
|
// in the form of a multierror) that occurred. Update should be called
|
||||||
|
// to populate the walk with vertices and edges prior to calling this.
|
||||||
|
//
|
||||||
|
// Wait will return as soon as all currently known vertices are complete.
|
||||||
|
// If you plan on calling Update with more vertices in the future, you
|
||||||
|
// should not call Wait until after this is done.
|
||||||
|
func (w *Walker) Wait() error {
|
||||||
|
// Wait for completion
|
||||||
|
w.wait.Wait()
|
||||||
|
|
||||||
|
// Grab the error lock
|
||||||
|
w.errLock.Lock()
|
||||||
|
defer w.errLock.Unlock()
|
||||||
|
|
||||||
|
// Build the error
|
||||||
|
var result error
|
||||||
|
for v, err := range w.errMap {
|
||||||
|
if err != nil && err != errWalkUpstream {
|
||||||
|
result = multierror.Append(result, fmt.Errorf(
|
||||||
|
"%s: %s", VertexName(v), err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update updates the currently executing walk with the given graph.
|
||||||
|
// This will perform a diff of the vertices and edges and update the walker.
|
||||||
|
// Already completed vertices remain completed (including any errors during
|
||||||
|
// their execution).
|
||||||
|
//
|
||||||
|
// This returns immediately once the walker is updated; it does not wait
|
||||||
|
// for completion of the walk.
|
||||||
|
//
|
||||||
|
// Multiple Updates can be called in parallel. Update can be called at any
|
||||||
|
// time during a walk.
|
||||||
|
func (w *Walker) Update(g *AcyclicGraph) {
|
||||||
|
var v, e *Set
|
||||||
|
if g != nil {
|
||||||
|
v, e = g.vertices, g.edges
|
||||||
|
}
|
||||||
|
|
||||||
|
// Grab the change lock so no more updates happen but also so that
|
||||||
|
// no new vertices are executed during this time since we may be
|
||||||
|
// removing them.
|
||||||
|
w.changeLock.Lock()
|
||||||
|
defer w.changeLock.Unlock()
|
||||||
|
|
||||||
|
// Initialize fields
|
||||||
|
if w.vertexMap == nil {
|
||||||
|
w.vertexMap = make(map[Vertex]*walkerVertex)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate all our sets
|
||||||
|
newEdges := e.Difference(&w.edges)
|
||||||
|
oldEdges := w.edges.Difference(e)
|
||||||
|
newVerts := v.Difference(&w.vertices)
|
||||||
|
oldVerts := w.vertices.Difference(v)
|
||||||
|
|
||||||
|
// Add the new vertices
|
||||||
|
for _, raw := range newVerts.List() {
|
||||||
|
v := raw.(Vertex)
|
||||||
|
|
||||||
|
// Add to the waitgroup so our walk is not done until everything finishes
|
||||||
|
w.wait.Add(1)
|
||||||
|
|
||||||
|
// Add to our own set so we know about it already
|
||||||
|
log.Printf("[DEBUG] dag/walk: added new vertex: %q", VertexName(v))
|
||||||
|
w.vertices.Add(raw)
|
||||||
|
|
||||||
|
// Initialize the vertex info
|
||||||
|
info := &walkerVertex{
|
||||||
|
DoneCh: make(chan struct{}),
|
||||||
|
CancelCh: make(chan struct{}),
|
||||||
|
deps: make(map[Vertex]chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add it to the map and kick off the walk
|
||||||
|
w.vertexMap[v] = info
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the old vertices
|
||||||
|
for _, raw := range oldVerts.List() {
|
||||||
|
v := raw.(Vertex)
|
||||||
|
|
||||||
|
// Get the vertex info so we can cancel it
|
||||||
|
info, ok := w.vertexMap[v]
|
||||||
|
if !ok {
|
||||||
|
// This vertex for some reason was never in our map. This
|
||||||
|
// shouldn't be possible.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel the vertex
|
||||||
|
close(info.CancelCh)
|
||||||
|
|
||||||
|
// Delete it out of the map
|
||||||
|
delete(w.vertexMap, v)
|
||||||
|
|
||||||
|
log.Printf("[DEBUG] dag/walk: removed vertex: %q", VertexName(v))
|
||||||
|
w.vertices.Delete(raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the new edges
|
||||||
|
var changedDeps Set
|
||||||
|
for _, raw := range newEdges.List() {
|
||||||
|
edge := raw.(Edge)
|
||||||
|
waiter, dep := w.edgeParts(edge)
|
||||||
|
|
||||||
|
// Get the info for the waiter
|
||||||
|
waiterInfo, ok := w.vertexMap[waiter]
|
||||||
|
if !ok {
|
||||||
|
// Vertex doesn't exist... shouldn't be possible but ignore.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the info for the dep
|
||||||
|
depInfo, ok := w.vertexMap[dep]
|
||||||
|
if !ok {
|
||||||
|
// Vertex doesn't exist... shouldn't be possible but ignore.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the dependency to our waiter
|
||||||
|
waiterInfo.deps[dep] = depInfo.DoneCh
|
||||||
|
|
||||||
|
// Record that the deps changed for this waiter
|
||||||
|
changedDeps.Add(waiter)
|
||||||
|
|
||||||
|
log.Printf(
|
||||||
|
"[DEBUG] dag/walk: added edge: %q waiting on %q",
|
||||||
|
VertexName(waiter), VertexName(dep))
|
||||||
|
w.edges.Add(raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process reoved edges
|
||||||
|
for _, raw := range oldEdges.List() {
|
||||||
|
edge := raw.(Edge)
|
||||||
|
waiter, dep := w.edgeParts(edge)
|
||||||
|
|
||||||
|
// Get the info for the waiter
|
||||||
|
waiterInfo, ok := w.vertexMap[waiter]
|
||||||
|
if !ok {
|
||||||
|
// Vertex doesn't exist... shouldn't be possible but ignore.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the dependency from the waiter
|
||||||
|
delete(waiterInfo.deps, dep)
|
||||||
|
|
||||||
|
// Record that the deps changed for this waiter
|
||||||
|
changedDeps.Add(waiter)
|
||||||
|
|
||||||
|
log.Printf(
|
||||||
|
"[DEBUG] dag/walk: removed edge: %q waiting on %q",
|
||||||
|
VertexName(waiter), VertexName(dep))
|
||||||
|
w.edges.Delete(raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
// For each vertex with changed dependencies, we need to kick off
|
||||||
|
// a new waiter and notify the vertex of the changes.
|
||||||
|
for _, raw := range changedDeps.List() {
|
||||||
|
v := raw.(Vertex)
|
||||||
|
info, ok := w.vertexMap[v]
|
||||||
|
if !ok {
|
||||||
|
// Vertex doesn't exist... shouldn't be possible but ignore.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new done channel
|
||||||
|
doneCh := make(chan bool, 1)
|
||||||
|
|
||||||
|
// Create the channel we close for cancellation
|
||||||
|
cancelCh := make(chan struct{})
|
||||||
|
|
||||||
|
// Build a new deps copy
|
||||||
|
deps := make(map[Vertex]<-chan struct{})
|
||||||
|
for k, v := range info.deps {
|
||||||
|
deps[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the update channel
|
||||||
|
info.DepsLock.Lock()
|
||||||
|
if info.DepsUpdateCh != nil {
|
||||||
|
close(info.DepsUpdateCh)
|
||||||
|
}
|
||||||
|
info.DepsCh = doneCh
|
||||||
|
info.DepsUpdateCh = make(chan struct{})
|
||||||
|
info.DepsLock.Unlock()
|
||||||
|
|
||||||
|
// Cancel the older waiter
|
||||||
|
if info.depsCancelCh != nil {
|
||||||
|
close(info.depsCancelCh)
|
||||||
|
}
|
||||||
|
info.depsCancelCh = cancelCh
|
||||||
|
|
||||||
|
log.Printf(
|
||||||
|
"[DEBUG] dag/walk: dependencies changed for %q, sending new deps",
|
||||||
|
VertexName(v))
|
||||||
|
|
||||||
|
// Start the waiter
|
||||||
|
go w.waitDeps(v, deps, doneCh, cancelCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start all the new vertices. We do this at the end so that all
|
||||||
|
// the edge waiters and changes are setup above.
|
||||||
|
for _, raw := range newVerts.List() {
|
||||||
|
v := raw.(Vertex)
|
||||||
|
go w.walkVertex(v, w.vertexMap[v])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// edgeParts returns the waiter and the dependency, in that order.
|
||||||
|
// The waiter is waiting on the dependency.
|
||||||
|
func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) {
|
||||||
|
if w.Reverse {
|
||||||
|
return e.Source(), e.Target()
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.Target(), e.Source()
|
||||||
|
}
|
||||||
|
|
||||||
|
// walkVertex walks a single vertex, waiting for any dependencies before
|
||||||
|
// executing the callback.
|
||||||
|
func (w *Walker) walkVertex(v Vertex, info *walkerVertex) {
|
||||||
|
// When we're done executing, lower the waitgroup count
|
||||||
|
defer w.wait.Done()
|
||||||
|
|
||||||
|
// When we're done, always close our done channel
|
||||||
|
defer close(info.DoneCh)
|
||||||
|
|
||||||
|
// Wait for our dependencies. We create a [closed] deps channel so
|
||||||
|
// that we can immediately fall through to load our actual DepsCh.
|
||||||
|
var depsSuccess bool
|
||||||
|
var depsUpdateCh chan struct{}
|
||||||
|
depsCh := make(chan bool, 1)
|
||||||
|
depsCh <- true
|
||||||
|
close(depsCh)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-info.CancelCh:
|
||||||
|
// Cancel
|
||||||
|
return
|
||||||
|
|
||||||
|
case depsSuccess = <-depsCh:
|
||||||
|
// Deps complete! Mark as nil to trigger completion handling.
|
||||||
|
depsCh = nil
|
||||||
|
|
||||||
|
case <-depsUpdateCh:
|
||||||
|
// New deps, reloop
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we have updated dependencies. This can happen if the
|
||||||
|
// dependencies were satisfied exactly prior to an Update occuring.
|
||||||
|
// In that case, we'd like to take into account new dependencies
|
||||||
|
// if possible.
|
||||||
|
info.DepsLock.Lock()
|
||||||
|
if info.DepsCh != nil {
|
||||||
|
depsCh = info.DepsCh
|
||||||
|
info.DepsCh = nil
|
||||||
|
}
|
||||||
|
if info.DepsUpdateCh != nil {
|
||||||
|
depsUpdateCh = info.DepsUpdateCh
|
||||||
|
}
|
||||||
|
info.DepsLock.Unlock()
|
||||||
|
|
||||||
|
// If we still have no deps channel set, then we're done!
|
||||||
|
if depsCh == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we passed dependencies, we just want to check once more that
|
||||||
|
// we're not cancelled, since this can happen just as dependencies pass.
|
||||||
|
select {
|
||||||
|
case <-info.CancelCh:
|
||||||
|
// Cancelled during an update while dependencies completed.
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run our callback or note that our upstream failed
|
||||||
|
var err error
|
||||||
|
if depsSuccess {
|
||||||
|
log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v))
|
||||||
|
err = w.Callback(v)
|
||||||
|
} else {
|
||||||
|
log.Printf("[DEBUG] dag/walk: upstream errored, not walking %q", VertexName(v))
|
||||||
|
err = errWalkUpstream
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record the error
|
||||||
|
if err != nil {
|
||||||
|
w.errLock.Lock()
|
||||||
|
defer w.errLock.Unlock()
|
||||||
|
|
||||||
|
if w.errMap == nil {
|
||||||
|
w.errMap = make(map[Vertex]error)
|
||||||
|
}
|
||||||
|
w.errMap[v] = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Walker) waitDeps(
|
||||||
|
v Vertex,
|
||||||
|
deps map[Vertex]<-chan struct{},
|
||||||
|
doneCh chan<- bool,
|
||||||
|
cancelCh <-chan struct{}) {
|
||||||
|
// For each dependency given to us, wait for it to complete
|
||||||
|
for dep, depCh := range deps {
|
||||||
|
DepSatisfied:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-depCh:
|
||||||
|
// Dependency satisfied!
|
||||||
|
break DepSatisfied
|
||||||
|
|
||||||
|
case <-cancelCh:
|
||||||
|
// Wait cancelled. Note that we didn't satisfy dependencies
|
||||||
|
// so that anything waiting on us also doesn't run.
|
||||||
|
doneCh <- false
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
log.Printf("[DEBUG] dag/walk: vertex %q, waiting for: %q",
|
||||||
|
VertexName(v), VertexName(dep))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dependencies satisfied! We need to check if any errored
|
||||||
|
w.errLock.Lock()
|
||||||
|
defer w.errLock.Unlock()
|
||||||
|
for dep, _ := range deps {
|
||||||
|
if w.errMap[dep] != nil {
|
||||||
|
// One of our dependencies failed, so return false
|
||||||
|
doneCh <- false
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// All dependencies satisfied and successful
|
||||||
|
doneCh <- true
|
||||||
|
}
|
|
@ -0,0 +1,281 @@
|
||||||
|
package dag
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestWalker_basic(t *testing.T) {
|
||||||
|
var g AcyclicGraph
|
||||||
|
g.Add(1)
|
||||||
|
g.Add(2)
|
||||||
|
g.Connect(BasicEdge(1, 2))
|
||||||
|
|
||||||
|
// Run it a bunch of times since it is timing dependent
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
var order []interface{}
|
||||||
|
w := &Walker{Callback: walkCbRecord(&order)}
|
||||||
|
w.Update(&g)
|
||||||
|
|
||||||
|
// Wait
|
||||||
|
if err := w.Wait(); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check
|
||||||
|
expected := []interface{}{1, 2}
|
||||||
|
if !reflect.DeepEqual(order, expected) {
|
||||||
|
t.Fatalf("bad: %#v", order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWalker_updateNilGraph(t *testing.T) {
|
||||||
|
var g AcyclicGraph
|
||||||
|
g.Add(1)
|
||||||
|
g.Add(2)
|
||||||
|
g.Connect(BasicEdge(1, 2))
|
||||||
|
|
||||||
|
// Run it a bunch of times since it is timing dependent
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
var order []interface{}
|
||||||
|
w := &Walker{Callback: walkCbRecord(&order)}
|
||||||
|
w.Update(&g)
|
||||||
|
w.Update(nil)
|
||||||
|
|
||||||
|
// Wait
|
||||||
|
if err := w.Wait(); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWalker_error(t *testing.T) {
|
||||||
|
var g AcyclicGraph
|
||||||
|
g.Add(1)
|
||||||
|
g.Add(2)
|
||||||
|
g.Add(3)
|
||||||
|
g.Add(4)
|
||||||
|
g.Connect(BasicEdge(1, 2))
|
||||||
|
g.Connect(BasicEdge(2, 3))
|
||||||
|
g.Connect(BasicEdge(3, 4))
|
||||||
|
|
||||||
|
// Record function
|
||||||
|
var order []interface{}
|
||||||
|
recordF := walkCbRecord(&order)
|
||||||
|
|
||||||
|
// Build a callback that delays until we close a channel
|
||||||
|
cb := func(v Vertex) error {
|
||||||
|
if v == 2 {
|
||||||
|
return fmt.Errorf("error!")
|
||||||
|
}
|
||||||
|
|
||||||
|
return recordF(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
w := &Walker{Callback: cb}
|
||||||
|
w.Update(&g)
|
||||||
|
|
||||||
|
// Wait
|
||||||
|
if err := w.Wait(); err == nil {
|
||||||
|
t.Fatal("expect error")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check
|
||||||
|
expected := []interface{}{1}
|
||||||
|
if !reflect.DeepEqual(order, expected) {
|
||||||
|
t.Fatalf("bad: %#v", order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWalker_newVertex(t *testing.T) {
|
||||||
|
// Run it a bunch of times since it is timing dependent
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
var g AcyclicGraph
|
||||||
|
g.Add(1)
|
||||||
|
g.Add(2)
|
||||||
|
g.Connect(BasicEdge(1, 2))
|
||||||
|
|
||||||
|
var order []interface{}
|
||||||
|
w := &Walker{Callback: walkCbRecord(&order)}
|
||||||
|
w.Update(&g)
|
||||||
|
|
||||||
|
// Wait a bit
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
// Update the graph
|
||||||
|
g.Add(3)
|
||||||
|
w.Update(&g)
|
||||||
|
|
||||||
|
// Update the graph again but with the same vertex
|
||||||
|
g.Add(3)
|
||||||
|
w.Update(&g)
|
||||||
|
|
||||||
|
// Wait
|
||||||
|
if err := w.Wait(); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check
|
||||||
|
expected := []interface{}{1, 2, 3}
|
||||||
|
if !reflect.DeepEqual(order, expected) {
|
||||||
|
t.Fatalf("bad: %#v", order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWalker_removeVertex(t *testing.T) {
|
||||||
|
// Run it a bunch of times since it is timing dependent
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
var g AcyclicGraph
|
||||||
|
g.Add(1)
|
||||||
|
g.Add(2)
|
||||||
|
g.Connect(BasicEdge(1, 2))
|
||||||
|
|
||||||
|
// Record function
|
||||||
|
var order []interface{}
|
||||||
|
recordF := walkCbRecord(&order)
|
||||||
|
|
||||||
|
// Build a callback that delays until we close a channel
|
||||||
|
var w *Walker
|
||||||
|
cb := func(v Vertex) error {
|
||||||
|
if v == 1 {
|
||||||
|
g.Remove(2)
|
||||||
|
w.Update(&g)
|
||||||
|
}
|
||||||
|
|
||||||
|
return recordF(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the initial vertices
|
||||||
|
w = &Walker{Callback: cb}
|
||||||
|
w.Update(&g)
|
||||||
|
|
||||||
|
// Wait
|
||||||
|
if err := w.Wait(); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check
|
||||||
|
expected := []interface{}{1}
|
||||||
|
if !reflect.DeepEqual(order, expected) {
|
||||||
|
t.Fatalf("bad: %#v", order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWalker_newEdge(t *testing.T) {
|
||||||
|
// Run it a bunch of times since it is timing dependent
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
var g AcyclicGraph
|
||||||
|
g.Add(1)
|
||||||
|
g.Add(2)
|
||||||
|
g.Connect(BasicEdge(1, 2))
|
||||||
|
|
||||||
|
// Record function
|
||||||
|
var order []interface{}
|
||||||
|
recordF := walkCbRecord(&order)
|
||||||
|
|
||||||
|
// Build a callback that delays until we close a channel
|
||||||
|
var w *Walker
|
||||||
|
cb := func(v Vertex) error {
|
||||||
|
if v == 1 {
|
||||||
|
g.Add(3)
|
||||||
|
g.Connect(BasicEdge(3, 2))
|
||||||
|
w.Update(&g)
|
||||||
|
}
|
||||||
|
|
||||||
|
return recordF(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the initial vertices
|
||||||
|
w = &Walker{Callback: cb}
|
||||||
|
w.Update(&g)
|
||||||
|
|
||||||
|
// Wait
|
||||||
|
if err := w.Wait(); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check
|
||||||
|
expected := []interface{}{1, 3, 2}
|
||||||
|
if !reflect.DeepEqual(order, expected) {
|
||||||
|
t.Fatalf("bad: %#v", order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWalker_removeEdge(t *testing.T) {
|
||||||
|
// Run it a bunch of times since it is timing dependent
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
var g AcyclicGraph
|
||||||
|
g.Add(1)
|
||||||
|
g.Add(2)
|
||||||
|
g.Add(3)
|
||||||
|
g.Connect(BasicEdge(1, 2))
|
||||||
|
g.Connect(BasicEdge(3, 2))
|
||||||
|
|
||||||
|
// Record function
|
||||||
|
var order []interface{}
|
||||||
|
recordF := walkCbRecord(&order)
|
||||||
|
|
||||||
|
// The way this works is that our original graph forces
|
||||||
|
// the order of 1 => 3 => 2. During the execution of 1, we
|
||||||
|
// remove the edge forcing 3 before 2. Then, during the execution
|
||||||
|
// of 3, we wait on a channel that is only closed by 2, implicitly
|
||||||
|
// forcing 2 before 3 via the callback (and not the graph). If
|
||||||
|
// 2 cannot execute before 3 (edge removal is non-functional), then
|
||||||
|
// this test will timeout.
|
||||||
|
var w *Walker
|
||||||
|
gateCh := make(chan struct{})
|
||||||
|
cb := func(v Vertex) error {
|
||||||
|
if v == 1 {
|
||||||
|
g.RemoveEdge(BasicEdge(3, 2))
|
||||||
|
w.Update(&g)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v == 2 {
|
||||||
|
close(gateCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v == 3 {
|
||||||
|
select {
|
||||||
|
case <-gateCh:
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
return fmt.Errorf("timeout 3 waiting for 2")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return recordF(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the initial vertices
|
||||||
|
w = &Walker{Callback: cb}
|
||||||
|
w.Update(&g)
|
||||||
|
|
||||||
|
// Wait
|
||||||
|
if err := w.Wait(); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check
|
||||||
|
expected := []interface{}{1, 2, 3}
|
||||||
|
if !reflect.DeepEqual(order, expected) {
|
||||||
|
t.Fatalf("bad: %#v", order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// walkCbRecord is a test helper callback that just records the order called.
|
||||||
|
func walkCbRecord(order *[]interface{}) WalkFunc {
|
||||||
|
var l sync.Mutex
|
||||||
|
return func(v Vertex) error {
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
*order = append(*order, v)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue