provider/aws: serialize SG rule access to fix race condition
Because `aws_security_group_rule` resources are an abstraction on top of Security Groups, they must interact with the AWS Security Group APIs in a pattern that often results in lots of parallel requests interacting with the same security group. We've found that this pattern can trigger race conditions resulting in inconsistent behavior, including: * Rules that report as created but don't actually exist on AWS's side * Rules that show up in AWS but don't register as being created locally, resulting in follow up attempts to authorize the rule failing w/ Duplicate errors Here, we introduce a per-SG mutex that must be held by any security group before it is allowed to interact with AWS APIs. This protects the space between `DescribeSecurityGroup` and `Authorize*` / `Revoke*` calls, ensuring that no other rules interact with the SG during that span. The included test exposes the race by applying a security group with lots of rules, which based on the dependency graph can all be handled in parallel. This fails most of the time without the new locking behavior. I've omitted the mutex from `Read`, since it is only called during the Refresh walk when no changes are being made, meaning a bunch of parallel `DescribeSecurityGroup` API calls should be consistent in that case.
This commit is contained in:
parent
e7a054c9dd
commit
6b6b5a43c3
|
@ -6,6 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/terraform/helper/hashcode"
|
"github.com/hashicorp/terraform/helper/hashcode"
|
||||||
|
"github.com/hashicorp/terraform/helper/mutexkv"
|
||||||
"github.com/hashicorp/terraform/helper/schema"
|
"github.com/hashicorp/terraform/helper/schema"
|
||||||
"github.com/hashicorp/terraform/terraform"
|
"github.com/hashicorp/terraform/terraform"
|
||||||
|
|
||||||
|
@ -321,3 +322,6 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {
|
||||||
|
|
||||||
return config.Client()
|
return config.Client()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is a global MutexKV for use within this plugin.
|
||||||
|
var awsMutexKV = mutexkv.NewMutexKV()
|
||||||
|
|
|
@ -84,8 +84,11 @@ func resourceAwsSecurityGroupRule() *schema.Resource {
|
||||||
func resourceAwsSecurityGroupRuleCreate(d *schema.ResourceData, meta interface{}) error {
|
func resourceAwsSecurityGroupRuleCreate(d *schema.ResourceData, meta interface{}) error {
|
||||||
conn := meta.(*AWSClient).ec2conn
|
conn := meta.(*AWSClient).ec2conn
|
||||||
sg_id := d.Get("security_group_id").(string)
|
sg_id := d.Get("security_group_id").(string)
|
||||||
sg, err := findResourceSecurityGroup(conn, sg_id)
|
|
||||||
|
|
||||||
|
awsMutexKV.Lock(sg_id)
|
||||||
|
defer awsMutexKV.Unlock(sg_id)
|
||||||
|
|
||||||
|
sg, err := findResourceSecurityGroup(conn, sg_id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -249,8 +252,11 @@ func resourceAwsSecurityGroupRuleRead(d *schema.ResourceData, meta interface{})
|
||||||
func resourceAwsSecurityGroupRuleDelete(d *schema.ResourceData, meta interface{}) error {
|
func resourceAwsSecurityGroupRuleDelete(d *schema.ResourceData, meta interface{}) error {
|
||||||
conn := meta.(*AWSClient).ec2conn
|
conn := meta.(*AWSClient).ec2conn
|
||||||
sg_id := d.Get("security_group_id").(string)
|
sg_id := d.Get("security_group_id").(string)
|
||||||
sg, err := findResourceSecurityGroup(conn, sg_id)
|
|
||||||
|
|
||||||
|
awsMutexKV.Lock(sg_id)
|
||||||
|
defer awsMutexKV.Unlock(sg_id)
|
||||||
|
|
||||||
|
sg, err := findResourceSecurityGroup(conn, sg_id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package aws
|
package aws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -339,7 +340,24 @@ func TestAccAWSSecurityGroupRule_PartialMatching_Source(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccAWSSecurityGroupRule_Race(t *testing.T) {
|
||||||
|
var group ec2.SecurityGroup
|
||||||
|
|
||||||
|
resource.Test(t, resource.TestCase{
|
||||||
|
PreCheck: func() { testAccPreCheck(t) },
|
||||||
|
Providers: testAccProviders,
|
||||||
|
CheckDestroy: testAccCheckAWSSecurityGroupRuleDestroy,
|
||||||
|
Steps: []resource.TestStep{
|
||||||
|
resource.TestStep{
|
||||||
|
Config: testAccAWSSecurityGroupRuleRace,
|
||||||
|
Check: resource.ComposeTestCheckFunc(
|
||||||
|
testAccCheckAWSSecurityGroupRuleExists("aws_security_group.race", &group),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testAccCheckAWSSecurityGroupRuleDestroy(s *terraform.State) error {
|
func testAccCheckAWSSecurityGroupRuleDestroy(s *terraform.State) error {
|
||||||
|
@ -718,3 +736,41 @@ resource "aws_security_group_rule" "other_ingress" {
|
||||||
security_group_id = "${aws_security_group.web.id}"
|
security_group_id = "${aws_security_group.web.id}"
|
||||||
}
|
}
|
||||||
`
|
`
|
||||||
|
|
||||||
|
var testAccAWSSecurityGroupRuleRace = func() string {
|
||||||
|
var b bytes.Buffer
|
||||||
|
iterations := 50
|
||||||
|
b.WriteString(fmt.Sprintf(`
|
||||||
|
resource "aws_vpc" "default" {
|
||||||
|
cidr_block = "10.0.0.0/16"
|
||||||
|
tags { Name = "tf-sg-rule-race" }
|
||||||
|
}
|
||||||
|
|
||||||
|
resource "aws_security_group" "race" {
|
||||||
|
name = "tf-sg-rule-race-group-%d"
|
||||||
|
vpc_id = "${aws_vpc.default.id}"
|
||||||
|
}
|
||||||
|
`, genRandInt()))
|
||||||
|
for i := 1; i < iterations; i++ {
|
||||||
|
b.WriteString(fmt.Sprintf(`
|
||||||
|
resource "aws_security_group_rule" "ingress%d" {
|
||||||
|
security_group_id = "${aws_security_group.race.id}"
|
||||||
|
type = "ingress"
|
||||||
|
from_port = %d
|
||||||
|
to_port = %d
|
||||||
|
protocol = "tcp"
|
||||||
|
cidr_blocks = ["10.0.0.%d/32"]
|
||||||
|
}
|
||||||
|
|
||||||
|
resource "aws_security_group_rule" "egress%d" {
|
||||||
|
security_group_id = "${aws_security_group.race.id}"
|
||||||
|
type = "egress"
|
||||||
|
from_port = %d
|
||||||
|
to_port = %d
|
||||||
|
protocol = "tcp"
|
||||||
|
cidr_blocks = ["10.0.0.%d/32"]
|
||||||
|
}
|
||||||
|
`, i, i, i, i, i, i, i, i))
|
||||||
|
}
|
||||||
|
return b.String()
|
||||||
|
}()
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
package mutexkv
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MutexKV is a simple key/value store for arbitrary mutexes. It can be used to
|
||||||
|
// serialize changes across arbitrary collaborators that share knowledge of the
|
||||||
|
// keys they must serialize on.
|
||||||
|
//
|
||||||
|
// The initial use case is to let aws_security_group_rule resources serialize
|
||||||
|
// their access to individual security groups based on SG ID.
|
||||||
|
type MutexKV struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
store map[string]*sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Locks the mutex for the given key. Caller is responsible for calling Unlock
|
||||||
|
// for the same key
|
||||||
|
func (m *MutexKV) Lock(key string) {
|
||||||
|
log.Printf("[DEBUG] Locking %q", key)
|
||||||
|
m.get(key).Lock()
|
||||||
|
log.Printf("[DEBUG] Locked %q", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unlock the mutex for the given key. Caller must have called Lock for the same key first
|
||||||
|
func (m *MutexKV) Unlock(key string) {
|
||||||
|
log.Printf("[DEBUG] Unlocking %q", key)
|
||||||
|
m.get(key).Unlock()
|
||||||
|
log.Printf("[DEBUG] Unlocked %q", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a mutex for the given key, no guarantee of its lock status
|
||||||
|
func (m *MutexKV) get(key string) *sync.Mutex {
|
||||||
|
m.lock.Lock()
|
||||||
|
defer m.lock.Unlock()
|
||||||
|
mutex, ok := m.store[key]
|
||||||
|
if !ok {
|
||||||
|
mutex = &sync.Mutex{}
|
||||||
|
m.store[key] = mutex
|
||||||
|
}
|
||||||
|
return mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a properly initalized MutexKV
|
||||||
|
func NewMutexKV() *MutexKV {
|
||||||
|
return &MutexKV{
|
||||||
|
store: make(map[string]*sync.Mutex),
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
package mutexkv
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMutexKVLock(t *testing.T) {
|
||||||
|
mkv := NewMutexKV()
|
||||||
|
|
||||||
|
mkv.Lock("foo")
|
||||||
|
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
mkv.Lock("foo")
|
||||||
|
close(doneCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-doneCh:
|
||||||
|
t.Fatal("Second lock was able to be taken. This shouldn't happen.")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
// pass
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMutexKVUnlock(t *testing.T) {
|
||||||
|
mkv := NewMutexKV()
|
||||||
|
|
||||||
|
mkv.Lock("foo")
|
||||||
|
mkv.Unlock("foo")
|
||||||
|
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
mkv.Lock("foo")
|
||||||
|
close(doneCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-doneCh:
|
||||||
|
// pass
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
t.Fatal("Second lock blocked after unlock. This shouldn't happen.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMutexKVDifferentKeys(t *testing.T) {
|
||||||
|
mkv := NewMutexKV()
|
||||||
|
|
||||||
|
mkv.Lock("foo")
|
||||||
|
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
mkv.Lock("bar")
|
||||||
|
close(doneCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-doneCh:
|
||||||
|
// pass
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
t.Fatal("Second lock on a different key blocked. This shouldn't happen.")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue