Finishing the first pass at Kinesis Firehose. I have only implemented the S3 configuration right now as Terraform doesn't include RedShift support

This commit is contained in:
stack72 2015-11-10 16:24:33 +00:00
parent 5dfa9ac823
commit d14d891367
3 changed files with 165 additions and 11 deletions

View File

@ -171,7 +171,7 @@ func (c *Config) Client() (interface{}, error) {
}
log.Println("[INFO] Initializing Kinesis Firehose Connection")
client.firehoseconn = autoscaling.New(sess)
client.firehoseconn = firehose.New(sess)
log.Println("[INFO] Initializing AutoScaling connection")
client.autoscalingconn = autoscaling.New(sess)

View File

@ -2,6 +2,7 @@ package aws
import (
"fmt"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
@ -29,6 +30,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Type: schema.TypeString,
Required: true,
ForceNew: true,
StateFunc: func(v interface{}) string {
value := v.(string)
return strings.ToLower(value)
},
},
"role_arn": &schema.Schema{
@ -69,6 +74,18 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Optional: true,
Computed: true,
},
"version_id": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Computed: true,
},
"destination_id": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Computed: true,
},
},
}
}
@ -76,6 +93,10 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
if d.Get("destination").(string) != "s3" {
return fmt.Errorf("[ERROR] AWS Kinesis Firehose only supports S3 destinations for the first implementation")
}
sn := d.Get("name").(string)
input := &firehose.CreateDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
@ -124,11 +145,56 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
d.SetId(*s.DeliveryStreamARN)
d.Set("arn", s.DeliveryStreamARN)
return resourceAwsKinesisStreamUpdate(d, meta)
return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}
func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta interface{}) error {
// conn := meta.(*AWSClient).firehoseconn
conn := meta.(*AWSClient).firehoseconn
if d.Get("destination").(string) != "s3" {
return fmt.Errorf("[ERROR] AWS Kinesis Firehose only supports S3 destinations for the first implementation")
}
sn := d.Get("name").(string)
s3_config := &firehose.S3DestinationUpdate{}
if d.HasChange("role_arn") {
s3_config.RoleARN = aws.String(d.Get("role_arn").(string))
}
if d.HasChange("s3_bucket_arn") {
s3_config.BucketARN = aws.String(d.Get("s3_bucket_arn").(string))
}
if d.HasChange("s3_prefix") {
s3_config.Prefix = aws.String(d.Get("s3_prefix").(string))
}
if d.HasChange("s3_data_compression") {
s3_config.CompressionFormat = aws.String(d.Get("s3_data_compression").(string))
}
if d.HasChange("s3_buffer_interval") || d.HasChange("s3_buffer_size") {
bufferingHints := &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(d.Get("s3_buffer_interval").(int))),
SizeInMBs: aws.Int64(int64(d.Get("s3_buffer_size").(int))),
}
s3_config.BufferingHints = bufferingHints
}
destOpts := &firehose.UpdateDestinationInput{
DeliveryStreamName: aws.String(sn),
CurrentDeliveryStreamVersionId: aws.String(d.Get("version_id").(string)),
DestinationId: aws.String(d.Get("destination_id").(string)),
S3DestinationUpdate: s3_config,
}
_, err := conn.UpdateDestination(destOpts)
if err != nil {
return fmt.Errorf(
"Error Updating Kinesis Firehose Delivery Stream: %s",
sn, err)
}
return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}
@ -136,7 +202,7 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
func resourceAwsKinesisFirehoseDeliveryStreamRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
sn := d.Get("name").(string)
describeOpts := &firehose.DeliveryStreamDescription{
describeOpts := &firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
}
resp, err := conn.DescribeDeliveryStream(describeOpts)
@ -152,7 +218,12 @@ func resourceAwsKinesisFirehoseDeliveryStreamRead(d *schema.ResourceData, meta i
}
s := resp.DeliveryStreamDescription
d.Set("version_id", s.VersionId)
d.Set("arn", *s.DeliveryStreamARN)
if len(s.Destinations) > 0 {
destination := s.Destinations[0]
d.Set("destination_id", *destination.DestinationId)
}
return nil
}

View File

@ -2,6 +2,7 @@ package aws
import (
"fmt"
"log"
"math/rand"
"strings"
"testing"
@ -16,13 +17,16 @@ import (
func TestAccAWSKinesisFirehoseDeliveryStream_basic(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int()
config := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_basic, ri, ri)
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccKinesisFirehoseDeliveryStreamConfig,
Config: config,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream),
@ -32,9 +36,53 @@ func TestAccAWSKinesisFirehoseDeliveryStream_basic(t *testing.T) {
})
}
func TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int()
preconfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3, ri, ri)
postConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3Updates, ri, ri)
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: preconfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_size", "5"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_interval", "300"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_data_compression", "UNCOMPRESSED"),
),
},
resource.TestStep{
Config: postConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_size", "10"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_interval", "400"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_data_compression", "GZIP"),
),
},
},
})
}
func testAccCheckKinesisFirehoseDeliveryStreamExists(n string, stream *firehose.DeliveryStreamDescription) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
log.Printf("State: %#v", s.RootModule().Resources)
if !ok {
return fmt.Errorf("Not found: %s", n)
}
@ -60,7 +108,7 @@ func testAccCheckKinesisFirehoseDeliveryStreamExists(n string, stream *firehose.
func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.DeliveryStreamDescription) resource.TestCheckFunc {
return func(s *terraform.State) error {
if !strings.HasPrefix(*stream.DeliveryStreamName, "terraform-kinesis-firehose-test") {
if !strings.HasPrefix(*stream.DeliveryStreamName, "terraform-kinesis-firehose") {
return fmt.Errorf("Bad Stream name: %s", *stream.DeliveryStreamName)
}
for _, rs := range s.RootModule().Resources {
@ -98,9 +146,44 @@ func testAccCheckKinesisFirehoseDeliveryStreamDestroy(s *terraform.State) error
return nil
}
var testAccKinesisFirehoseDeliveryStreamConfig = fmt.Sprintf(`
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-test-%d"
var testAccKinesisFirehoseDeliveryStreamConfig_basic = `
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-%d"
acl = "private"
}
`, rand.New(rand.NewSource(time.Now().UnixNano())).Int())
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-basictest-%d"
destination = "s3"
role_arn = "arn:aws:iam::946579370547:role/firehose_delivery_role"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
}`
var testAccKinesisFirehoseDeliveryStreamConfig_s3 = `
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-%d"
acl = "private"
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-s3test-%d"
destination = "s3"
role_arn = "arn:aws:iam::946579370547:role/firehose_delivery_role"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
}`
var testAccKinesisFirehoseDeliveryStreamConfig_s3Updates = `
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-01-%d"
acl = "private"
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-s3test-%d"
destination = "s3"
role_arn = "arn:aws:iam::946579370547:role/firehose_delivery_role"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
s3_buffer_size = 10
s3_buffer_interval = 400
s3_data_compression = "GZIP"
}`