diff --git a/builtin/providers/aws/provider.go b/builtin/providers/aws/provider.go index 70e741e0e..c3e48e902 100644 --- a/builtin/providers/aws/provider.go +++ b/builtin/providers/aws/provider.go @@ -232,13 +232,13 @@ func Provider() terraform.ResourceProvider { "aws_elasticsearch_domain": resourceAwsElasticSearchDomain(), "aws_elastictranscoder_pipeline": resourceAwsElasticTranscoderPipeline(), "aws_elastictranscoder_preset": resourceAwsElasticTranscoderPreset(), - "aws_elb": resourceAwsElb(), - "aws_elb_attachment": resourceAwsElbAttachment(), - "aws_emr": resourceAwsEMR(), - "aws_emr_task_group": resourceAwsEMRTaskGroup(), - "aws_flow_log": resourceAwsFlowLog(), - "aws_glacier_vault": resourceAwsGlacierVault(), - "aws_iam_access_key": resourceAwsIamAccessKey(), + "aws_elb": resourceAwsElb(), + "aws_elb_attachment": resourceAwsElbAttachment(), + "aws_emr_cluster": resourceAwsEMRCluster(), + "aws_emr_instance_group": resourceAwsEMRInstanceGroup(), + "aws_flow_log": resourceAwsFlowLog(), + "aws_glacier_vault": resourceAwsGlacierVault(), + "aws_iam_access_key": resourceAwsIamAccessKey(), "aws_iam_account_password_policy": resourceAwsIamAccountPasswordPolicy(), "aws_iam_group_policy": resourceAwsIamGroupPolicy(), "aws_iam_group": resourceAwsIamGroup(), diff --git a/builtin/providers/aws/resource_aws_emr.go b/builtin/providers/aws/resource_aws_emr.go deleted file mode 100644 index 76201cd03..000000000 --- a/builtin/providers/aws/resource_aws_emr.go +++ /dev/null @@ -1,498 +0,0 @@ -package aws - -import ( - "log" - - "encoding/json" - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/service/emr" - "github.com/hashicorp/terraform/helper/resource" - "github.com/hashicorp/terraform/helper/schema" - "io/ioutil" - "net/http" - "strings" - "time" -) - -func resourceAwsEMR() *schema.Resource { - return &schema.Resource{ - Create: resourceAwsEMRCreate, - Read: resourceAwsEMRRead, - Update: resourceAwsEMRUpdate, - Delete: resourceAwsEMRDelete, - Schema: map[string]*schema.Schema{ - "name": &schema.Schema{ - Type: schema.TypeString, - Required: true, - }, - "release_label": &schema.Schema{ - Type: schema.TypeString, - Required: true, - }, - "master_instance_type": &schema.Schema{ - Type: schema.TypeString, - Required: true, - ForceNew: true, - }, - "core_instance_type": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - Computed: true, - }, - "core_instance_count": &schema.Schema{ - Type: schema.TypeInt, - Optional: true, - Default: 0, - }, - "log_uri": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "applications": &schema.Schema{ - Type: schema.TypeSet, - Optional: true, - ForceNew: true, - Elem: &schema.Schema{Type: schema.TypeString}, - Set: schema.HashString, - }, - "ec2_attributes": &schema.Schema{ - Type: schema.TypeList, - MaxItems: 1, - Optional: true, - Elem: &schema.Resource{ - Schema: map[string]*schema.Schema{ - "key_name": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "subnet_id": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "additional_master_security_groups": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "additional_slave_security_groups": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "emr_managed_master_security_group": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "emr_managed_slave_security_group": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "instance_profile": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - }, - }, - }, - "bootstrap_action": &schema.Schema{ - Type: schema.TypeSet, - Optional: true, - Elem: &schema.Resource{ - Schema: map[string]*schema.Schema{ - "name": &schema.Schema{ - Type: schema.TypeString, - Required: true, - }, - "path": &schema.Schema{ - Type: schema.TypeString, - Required: true, - }, - "args": &schema.Schema{ - Type: schema.TypeSet, - Optional: true, - Elem: &schema.Schema{Type: schema.TypeString}, - Set: schema.HashString, - }, - }, - }, - }, - "tags": tagsSchema(), - "configurations": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "service_role": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "visible_to_all_users": &schema.Schema{ - Type: schema.TypeBool, - Optional: true, - Default: true, - }, - }, - } -} - -func resourceAwsEMRCreate(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).emrconn - - log.Printf("[DEBUG] Creating EMR cluster") - masterInstanceType := d.Get("master_instance_type").(string) - coreInstanceType := masterInstanceType - if v, ok := d.GetOk("core_instance_type"); ok { - coreInstanceType = v.(string) - } - coreInstanceCount := d.Get("core_instance_count").(int) - - applications := d.Get("applications").(*schema.Set).List() - var userKey, subnet, extraMasterSecGrp, extraSlaveSecGrp, emrMasterSecGrp, emrSlaveSecGrp, instanceProfile, serviceRole string - instanceProfile = "EMR_EC2_DefaultRole" - - if a, ok := d.GetOk("ec2_attributes"); ok { - ec2Attributes := a.([]interface{}) - attributes := ec2Attributes[0].(map[string]interface{}) - userKey = attributes["key_name"].(string) - subnet = attributes["subnet_id"].(string) - extraMasterSecGrp = attributes["additional_master_security_groups"].(string) - extraSlaveSecGrp = attributes["additional_slave_security_groups"].(string) - emrMasterSecGrp = attributes["emr_managed_master_security_group"].(string) - emrSlaveSecGrp = attributes["emr_managed_slave_security_group"].(string) - - if len(strings.TrimSpace(attributes["instance_profile"].(string))) != 0 { - instanceProfile = strings.TrimSpace(attributes["instance_profile"].(string)) - } - } - - if v, ok := d.GetOk("service_role"); ok { - serviceRole = v.(string) - } else { - serviceRole = "EMR_DefaultRole" - } - - emrApps := expandApplications(applications) - - params := &emr.RunJobFlowInput{ - Instances: &emr.JobFlowInstancesConfig{ - Ec2KeyName: aws.String(userKey), - Ec2SubnetId: aws.String(subnet), - InstanceCount: aws.Int64(int64(coreInstanceCount + 1)), - KeepJobFlowAliveWhenNoSteps: aws.Bool(true), - MasterInstanceType: aws.String(masterInstanceType), - SlaveInstanceType: aws.String(coreInstanceType), - TerminationProtected: aws.Bool(false), - AdditionalMasterSecurityGroups: []*string{ - aws.String(extraMasterSecGrp), - }, - AdditionalSlaveSecurityGroups: []*string{ - aws.String(extraSlaveSecGrp), - }, - EmrManagedMasterSecurityGroup: aws.String(emrMasterSecGrp), - EmrManagedSlaveSecurityGroup: aws.String(emrSlaveSecGrp), - }, - Name: aws.String(d.Get("name").(string)), - Applications: emrApps, - - JobFlowRole: aws.String(instanceProfile), - ReleaseLabel: aws.String(d.Get("release_label").(string)), - ServiceRole: aws.String(serviceRole), - VisibleToAllUsers: aws.Bool(d.Get("visible_to_all_users").(bool)), - } - - if v, ok := d.GetOk("log_uri"); ok { - logUrl := v.(string) - params.LogUri = aws.String(logUrl) - } - if v, ok := d.GetOk("bootstrap_action"); ok { - bootstrapActions := v.(*schema.Set).List() - log.Printf("[DEBUG] %v\n", bootstrapActions) - params.BootstrapActions = expandBootstrapActions(bootstrapActions) - } - if v, ok := d.GetOk("tags"); ok { - tagsIn := v.(map[string]interface{}) - params.Tags = expandTags(tagsIn) - } - if v, ok := d.GetOk("configurations"); ok { - confUrl := v.(string) - params.Configurations = expandConfigures(confUrl) - } - - log.Printf("[DEBUG] EMR Cluster create options: %s", params) - resp, err := conn.RunJobFlow(params) - - if err != nil { - log.Printf("[ERROR] %s", err) - return err - } - - log.Printf("[DEBUG] Created EMR Cluster done...") - d.SetId(*resp.JobFlowId) - - log.Println( - "[INFO] Waiting for EMR Cluster to be available") - - stateConf := &resource.StateChangeConf{ - Pending: []string{"STARTING", "BOOTSTRAPPING"}, - Target: []string{"WAITING", "RUNNING"}, - Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta), - Timeout: 40 * time.Minute, - MinTimeout: 10 * time.Second, - Delay: 30 * time.Second, // Wait 30 secs before starting - } - - _, err = stateConf.WaitForState() - if err != nil { - return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\": %s", err) - } - - return resourceAwsEMRRead(d, meta) -} - -func resourceAwsEMRRead(d *schema.ResourceData, meta interface{}) error { - emrconn := meta.(*AWSClient).emrconn - - req := &emr.DescribeClusterInput{ - ClusterId: aws.String(d.Id()), - } - - resp, err := emrconn.DescribeCluster(req) - if err != nil { - return fmt.Errorf("Error reading EMR cluster: %s", err) - } - - if resp.Cluster == nil { - d.SetId("") - log.Printf("[DEBUG] EMR Cluster (%s) not found", d.Id()) - return nil - } - - instance := resp.Cluster - - if instance.Status != nil { - if *resp.Cluster.Status.State == "TERMINATED" { - log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id()) - d.SetId("") - return nil - } - - if *resp.Cluster.Status.State == "TERMINATED_WITH_ERRORS" { - log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id()) - d.SetId("") - return nil - } - } - - instanceGroups, errGrps := loadGroups(d, meta) - if errGrps == nil { - coreGroup := findGroup(instanceGroups, "CORE") - if coreGroup != nil { - d.Set("core_instance_type", coreGroup.InstanceType) - } - } - - return nil -} - -func resourceAwsEMRUpdate(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).emrconn - - if d.HasChange("core_instance_count") { - log.Printf("[DEBUG] Modify EMR cluster") - req := &emr.ListInstanceGroupsInput{ - ClusterId: aws.String(d.Id()), - } - - respGrps, errGrps := conn.ListInstanceGroups(req) - if errGrps != nil { - return fmt.Errorf("Error reading EMR cluster: %s", errGrps) - } - instanceGroups := respGrps.InstanceGroups - - coreInstanceCount := d.Get("core_instance_count").(int) - coreGroup := findGroup(instanceGroups, "CORE") - - params := &emr.ModifyInstanceGroupsInput{ - InstanceGroups: []*emr.InstanceGroupModifyConfig{ - { - InstanceGroupId: aws.String(*coreGroup.Id), - InstanceCount: aws.Int64(int64(coreInstanceCount)), - }, - }, - } - _, errModify := conn.ModifyInstanceGroups(params) - if errModify != nil { - log.Printf("[ERROR] %s", errModify) - return errModify - } - - log.Printf("[DEBUG] Modify EMR Cluster done...") - } - - return resourceAwsEMRRead(d, meta) -} - -func resourceAwsEMRDelete(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).emrconn - - req := &emr.TerminateJobFlowsInput{ - JobFlowIds: []*string{ - aws.String(d.Id()), - }, - } - - _, err := conn.TerminateJobFlows(req) - if err != nil { - log.Printf("[ERROR], %s", err) - return err - } - - d.SetId("") - return nil -} - -func expandApplications(apps []interface{}) []*emr.Application { - appOut := make([]*emr.Application, 0, len(apps)) - - for _, appName := range expandStringList(apps) { - app := &emr.Application{ - Name: appName, - } - appOut = append(appOut, app) - } - return appOut -} - -func loadGroups(d *schema.ResourceData, meta interface{}) ([]*emr.InstanceGroup, error) { - emrconn := meta.(*AWSClient).emrconn - reqGrps := &emr.ListInstanceGroupsInput{ - ClusterId: aws.String(d.Id()), - } - - respGrps, errGrps := emrconn.ListInstanceGroups(reqGrps) - if errGrps != nil { - return nil, fmt.Errorf("Error reading EMR cluster: %s", errGrps) - } - return respGrps.InstanceGroups, nil -} - -func findGroup(grps []*emr.InstanceGroup, typ string) *emr.InstanceGroup { - for _, grp := range grps { - if *grp.InstanceGroupType == typ { - return grp - } - } - return nil -} - -func expandTags(m map[string]interface{}) []*emr.Tag { - var result []*emr.Tag - for k, v := range m { - result = append(result, &emr.Tag{ - Key: aws.String(k), - Value: aws.String(v.(string)), - }) - } - - return result -} - -func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActionConfig { - actionsOut := []*emr.BootstrapActionConfig{} - - for _, raw := range bootstrapActions { - actionAttributes := raw.(map[string]interface{}) - actionName := actionAttributes["name"].(string) - actionPath := actionAttributes["path"].(string) - actionArgs := actionAttributes["args"].(*schema.Set).List() - - action := &emr.BootstrapActionConfig{ - Name: aws.String(actionName), - ScriptBootstrapAction: &emr.ScriptBootstrapActionConfig{ - Path: aws.String(actionPath), - Args: expandStringList(actionArgs), - }, - } - actionsOut = append(actionsOut, action) - } - - return actionsOut -} - -func expandConfigures(input string) []*emr.Configuration { - configsOut := []*emr.Configuration{} - if strings.HasPrefix(input, "http") { - readHttpJson(input, &configsOut) - } else if strings.HasSuffix(input, ".json") { - readLocalJson(input, &configsOut) - } else { - readBodyJson(input, &configsOut) - } - log.Printf("[DEBUG] Configures %v\n", configsOut) - - return configsOut -} - -func readHttpJson(url string, target interface{}) error { - r, err := http.Get(url) - if err != nil { - return err - } - defer r.Body.Close() - - return json.NewDecoder(r.Body).Decode(target) -} - -func readLocalJson(localFile string, target interface{}) error { - file, e := ioutil.ReadFile(localFile) - if e != nil { - log.Printf("[ERROR] %s", e) - return e - } - - return json.Unmarshal(file, target) -} - -func readBodyJson(body string, target interface{}) error { - log.Printf("[DEBUG] Raw Body %s\n", body) - err := json.Unmarshal([]byte(body), target) - if err != nil { - log.Printf("[ERROR] parsing JSON %s", err) - return err - } - return nil -} - -func resourceAwsEMRClusterStateRefreshFunc(d *schema.ResourceData, meta interface{}) resource.StateRefreshFunc { - return func() (interface{}, string, error) { - conn := meta.(*AWSClient).emrconn - - log.Printf("[INFO] Reading EMR Cluster Information: %s", d.Id()) - params := &emr.DescribeClusterInput{ - ClusterId: aws.String(d.Id()), - } - - resp, err := conn.DescribeCluster(params) - - if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - if "ClusterNotFound" == awsErr.Code() { - return 42, "destroyed", nil - } - } - log.Printf("[WARN] Error on retrieving EMR Cluster (%s) when waiting: %s", d.Id(), err) - return nil, "", err - } - - emrc := resp.Cluster - - if emrc == nil { - return 42, "destroyed", nil - } - - if resp.Cluster.Status != nil { - log.Printf("[DEBUG] EMR Cluster status (%s): %s", d.Id(), *resp.Cluster.Status) - } - - return emrc, *emrc.Status.State, nil - } -} diff --git a/builtin/providers/aws/resource_aws_emr_cluster.go b/builtin/providers/aws/resource_aws_emr_cluster.go new file mode 100644 index 000000000..82f744c49 --- /dev/null +++ b/builtin/providers/aws/resource_aws_emr_cluster.go @@ -0,0 +1,668 @@ +package aws + +import ( + "log" + + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/emr" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/helper/schema" +) + +func resourceAwsEMRCluster() *schema.Resource { + return &schema.Resource{ + Create: resourceAwsEMRClusterCreate, + Read: resourceAwsEMRClusterRead, + Update: resourceAwsEMRClusterUpdate, + Delete: resourceAwsEMRClusterDelete, + Schema: map[string]*schema.Schema{ + "name": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Required: true, + }, + "release_label": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Required: true, + }, + "master_instance_type": &schema.Schema{ + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "core_instance_type": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Computed: true, + }, + "core_instance_count": &schema.Schema{ + Type: schema.TypeInt, + Optional: true, + Default: 0, + }, + "cluster_state": &schema.Schema{ + Type: schema.TypeString, + Computed: true, + }, + "log_uri": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Optional: true, + }, + "master_public_dns": &schema.Schema{ + Type: schema.TypeString, + Computed: true, + }, + "applications": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + "ec2_attributes": &schema.Schema{ + Type: schema.TypeList, + MaxItems: 1, + Optional: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "key_name": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "subnet_id": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "additional_master_security_groups": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "additional_slave_security_groups": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "emr_managed_master_security_group": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "emr_managed_slave_security_group": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "instance_profile": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + }, + }, + }, + "bootstrap_action": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "name": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + "path": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + "args": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + }, + }, + }, + "tags": tagsSchema(), + "configurations": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Optional: true, + }, + "service_role": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Required: true, + }, + "visible_to_all_users": &schema.Schema{ + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + Default: true, + }, + }, + } +} + +func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + log.Printf("[DEBUG] Creating EMR cluster") + masterInstanceType := d.Get("master_instance_type").(string) + coreInstanceType := masterInstanceType + if v, ok := d.GetOk("core_instance_type"); ok { + coreInstanceType = v.(string) + } + coreInstanceCount := d.Get("core_instance_count").(int) + + applications := d.Get("applications").(*schema.Set).List() + + instanceConfig := &emr.JobFlowInstancesConfig{ + MasterInstanceType: aws.String(masterInstanceType), + SlaveInstanceType: aws.String(coreInstanceType), + InstanceCount: aws.Int64(int64(coreInstanceCount)), + // Default values that we can open up in the future + KeepJobFlowAliveWhenNoSteps: aws.Bool(true), + TerminationProtected: aws.Bool(false), + } + + var instanceProfile string + if a, ok := d.GetOk("ec2_attributes"); ok { + ec2Attributes := a.([]interface{}) + attributes := ec2Attributes[0].(map[string]interface{}) + + if v, ok := attributes["key_name"]; ok { + instanceConfig.Ec2KeyName = aws.String(v.(string)) + } + if v, ok := attributes["subnet_id"]; ok { + instanceConfig.Ec2SubnetId = aws.String(v.(string)) + } + if v, ok := attributes["subnet_id"]; ok { + instanceConfig.Ec2SubnetId = aws.String(v.(string)) + } + + if v, ok := attributes["additional_master_security_groups"]; ok { + strSlice := strings.Split(v.(string), ",") + for i, s := range strSlice { + strSlice[i] = strings.TrimSpace(s) + } + instanceConfig.AdditionalMasterSecurityGroups = aws.StringSlice(strSlice) + } + + if v, ok := attributes["additional_slave_security_groups"]; ok { + strSlice := strings.Split(v.(string), ",") + for i, s := range strSlice { + strSlice[i] = strings.TrimSpace(s) + } + instanceConfig.AdditionalSlaveSecurityGroups = aws.StringSlice(strSlice) + } + + if v, ok := attributes["emr_managed_master_security_group"]; ok { + instanceConfig.EmrManagedMasterSecurityGroup = aws.String(v.(string)) + } + if v, ok := attributes["emr_managed_slave_security_group"]; ok { + instanceConfig.EmrManagedSlaveSecurityGroup = aws.String(v.(string)) + } + + if len(strings.TrimSpace(attributes["instance_profile"].(string))) != 0 { + instanceProfile = strings.TrimSpace(attributes["instance_profile"].(string)) + } + } + + emrApps := expandApplications(applications) + + params := &emr.RunJobFlowInput{ + Instances: instanceConfig, + Name: aws.String(d.Get("name").(string)), + Applications: emrApps, + + ReleaseLabel: aws.String(d.Get("release_label").(string)), + ServiceRole: aws.String(d.Get("service_role").(string)), + VisibleToAllUsers: aws.Bool(d.Get("visible_to_all_users").(bool)), + } + + if v, ok := d.GetOk("log_uri"); ok { + params.LogUri = aws.String(v.(string)) + } + + if instanceProfile != "" { + params.JobFlowRole = aws.String(instanceProfile) + } + + if v, ok := d.GetOk("bootstrap_action"); ok { + bootstrapActions := v.(*schema.Set).List() + params.BootstrapActions = expandBootstrapActions(bootstrapActions) + } + if v, ok := d.GetOk("tags"); ok { + tagsIn := v.(map[string]interface{}) + params.Tags = expandTags(tagsIn) + } + if v, ok := d.GetOk("configurations"); ok { + confUrl := v.(string) + params.Configurations = expandConfigures(confUrl) + } + + log.Printf("[DEBUG] EMR Cluster create options: %s", params) + resp, err := conn.RunJobFlow(params) + + if err != nil { + log.Printf("[ERROR] %s", err) + return err + } + + d.SetId(*resp.JobFlowId) + + log.Println( + "[INFO] Waiting for EMR Cluster to be available") + + stateConf := &resource.StateChangeConf{ + Pending: []string{"STARTING", "BOOTSTRAPPING"}, + Target: []string{"WAITING", "RUNNING"}, + Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta), + Timeout: 40 * time.Minute, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, // Wait 30 secs before starting + } + + _, err = stateConf.WaitForState() + if err != nil { + return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\" or \"RUNNING\": %s", err) + } + + return resourceAwsEMRClusterRead(d, meta) +} + +func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error { + emrconn := meta.(*AWSClient).emrconn + + req := &emr.DescribeClusterInput{ + ClusterId: aws.String(d.Id()), + } + + resp, err := emrconn.DescribeCluster(req) + if err != nil { + return fmt.Errorf("Error reading EMR cluster: %s", err) + } + + if resp.Cluster == nil { + log.Printf("[DEBUG] EMR Cluster (%s) not found", d.Id()) + d.SetId("") + return nil + } + + cluster := resp.Cluster + + if cluster.Status != nil { + if *cluster.Status.State == "TERMINATED" { + log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id()) + d.SetId("") + return nil + } + + if *cluster.Status.State == "TERMINATED_WITH_ERRORS" { + log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id()) + d.SetId("") + return nil + } + + d.Set("cluster_state", cluster.Status.State) + } + + instanceGroups, err := fetchAllEMRInstanceGroups(meta, d.Id()) + if err == nil { + coreGroup := findGroup(instanceGroups, "CORE") + if coreGroup != nil { + d.Set("core_instance_type", coreGroup.InstanceType) + } + } + + d.Set("name", cluster.Name) + d.Set("service_role", cluster.ServiceRole) + d.Set("release_label", cluster.ReleaseLabel) + d.Set("log_uri", cluster.LogUri) + d.Set("master_public_dns", cluster.MasterPublicDnsName) + d.Set("visible_to_all_users", cluster.VisibleToAllUsers) + d.Set("tags", tagsToMapEMR(cluster.Tags)) + + if err := d.Set("applications", flattenApplications(cluster.Applications)); err != nil { + log.Printf("[ERR] Error setting EMR Applications for cluster (%s): %s", d.Id(), err) + } + + // Configurations is a JSON document. It's built with an expand method but a + // simple string should be returned as JSON + if err := d.Set("configurations", cluster.Configurations); err != nil { + log.Printf("[ERR] Error setting EMR configurations for cluster (%s): %s", d.Id(), err) + } + + if err := d.Set("ec2_attributes", flattenEc2Attributes(cluster.Ec2InstanceAttributes)); err != nil { + log.Printf("[ERR] Error setting EMR Ec2 Attributes: %s", err) + } + return nil +} + +func resourceAwsEMRClusterUpdate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + if d.HasChange("core_instance_count") { + log.Printf("[DEBUG] Modify EMR cluster") + groups, err := fetchAllEMRInstanceGroups(meta, d.Id()) + if err != nil { + log.Printf("[DEBUG] Error finding all instance groups: %s", err) + return err + } + + coreInstanceCount := d.Get("core_instance_count").(int) + coreGroup := findGroup(groups, "CORE") + if coreGroup == nil { + return fmt.Errorf("[ERR] Error finding core group") + } + + params := &emr.ModifyInstanceGroupsInput{ + InstanceGroups: []*emr.InstanceGroupModifyConfig{ + { + InstanceGroupId: coreGroup.Id, + InstanceCount: aws.Int64(int64(coreInstanceCount)), + }, + }, + } + _, errModify := conn.ModifyInstanceGroups(params) + if errModify != nil { + log.Printf("[ERROR] %s", errModify) + return errModify + } + + log.Printf("[DEBUG] Modify EMR Cluster done...") + } + + log.Println( + "[INFO] Waiting for EMR Cluster to be available") + + stateConf := &resource.StateChangeConf{ + Pending: []string{"STARTING", "BOOTSTRAPPING"}, + Target: []string{"WAITING", "RUNNING"}, + Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta), + Timeout: 40 * time.Minute, + MinTimeout: 10 * time.Second, + Delay: 5 * time.Second, + } + + _, err := stateConf.WaitForState() + if err != nil { + return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\" or \"RUNNING\" after modification: %s", err) + } + + return resourceAwsEMRClusterRead(d, meta) +} + +func resourceAwsEMRClusterDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + req := &emr.TerminateJobFlowsInput{ + JobFlowIds: []*string{ + aws.String(d.Id()), + }, + } + + _, err := conn.TerminateJobFlows(req) + if err != nil { + log.Printf("[ERROR], %s", err) + return err + } + + err = resource.Retry(10*time.Minute, func() *resource.RetryError { + resp, err := conn.ListInstances(&emr.ListInstancesInput{ + ClusterId: aws.String(d.Id()), + }) + + if err != nil { + return resource.NonRetryableError(err) + } + + instanceCount := len(resp.Instances) + + if resp == nil || instanceCount == 0 { + log.Printf("[DEBUG] No instances found for EMR Cluster (%s)", d.Id()) + return nil + } + + // Collect instance status states, wait for all instances to be terminated + // before moving on + var terminated []string + for j, i := range resp.Instances { + if i.Status != nil { + if *i.Status.State == "TERMINATED" { + terminated = append(terminated, *i.Ec2InstanceId) + } + } else { + log.Printf("[DEBUG] Cluster instance (%d : %s) has no status", j, *i.Ec2InstanceId) + } + } + if len(terminated) == instanceCount { + log.Printf("[DEBUG] All (%d) EMR Cluster (%s) Instances terminated", instanceCount, d.Id()) + return nil + } + return resource.RetryableError(fmt.Errorf("[DEBUG] EMR Cluster (%s) has (%d) Instances remaining, retrying", d.Id(), len(resp.Instances))) + }) + + if err != nil { + log.Printf("[ERR] Error waiting for EMR Cluster (%s) Instances to drain", d.Id()) + } + + d.SetId("") + return nil +} + +func expandApplications(apps []interface{}) []*emr.Application { + appOut := make([]*emr.Application, 0, len(apps)) + + for _, appName := range expandStringList(apps) { + app := &emr.Application{ + Name: appName, + } + appOut = append(appOut, app) + } + return appOut +} + +func flattenApplications(apps []*emr.Application) []interface{} { + appOut := make([]interface{}, 0, len(apps)) + + for _, app := range apps { + appOut = append(appOut, *app.Name) + } + return appOut +} + +func flattenEc2Attributes(ia *emr.Ec2InstanceAttributes) []map[string]interface{} { + attrs := map[string]interface{}{} + result := make([]map[string]interface{}, 0) + + if ia.Ec2KeyName != nil { + attrs["key_name"] = *ia.Ec2KeyName + } + if ia.Ec2SubnetId != nil { + attrs["subnet_id"] = *ia.Ec2SubnetId + } + if ia.IamInstanceProfile != nil { + attrs["instance_profile"] = *ia.IamInstanceProfile + } + if ia.EmrManagedMasterSecurityGroup != nil { + attrs["emr_managed_master_security_group"] = *ia.EmrManagedMasterSecurityGroup + } + if ia.EmrManagedSlaveSecurityGroup != nil { + attrs["emr_managed_slave_security_group"] = *ia.EmrManagedSlaveSecurityGroup + } + + if len(ia.AdditionalMasterSecurityGroups) > 0 { + strs := aws.StringValueSlice(ia.AdditionalMasterSecurityGroups) + attrs["additional_master_security_groups"] = strings.Join(strs, ",") + } + if len(ia.AdditionalSlaveSecurityGroups) > 0 { + strs := aws.StringValueSlice(ia.AdditionalSlaveSecurityGroups) + attrs["additional_slave_security_groups"] = strings.Join(strs, ",") + } + + result = append(result, attrs) + + return result +} + +func loadGroups(d *schema.ResourceData, meta interface{}) ([]*emr.InstanceGroup, error) { + emrconn := meta.(*AWSClient).emrconn + reqGrps := &emr.ListInstanceGroupsInput{ + ClusterId: aws.String(d.Id()), + } + + respGrps, errGrps := emrconn.ListInstanceGroups(reqGrps) + if errGrps != nil { + return nil, fmt.Errorf("Error reading EMR cluster: %s", errGrps) + } + return respGrps.InstanceGroups, nil +} + +func findGroup(grps []*emr.InstanceGroup, typ string) *emr.InstanceGroup { + for _, grp := range grps { + if grp.InstanceGroupType != nil { + if *grp.InstanceGroupType == typ { + return grp + } + } + } + return nil +} + +func expandTags(m map[string]interface{}) []*emr.Tag { + var result []*emr.Tag + for k, v := range m { + result = append(result, &emr.Tag{ + Key: aws.String(k), + Value: aws.String(v.(string)), + }) + } + + return result +} + +func tagsToMapEMR(ts []*emr.Tag) map[string]string { + result := make(map[string]string) + for _, t := range ts { + result[*t.Key] = *t.Value + } + + return result +} + +func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActionConfig { + actionsOut := []*emr.BootstrapActionConfig{} + + for _, raw := range bootstrapActions { + actionAttributes := raw.(map[string]interface{}) + actionName := actionAttributes["name"].(string) + actionPath := actionAttributes["path"].(string) + actionArgs := actionAttributes["args"].(*schema.Set).List() + + action := &emr.BootstrapActionConfig{ + Name: aws.String(actionName), + ScriptBootstrapAction: &emr.ScriptBootstrapActionConfig{ + Path: aws.String(actionPath), + Args: expandStringList(actionArgs), + }, + } + actionsOut = append(actionsOut, action) + } + + return actionsOut +} + +func expandConfigures(input string) []*emr.Configuration { + configsOut := []*emr.Configuration{} + if strings.HasPrefix(input, "http") { + if err := readHttpJson(input, &configsOut); err != nil { + log.Printf("[ERR] Error reading HTTP JSON: %s", err) + } + } else if strings.HasSuffix(input, ".json") { + if err := readLocalJson(input, &configsOut); err != nil { + log.Printf("[ERR] Error reading local JSON: %s", err) + } + } else { + if err := readBodyJson(input, &configsOut); err != nil { + log.Printf("[ERR] Error reading body JSON: %s", err) + } + } + log.Printf("[DEBUG] Expanded EMR Configurations %s", configsOut) + + return configsOut +} + +func readHttpJson(url string, target interface{}) error { + r, err := http.Get(url) + if err != nil { + return err + } + defer r.Body.Close() + + return json.NewDecoder(r.Body).Decode(target) +} + +func readLocalJson(localFile string, target interface{}) error { + file, e := ioutil.ReadFile(localFile) + if e != nil { + log.Printf("[ERROR] %s", e) + return e + } + + return json.Unmarshal(file, target) +} + +func readBodyJson(body string, target interface{}) error { + log.Printf("[DEBUG] Raw Body %s\n", body) + err := json.Unmarshal([]byte(body), target) + if err != nil { + log.Printf("[ERROR] parsing JSON %s", err) + return err + } + return nil +} + +func resourceAwsEMRClusterStateRefreshFunc(d *schema.ResourceData, meta interface{}) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + conn := meta.(*AWSClient).emrconn + + log.Printf("[INFO] Reading EMR Cluster Information: %s", d.Id()) + params := &emr.DescribeClusterInput{ + ClusterId: aws.String(d.Id()), + } + + resp, err := conn.DescribeCluster(params) + + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if "ClusterNotFound" == awsErr.Code() { + return 42, "destroyed", nil + } + } + log.Printf("[WARN] Error on retrieving EMR Cluster (%s) when waiting: %s", d.Id(), err) + return nil, "", err + } + + emrc := resp.Cluster + + if emrc == nil { + return 42, "destroyed", nil + } + + if resp.Cluster.Status != nil { + log.Printf("[DEBUG] EMR Cluster status (%s): %s", d.Id(), *resp.Cluster.Status) + } + + return emrc, *emrc.Status.State, nil + } +} diff --git a/builtin/providers/aws/resource_aws_emr_cluster_test.go b/builtin/providers/aws/resource_aws_emr_cluster_test.go new file mode 100644 index 000000000..a871d53e8 --- /dev/null +++ b/builtin/providers/aws/resource_aws_emr_cluster_test.go @@ -0,0 +1,373 @@ +package aws + +import ( + "fmt" + "log" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/emr" + "github.com/hashicorp/terraform/helper/acctest" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/terraform" +) + +func TestAccAWSEMRCluster_basic(t *testing.T) { + var jobFlow emr.RunJobFlowOutput + r := acctest.RandInt() + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSEmrDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: testAccAWSEmrClusterConfig(r), + Check: testAccCheckAWSEmrClusterExists("aws_emr_cluster.tf-test-cluster", &jobFlow), + }, + }, + }) +} + +func testAccCheckAWSEmrDestroy(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).emrconn + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_emr_cluster" { + continue + } + + params := &emr.DescribeClusterInput{ + ClusterId: aws.String(rs.Primary.ID), + } + + describe, err := conn.DescribeCluster(params) + + if err == nil { + if describe.Cluster != nil && + *describe.Cluster.Status.State == "WAITING" { + return fmt.Errorf("EMR Cluster still exists") + } + } + + providerErr, ok := err.(awserr.Error) + if !ok { + return err + } + + log.Printf("[ERROR] %v", providerErr) + } + + return nil +} + +func testAccCheckAWSEmrClusterExists(n string, v *emr.RunJobFlowOutput) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + if rs.Primary.ID == "" { + return fmt.Errorf("No cluster id set") + } + conn := testAccProvider.Meta().(*AWSClient).emrconn + describe, err := conn.DescribeCluster(&emr.DescribeClusterInput{ + ClusterId: aws.String(rs.Primary.ID), + }) + if err != nil { + return fmt.Errorf("EMR error: %v", err) + } + + if describe.Cluster != nil && + *describe.Cluster.Id != rs.Primary.ID { + return fmt.Errorf("EMR cluser not found") + } + + if describe.Cluster != nil && + *describe.Cluster.Status.State != "WAITING" { + return fmt.Errorf("EMR cluser is not up yet") + } + + return nil + } +} + +func testAccAWSEmrClusterConfig(r int) string { + return fmt.Sprintf(` +provider "aws" { + region = "us-west-2" +} + +resource "aws_emr_cluster" "tf-test-cluster" { + name = "emr-test-%d" + release_label = "emr-4.6.0" + applications = ["Spark"] + + ec2_attributes { + subnet_id = "${aws_subnet.main.id}" + emr_managed_master_security_group = "${aws_security_group.allow_all.id}" + emr_managed_slave_security_group = "${aws_security_group.allow_all.id}" + instance_profile = "${aws_iam_instance_profile.emr_profile.arn}" + } + + master_instance_type = "m3.xlarge" + core_instance_type = "m3.xlarge" + core_instance_count = 1 + + tags { + role = "rolename" + dns_zone = "env_zone" + env = "env" + name = "name-env" + } + + bootstrap_action { + path = "s3://elasticmapreduce/bootstrap-actions/run-if" + name = "runif" + args = ["instance.isMaster=true", "echo running on master node"] + } + + configurations = "test-fixtures/emr_configurations.json" + + depends_on = ["aws_main_route_table_association.a"] + + service_role = "${aws_iam_role.iam_emr_default_role.arn}" +} + +resource "aws_security_group" "allow_all" { + name = "allow_all" + description = "Allow all inbound traffic" + vpc_id = "${aws_vpc.main.id}" + + ingress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + depends_on = ["aws_subnet.main"] + + lifecycle { + ignore_changes = ["ingress", "egress"] + } + + tags { + name = "emr_test" + } +} + +resource "aws_vpc" "main" { + cidr_block = "168.31.0.0/16" + enable_dns_hostnames = true + + tags { + name = "emr_test" + } +} + +resource "aws_subnet" "main" { + vpc_id = "${aws_vpc.main.id}" + cidr_block = "168.31.0.0/20" + + tags { + name = "emr_test" + } +} + +resource "aws_internet_gateway" "gw" { + vpc_id = "${aws_vpc.main.id}" +} + +resource "aws_route_table" "r" { + vpc_id = "${aws_vpc.main.id}" + + route { + cidr_block = "0.0.0.0/0" + gateway_id = "${aws_internet_gateway.gw.id}" + } +} + +resource "aws_main_route_table_association" "a" { + vpc_id = "${aws_vpc.main.id}" + route_table_id = "${aws_route_table.r.id}" +} + +### + +# IAM things + +### + +# IAM role for EMR Service +resource "aws_iam_role" "iam_emr_default_role" { + name = "iam_emr_default_role_%d" + + assume_role_policy = < **NOTE:** At this time, Instance Groups cannot be destroyed through the API nor +web interface. Instance Groups are destroyed when the EMR Cluster is destroyed. +Terraform will resize any Instance Group to zero when destroying the resource. + +## Example Usage + +``` +resource "aws_emr_cluster_instance_group" "task" { + cluster_id = "${aws_emr_cluster.tf-test-cluster.id}" + instance_count = 1 + instance_type = "m3.xlarge" + name = "my little instance group" +} +``` + +## Argument Reference + +The following arguments are supported: + +* `name` - (Optional) Optional human friendly name for this Instance Group +* `cluster_id` - (Required) ID of the EMR Cluster to attach to +* `instance_type` - (Required) Type of instances for this Group +* `instance_count` - (Optional) Count of instances to launch + + + +## ec2\_attributes + +Attributes for the Instance Group + +* `name` - Human friendly name for this Instance Group +* `cluster_id` - ID of the EMR Cluster the group is attached to +* `instance_type` - Type of instances for this Group +* `instance_count` - Count of desired instances to launch +* `running_instance_count` - Count of actual running instances in the group +* `status` - State of the instance group. One of `PROVISIONING`, `BOOTSTRAPPING`, `RUNNING`, `RESIZING`, `SUSPENDED`, `TERMINATING`, `TERMINATED`, `ARRESTED`, `SHUTTING_DOWN`, `ENDED` diff --git a/website/source/layouts/aws.erb b/website/source/layouts/aws.erb index 12451035d..07e3bcad0 100644 --- a/website/source/layouts/aws.erb +++ b/website/source/layouts/aws.erb @@ -457,6 +457,19 @@ + > + Elastic Map Reduce Resources + + + > ElasticSearch Resources