fix concurrent test relying on sleep
Make an old concurrent test deterministic, and not rely on sleep for any synchronization.
This commit is contained in:
parent
c8f83e184b
commit
1e5a8e4dae
|
@ -2,6 +2,7 @@ package command
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
@ -143,34 +144,8 @@ func TestApply_lockedStateWait(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// high water mark counter
|
// Verify that the parallelism flag allows no more than the desired number of
|
||||||
type hwm struct {
|
// concurrent calls to ApplyResourceChange.
|
||||||
sync.Mutex
|
|
||||||
val int
|
|
||||||
max int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *hwm) Inc() {
|
|
||||||
t.Lock()
|
|
||||||
defer t.Unlock()
|
|
||||||
t.val++
|
|
||||||
if t.val > t.max {
|
|
||||||
t.max = t.val
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *hwm) Dec() {
|
|
||||||
t.Lock()
|
|
||||||
defer t.Unlock()
|
|
||||||
t.val--
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *hwm) Max() int {
|
|
||||||
t.Lock()
|
|
||||||
defer t.Unlock()
|
|
||||||
return t.max
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestApply_parallelism(t *testing.T) {
|
func TestApply_parallelism(t *testing.T) {
|
||||||
// Create a temporary working directory that is empty
|
// Create a temporary working directory that is empty
|
||||||
td := tempDir(t)
|
td := tempDir(t)
|
||||||
|
@ -182,13 +157,15 @@ func TestApply_parallelism(t *testing.T) {
|
||||||
|
|
||||||
par := 4
|
par := 4
|
||||||
|
|
||||||
// This blocks all the apply functions. We close it when we exit so
|
// started is a semaphore that we use to ensure that we never have more
|
||||||
// they end quickly after this test finishes.
|
// than "par" apply operations happening concurrently
|
||||||
block := make(chan struct{})
|
started := make(chan struct{}, par)
|
||||||
// signal how many goroutines have started
|
|
||||||
started := make(chan int, 100)
|
|
||||||
|
|
||||||
runCount := &hwm{}
|
// beginCtx is used as a starting gate to hold back ApplyResourceChange
|
||||||
|
// calls until we reach the desired concurrency. The cancel func "begin" is
|
||||||
|
// called once we reach the desired concurrency, allowing all apply calls
|
||||||
|
// to proceed in unison.
|
||||||
|
beginCtx, begin := context.WithCancel(context.Background())
|
||||||
|
|
||||||
// Since our mock provider has its own mutex preventing concurrent calls
|
// Since our mock provider has its own mutex preventing concurrent calls
|
||||||
// to ApplyResourceChange, we need to use a number of separate providers
|
// to ApplyResourceChange, we need to use a number of separate providers
|
||||||
|
@ -209,12 +186,29 @@ func TestApply_parallelism(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
provider.ApplyResourceChangeFn = func(req providers.ApplyResourceChangeRequest) providers.ApplyResourceChangeResponse {
|
provider.ApplyResourceChangeFn = func(req providers.ApplyResourceChangeRequest) providers.ApplyResourceChangeResponse {
|
||||||
// Increment so we're counting parallelism
|
|
||||||
started <- 1
|
// If we ever have more than our intended parallelism number of
|
||||||
runCount.Inc()
|
// apply operations running concurrently, the semaphore will fail.
|
||||||
defer runCount.Dec()
|
select {
|
||||||
// Block here to stage up our max number of parallel instances
|
case started <- struct{}{}:
|
||||||
<-block
|
defer func() {
|
||||||
|
<-started
|
||||||
|
}()
|
||||||
|
default:
|
||||||
|
t.Fatal("too many concurrent apply operations")
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we never reach our intended parallelism, the context will
|
||||||
|
// never be canceled and the test will time out.
|
||||||
|
if len(started) >= par {
|
||||||
|
begin()
|
||||||
|
}
|
||||||
|
<-beginCtx.Done()
|
||||||
|
|
||||||
|
// do some "work"
|
||||||
|
// Not required for correctness, but makes it easier to spot a
|
||||||
|
// failure when there is more overlap.
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
return providers.ApplyResourceChangeResponse{
|
return providers.ApplyResourceChangeResponse{
|
||||||
NewState: cty.EmptyObjectVal,
|
NewState: cty.EmptyObjectVal,
|
||||||
|
@ -240,41 +234,10 @@ func TestApply_parallelism(t *testing.T) {
|
||||||
fmt.Sprintf("-parallelism=%d", par),
|
fmt.Sprintf("-parallelism=%d", par),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run in a goroutine. We can get any errors from the ui.OutputWriter
|
res := c.Run(args)
|
||||||
doneCh := make(chan int, 1)
|
|
||||||
go func() {
|
|
||||||
doneCh <- c.Run(args)
|
|
||||||
}()
|
|
||||||
|
|
||||||
timeout := time.After(5 * time.Second)
|
|
||||||
|
|
||||||
// ensure things are running
|
|
||||||
for i := 0; i < par; i++ {
|
|
||||||
select {
|
|
||||||
case <-timeout:
|
|
||||||
t.Fatal("timeout waiting for all goroutines to start")
|
|
||||||
case <-started:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// a little extra sleep, since we can't ensure all goroutines from the walk have
|
|
||||||
// really started
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
close(block)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case res := <-doneCh:
|
|
||||||
if res != 0 {
|
if res != 0 {
|
||||||
t.Fatal(ui.OutputWriter.String())
|
t.Fatal(ui.OutputWriter.String())
|
||||||
}
|
}
|
||||||
case <-timeout:
|
|
||||||
t.Fatal("timeout waiting from Run()")
|
|
||||||
}
|
|
||||||
|
|
||||||
// The total in flight should equal the parallelism
|
|
||||||
if runCount.Max() != par {
|
|
||||||
t.Fatalf("Expected parallelism: %d, got: %d", par, runCount.Max())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestApply_configInvalid(t *testing.T) {
|
func TestApply_configInvalid(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue