535 lines
16 KiB
Go
535 lines
16 KiB
Go
package aws
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/service/elasticache"
|
|
"github.com/hashicorp/terraform/helper/resource"
|
|
"github.com/hashicorp/terraform/helper/schema"
|
|
)
|
|
|
|
func resourceAwsElasticacheReplicationGroup() *schema.Resource {
|
|
|
|
resourceSchema := resourceAwsElastiCacheCommonSchema()
|
|
|
|
resourceSchema["replication_group_id"] = &schema.Schema{
|
|
Type: schema.TypeString,
|
|
Required: true,
|
|
ForceNew: true,
|
|
ValidateFunc: validateAwsElastiCacheReplicationGroupId,
|
|
}
|
|
|
|
resourceSchema["automatic_failover_enabled"] = &schema.Schema{
|
|
Type: schema.TypeBool,
|
|
Optional: true,
|
|
Default: false,
|
|
}
|
|
|
|
resourceSchema["auto_minor_version_upgrade"] = &schema.Schema{
|
|
Type: schema.TypeBool,
|
|
Optional: true,
|
|
Default: true,
|
|
}
|
|
|
|
resourceSchema["replication_group_description"] = &schema.Schema{
|
|
Type: schema.TypeString,
|
|
Required: true,
|
|
}
|
|
|
|
resourceSchema["number_cache_clusters"] = &schema.Schema{
|
|
Type: schema.TypeInt,
|
|
Computed: true,
|
|
Optional: true,
|
|
ForceNew: true,
|
|
}
|
|
|
|
resourceSchema["primary_endpoint_address"] = &schema.Schema{
|
|
Type: schema.TypeString,
|
|
Computed: true,
|
|
}
|
|
|
|
resourceSchema["configuration_endpoint_address"] = &schema.Schema{
|
|
Type: schema.TypeString,
|
|
Computed: true,
|
|
}
|
|
|
|
resourceSchema["cluster_mode"] = &schema.Schema{
|
|
Type: schema.TypeSet,
|
|
Optional: true,
|
|
MaxItems: 1,
|
|
Elem: &schema.Resource{
|
|
Schema: map[string]*schema.Schema{
|
|
"replicas_per_node_group": {
|
|
Type: schema.TypeInt,
|
|
Required: true,
|
|
ForceNew: true,
|
|
},
|
|
"num_node_groups": {
|
|
Type: schema.TypeInt,
|
|
Required: true,
|
|
ForceNew: true,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
resourceSchema["engine"].Required = false
|
|
resourceSchema["engine"].Optional = true
|
|
resourceSchema["engine"].Default = "redis"
|
|
resourceSchema["engine"].ValidateFunc = validateAwsElastiCacheReplicationGroupEngine
|
|
|
|
return &schema.Resource{
|
|
Create: resourceAwsElasticacheReplicationGroupCreate,
|
|
Read: resourceAwsElasticacheReplicationGroupRead,
|
|
Update: resourceAwsElasticacheReplicationGroupUpdate,
|
|
Delete: resourceAwsElasticacheReplicationGroupDelete,
|
|
Importer: &schema.ResourceImporter{
|
|
State: schema.ImportStatePassthrough,
|
|
},
|
|
|
|
Schema: resourceSchema,
|
|
}
|
|
}
|
|
|
|
func resourceAwsElasticacheReplicationGroupCreate(d *schema.ResourceData, meta interface{}) error {
|
|
conn := meta.(*AWSClient).elasticacheconn
|
|
|
|
tags := tagsFromMapEC(d.Get("tags").(map[string]interface{}))
|
|
params := &elasticache.CreateReplicationGroupInput{
|
|
ReplicationGroupId: aws.String(d.Get("replication_group_id").(string)),
|
|
ReplicationGroupDescription: aws.String(d.Get("replication_group_description").(string)),
|
|
AutomaticFailoverEnabled: aws.Bool(d.Get("automatic_failover_enabled").(bool)),
|
|
AutoMinorVersionUpgrade: aws.Bool(d.Get("auto_minor_version_upgrade").(bool)),
|
|
CacheNodeType: aws.String(d.Get("node_type").(string)),
|
|
Engine: aws.String(d.Get("engine").(string)),
|
|
Port: aws.Int64(int64(d.Get("port").(int))),
|
|
Tags: tags,
|
|
}
|
|
|
|
if v, ok := d.GetOk("engine_version"); ok {
|
|
params.EngineVersion = aws.String(v.(string))
|
|
}
|
|
|
|
preferred_azs := d.Get("availability_zones").(*schema.Set).List()
|
|
if len(preferred_azs) > 0 {
|
|
azs := expandStringList(preferred_azs)
|
|
params.PreferredCacheClusterAZs = azs
|
|
}
|
|
|
|
if v, ok := d.GetOk("parameter_group_name"); ok {
|
|
params.CacheParameterGroupName = aws.String(v.(string))
|
|
}
|
|
|
|
if v, ok := d.GetOk("subnet_group_name"); ok {
|
|
params.CacheSubnetGroupName = aws.String(v.(string))
|
|
}
|
|
|
|
security_group_names := d.Get("security_group_names").(*schema.Set).List()
|
|
if len(security_group_names) > 0 {
|
|
params.CacheSecurityGroupNames = expandStringList(security_group_names)
|
|
}
|
|
|
|
security_group_ids := d.Get("security_group_ids").(*schema.Set).List()
|
|
if len(security_group_ids) > 0 {
|
|
params.SecurityGroupIds = expandStringList(security_group_ids)
|
|
}
|
|
|
|
snaps := d.Get("snapshot_arns").(*schema.Set).List()
|
|
if len(snaps) > 0 {
|
|
params.SnapshotArns = expandStringList(snaps)
|
|
}
|
|
|
|
if v, ok := d.GetOk("maintenance_window"); ok {
|
|
params.PreferredMaintenanceWindow = aws.String(v.(string))
|
|
}
|
|
|
|
if v, ok := d.GetOk("notification_topic_arn"); ok {
|
|
params.NotificationTopicArn = aws.String(v.(string))
|
|
}
|
|
|
|
if v, ok := d.GetOk("snapshot_retention_limit"); ok {
|
|
params.SnapshotRetentionLimit = aws.Int64(int64(v.(int)))
|
|
}
|
|
|
|
if v, ok := d.GetOk("snapshot_window"); ok {
|
|
params.SnapshotWindow = aws.String(v.(string))
|
|
}
|
|
|
|
if v, ok := d.GetOk("snapshot_name"); ok {
|
|
params.SnapshotName = aws.String(v.(string))
|
|
}
|
|
|
|
clusterMode, clusterModeOk := d.GetOk("cluster_mode")
|
|
cacheClusters, cacheClustersOk := d.GetOk("number_cache_clusters")
|
|
|
|
if !clusterModeOk && !cacheClustersOk || clusterModeOk && cacheClustersOk {
|
|
return fmt.Errorf("Either `number_cache_clusters` or `cluster_mode` must be set")
|
|
}
|
|
|
|
if clusterModeOk {
|
|
clusterModeAttributes := clusterMode.(*schema.Set).List()
|
|
attributes := clusterModeAttributes[0].(map[string]interface{})
|
|
|
|
if v, ok := attributes["num_node_groups"]; ok {
|
|
params.NumNodeGroups = aws.Int64(int64(v.(int)))
|
|
}
|
|
|
|
if v, ok := attributes["replicas_per_node_group"]; ok {
|
|
params.ReplicasPerNodeGroup = aws.Int64(int64(v.(int)))
|
|
}
|
|
}
|
|
|
|
if cacheClustersOk {
|
|
params.NumCacheClusters = aws.Int64(int64(cacheClusters.(int)))
|
|
}
|
|
|
|
resp, err := conn.CreateReplicationGroup(params)
|
|
if err != nil {
|
|
return fmt.Errorf("Error creating Elasticache Replication Group: %s", err)
|
|
}
|
|
|
|
d.SetId(*resp.ReplicationGroup.ReplicationGroupId)
|
|
|
|
pending := []string{"creating", "modifying", "restoring", "snapshotting"}
|
|
stateConf := &resource.StateChangeConf{
|
|
Pending: pending,
|
|
Target: []string{"available"},
|
|
Refresh: cacheReplicationGroupStateRefreshFunc(conn, d.Id(), "available", pending),
|
|
Timeout: 40 * time.Minute,
|
|
MinTimeout: 10 * time.Second,
|
|
Delay: 30 * time.Second,
|
|
}
|
|
|
|
log.Printf("[DEBUG] Waiting for state to become available: %v", d.Id())
|
|
_, sterr := stateConf.WaitForState()
|
|
if sterr != nil {
|
|
return fmt.Errorf("Error waiting for elasticache replication group (%s) to be created: %s", d.Id(), sterr)
|
|
}
|
|
|
|
return resourceAwsElasticacheReplicationGroupRead(d, meta)
|
|
}
|
|
|
|
func resourceAwsElasticacheReplicationGroupRead(d *schema.ResourceData, meta interface{}) error {
|
|
conn := meta.(*AWSClient).elasticacheconn
|
|
req := &elasticache.DescribeReplicationGroupsInput{
|
|
ReplicationGroupId: aws.String(d.Id()),
|
|
}
|
|
|
|
res, err := conn.DescribeReplicationGroups(req)
|
|
if err != nil {
|
|
if eccErr, ok := err.(awserr.Error); ok && eccErr.Code() == "ReplicationGroupNotFoundFault" {
|
|
log.Printf("[WARN] Elasticache Replication Group (%s) not found", d.Id())
|
|
d.SetId("")
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
var rgp *elasticache.ReplicationGroup
|
|
for _, r := range res.ReplicationGroups {
|
|
if *r.ReplicationGroupId == d.Id() {
|
|
rgp = r
|
|
}
|
|
}
|
|
|
|
if rgp == nil {
|
|
log.Printf("[WARN] Replication Group (%s) not found", d.Id())
|
|
d.SetId("")
|
|
return nil
|
|
}
|
|
|
|
if *rgp.Status == "deleting" {
|
|
log.Printf("[WARN] The Replication Group %q is currently in the `deleting` state", d.Id())
|
|
d.SetId("")
|
|
return nil
|
|
}
|
|
|
|
if rgp.AutomaticFailover != nil {
|
|
switch strings.ToLower(*rgp.AutomaticFailover) {
|
|
case "disabled", "disabling":
|
|
d.Set("automatic_failover_enabled", false)
|
|
case "enabled", "enabling":
|
|
d.Set("automatic_failover_enabled", true)
|
|
default:
|
|
log.Printf("Unknown AutomaticFailover state %s", *rgp.AutomaticFailover)
|
|
}
|
|
}
|
|
|
|
d.Set("replication_group_description", rgp.Description)
|
|
d.Set("number_cache_clusters", len(rgp.MemberClusters))
|
|
d.Set("replication_group_id", rgp.ReplicationGroupId)
|
|
|
|
if rgp.NodeGroups != nil {
|
|
if len(rgp.NodeGroups[0].NodeGroupMembers) == 0 {
|
|
return nil
|
|
}
|
|
|
|
cacheCluster := *rgp.NodeGroups[0].NodeGroupMembers[0]
|
|
|
|
res, err := conn.DescribeCacheClusters(&elasticache.DescribeCacheClustersInput{
|
|
CacheClusterId: cacheCluster.CacheClusterId,
|
|
ShowCacheNodeInfo: aws.Bool(true),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(res.CacheClusters) == 0 {
|
|
return nil
|
|
}
|
|
|
|
c := res.CacheClusters[0]
|
|
d.Set("node_type", c.CacheNodeType)
|
|
d.Set("engine", c.Engine)
|
|
d.Set("engine_version", c.EngineVersion)
|
|
d.Set("subnet_group_name", c.CacheSubnetGroupName)
|
|
d.Set("security_group_names", flattenElastiCacheSecurityGroupNames(c.CacheSecurityGroups))
|
|
d.Set("security_group_ids", flattenElastiCacheSecurityGroupIds(c.SecurityGroups))
|
|
|
|
if c.CacheParameterGroup != nil {
|
|
d.Set("parameter_group_name", c.CacheParameterGroup.CacheParameterGroupName)
|
|
}
|
|
|
|
d.Set("maintenance_window", c.PreferredMaintenanceWindow)
|
|
d.Set("snapshot_window", rgp.SnapshotWindow)
|
|
d.Set("snapshot_retention_limit", rgp.SnapshotRetentionLimit)
|
|
|
|
if rgp.ConfigurationEndpoint != nil {
|
|
d.Set("port", rgp.ConfigurationEndpoint.Port)
|
|
d.Set("configuration_endpoint_address", rgp.ConfigurationEndpoint.Address)
|
|
} else {
|
|
d.Set("port", rgp.NodeGroups[0].PrimaryEndpoint.Port)
|
|
d.Set("primary_endpoint_address", rgp.NodeGroups[0].PrimaryEndpoint.Address)
|
|
}
|
|
|
|
d.Set("auto_minor_version_upgrade", c.AutoMinorVersionUpgrade)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func resourceAwsElasticacheReplicationGroupUpdate(d *schema.ResourceData, meta interface{}) error {
|
|
conn := meta.(*AWSClient).elasticacheconn
|
|
|
|
requestUpdate := false
|
|
params := &elasticache.ModifyReplicationGroupInput{
|
|
ApplyImmediately: aws.Bool(d.Get("apply_immediately").(bool)),
|
|
ReplicationGroupId: aws.String(d.Id()),
|
|
}
|
|
|
|
if d.HasChange("replication_group_description") {
|
|
params.ReplicationGroupDescription = aws.String(d.Get("replication_group_description").(string))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if d.HasChange("automatic_failover_enabled") {
|
|
params.AutomaticFailoverEnabled = aws.Bool(d.Get("automatic_failover_enabled").(bool))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if d.HasChange("auto_minor_version_upgrade") {
|
|
params.AutoMinorVersionUpgrade = aws.Bool(d.Get("auto_minor_version_upgrade").(bool))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if d.HasChange("security_group_ids") {
|
|
if attr := d.Get("security_group_ids").(*schema.Set); attr.Len() > 0 {
|
|
params.SecurityGroupIds = expandStringList(attr.List())
|
|
requestUpdate = true
|
|
}
|
|
}
|
|
|
|
if d.HasChange("security_group_names") {
|
|
if attr := d.Get("security_group_names").(*schema.Set); attr.Len() > 0 {
|
|
params.CacheSecurityGroupNames = expandStringList(attr.List())
|
|
requestUpdate = true
|
|
}
|
|
}
|
|
|
|
if d.HasChange("maintenance_window") {
|
|
params.PreferredMaintenanceWindow = aws.String(d.Get("maintenance_window").(string))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if d.HasChange("notification_topic_arn") {
|
|
params.NotificationTopicArn = aws.String(d.Get("notification_topic_arn").(string))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if d.HasChange("parameter_group_name") {
|
|
params.CacheParameterGroupName = aws.String(d.Get("parameter_group_name").(string))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if d.HasChange("engine_version") {
|
|
params.EngineVersion = aws.String(d.Get("engine_version").(string))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if d.HasChange("snapshot_retention_limit") {
|
|
// This is a real hack to set the Snapshotting Cluster ID to be the first Cluster in the RG
|
|
o, _ := d.GetChange("snapshot_retention_limit")
|
|
if o.(int) == 0 {
|
|
params.SnapshottingClusterId = aws.String(fmt.Sprintf("%s-001", d.Id()))
|
|
}
|
|
|
|
params.SnapshotRetentionLimit = aws.Int64(int64(d.Get("snapshot_retention_limit").(int)))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if d.HasChange("snapshot_window") {
|
|
params.SnapshotWindow = aws.String(d.Get("snapshot_window").(string))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if d.HasChange("node_type") {
|
|
params.CacheNodeType = aws.String(d.Get("node_type").(string))
|
|
requestUpdate = true
|
|
}
|
|
|
|
if requestUpdate {
|
|
_, err := conn.ModifyReplicationGroup(params)
|
|
if err != nil {
|
|
return fmt.Errorf("Error updating Elasticache replication group: %s", err)
|
|
}
|
|
|
|
pending := []string{"creating", "modifying", "snapshotting"}
|
|
stateConf := &resource.StateChangeConf{
|
|
Pending: pending,
|
|
Target: []string{"available"},
|
|
Refresh: cacheReplicationGroupStateRefreshFunc(conn, d.Id(), "available", pending),
|
|
Timeout: 40 * time.Minute,
|
|
MinTimeout: 10 * time.Second,
|
|
Delay: 30 * time.Second,
|
|
}
|
|
|
|
log.Printf("[DEBUG] Waiting for state to become available: %v", d.Id())
|
|
_, sterr := stateConf.WaitForState()
|
|
if sterr != nil {
|
|
return fmt.Errorf("Error waiting for elasticache replication group (%s) to be created: %s", d.Id(), sterr)
|
|
}
|
|
}
|
|
return resourceAwsElasticacheReplicationGroupRead(d, meta)
|
|
}
|
|
|
|
func resourceAwsElasticacheReplicationGroupDelete(d *schema.ResourceData, meta interface{}) error {
|
|
conn := meta.(*AWSClient).elasticacheconn
|
|
|
|
req := &elasticache.DeleteReplicationGroupInput{
|
|
ReplicationGroupId: aws.String(d.Id()),
|
|
}
|
|
|
|
_, err := conn.DeleteReplicationGroup(req)
|
|
if err != nil {
|
|
if ec2err, ok := err.(awserr.Error); ok && ec2err.Code() == "ReplicationGroupNotFoundFault" {
|
|
d.SetId("")
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("Error deleting Elasticache replication group: %s", err)
|
|
}
|
|
|
|
log.Printf("[DEBUG] Waiting for deletion: %v", d.Id())
|
|
stateConf := &resource.StateChangeConf{
|
|
Pending: []string{"creating", "available", "deleting"},
|
|
Target: []string{},
|
|
Refresh: cacheReplicationGroupStateRefreshFunc(conn, d.Id(), "", []string{}),
|
|
Timeout: 40 * time.Minute,
|
|
MinTimeout: 10 * time.Second,
|
|
Delay: 30 * time.Second,
|
|
}
|
|
|
|
_, sterr := stateConf.WaitForState()
|
|
if sterr != nil {
|
|
return fmt.Errorf("Error waiting for replication group (%s) to delete: %s", d.Id(), sterr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func cacheReplicationGroupStateRefreshFunc(conn *elasticache.ElastiCache, replicationGroupId, givenState string, pending []string) resource.StateRefreshFunc {
|
|
return func() (interface{}, string, error) {
|
|
resp, err := conn.DescribeReplicationGroups(&elasticache.DescribeReplicationGroupsInput{
|
|
ReplicationGroupId: aws.String(replicationGroupId),
|
|
})
|
|
if err != nil {
|
|
if eccErr, ok := err.(awserr.Error); ok && eccErr.Code() == "ReplicationGroupNotFoundFault" {
|
|
log.Printf("[DEBUG] Replication Group Not Found")
|
|
return nil, "", nil
|
|
}
|
|
|
|
log.Printf("[ERROR] cacheClusterReplicationGroupStateRefreshFunc: %s", err)
|
|
return nil, "", err
|
|
}
|
|
|
|
if len(resp.ReplicationGroups) == 0 {
|
|
return nil, "", fmt.Errorf("[WARN] Error: no Cache Replication Groups found for id (%s)", replicationGroupId)
|
|
}
|
|
|
|
var rg *elasticache.ReplicationGroup
|
|
for _, replicationGroup := range resp.ReplicationGroups {
|
|
if *replicationGroup.ReplicationGroupId == replicationGroupId {
|
|
log.Printf("[DEBUG] Found matching ElastiCache Replication Group: %s", *replicationGroup.ReplicationGroupId)
|
|
rg = replicationGroup
|
|
}
|
|
}
|
|
|
|
if rg == nil {
|
|
return nil, "", fmt.Errorf("[WARN] Error: no matching ElastiCache Replication Group for id (%s)", replicationGroupId)
|
|
}
|
|
|
|
log.Printf("[DEBUG] ElastiCache Replication Group (%s) status: %v", replicationGroupId, *rg.Status)
|
|
|
|
// return the current state if it's in the pending array
|
|
for _, p := range pending {
|
|
log.Printf("[DEBUG] ElastiCache: checking pending state (%s) for Replication Group (%s), Replication Group status: %s", pending, replicationGroupId, *rg.Status)
|
|
s := *rg.Status
|
|
if p == s {
|
|
log.Printf("[DEBUG] Return with status: %v", *rg.Status)
|
|
return s, p, nil
|
|
}
|
|
}
|
|
|
|
return rg, *rg.Status, nil
|
|
}
|
|
}
|
|
|
|
func validateAwsElastiCacheReplicationGroupEngine(v interface{}, k string) (ws []string, errors []error) {
|
|
if strings.ToLower(v.(string)) != "redis" {
|
|
errors = append(errors, fmt.Errorf("The only acceptable Engine type when using Replication Groups is Redis"))
|
|
}
|
|
return
|
|
}
|
|
|
|
func validateAwsElastiCacheReplicationGroupId(v interface{}, k string) (ws []string, errors []error) {
|
|
value := v.(string)
|
|
if (len(value) < 1) || (len(value) > 20) {
|
|
errors = append(errors, fmt.Errorf(
|
|
"%q must contain from 1 to 20 alphanumeric characters or hyphens", k))
|
|
}
|
|
if !regexp.MustCompile(`^[0-9a-zA-Z-]+$`).MatchString(value) {
|
|
errors = append(errors, fmt.Errorf(
|
|
"only alphanumeric characters and hyphens allowed in %q", k))
|
|
}
|
|
if !regexp.MustCompile(`^[a-z]`).MatchString(value) {
|
|
errors = append(errors, fmt.Errorf(
|
|
"first character of %q must be a letter", k))
|
|
}
|
|
if regexp.MustCompile(`--`).MatchString(value) {
|
|
errors = append(errors, fmt.Errorf(
|
|
"%q cannot contain two consecutive hyphens", k))
|
|
}
|
|
if regexp.MustCompile(`-$`).MatchString(value) {
|
|
errors = append(errors, fmt.Errorf(
|
|
"%q cannot end with a hyphen", k))
|
|
}
|
|
return
|
|
}
|