Report the correct number of shards

This commit is contained in:
Maxime Bury 2016-03-01 10:46:39 -08:00
parent c9e3b7f51d
commit ae8627f92a
1 changed files with 13 additions and 1 deletions

View File

@ -247,7 +247,7 @@ func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamStat
err := conn.DescribeStreamPages(describeOpts, func(page *kinesis.DescribeStreamOutput, last bool) (shouldContinue bool) { err := conn.DescribeStreamPages(describeOpts, func(page *kinesis.DescribeStreamOutput, last bool) (shouldContinue bool) {
state.arn = aws.StringValue(page.StreamDescription.StreamARN) state.arn = aws.StringValue(page.StreamDescription.StreamARN)
state.status = aws.StringValue(page.StreamDescription.StreamStatus) state.status = aws.StringValue(page.StreamDescription.StreamStatus)
state.shardCount += len(page.StreamDescription.Shards) state.shardCount += len(openShards(page.StreamDescription.Shards))
state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours) state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours)
return !last return !last
}) })
@ -270,3 +270,15 @@ func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefr
return state, state.status, nil return state, state.status, nil
} }
} }
// See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding-merge.html
func openShards(shards []*kinesis.Shard) []*kinesis.Shard {
var open []*kinesis.Shard
for _, s := range shards {
if s.SequenceNumberRange.EndingSequenceNumber == nil {
open = append(open, s)
}
}
return open
}