provider/aws: Kinesis DescribeStream pagination

Each call to the Kinesis DescribeStream API returns a limited number of
shards. When interrogating AWS for the state of a Kinesis stream, the
client needs to page through the API's responses to get the true number
of shards.
This commit is contained in:
Spencer Nelson 2015-12-14 11:26:44 -05:00
parent 86882e39ed
commit 063d770e51
1 changed files with 32 additions and 16 deletions

View File

@ -74,9 +74,10 @@ func resourceAwsKinesisStreamCreate(d *schema.ResourceData, meta interface{}) er
sn, err) sn, err)
} }
s := streamRaw.(*kinesis.StreamDescription) s := streamRaw.(kinesisStreamState)
d.SetId(*s.StreamARN) d.SetId(s.arn)
d.Set("arn", s.StreamARN) d.Set("arn", s.arn)
d.Set("shard_count", s.shardCount)
return resourceAwsKinesisStreamUpdate(d, meta) return resourceAwsKinesisStreamUpdate(d, meta)
} }
@ -98,10 +99,8 @@ func resourceAwsKinesisStreamUpdate(d *schema.ResourceData, meta interface{}) er
func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) error { func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kinesisconn conn := meta.(*AWSClient).kinesisconn
sn := d.Get("name").(string) sn := d.Get("name").(string)
describeOpts := &kinesis.DescribeStreamInput{
StreamName: aws.String(sn), state, err := readKinesisStreamState(conn, sn)
}
resp, err := conn.DescribeStream(describeOpts)
if err != nil { if err != nil {
if awsErr, ok := err.(awserr.Error); ok { if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" { if awsErr.Code() == "ResourceNotFoundException" {
@ -111,11 +110,10 @@ func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) erro
return fmt.Errorf("[WARN] Error reading Kinesis Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code()) return fmt.Errorf("[WARN] Error reading Kinesis Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code())
} }
return err return err
}
s := resp.StreamDescription }
d.Set("arn", *s.StreamARN) d.Set("arn", state.arn)
d.Set("shard_count", len(s.Shards)) d.Set("shard_count", state.shardCount)
// set tags // set tags
describeTagsOpts := &kinesis.ListTagsForStreamInput{ describeTagsOpts := &kinesis.ListTagsForStreamInput{
@ -162,12 +160,30 @@ func resourceAwsKinesisStreamDelete(d *schema.ResourceData, meta interface{}) er
return nil return nil
} }
type kinesisStreamState struct {
arn string
status string
shardCount int
}
func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamState, error) {
describeOpts := &kinesis.DescribeStreamInput{
StreamName: aws.String(sn),
}
var state kinesisStreamState
err := conn.DescribeStreamPages(describeOpts, func(page *kinesis.DescribeStreamOutput, last bool) (shouldContinue bool) {
state.arn = aws.StringValue(page.StreamDescription.StreamARN)
state.status = aws.StringValue(page.StreamDescription.StreamStatus)
state.shardCount += len(page.StreamDescription.Shards)
return !last
})
return state, err
}
func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefreshFunc { func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) { return func() (interface{}, string, error) {
describeOpts := &kinesis.DescribeStreamInput{ state, err := readKinesisStreamState(conn, sn)
StreamName: aws.String(sn),
}
resp, err := conn.DescribeStream(describeOpts)
if err != nil { if err != nil {
if awsErr, ok := err.(awserr.Error); ok { if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" { if awsErr.Code() == "ResourceNotFoundException" {
@ -178,6 +194,6 @@ func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefr
return nil, "failed", err return nil, "failed", err
} }
return resp.StreamDescription, *resp.StreamDescription.StreamStatus, nil return state, state.status, nil
} }
} }