2016-08-18 20:42:29 +02:00
package aws
import (
"fmt"
"log"
"regexp"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)
func resourceAwsElasticacheReplicationGroup ( ) * schema . Resource {
resourceSchema := resourceAwsElastiCacheCommonSchema ( )
resourceSchema [ "replication_group_id" ] = & schema . Schema {
Type : schema . TypeString ,
Required : true ,
ForceNew : true ,
ValidateFunc : validateAwsElastiCacheReplicationGroupId ,
}
resourceSchema [ "automatic_failover_enabled" ] = & schema . Schema {
Type : schema . TypeBool ,
Optional : true ,
Default : false ,
}
resourceSchema [ "replication_group_description" ] = & schema . Schema {
Type : schema . TypeString ,
Required : true ,
}
resourceSchema [ "number_cache_clusters" ] = & schema . Schema {
Type : schema . TypeInt ,
Required : true ,
ForceNew : true ,
}
2016-08-23 12:13:26 +02:00
resourceSchema [ "primary_endpoint_address" ] = & schema . Schema {
Type : schema . TypeString ,
Computed : true ,
}
2016-08-18 21:30:12 +02:00
resourceSchema [ "engine" ] . Required = false
resourceSchema [ "engine" ] . Optional = true
resourceSchema [ "engine" ] . Default = "redis"
2016-08-18 20:42:29 +02:00
resourceSchema [ "engine" ] . ValidateFunc = validateAwsElastiCacheReplicationGroupEngine
return & schema . Resource {
Create : resourceAwsElasticacheReplicationGroupCreate ,
Read : resourceAwsElasticacheReplicationGroupRead ,
Update : resourceAwsElasticacheReplicationGroupUpdate ,
Delete : resourceAwsElasticacheReplicationGroupDelete ,
Schema : resourceSchema ,
}
}
func resourceAwsElasticacheReplicationGroupCreate ( d * schema . ResourceData , meta interface { } ) error {
conn := meta . ( * AWSClient ) . elasticacheconn
tags := tagsFromMapEC ( d . Get ( "tags" ) . ( map [ string ] interface { } ) )
params := & elasticache . CreateReplicationGroupInput {
ReplicationGroupId : aws . String ( d . Get ( "replication_group_id" ) . ( string ) ) ,
ReplicationGroupDescription : aws . String ( d . Get ( "replication_group_description" ) . ( string ) ) ,
AutomaticFailoverEnabled : aws . Bool ( d . Get ( "automatic_failover_enabled" ) . ( bool ) ) ,
CacheNodeType : aws . String ( d . Get ( "node_type" ) . ( string ) ) ,
Engine : aws . String ( d . Get ( "engine" ) . ( string ) ) ,
Port : aws . Int64 ( int64 ( d . Get ( "port" ) . ( int ) ) ) ,
NumCacheClusters : aws . Int64 ( int64 ( d . Get ( "number_cache_clusters" ) . ( int ) ) ) ,
Tags : tags ,
}
if v , ok := d . GetOk ( "engine_version" ) ; ok {
params . EngineVersion = aws . String ( v . ( string ) )
}
preferred_azs := d . Get ( "availability_zones" ) . ( * schema . Set ) . List ( )
if len ( preferred_azs ) > 0 {
azs := expandStringList ( preferred_azs )
params . PreferredCacheClusterAZs = azs
}
if v , ok := d . GetOk ( "parameter_group_name" ) ; ok {
params . CacheParameterGroupName = aws . String ( v . ( string ) )
}
if v , ok := d . GetOk ( "subnet_group_name" ) ; ok {
params . CacheSubnetGroupName = aws . String ( v . ( string ) )
}
security_group_names := d . Get ( "security_group_names" ) . ( * schema . Set ) . List ( )
if len ( security_group_names ) > 0 {
params . CacheSecurityGroupNames = expandStringList ( security_group_names )
}
security_group_ids := d . Get ( "security_group_ids" ) . ( * schema . Set ) . List ( )
if len ( security_group_ids ) > 0 {
params . SecurityGroupIds = expandStringList ( security_group_ids )
}
snaps := d . Get ( "snapshot_arns" ) . ( * schema . Set ) . List ( )
if len ( snaps ) > 0 {
params . SnapshotArns = expandStringList ( snaps )
}
if v , ok := d . GetOk ( "maintenance_window" ) ; ok {
params . PreferredMaintenanceWindow = aws . String ( v . ( string ) )
}
if v , ok := d . GetOk ( "notification_topic_arn" ) ; ok {
params . NotificationTopicArn = aws . String ( v . ( string ) )
}
if v , ok := d . GetOk ( "snapshot_retention_limit" ) ; ok {
params . SnapshotRetentionLimit = aws . Int64 ( int64 ( v . ( int ) ) )
}
if v , ok := d . GetOk ( "snapshot_window" ) ; ok {
params . SnapshotWindow = aws . String ( v . ( string ) )
}
resp , err := conn . CreateReplicationGroup ( params )
if err != nil {
return fmt . Errorf ( "Error creating Elasticache Replication Group: %s" , err )
}
d . SetId ( * resp . ReplicationGroup . ReplicationGroupId )
pending := [ ] string { "creating" , "modifying" }
stateConf := & resource . StateChangeConf {
Pending : pending ,
Target : [ ] string { "available" } ,
Refresh : cacheReplicationGroupStateRefreshFunc ( conn , d . Id ( ) , "available" , pending ) ,
Timeout : 40 * time . Minute ,
MinTimeout : 10 * time . Second ,
Delay : 30 * time . Second ,
}
log . Printf ( "[DEBUG] Waiting for state to become available: %v" , d . Id ( ) )
_ , sterr := stateConf . WaitForState ( )
if sterr != nil {
return fmt . Errorf ( "Error waiting for elasticache replication group (%s) to be created: %s" , d . Id ( ) , sterr )
}
return resourceAwsElasticacheReplicationGroupRead ( d , meta )
}
func resourceAwsElasticacheReplicationGroupRead ( d * schema . ResourceData , meta interface { } ) error {
conn := meta . ( * AWSClient ) . elasticacheconn
req := & elasticache . DescribeReplicationGroupsInput {
ReplicationGroupId : aws . String ( d . Id ( ) ) ,
}
res , err := conn . DescribeReplicationGroups ( req )
if err != nil {
if eccErr , ok := err . ( awserr . Error ) ; ok && eccErr . Code ( ) == "ReplicationGroupNotFoundFault" {
log . Printf ( "[WARN] Elasticache Replication Group (%s) not found" , d . Id ( ) )
d . SetId ( "" )
return nil
}
return err
}
var rgp * elasticache . ReplicationGroup
for _ , r := range res . ReplicationGroups {
if * r . ReplicationGroupId == d . Id ( ) {
rgp = r
}
}
if rgp == nil {
log . Printf ( "[WARN] Replication Group (%s) not found" , d . Id ( ) )
d . SetId ( "" )
return nil
}
if * rgp . Status == "deleting" {
log . Printf ( "[WARN] The Replication Group %q is currently in the `deleting` state" , d . Id ( ) )
d . SetId ( "" )
return nil
}
d . Set ( "automatic_failover_enabled" , rgp . AutomaticFailover )
d . Set ( "replication_group_description" , rgp . Description )
d . Set ( "number_cache_clusters" , len ( rgp . MemberClusters ) )
d . Set ( "replication_group_id" , rgp . ReplicationGroupId )
if rgp . NodeGroups != nil {
cacheCluster := * rgp . NodeGroups [ 0 ] . NodeGroupMembers [ 0 ]
res , err := conn . DescribeCacheClusters ( & elasticache . DescribeCacheClustersInput {
CacheClusterId : cacheCluster . CacheClusterId ,
ShowCacheNodeInfo : aws . Bool ( true ) ,
} )
if err != nil {
return err
}
if len ( res . CacheClusters ) == 0 {
return nil
}
c := res . CacheClusters [ 0 ]
d . Set ( "node_type" , c . CacheNodeType )
d . Set ( "engine" , c . Engine )
d . Set ( "engine_version" , c . EngineVersion )
d . Set ( "subnet_group_name" , c . CacheSubnetGroupName )
d . Set ( "security_group_names" , c . CacheSecurityGroups )
d . Set ( "security_group_ids" , c . SecurityGroups )
d . Set ( "parameter_group_name" , c . CacheParameterGroup )
d . Set ( "maintenance_window" , c . PreferredMaintenanceWindow )
d . Set ( "snapshot_window" , c . SnapshotWindow )
d . Set ( "snapshot_retention_limit" , c . SnapshotRetentionLimit )
2016-08-23 12:13:26 +02:00
d . Set ( "primary_endpoint_address" , rgp . NodeGroups [ 0 ] . PrimaryEndpoint . Address )
2016-08-18 20:42:29 +02:00
}
return nil
}
func resourceAwsElasticacheReplicationGroupUpdate ( d * schema . ResourceData , meta interface { } ) error {
conn := meta . ( * AWSClient ) . elasticacheconn
requestUpdate := false
params := & elasticache . ModifyReplicationGroupInput {
ApplyImmediately : aws . Bool ( d . Get ( "apply_immediately" ) . ( bool ) ) ,
ReplicationGroupId : aws . String ( d . Id ( ) ) ,
}
if d . HasChange ( "replication_group_description" ) {
params . ReplicationGroupDescription = aws . String ( d . Get ( "replication_group_description" ) . ( string ) )
requestUpdate = true
}
if d . HasChange ( "automatic_failover_enabled" ) {
params . AutomaticFailoverEnabled = aws . Bool ( d . Get ( "automatic_failover_enabled" ) . ( bool ) )
requestUpdate = true
}
if d . HasChange ( "security_group_ids" ) {
if attr := d . Get ( "security_group_ids" ) . ( * schema . Set ) ; attr . Len ( ) > 0 {
params . SecurityGroupIds = expandStringList ( attr . List ( ) )
requestUpdate = true
}
}
if d . HasChange ( "security_group_names" ) {
if attr := d . Get ( "security_group_names" ) . ( * schema . Set ) ; attr . Len ( ) > 0 {
params . CacheSecurityGroupNames = expandStringList ( attr . List ( ) )
requestUpdate = true
}
}
if d . HasChange ( "preferred_maintenance_window" ) {
params . PreferredMaintenanceWindow = aws . String ( d . Get ( "preferred_maintenance_window" ) . ( string ) )
requestUpdate = true
}
if d . HasChange ( "notification_topic_arn" ) {
params . NotificationTopicArn = aws . String ( d . Get ( "notification_topic_arn" ) . ( string ) )
requestUpdate = true
}
if d . HasChange ( "parameter_group_name" ) {
params . CacheParameterGroupName = aws . String ( d . Get ( "cache_parameter_group_name" ) . ( string ) )
requestUpdate = true
}
if d . HasChange ( "engine_version" ) {
params . EngineVersion = aws . String ( d . Get ( "engine_version" ) . ( string ) )
requestUpdate = true
}
if d . HasChange ( "snapshot_retention_limit" ) {
params . SnapshotRetentionLimit = aws . Int64 ( int64 ( d . Get ( "snapshot_retention_limit" ) . ( int ) ) )
requestUpdate = true
}
if d . HasChange ( "snapshot_window" ) {
params . SnapshotWindow = aws . String ( d . Get ( "snapshot_window" ) . ( string ) )
requestUpdate = true
}
if d . HasChange ( "node_type" ) {
params . CacheNodeType = aws . String ( d . Get ( "node_type" ) . ( string ) )
requestUpdate = true
}
if requestUpdate {
_ , err := conn . ModifyReplicationGroup ( params )
if err != nil {
return fmt . Errorf ( "Error updating Elasticache replication group: %s" , err )
}
pending := [ ] string { "creating" , "modifying" , "snapshotting" }
stateConf := & resource . StateChangeConf {
Pending : pending ,
Target : [ ] string { "available" } ,
Refresh : cacheReplicationGroupStateRefreshFunc ( conn , d . Id ( ) , "available" , pending ) ,
Timeout : 40 * time . Minute ,
MinTimeout : 10 * time . Second ,
Delay : 30 * time . Second ,
}
log . Printf ( "[DEBUG] Waiting for state to become available: %v" , d . Id ( ) )
_ , sterr := stateConf . WaitForState ( )
if sterr != nil {
return fmt . Errorf ( "Error waiting for elasticache replication group (%s) to be created: %s" , d . Id ( ) , sterr )
}
}
return resourceAwsElasticacheReplicationGroupRead ( d , meta )
}
func resourceAwsElasticacheReplicationGroupDelete ( d * schema . ResourceData , meta interface { } ) error {
conn := meta . ( * AWSClient ) . elasticacheconn
req := & elasticache . DeleteReplicationGroupInput {
ReplicationGroupId : aws . String ( d . Id ( ) ) ,
}
_ , err := conn . DeleteReplicationGroup ( req )
if err != nil {
if ec2err , ok := err . ( awserr . Error ) ; ok && ec2err . Code ( ) == "ReplicationGroupNotFoundFault" {
d . SetId ( "" )
return nil
}
return fmt . Errorf ( "Error deleting Elasticache replication group: %s" , err )
}
log . Printf ( "[DEBUG] Waiting for deletion: %v" , d . Id ( ) )
stateConf := & resource . StateChangeConf {
Pending : [ ] string { "creating" , "available" , "deleting" } ,
Target : [ ] string { } ,
Refresh : cacheReplicationGroupStateRefreshFunc ( conn , d . Id ( ) , "" , [ ] string { } ) ,
Timeout : 40 * time . Minute ,
MinTimeout : 10 * time . Second ,
Delay : 30 * time . Second ,
}
_ , sterr := stateConf . WaitForState ( )
if sterr != nil {
return fmt . Errorf ( "Error waiting for replication group (%s) to delete: %s" , d . Id ( ) , sterr )
}
return nil
}
func cacheReplicationGroupStateRefreshFunc ( conn * elasticache . ElastiCache , replicationGroupId , givenState string , pending [ ] string ) resource . StateRefreshFunc {
return func ( ) ( interface { } , string , error ) {
resp , err := conn . DescribeReplicationGroups ( & elasticache . DescribeReplicationGroupsInput {
ReplicationGroupId : aws . String ( replicationGroupId ) ,
} )
if err != nil {
if eccErr , ok := err . ( awserr . Error ) ; ok && eccErr . Code ( ) == "ReplicationGroupNotFoundFault" {
log . Printf ( "[DEBUG] Replication Group Not Found" )
return nil , "" , nil
}
log . Printf ( "[ERROR] cacheClusterReplicationGroupStateRefreshFunc: %s" , err )
return nil , "" , err
}
if len ( resp . ReplicationGroups ) == 0 {
return nil , "" , fmt . Errorf ( "[WARN] Error: no Cache Replication Groups found for id (%s)" , replicationGroupId )
}
var rg * elasticache . ReplicationGroup
for _ , replicationGroup := range resp . ReplicationGroups {
if * replicationGroup . ReplicationGroupId == replicationGroupId {
log . Printf ( "[DEBUG] Found matching ElastiCache Replication Group: %s" , * replicationGroup . ReplicationGroupId )
rg = replicationGroup
}
}
if rg == nil {
return nil , "" , fmt . Errorf ( "[WARN] Error: no matching ElastiCache Replication Group for id (%s)" , replicationGroupId )
}
log . Printf ( "[DEBUG] ElastiCache Replication Group (%s) status: %v" , replicationGroupId , * rg . Status )
// return the current state if it's in the pending array
for _ , p := range pending {
log . Printf ( "[DEBUG] ElastiCache: checking pending state (%s) for Replication Group (%s), Replication Group status: %s" , pending , replicationGroupId , * rg . Status )
s := * rg . Status
if p == s {
log . Printf ( "[DEBUG] Return with status: %v" , * rg . Status )
return s , p , nil
}
}
return rg , * rg . Status , nil
}
}
func validateAwsElastiCacheReplicationGroupEngine ( v interface { } , k string ) ( ws [ ] string , errors [ ] error ) {
if strings . ToLower ( v . ( string ) ) != "redis" {
errors = append ( errors , fmt . Errorf ( "The only acceptable Engine type when using Replication Groups is Redis" ) )
}
return
}
func validateAwsElastiCacheReplicationGroupId ( v interface { } , k string ) ( ws [ ] string , errors [ ] error ) {
value := v . ( string )
2016-08-23 12:05:54 +02:00
if ( len ( value ) < 1 ) || ( len ( value ) > 16 ) {
2016-08-18 20:42:29 +02:00
errors = append ( errors , fmt . Errorf (
2016-08-23 12:05:54 +02:00
"%q must contain from 1 to 16 alphanumeric characters or hyphens" , k ) )
2016-08-18 20:42:29 +02:00
}
if ! regexp . MustCompile ( ` ^[0-9a-zA-Z-]+$ ` ) . MatchString ( value ) {
errors = append ( errors , fmt . Errorf (
"only alphanumeric characters and hyphens allowed in %q" , k ) )
}
if ! regexp . MustCompile ( ` ^[a-z] ` ) . MatchString ( value ) {
errors = append ( errors , fmt . Errorf (
"first character of %q must be a letter" , k ) )
}
if regexp . MustCompile ( ` -- ` ) . MatchString ( value ) {
errors = append ( errors , fmt . Errorf (
"%q cannot contain two consecutive hyphens" , k ) )
}
if regexp . MustCompile ( ` -$ ` ) . MatchString ( value ) {
errors = append ( errors , fmt . Errorf (
"%q cannot end with a hyphen" , k ) )
}
return
}