From d14d891367b2e13cef77f5c986473fdf1af8ba02 Mon Sep 17 00:00:00 2001 From: stack72 Date: Tue, 10 Nov 2015 16:24:33 +0000 Subject: [PATCH] Finishing the first pass at Kinesis Firehose. I have only implemented the S3 configuration right now as Terraform doesn't include RedShift support --- builtin/providers/aws/config.go | 2 +- ...ce_aws_kinesis_firehose_delivery_stream.go | 77 ++++++++++++++- ...s_kinesis_firehose_delivery_stream_test.go | 97 +++++++++++++++++-- 3 files changed, 165 insertions(+), 11 deletions(-) diff --git a/builtin/providers/aws/config.go b/builtin/providers/aws/config.go index bd7b08924..d8a9ff862 100644 --- a/builtin/providers/aws/config.go +++ b/builtin/providers/aws/config.go @@ -171,7 +171,7 @@ func (c *Config) Client() (interface{}, error) { } log.Println("[INFO] Initializing Kinesis Firehose Connection") - client.firehoseconn = autoscaling.New(sess) + client.firehoseconn = firehose.New(sess) log.Println("[INFO] Initializing AutoScaling connection") client.autoscalingconn = autoscaling.New(sess) diff --git a/builtin/providers/aws/resource_aws_kinesis_firehose_delivery_stream.go b/builtin/providers/aws/resource_aws_kinesis_firehose_delivery_stream.go index 320e078b2..c1b973454 100644 --- a/builtin/providers/aws/resource_aws_kinesis_firehose_delivery_stream.go +++ b/builtin/providers/aws/resource_aws_kinesis_firehose_delivery_stream.go @@ -2,6 +2,7 @@ package aws import ( "fmt" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -29,6 +30,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { Type: schema.TypeString, Required: true, ForceNew: true, + StateFunc: func(v interface{}) string { + value := v.(string) + return strings.ToLower(value) + }, }, "role_arn": &schema.Schema{ @@ -69,6 +74,18 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { Optional: true, Computed: true, }, + + "version_id": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + Computed: true, + }, + + "destination_id": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + Computed: true, + }, }, } } @@ -76,6 +93,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).firehoseconn + if d.Get("destination").(string) != "s3" { + return fmt.Errorf("[ERROR] AWS Kinesis Firehose only supports S3 destinations for the first implementation") + } + sn := d.Get("name").(string) input := &firehose.CreateDeliveryStreamInput{ DeliveryStreamName: aws.String(sn), @@ -124,11 +145,56 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta d.SetId(*s.DeliveryStreamARN) d.Set("arn", s.DeliveryStreamARN) - return resourceAwsKinesisStreamUpdate(d, meta) + return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta) } func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta interface{}) error { - // conn := meta.(*AWSClient).firehoseconn + conn := meta.(*AWSClient).firehoseconn + + if d.Get("destination").(string) != "s3" { + return fmt.Errorf("[ERROR] AWS Kinesis Firehose only supports S3 destinations for the first implementation") + } + + sn := d.Get("name").(string) + s3_config := &firehose.S3DestinationUpdate{} + + if d.HasChange("role_arn") { + s3_config.RoleARN = aws.String(d.Get("role_arn").(string)) + } + + if d.HasChange("s3_bucket_arn") { + s3_config.BucketARN = aws.String(d.Get("s3_bucket_arn").(string)) + } + + if d.HasChange("s3_prefix") { + s3_config.Prefix = aws.String(d.Get("s3_prefix").(string)) + } + + if d.HasChange("s3_data_compression") { + s3_config.CompressionFormat = aws.String(d.Get("s3_data_compression").(string)) + } + + if d.HasChange("s3_buffer_interval") || d.HasChange("s3_buffer_size") { + bufferingHints := &firehose.BufferingHints{ + IntervalInSeconds: aws.Int64(int64(d.Get("s3_buffer_interval").(int))), + SizeInMBs: aws.Int64(int64(d.Get("s3_buffer_size").(int))), + } + s3_config.BufferingHints = bufferingHints + } + + destOpts := &firehose.UpdateDestinationInput{ + DeliveryStreamName: aws.String(sn), + CurrentDeliveryStreamVersionId: aws.String(d.Get("version_id").(string)), + DestinationId: aws.String(d.Get("destination_id").(string)), + S3DestinationUpdate: s3_config, + } + + _, err := conn.UpdateDestination(destOpts) + if err != nil { + return fmt.Errorf( + "Error Updating Kinesis Firehose Delivery Stream: %s", + sn, err) + } return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta) } @@ -136,7 +202,7 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta func resourceAwsKinesisFirehoseDeliveryStreamRead(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).firehoseconn sn := d.Get("name").(string) - describeOpts := &firehose.DeliveryStreamDescription{ + describeOpts := &firehose.DescribeDeliveryStreamInput{ DeliveryStreamName: aws.String(sn), } resp, err := conn.DescribeDeliveryStream(describeOpts) @@ -152,7 +218,12 @@ func resourceAwsKinesisFirehoseDeliveryStreamRead(d *schema.ResourceData, meta i } s := resp.DeliveryStreamDescription + d.Set("version_id", s.VersionId) d.Set("arn", *s.DeliveryStreamARN) + if len(s.Destinations) > 0 { + destination := s.Destinations[0] + d.Set("destination_id", *destination.DestinationId) + } return nil } diff --git a/builtin/providers/aws/resource_aws_kinesis_firehose_delivery_stream_test.go b/builtin/providers/aws/resource_aws_kinesis_firehose_delivery_stream_test.go index 1414f7ae6..611e196ce 100644 --- a/builtin/providers/aws/resource_aws_kinesis_firehose_delivery_stream_test.go +++ b/builtin/providers/aws/resource_aws_kinesis_firehose_delivery_stream_test.go @@ -2,6 +2,7 @@ package aws import ( "fmt" + "log" "math/rand" "strings" "testing" @@ -16,13 +17,16 @@ import ( func TestAccAWSKinesisFirehoseDeliveryStream_basic(t *testing.T) { var stream firehose.DeliveryStreamDescription + ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int() + config := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_basic, ri, ri) + resource.Test(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, Providers: testAccProviders, CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy, Steps: []resource.TestStep{ resource.TestStep{ - Config: testAccKinesisFirehoseDeliveryStreamConfig, + Config: config, Check: resource.ComposeTestCheckFunc( testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream), testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream), @@ -32,9 +36,53 @@ func TestAccAWSKinesisFirehoseDeliveryStream_basic(t *testing.T) { }) } +func TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates(t *testing.T) { + var stream firehose.DeliveryStreamDescription + + ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int() + preconfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3, ri, ri) + postConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3Updates, ri, ri) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: preconfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream), + resource.TestCheckResourceAttr( + "aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_size", "5"), + resource.TestCheckResourceAttr( + "aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_interval", "300"), + resource.TestCheckResourceAttr( + "aws_kinesis_firehose_delivery_stream.test_stream", "s3_data_compression", "UNCOMPRESSED"), + ), + }, + + resource.TestStep{ + Config: postConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream), + resource.TestCheckResourceAttr( + "aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_size", "10"), + resource.TestCheckResourceAttr( + "aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_interval", "400"), + resource.TestCheckResourceAttr( + "aws_kinesis_firehose_delivery_stream.test_stream", "s3_data_compression", "GZIP"), + ), + }, + }, + }) +} + func testAccCheckKinesisFirehoseDeliveryStreamExists(n string, stream *firehose.DeliveryStreamDescription) resource.TestCheckFunc { return func(s *terraform.State) error { rs, ok := s.RootModule().Resources[n] + log.Printf("State: %#v", s.RootModule().Resources) if !ok { return fmt.Errorf("Not found: %s", n) } @@ -60,7 +108,7 @@ func testAccCheckKinesisFirehoseDeliveryStreamExists(n string, stream *firehose. func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.DeliveryStreamDescription) resource.TestCheckFunc { return func(s *terraform.State) error { - if !strings.HasPrefix(*stream.DeliveryStreamName, "terraform-kinesis-firehose-test") { + if !strings.HasPrefix(*stream.DeliveryStreamName, "terraform-kinesis-firehose") { return fmt.Errorf("Bad Stream name: %s", *stream.DeliveryStreamName) } for _, rs := range s.RootModule().Resources { @@ -98,9 +146,44 @@ func testAccCheckKinesisFirehoseDeliveryStreamDestroy(s *terraform.State) error return nil } -var testAccKinesisFirehoseDeliveryStreamConfig = fmt.Sprintf(` -resource "aws_kinesis_firehose_delivery_stream" "test_stream" { - name = "terraform-kinesis-firehose-test-%d" - +var testAccKinesisFirehoseDeliveryStreamConfig_basic = ` +resource "aws_s3_bucket" "bucket" { + bucket = "tf-test-bucket-%d" + acl = "private" } -`, rand.New(rand.NewSource(time.Now().UnixNano())).Int()) + +resource "aws_kinesis_firehose_delivery_stream" "test_stream" { + name = "terraform-kinesis-firehose-basictest-%d" + destination = "s3" + role_arn = "arn:aws:iam::946579370547:role/firehose_delivery_role" + s3_bucket_arn = "${aws_s3_bucket.bucket.arn}" +}` + +var testAccKinesisFirehoseDeliveryStreamConfig_s3 = ` +resource "aws_s3_bucket" "bucket" { + bucket = "tf-test-bucket-%d" + acl = "private" +} + +resource "aws_kinesis_firehose_delivery_stream" "test_stream" { + name = "terraform-kinesis-firehose-s3test-%d" + destination = "s3" + role_arn = "arn:aws:iam::946579370547:role/firehose_delivery_role" + s3_bucket_arn = "${aws_s3_bucket.bucket.arn}" +}` + +var testAccKinesisFirehoseDeliveryStreamConfig_s3Updates = ` +resource "aws_s3_bucket" "bucket" { + bucket = "tf-test-bucket-01-%d" + acl = "private" +} + +resource "aws_kinesis_firehose_delivery_stream" "test_stream" { + name = "terraform-kinesis-firehose-s3test-%d" + destination = "s3" + role_arn = "arn:aws:iam::946579370547:role/firehose_delivery_role" + s3_bucket_arn = "${aws_s3_bucket.bucket.arn}" + s3_buffer_size = 10 + s3_buffer_interval = 400 + s3_data_compression = "GZIP" +}`