Add retry-logic and mutexes to work around inconsistent nature of IAM

This commit is contained in:
Radek Simko 2016-02-12 12:07:11 +00:00
parent 8481625596
commit db0d48eb84
2 changed files with 233 additions and 63 deletions

View File

@ -72,9 +72,17 @@ func resourceAwsLambdaPermission() *schema.Resource {
func resourceAwsLambdaPermissionCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).lambdaconn
functionName := d.Get("function_name").(string)
// There is a bug in the API (reported and acknowledged by AWS)
// which causes some permissions to be ignored when API calls are sent in parallel
// We work around this bug via mutex
awsMutexKV.Lock(functionName)
defer awsMutexKV.Unlock(functionName)
input := lambda.AddPermissionInput{
Action: aws.String(d.Get("action").(string)),
FunctionName: aws.String(d.Get("function_name").(string)),
FunctionName: aws.String(functionName),
Principal: aws.String(d.Get("principal").(string)),
StatementId: aws.String(d.Get("statement_id").(string)),
}
@ -99,7 +107,8 @@ func resourceAwsLambdaPermissionCreate(d *schema.ResourceData, meta interface{})
if awsErr, ok := err.(awserr.Error); ok {
// IAM is eventually consistent :/
if awsErr.Code() == "ResourceConflictException" {
return fmt.Errorf("[WARN] Error creating ELB Listener with SSL Cert, retrying: %s", err)
return fmt.Errorf("[WARN] Error adding new Lambda Permission for %s, retrying: %s",
*input.FunctionName, err)
}
}
return resource.RetryError{Err: err}
@ -115,7 +124,26 @@ func resourceAwsLambdaPermissionCreate(d *schema.ResourceData, meta interface{})
d.SetId(d.Get("statement_id").(string))
return resourceAwsLambdaPermissionRead(d, meta)
err = resource.Retry(5*time.Minute, func() error {
// IAM is eventually cosistent :/
err := resourceAwsLambdaPermissionRead(d, meta)
if err != nil {
if strings.HasPrefix(err.Error(), "Error reading Lambda policy: ResourceNotFoundException") {
return fmt.Errorf("[WARN] Error reading newly created Lambda Permission for %s, retrying: %s",
*input.FunctionName, err)
}
if strings.HasPrefix(err.Error(), "Failed to find statement \""+d.Id()) {
return fmt.Errorf("[WARN] Error reading newly created Lambda Permission statement for %s, retrying: %s",
*input.FunctionName, err)
}
log.Printf("[ERROR] An actual error occured when expecting Lambda policy to be there: %s", err)
return resource.RetryError{Err: err}
}
return nil
})
return err
}
func resourceAwsLambdaPermissionRead(d *schema.ResourceData, meta interface{}) error {
@ -129,20 +157,31 @@ func resourceAwsLambdaPermissionRead(d *schema.ResourceData, meta interface{}) e
}
log.Printf("[DEBUG] Looking for Lambda permission: %s", input)
out, err := conn.GetPolicy(&input)
if err != nil {
return fmt.Errorf("Error reading Lambda policy: %s", err)
}
var out *lambda.GetPolicyOutput
var statement *LambdaPolicyStatement
err := resource.Retry(1*time.Minute, func() error {
// IAM is eventually cosistent :/
var err error
out, err = conn.GetPolicy(&input)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" {
return err
}
}
return resource.RetryError{Err: err}
}
d.Set("full_policy", *out.Policy)
policyInBytes := []byte(*out.Policy)
policy := LambdaPolicy{}
err = json.Unmarshal(policyInBytes, &policy)
if err != nil {
return fmt.Errorf("Error unmarshalling Lambda policy: %s", err)
}
policyInBytes := []byte(*out.Policy)
policy := LambdaPolicy{}
err = json.Unmarshal(policyInBytes, &policy)
if err != nil {
return resource.RetryError{Err: fmt.Errorf("Error unmarshalling Lambda policy: %s", err)}
}
statement, err := findLambdaPolicyStatementById(&policy, d.Id())
statement, err = findLambdaPolicyStatementById(&policy, d.Id())
return err
})
if err != nil {
return err
}
@ -182,8 +221,16 @@ func resourceAwsLambdaPermissionRead(d *schema.ResourceData, meta interface{}) e
func resourceAwsLambdaPermissionDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).lambdaconn
functionName := d.Get("function_name").(string)
// There is a bug in the API (reported and acknowledged by AWS)
// which causes some permissions to be ignored when API calls are sent in parallel
// We work around this bug via mutex
awsMutexKV.Lock(functionName)
defer awsMutexKV.Unlock(functionName)
input := lambda.RemovePermissionInput{
FunctionName: aws.String(d.Get("function_name").(string)),
FunctionName: aws.String(functionName),
StatementId: aws.String(d.Id()),
}
@ -197,6 +244,51 @@ func resourceAwsLambdaPermissionDelete(d *schema.ResourceData, meta interface{})
return err
}
err = resource.Retry(5*time.Minute, func() error {
log.Printf("[DEBUG] Checking if Lambda permission %q is deleted", d.Id())
params := &lambda.GetPolicyInput{
FunctionName: aws.String(d.Get("function_name").(string)),
}
if v, ok := d.GetOk("qualifier"); ok {
params.Qualifier = aws.String(v.(string))
}
log.Printf("[DEBUG] Looking for Lambda permission: %s", *params)
resp, err := conn.GetPolicy(params)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" {
return nil
}
}
return resource.RetryError{Err: err}
}
if resp.Policy == nil {
return nil
}
policyInBytes := []byte(*resp.Policy)
policy := LambdaPolicy{}
err = json.Unmarshal(policyInBytes, &policy)
if err != nil {
return fmt.Errorf("Error unmarshalling Lambda policy: %s", err)
}
_, err = findLambdaPolicyStatementById(&policy, d.Id())
if err != nil {
return nil
}
log.Printf("[DEBUG] No error when checking if Lambda permission %s is deleted", d.Id())
return nil
})
if err != nil {
return fmt.Errorf("Failed removing Lambda permission: %s", err)
}
log.Printf("[DEBUG] Lambda permission with ID %q removed", d.Id())
d.SetId("")
@ -206,14 +298,14 @@ func resourceAwsLambdaPermissionDelete(d *schema.ResourceData, meta interface{})
func findLambdaPolicyStatementById(policy *LambdaPolicy, id string) (
*LambdaPolicyStatement, error) {
log.Printf("[DEBUG] Received %d statements in Lambda policy", len(policy.Statement))
log.Printf("[DEBUG] Received %d statements in Lambda policy: %s", len(policy.Statement), policy.Statement)
for _, statement := range policy.Statement {
if statement.Sid == id {
return &statement, nil
}
}
return nil, fmt.Errorf("Failed to find statement %q in Lambda policy", id)
return nil, fmt.Errorf("Failed to find statement %q in Lambda policy:\n%s", id, policy.Statement)
}
func getQualifierFromLambdaAliasOrVersionArn(arn string) (string, error) {

View File

@ -3,11 +3,13 @@ package aws
import (
"encoding/json"
"fmt"
"log"
"regexp"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
@ -207,6 +209,8 @@ func TestAccAWSLambdaPermission_multiplePerms(t *testing.T) {
var secondStatement LambdaPolicyStatement
var secondStatementModified LambdaPolicyStatement
var thirdStatement LambdaPolicyStatement
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
@ -242,11 +246,18 @@ func TestAccAWSLambdaPermission_multiplePerms(t *testing.T) {
resource.TestMatchResourceAttr("aws_lambda_permission.first", "function_name",
regexp.MustCompile(":function:lambda_function_name_perm_multiperms$")),
// 2nd
testAccCheckLambdaPermissionExists("aws_lambda_permission.renamed", &secondStatementModified),
resource.TestCheckResourceAttr("aws_lambda_permission.renamed", "action", "lambda:*"),
resource.TestCheckResourceAttr("aws_lambda_permission.renamed", "principal", "events.amazonaws.com"),
resource.TestCheckResourceAttr("aws_lambda_permission.renamed", "statement_id", "AllowExecutionSecond"),
resource.TestMatchResourceAttr("aws_lambda_permission.renamed", "function_name",
testAccCheckLambdaPermissionExists("aws_lambda_permission.sec0nd", &secondStatementModified),
resource.TestCheckResourceAttr("aws_lambda_permission.sec0nd", "action", "lambda:*"),
resource.TestCheckResourceAttr("aws_lambda_permission.sec0nd", "principal", "events.amazonaws.com"),
resource.TestCheckResourceAttr("aws_lambda_permission.sec0nd", "statement_id", "AllowExecutionSec0nd"),
resource.TestMatchResourceAttr("aws_lambda_permission.sec0nd", "function_name",
regexp.MustCompile(":function:lambda_function_name_perm_multiperms$")),
// 3rd
testAccCheckLambdaPermissionExists("aws_lambda_permission.third", &thirdStatement),
resource.TestCheckResourceAttr("aws_lambda_permission.third", "action", "lambda:*"),
resource.TestCheckResourceAttr("aws_lambda_permission.third", "principal", "events.amazonaws.com"),
resource.TestCheckResourceAttr("aws_lambda_permission.third", "statement_id", "AllowExecutionThird"),
resource.TestMatchResourceAttr("aws_lambda_permission.third", "function_name",
regexp.MustCompile(":function:lambda_function_name_perm_multiperms$")),
),
},
@ -311,31 +322,26 @@ func testAccCheckLambdaPermissionExists(n string, statement *LambdaPolicyStateme
}
conn := testAccProvider.Meta().(*AWSClient).lambdaconn
params := &lambda.GetPolicyInput{
FunctionName: aws.String(rs.Primary.Attributes["function_name"]),
}
if v, ok := rs.Primary.Attributes["qualifier"]; ok {
params.Qualifier = aws.String(v)
}
log.Printf("[DEBUG] Looking for Lambda permission: %s", *params)
resp, err := conn.GetPolicy(params)
if err != nil {
return fmt.Errorf("Lambda policy not found: %q", err)
}
if resp.Policy == nil {
return fmt.Errorf("Received Lambda policy is empty")
}
policyInBytes := []byte(*resp.Policy)
policy := LambdaPolicy{}
err = json.Unmarshal(policyInBytes, &policy)
if err != nil {
return fmt.Errorf("Error unmarshalling Lambda policy: %s", err)
}
foundStatement, err := findLambdaPolicyStatementById(&policy, rs.Primary.ID)
// IAM is eventually consistent
var foundStatement *LambdaPolicyStatement
err := resource.Retry(5*time.Minute, func() error {
var err error
foundStatement, err = lambdaPermissionExists(rs, conn)
if err != nil {
if strings.HasPrefix(err.Error(), "ResourceNotFoundException") {
return err
}
if strings.HasPrefix(err.Error(), "Lambda policy not found") {
return err
}
if strings.HasPrefix(err.Error(), "Failed to find statement") {
return err
}
return resource.RetryError{Err: err}
}
return nil
})
if err != nil {
return err
}
@ -354,24 +360,89 @@ func testAccCheckAWSLambdaPermissionDestroy(s *terraform.State) error {
continue
}
params := &lambda.GetPolicyInput{
FunctionName: aws.String(rs.Primary.Attributes["function_name"]),
}
if v, ok := rs.Primary.Attributes["qualifier"]; ok {
params.Qualifier = aws.String(v)
}
resp, err := conn.GetPolicy(params)
if err == nil {
// No permissions should exist at this point
return fmt.Errorf("Lambda Permission still exists: %s\n%s",
rs.Primary.ID, *resp.Policy)
// IAM is eventually consistent
err := resource.Retry(5*time.Minute, func() error {
err := isLambdaPermissionGone(rs, conn)
if err != nil {
if !strings.HasPrefix(err.Error(), "Error unmarshalling Lambda policy") {
return err
}
return resource.RetryError{Err: err}
}
return nil
})
if err != nil {
return err
}
}
return nil
}
func isLambdaPermissionGone(rs *terraform.ResourceState, conn *lambda.Lambda) error {
params := &lambda.GetPolicyInput{
FunctionName: aws.String(rs.Primary.Attributes["function_name"]),
}
if v, ok := rs.Primary.Attributes["qualifier"]; ok {
params.Qualifier = aws.String(v)
}
resp, err := conn.GetPolicy(params)
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" {
// no policy found => all statements deleted
return nil
}
}
if err != nil {
return fmt.Errorf("Unexpected error when checking existence of Lambda permission: %s\n%s",
rs.Primary.ID, err)
}
policyInBytes := []byte(*resp.Policy)
policy := LambdaPolicy{}
err = json.Unmarshal(policyInBytes, &policy)
if err != nil {
return fmt.Errorf("Error unmarshalling Lambda policy (%s): %s", *resp.Policy, err)
}
state, err := findLambdaPolicyStatementById(&policy, rs.Primary.ID)
if err != nil {
// statement not found => deleted
return nil
}
return fmt.Errorf("Policy statement expected to be gone (%s):\n%s",
rs.Primary.ID, *state)
}
func lambdaPermissionExists(rs *terraform.ResourceState, conn *lambda.Lambda) (*LambdaPolicyStatement, error) {
params := &lambda.GetPolicyInput{
FunctionName: aws.String(rs.Primary.Attributes["function_name"]),
}
if v, ok := rs.Primary.Attributes["qualifier"]; ok {
params.Qualifier = aws.String(v)
}
resp, err := conn.GetPolicy(params)
if err != nil {
return nil, fmt.Errorf("Lambda policy not found: %q", err)
}
if resp.Policy == nil {
return nil, fmt.Errorf("Received Lambda policy is empty")
}
policyInBytes := []byte(*resp.Policy)
policy := LambdaPolicy{}
err = json.Unmarshal(policyInBytes, &policy)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling Lambda policy: %s", err)
}
return findLambdaPolicyStatementById(&policy, rs.Primary.ID)
}
var testAccAWSLambdaPermissionConfig = `
resource "aws_lambda_permission" "allow_cloudwatch" {
statement_id = "AllowExecutionFromCloudWatch"
@ -496,11 +567,12 @@ resource "aws_lambda_permission" "first" {
}
resource "aws_lambda_permission" "%s" {
statement_id = "AllowExecutionSecond"
statement_id = "%s"
action = "lambda:*"
function_name = "${aws_lambda_function.test_lambda.arn}"
principal = "events.amazonaws.com"
}
%s
resource "aws_lambda_function" "test_lambda" {
filename = "test-fixtures/lambdatest.zip"
@ -530,9 +602,15 @@ EOF
`
var testAccAWSLambdaPermissionConfig_multiplePerms = fmt.Sprintf(
testAccAWSLambdaPermissionConfig_multiplePerms_tpl, "second")
testAccAWSLambdaPermissionConfig_multiplePerms_tpl, "second", "AllowExecutionSecond", "")
var testAccAWSLambdaPermissionConfig_multiplePermsModified = fmt.Sprintf(
testAccAWSLambdaPermissionConfig_multiplePerms_tpl, "renamed")
testAccAWSLambdaPermissionConfig_multiplePerms_tpl, "sec0nd", "AllowExecutionSec0nd", `
resource "aws_lambda_permission" "third" {
statement_id = "AllowExecutionThird"
action = "lambda:*"
function_name = "${aws_lambda_function.test_lambda.arn}"
principal = "events.amazonaws.com"
}`)
var testAccAWSLambdaPermissionConfig_withS3 = `
resource "aws_lambda_permission" "with_s3" {