Make TestApply_parallelism more reliable
Wait for our expected number of goroutines to be staged and ready, then allow Run() to complete. Use a high-water-mark counter to ensure we never exceeded the max expected concurrent goroutines at any point during the run.
This commit is contained in:
parent
f86198c155
commit
54d50c90b1
|
@ -12,7 +12,6 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -59,24 +58,57 @@ func TestApply(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// high water mark counter
|
||||||
|
type hwm struct {
|
||||||
|
sync.Mutex
|
||||||
|
val int
|
||||||
|
max int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *hwm) Inc() {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
t.val++
|
||||||
|
if t.val > t.max {
|
||||||
|
t.max = t.val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *hwm) Dec() {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
t.val--
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *hwm) Max() int {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
return t.max
|
||||||
|
}
|
||||||
|
|
||||||
func TestApply_parallelism(t *testing.T) {
|
func TestApply_parallelism(t *testing.T) {
|
||||||
provider := testProvider()
|
provider := testProvider()
|
||||||
statePath := testTempFile(t)
|
statePath := testTempFile(t)
|
||||||
|
|
||||||
|
par := 4
|
||||||
|
|
||||||
// This blocks all the appy functions. We close it when we exit so
|
// This blocks all the appy functions. We close it when we exit so
|
||||||
// they end quickly after this test finishes.
|
// they end quickly after this test finishes.
|
||||||
block := make(chan struct{})
|
block := make(chan struct{})
|
||||||
defer close(block)
|
// signal how many goroutines have started
|
||||||
|
started := make(chan int, 100)
|
||||||
|
|
||||||
|
runCount := &hwm{}
|
||||||
|
|
||||||
var runCount uint64
|
|
||||||
provider.ApplyFn = func(
|
provider.ApplyFn = func(
|
||||||
i *terraform.InstanceInfo,
|
i *terraform.InstanceInfo,
|
||||||
s *terraform.InstanceState,
|
s *terraform.InstanceState,
|
||||||
d *terraform.InstanceDiff) (*terraform.InstanceState, error) {
|
d *terraform.InstanceDiff) (*terraform.InstanceState, error) {
|
||||||
// Increment so we're counting parallelism
|
// Increment so we're counting parallelism
|
||||||
atomic.AddUint64(&runCount, 1)
|
started <- 1
|
||||||
|
runCount.Inc()
|
||||||
// Block until we're done
|
defer runCount.Dec()
|
||||||
|
// Block here to stage up our max number of parallel instances
|
||||||
<-block
|
<-block
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -90,30 +122,46 @@ func TestApply_parallelism(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
par := uint64(5)
|
|
||||||
args := []string{
|
args := []string{
|
||||||
"-state", statePath,
|
"-state", statePath,
|
||||||
fmt.Sprintf("-parallelism=%d", par),
|
fmt.Sprintf("-parallelism=%d", par),
|
||||||
testFixturePath("parallelism"),
|
testFixturePath("parallelism"),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run in a goroutine. We still try to catch any errors and
|
// Run in a goroutine. We can get any errors from the ui.OutputWriter
|
||||||
// get them on the error channel.
|
doneCh := make(chan int, 1)
|
||||||
errCh := make(chan string, 1)
|
|
||||||
go func() {
|
go func() {
|
||||||
if code := c.Run(args); code != 0 {
|
doneCh <- c.Run(args)
|
||||||
errCh <- ui.OutputWriter.String()
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
timeout := time.After(5 * time.Second)
|
||||||
|
|
||||||
|
// ensure things are running
|
||||||
|
for i := 0; i < par; i++ {
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatal("timeout waiting for all goroutines to start")
|
||||||
|
case <-started:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// a little extra sleep, since we can't ensure all goroutines from the walk have
|
||||||
|
// really started
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
close(block)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(1000 * time.Millisecond):
|
case res := <-doneCh:
|
||||||
case err := <-errCh:
|
if res != 0 {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatal(ui.OutputWriter.String())
|
||||||
|
}
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatal("timeout waiting from Run()")
|
||||||
}
|
}
|
||||||
|
|
||||||
// The total in flight should equal the parallelism
|
// The total in flight should equal the parallelism
|
||||||
if rc := atomic.LoadUint64(&runCount); rc != par {
|
if runCount.Max() != par {
|
||||||
t.Fatalf("Expected parallelism: %d, got: %d", par, rc)
|
t.Fatalf("Expected parallelism: %d, got: %d", par, runCount.Max())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue