Merge pull request #3833 from stack72/aws-kinesis-firehose

provider/aws: Add Kinesis Firehose resource
This commit is contained in:
James Nugent 2015-11-10 16:04:48 -05:00
commit 7c50e3ed65
6 changed files with 660 additions and 101 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/aws/aws-sdk-go/service/elasticache" "github.com/aws/aws-sdk-go/service/elasticache"
elasticsearch "github.com/aws/aws-sdk-go/service/elasticsearchservice" elasticsearch "github.com/aws/aws-sdk-go/service/elasticsearchservice"
"github.com/aws/aws-sdk-go/service/elb" "github.com/aws/aws-sdk-go/service/elb"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/aws/aws-sdk-go/service/glacier" "github.com/aws/aws-sdk-go/service/glacier"
"github.com/aws/aws-sdk-go/service/iam" "github.com/aws/aws-sdk-go/service/iam"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
@ -74,6 +75,7 @@ type AWSClient struct {
rdsconn *rds.RDS rdsconn *rds.RDS
iamconn *iam.IAM iamconn *iam.IAM
kinesisconn *kinesis.Kinesis kinesisconn *kinesis.Kinesis
firehoseconn *firehose.Firehose
elasticacheconn *elasticache.ElastiCache elasticacheconn *elasticache.ElastiCache
lambdaconn *lambda.Lambda lambdaconn *lambda.Lambda
opsworksconn *opsworks.OpsWorks opsworksconn *opsworks.OpsWorks
@ -168,6 +170,9 @@ func (c *Config) Client() (interface{}, error) {
errs = append(errs, authErr) errs = append(errs, authErr)
} }
log.Println("[INFO] Initializing Kinesis Firehose Connection")
client.firehoseconn = firehose.New(sess)
log.Println("[INFO] Initializing AutoScaling connection") log.Println("[INFO] Initializing AutoScaling connection")
client.autoscalingconn = autoscaling.New(sess) client.autoscalingconn = autoscaling.New(sess)

View File

@ -216,6 +216,7 @@ func Provider() terraform.ResourceProvider {
"aws_instance": resourceAwsInstance(), "aws_instance": resourceAwsInstance(),
"aws_internet_gateway": resourceAwsInternetGateway(), "aws_internet_gateway": resourceAwsInternetGateway(),
"aws_key_pair": resourceAwsKeyPair(), "aws_key_pair": resourceAwsKeyPair(),
"aws_kinesis_firehose_delivery_stream": resourceAwsKinesisFirehoseDeliveryStream(),
"aws_kinesis_stream": resourceAwsKinesisStream(), "aws_kinesis_stream": resourceAwsKinesisStream(),
"aws_lambda_function": resourceAwsLambdaFunction(), "aws_lambda_function": resourceAwsLambdaFunction(),
"aws_launch_configuration": resourceAwsLaunchConfiguration(), "aws_launch_configuration": resourceAwsLaunchConfiguration(),

View File

@ -0,0 +1,281 @@
package aws
import (
"fmt"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)
func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
return &schema.Resource{
Create: resourceAwsKinesisFirehoseDeliveryStreamCreate,
Read: resourceAwsKinesisFirehoseDeliveryStreamRead,
Update: resourceAwsKinesisFirehoseDeliveryStreamUpdate,
Delete: resourceAwsKinesisFirehoseDeliveryStreamDelete,
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"destination": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
StateFunc: func(v interface{}) string {
value := v.(string)
return strings.ToLower(value)
},
},
"role_arn": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"s3_bucket_arn": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"s3_prefix": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"s3_buffer_size": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 5,
},
"s3_buffer_interval": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 300,
},
"s3_data_compression": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "UNCOMPRESSED",
},
"arn": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Computed: true,
},
"version_id": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Computed: true,
},
"destination_id": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Computed: true,
},
},
}
}
func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
if d.Get("destination").(string) != "s3" {
return fmt.Errorf("[ERROR] AWS Kinesis Firehose only supports S3 destinations for the first implementation")
}
sn := d.Get("name").(string)
input := &firehose.CreateDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
}
s3_config := &firehose.S3DestinationConfiguration{
BucketARN: aws.String(d.Get("s3_bucket_arn").(string)),
RoleARN: aws.String(d.Get("role_arn").(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(d.Get("s3_buffer_interval").(int))),
SizeInMBs: aws.Int64(int64(d.Get("s3_buffer_size").(int))),
},
CompressionFormat: aws.String(d.Get("s3_data_compression").(string)),
}
if v, ok := d.GetOk("s3_prefix"); ok {
s3_config.Prefix = aws.String(v.(string))
}
input.S3DestinationConfiguration = s3_config
_, err := conn.CreateDeliveryStream(input)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
return fmt.Errorf("[WARN] Error creating Kinesis Firehose Delivery Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code())
}
return err
}
stateConf := &resource.StateChangeConf{
Pending: []string{"CREATING"},
Target: "ACTIVE",
Refresh: firehoseStreamStateRefreshFunc(conn, sn),
Timeout: 5 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}
firehoseStream, err := stateConf.WaitForState()
if err != nil {
return fmt.Errorf(
"Error waiting for Kinesis Stream (%s) to become active: %s",
sn, err)
}
s := firehoseStream.(*firehose.DeliveryStreamDescription)
d.SetId(*s.DeliveryStreamARN)
d.Set("arn", s.DeliveryStreamARN)
return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}
func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
if d.Get("destination").(string) != "s3" {
return fmt.Errorf("[ERROR] AWS Kinesis Firehose only supports S3 destinations for the first implementation")
}
sn := d.Get("name").(string)
s3_config := &firehose.S3DestinationUpdate{}
if d.HasChange("role_arn") {
s3_config.RoleARN = aws.String(d.Get("role_arn").(string))
}
if d.HasChange("s3_bucket_arn") {
s3_config.BucketARN = aws.String(d.Get("s3_bucket_arn").(string))
}
if d.HasChange("s3_prefix") {
s3_config.Prefix = aws.String(d.Get("s3_prefix").(string))
}
if d.HasChange("s3_data_compression") {
s3_config.CompressionFormat = aws.String(d.Get("s3_data_compression").(string))
}
if d.HasChange("s3_buffer_interval") || d.HasChange("s3_buffer_size") {
bufferingHints := &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(d.Get("s3_buffer_interval").(int))),
SizeInMBs: aws.Int64(int64(d.Get("s3_buffer_size").(int))),
}
s3_config.BufferingHints = bufferingHints
}
destOpts := &firehose.UpdateDestinationInput{
DeliveryStreamName: aws.String(sn),
CurrentDeliveryStreamVersionId: aws.String(d.Get("version_id").(string)),
DestinationId: aws.String(d.Get("destination_id").(string)),
S3DestinationUpdate: s3_config,
}
_, err := conn.UpdateDestination(destOpts)
if err != nil {
return fmt.Errorf(
"Error Updating Kinesis Firehose Delivery Stream: %s",
sn, err)
}
return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}
func resourceAwsKinesisFirehoseDeliveryStreamRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
sn := d.Get("name").(string)
describeOpts := &firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
}
resp, err := conn.DescribeDeliveryStream(describeOpts)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" {
d.SetId("")
return nil
}
return fmt.Errorf("[WARN] Error reading Kinesis Firehose Delivery Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code())
}
return err
}
s := resp.DeliveryStreamDescription
d.Set("version_id", s.VersionId)
d.Set("arn", *s.DeliveryStreamARN)
if len(s.Destinations) > 0 {
destination := s.Destinations[0]
d.Set("destination_id", *destination.DestinationId)
}
return nil
}
func resourceAwsKinesisFirehoseDeliveryStreamDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
sn := d.Get("name").(string)
_, err := conn.DeleteDeliveryStream(&firehose.DeleteDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
})
if err != nil {
return err
}
stateConf := &resource.StateChangeConf{
Pending: []string{"DELETING"},
Target: "DESTROYED",
Refresh: firehoseStreamStateRefreshFunc(conn, sn),
Timeout: 5 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}
_, err = stateConf.WaitForState()
if err != nil {
return fmt.Errorf(
"Error waiting for Delivery Stream (%s) to be destroyed: %s",
sn, err)
}
d.SetId("")
return nil
}
func firehoseStreamStateRefreshFunc(conn *firehose.Firehose, sn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
describeOpts := &firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
}
resp, err := conn.DescribeDeliveryStream(describeOpts)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" {
return 42, "DESTROYED", nil
}
return nil, awsErr.Code(), err
}
return nil, "failed", err
}
return resp.DeliveryStreamDescription, *resp.DeliveryStreamDescription.DeliveryStreamStatus, nil
}
}

View File

