dag: improve docs and read access during a lock

This commit is contained in:
Mitchell Hashimoto 2017-02-03 11:22:26 +01:00
parent 72a717f2de
commit 65752cd51a
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
3 changed files with 65 additions and 24 deletions

View File

@ -166,7 +166,7 @@ func (g *AcyclicGraph) Cycles() [][]Vertex {
func (g *AcyclicGraph) Walk(cb WalkFunc) error { func (g *AcyclicGraph) Walk(cb WalkFunc) error {
defer g.debug.BeginOperation(typeWalk, "").End("") defer g.debug.BeginOperation(typeWalk, "").End("")
w := &walker{Callback: cb, Reverse: true} w := &Walker{Callback: cb, Reverse: true}
w.Update(&g.Graph) w.Update(&g.Graph)
return w.Wait() return w.Wait()
} }

View File

@ -10,8 +10,23 @@ import (
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
) )
// walker performs a graph walk and supports walk-time changing of vertices // Walker is used to walk every vertex of a graph in parallel.
// and edges. //
// A vertex will only be walked when the dependencies of that vertex have
// been walked. If two vertices can be walked at the same time, they will be.
//
// Update can be called to update the graph. This can be called even during
// a walk, cahnging vertices/edges mid-walk. This should be done carefully.
// If a vertex is removed but has already been executed, the result of that
// execution (any error) is still returned by Wait. Changing or re-adding
// a vertex that has already executed has no effect. Changing edges of
// a vertex that has already executed has no effect.
//
// Non-parallelism can be enforced by introducing a lock in your callback
// function. However, the goroutine overhead of a walk will remain.
// Walker will create V*2 goroutines (one for each vertex, and dependency
// waiter for each vertex). In general this should be of no concern unless
// there are a huge number of vertices.
// //
// The walk is depth first by default. This can be changed with the Reverse // The walk is depth first by default. This can be changed with the Reverse
// option. // option.
@ -19,7 +34,7 @@ import (
// A single walker is only valid for one graph walk. After the walk is complete // A single walker is only valid for one graph walk. After the walk is complete
// you must construct a new walker to walk again. State for the walk is never // you must construct a new walker to walk again. State for the walk is never
// deleted in case vertices or edges are changed. // deleted in case vertices or edges are changed.
type walker struct { type Walker struct {
// Callback is what is called for each vertex // Callback is what is called for each vertex
Callback WalkFunc Callback WalkFunc
@ -46,18 +61,34 @@ type walker struct {
} }
type walkerVertex struct { type walkerVertex struct {
// These should only be set once on initialization and never written again // These should only be set once on initialization and never written again.
// They are not protected by a lock since they don't need to be since
// they are write-once.
// DoneCh is closed when this vertex has completed execution, regardless
// of success.
//
// CancelCh is closed when the vertex should cancel execution. If execution
// is already complete (DoneCh is closed), this has no effect. Otherwise,
// execution is cancelled as quickly as possible.
DoneCh chan struct{} DoneCh chan struct{}
CancelCh chan struct{} CancelCh chan struct{}
// Dependency information. Any changes to any of these fields requires // Dependency information. Any changes to any of these fields requires
// holding DepsLock. // holding DepsLock.
//
// DepsCh is sent a single value that denotes whether the upstream deps
// were successful (no errors). Any value sent means that the upstream
// dependencies are complete. No other values will ever be sent again.
//
// DepsUpdateCh is closed when there is a new DepsCh set.
DepsCh chan bool DepsCh chan bool
DepsUpdateCh chan struct{} DepsUpdateCh chan struct{}
DepsLock sync.Mutex DepsLock sync.Mutex
// Below is not safe to read/write in parallel. This behavior is // Below is not safe to read/write in parallel. This behavior is
// enforced by changes only happening in Update. // enforced by changes only happening in Update. Nothing else should
// ever modify these.
deps map[Vertex]chan struct{} deps map[Vertex]chan struct{}
depsCancelCh chan struct{} depsCancelCh chan struct{}
} }
@ -74,7 +105,7 @@ var errWalkUpstream = errors.New("upstream dependency failed")
// Wait will return as soon as all currently known vertices are complete. // Wait will return as soon as all currently known vertices are complete.
// If you plan on calling Update with more vertices in the future, you // If you plan on calling Update with more vertices in the future, you
// should not call Wait until after this is done. // should not call Wait until after this is done.
func (w *walker) Wait() error { func (w *Walker) Wait() error {
// Wait for completion // Wait for completion
w.wait.Wait() w.wait.Wait()
@ -94,11 +125,17 @@ func (w *walker) Wait() error {
return result return result
} }
// Update updates the currently executing walk with the given vertices // Update updates the currently executing walk with the given graph.
// and edges. It does not block until completion. // This will perform a diff of the vertices and edges and update the walker.
// Already completed vertices remain completed (including any errors during
// their execution).
// //
// Update can be called in parallel to Walk. // This returns immediately once the walker is updated; it does not wait
func (w *walker) Update(g *Graph) { // for completion of the walk.
//
// Multiple Updates can be called in parallel. Update can be called at any
// time during a walk.
func (w *Walker) Update(g *Graph) {
v, e := g.vertices, g.edges v, e := g.vertices, g.edges
// Grab the change lock so no more updates happen but also so that // Grab the change lock so no more updates happen but also so that
@ -277,7 +314,7 @@ func (w *walker) Update(g *Graph) {
// edgeParts returns the waiter and the dependency, in that order. // edgeParts returns the waiter and the dependency, in that order.
// The waiter is waiting on the dependency. // The waiter is waiting on the dependency.
func (w *walker) edgeParts(e Edge) (Vertex, Vertex) { func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) {
if w.Reverse { if w.Reverse {
return e.Source(), e.Target() return e.Source(), e.Target()
} }
@ -287,7 +324,7 @@ func (w *walker) edgeParts(e Edge) (Vertex, Vertex) {
// walkVertex walks a single vertex, waiting for any dependencies before // walkVertex walks a single vertex, waiting for any dependencies before
// executing the callback. // executing the callback.
func (w *walker) walkVertex(v Vertex, info *walkerVertex) { func (w *Walker) walkVertex(v Vertex, info *walkerVertex) {
// When we're done executing, lower the waitgroup count // When we're done executing, lower the waitgroup count
defer w.wait.Done() defer w.wait.Done()
@ -297,6 +334,7 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) {
// Wait for our dependencies. We create a [closed] deps channel so // Wait for our dependencies. We create a [closed] deps channel so
// that we can immediately fall through to load our actual DepsCh. // that we can immediately fall through to load our actual DepsCh.
var depsSuccess bool var depsSuccess bool
var depsUpdateCh chan struct{}
depsCh := make(chan bool, 1) depsCh := make(chan bool, 1)
depsCh <- true depsCh <- true
close(depsCh) close(depsCh)
@ -310,7 +348,7 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) {
// Deps complete! Mark as nil to trigger completion handling. // Deps complete! Mark as nil to trigger completion handling.
depsCh = nil depsCh = nil
case <-info.DepsUpdateCh: case <-depsUpdateCh:
// New deps, reloop // New deps, reloop
} }
@ -323,6 +361,9 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) {
depsCh = info.DepsCh depsCh = info.DepsCh
info.DepsCh = nil info.DepsCh = nil
} }
if info.DepsUpdateCh != nil {
depsUpdateCh = info.DepsUpdateCh
}
info.DepsLock.Unlock() info.DepsLock.Unlock()
// If we still have no deps channel set, then we're done! // If we still have no deps channel set, then we're done!
@ -362,7 +403,7 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) {
} }
} }
func (w *walker) waitDeps( func (w *Walker) waitDeps(
v Vertex, v Vertex,
deps map[Vertex]<-chan struct{}, deps map[Vertex]<-chan struct{},
doneCh chan<- bool, doneCh chan<- bool,

View File

@ -17,7 +17,7 @@ func TestWalker_basic(t *testing.T) {
// Run it a bunch of times since it is timing dependent // Run it a bunch of times since it is timing dependent
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
var order []interface{} var order []interface{}
w := &walker{Callback: walkCbRecord(&order)} w := &Walker{Callback: walkCbRecord(&order)}
w.Update(&g) w.Update(&g)
// Wait // Wait
@ -56,7 +56,7 @@ func TestWalker_error(t *testing.T) {
return recordF(v) return recordF(v)
} }
w := &walker{Callback: cb} w := &Walker{Callback: cb}
w.Update(&g) w.Update(&g)
// Wait // Wait
@ -80,7 +80,7 @@ func TestWalker_newVertex(t *testing.T) {
g.Connect(BasicEdge(1, 2)) g.Connect(BasicEdge(1, 2))
var order []interface{} var order []interface{}
w := &walker{Callback: walkCbRecord(&order)} w := &Walker{Callback: walkCbRecord(&order)}
w.Update(&g) w.Update(&g)
// Wait a bit // Wait a bit
@ -120,7 +120,7 @@ func TestWalker_removeVertex(t *testing.T) {
recordF := walkCbRecord(&order) recordF := walkCbRecord(&order)
// Build a callback that delays until we close a channel // Build a callback that delays until we close a channel
var w *walker var w *Walker
cb := func(v Vertex) error { cb := func(v Vertex) error {
if v == 1 { if v == 1 {
g.Remove(2) g.Remove(2)
@ -131,7 +131,7 @@ func TestWalker_removeVertex(t *testing.T) {
} }
// Add the initial vertices // Add the initial vertices
w = &walker{Callback: cb} w = &Walker{Callback: cb}
w.Update(&g) w.Update(&g)
// Wait // Wait
@ -160,7 +160,7 @@ func TestWalker_newEdge(t *testing.T) {
recordF := walkCbRecord(&order) recordF := walkCbRecord(&order)
// Build a callback that delays until we close a channel // Build a callback that delays until we close a channel
var w *walker var w *Walker
cb := func(v Vertex) error { cb := func(v Vertex) error {
if v == 1 { if v == 1 {
g.Add(3) g.Add(3)
@ -172,7 +172,7 @@ func TestWalker_newEdge(t *testing.T) {
} }
// Add the initial vertices // Add the initial vertices
w = &walker{Callback: cb} w = &Walker{Callback: cb}
w.Update(&g) w.Update(&g)
// Wait // Wait
@ -209,7 +209,7 @@ func TestWalker_removeEdge(t *testing.T) {
// forcing 2 before 3 via the callback (and not the graph). If // forcing 2 before 3 via the callback (and not the graph). If
// 2 cannot execute before 3 (edge removal is non-functional), then // 2 cannot execute before 3 (edge removal is non-functional), then
// this test will timeout. // this test will timeout.
var w *walker var w *Walker
gateCh := make(chan struct{}) gateCh := make(chan struct{})
cb := func(v Vertex) error { cb := func(v Vertex) error {
if v == 1 { if v == 1 {
@ -233,7 +233,7 @@ func TestWalker_removeEdge(t *testing.T) {
} }
// Add the initial vertices // Add the initial vertices
w = &walker{Callback: cb} w = &Walker{Callback: cb}
w.Update(&g) w.Update(&g)
// Wait // Wait