@ -0,0 +1,189 @@
package aws
import (
"fmt"
"log"
"math/rand"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccAWSKinesisFirehoseDeliveryStream_basic(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int()
config := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_basic, ri, ri)
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: config,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream),
),
},
},
})
}
func TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int()
preconfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3, ri, ri)
postConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3Updates, ri, ri)
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: preconfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_size", "5"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_interval", "300"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_data_compression", "UNCOMPRESSED"),
),
},
resource.TestStep{
Config: postConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_size", "10"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_interval", "400"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_data_compression", "GZIP"),
),
},
},
})
}
func testAccCheckKinesisFirehoseDeliveryStreamExists(n string, stream *firehose.DeliveryStreamDescription) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
log.Printf("State: %#v", s.RootModule().Resources)
if !ok {
return fmt.Errorf("Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No Kinesis Firehose ID is set")
}
conn := testAccProvider.Meta().(*AWSClient).firehoseconn
describeOpts := &firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(rs.Primary.Attributes["name"]),
}
resp, err := conn.DescribeDeliveryStream(describeOpts)
if err != nil {
return err
}
*stream = *resp.DeliveryStreamDescription
return nil
}
}
func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.DeliveryStreamDescription) resource.TestCheckFunc {
return func(s *terraform.State) error {
if !strings.HasPrefix(*stream.DeliveryStreamName, "terraform-kinesis-firehose") {
return fmt.Errorf("Bad Stream name: %s", *stream.DeliveryStreamName)
}
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_kinesis_firehose_delivery_stream" {
continue
}
if *stream.DeliveryStreamARN != rs.Primary.Attributes["arn"] {
return fmt.Errorf("Bad Delivery Stream ARN\n\t expected: %s\n\tgot: %s\n", rs.Primary.Attributes["arn"], *stream.DeliveryStreamARN)
}
}
return nil
}
}
func testAccCheckKinesisFirehoseDeliveryStreamDestroy(s *terraform.State) error {
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_kinesis_firehose_delivery_stream" {
continue
}
conn := testAccProvider.Meta().(*AWSClient).firehoseconn
describeOpts := &firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(rs.Primary.Attributes["name"]),
}
resp, err := conn.DescribeDeliveryStream(describeOpts)
if err == nil {
if resp.DeliveryStreamDescription != nil && *resp.DeliveryStreamDescription.DeliveryStreamStatus != "DELETING" {
return fmt.Errorf("Error: Delivery Stream still exists")
}
}
return nil
}
return nil
}
var testAccKinesisFirehoseDeliveryStreamConfig_basic = `
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-%d"
acl = "private"
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-basictest-%d"
destination = "s3"
role_arn = "arn:aws:iam::946579370547:role/firehose_delivery_role"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
}`
var testAccKinesisFirehoseDeliveryStreamConfig_s3 = `
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-%d"
acl = "private"
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-s3test-%d"
destination = "s3"
role_arn = "arn:aws:iam::946579370547:role/firehose_delivery_role"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
}`
var testAccKinesisFirehoseDeliveryStreamConfig_s3Updates = `
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-01-%d"
acl = "private"
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-s3test-%d"
destination = "s3"
role_arn = "arn:aws:iam::946579370547:role/firehose_delivery_role"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
s3_buffer_size = 10
s3_buffer_interval = 400
s3_data_compression = "GZIP"
}`

View File

@ -0,0 +1,72 @@
---
layout: "aws"
page_title: "AWS: aws_kinesis_firehose_delivery_stream"
sidebar_current: "docs-aws-resource-kinesis-firehose-delivery-stream"
description: |-
Provides a AWS Kinesis Firehose Delivery Stream
---
# aws\_kinesis\_stream
Provides a Kinesis Firehose Delivery Stream resource. Amazon Kinesis Firehose is a fully managed, elastic service to easily deliver real-time data streams to destinations such as Amazon S3 and Amazon Redshift.
For more details, see the [Amazon Kinesis Firehose Documentation][1].
## Example Usage
```
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket"
acl = "private"
}
esource "aws_iam_role" "firehose_role" {
name = "firehose_test_role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-test-stream"
destination = "s3"
role_arn = "${aws_iam_role.firehose_role.arn}"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
```
~> **NOTE:** Kinesis Firehose is currently only supported in us-east-1, us-west-2 and eu-west-1. This implementation of Kinesis Firehose only supports the s3 destination type as Terraform doesn't support Redshift yet.
## Argument Reference
The following arguments are supported:
* `name` - (Required) A name to identify the stream. This is unique to the
AWS account and region the Stream is created in.
* `destination`  (Required) This is the destination to where the data is delivered. The only options are `s3` & `redshift`
* `role_arn` - (Required) The ARN of the AWS credentials.
* `s3_bucket_arn` - (Required) The ARN of the S3 bucket
* `s3_prefix` - (Optional) The "YYYY/MM/DD/HH" time format prefix is automatically used for delivered S3 files. You can specify an extra prefix to be added in front of the time format prefix. Note that if the prefix ends with a slash, it appears as a folder in the S3 bucket
* `s3_buffer_size` - (Optional) Buffer incoming data to the specified size, in MBs, before delivering it to the destination. The default value is 5.
We recommend setting SizeInMBs to a value greater than the amount of data you typically ingest into the delivery stream in 10 seconds. For example, if you typically ingest data at 1 MB/sec set SizeInMBs to be 10 MB or highe
* `s3_buffer_interval` - (Optional) Buffer incoming data for the specified period of time, in seconds, before delivering it to the destination. The default value is 300
* `s3_data_compression` - (Optional) The compression format. If no value is specified, the default is NOCOMPRESSION. Other supported values are GZIP, ZIP & Snappy
## Attributes Reference
* `arn` - The Amazon Resource Name (ARN) specifying the Stream
[1]: http://aws.amazon.com/documentation/firehose/

View File

@ -324,6 +324,17 @@
</ul> </ul>
</li> </li>
<li<%= sidebar_current(/^docs-aws-resource-kinesis-firehose/) %>>
<a href="#">Kinesis Firehose Resources</a>
<ul class="nav nav-visible">
<li<%= sidebar_current("docs-aws-resource-kinesis-firehose-delivery-stream") %>>
<a href="/docs/providers/aws/r/kinesis_firehose_delivery_stream.html">aws_kinesis_firehose_delivery_stream</a>
</li>
</ul>
</li>
<li<%= sidebar_current(/^docs-aws-resource-lambda/) %>> <li<%= sidebar_current(/^docs-aws-resource-lambda/) %>>
<a href="#">Lambda Resources</a> <a href="#">Lambda Resources</a>