re-go-fmt after rebase

use us-west-2 region in tests

update test with working config

provider/aws: Update EMR contribution with passing test, polling for instance in DELETE method

remove defaulted role

document emr_cluster

rename aws_emr -> aws_emr_cluster

update docs for name change

update delete timeout/polling

rename emr taskgroup to emr instance group

default instance group count to 0, down from 60

update to ref emr_cluster, emr_instance_group

more cleanups for instance groups; need to read and update

add read, delete method for instance groups

refactor the read method to seperate out the fetching of the specific group

more refactoring for finding instance groups

update emr instance group docs

err check on reading HTTP. Dont' return the error, just log it

refactor the create method to catch optionals

additional cleanups, added a read method

update test to be non-master-only

wrap up the READ method for clusters

poll for instance group to be running after a modification

patch up a possible deref

provider/aws: EMR cleanups

fix test naming

remove outdated docs

randomize emr_profile names
This commit is contained in:
clint shryock 2016-09-22 14:59:13 -05:00 committed by Clint
parent ad8679e916
commit dad6face2b
12 changed files with 2115 additions and 980 deletions

View File

@ -232,13 +232,13 @@ func Provider() terraform.ResourceProvider {
"aws_elasticsearch_domain": resourceAwsElasticSearchDomain(),
"aws_elastictranscoder_pipeline": resourceAwsElasticTranscoderPipeline(),
"aws_elastictranscoder_preset": resourceAwsElasticTranscoderPreset(),
"aws_elb": resourceAwsElb(),
"aws_elb_attachment": resourceAwsElbAttachment(),
"aws_emr": resourceAwsEMR(),
"aws_emr_task_group": resourceAwsEMRTaskGroup(),
"aws_flow_log": resourceAwsFlowLog(),
"aws_glacier_vault": resourceAwsGlacierVault(),
"aws_iam_access_key": resourceAwsIamAccessKey(),
"aws_elb": resourceAwsElb(),
"aws_elb_attachment": resourceAwsElbAttachment(),
"aws_emr_cluster": resourceAwsEMRCluster(),
"aws_emr_instance_group": resourceAwsEMRInstanceGroup(),
"aws_flow_log": resourceAwsFlowLog(),
"aws_glacier_vault": resourceAwsGlacierVault(),
"aws_iam_access_key": resourceAwsIamAccessKey(),
"aws_iam_account_password_policy": resourceAwsIamAccountPasswordPolicy(),
"aws_iam_group_policy": resourceAwsIamGroupPolicy(),
"aws_iam_group": resourceAwsIamGroup(),

View File

@ -1,498 +0,0 @@
package aws
import (
"log"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
"io/ioutil"
"net/http"
"strings"
"time"
)
func resourceAwsEMR() *schema.Resource {
return &schema.Resource{
Create: resourceAwsEMRCreate,
Read: resourceAwsEMRRead,
Update: resourceAwsEMRUpdate,
Delete: resourceAwsEMRDelete,
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"release_label": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"master_instance_type": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"core_instance_type": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Computed: true,
},
"core_instance_count": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 0,
},
"log_uri": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"applications": &schema.Schema{
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
},
"ec2_attributes": &schema.Schema{
Type: schema.TypeList,
MaxItems: 1,
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"key_name": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"subnet_id": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"additional_master_security_groups": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"additional_slave_security_groups": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"emr_managed_master_security_group": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"emr_managed_slave_security_group": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"instance_profile": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
},
},
},
"bootstrap_action": &schema.Schema{
Type: schema.TypeSet,
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"path": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"args": &schema.Schema{
Type: schema.TypeSet,
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
},
},
},
},
"tags": tagsSchema(),
"configurations": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"service_role": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"visible_to_all_users": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Default: true,
},
},
}
}
func resourceAwsEMRCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
log.Printf("[DEBUG] Creating EMR cluster")
masterInstanceType := d.Get("master_instance_type").(string)
coreInstanceType := masterInstanceType
if v, ok := d.GetOk("core_instance_type"); ok {
coreInstanceType = v.(string)
}
coreInstanceCount := d.Get("core_instance_count").(int)
applications := d.Get("applications").(*schema.Set).List()
var userKey, subnet, extraMasterSecGrp, extraSlaveSecGrp, emrMasterSecGrp, emrSlaveSecGrp, instanceProfile, serviceRole string
instanceProfile = "EMR_EC2_DefaultRole"
if a, ok := d.GetOk("ec2_attributes"); ok {
ec2Attributes := a.([]interface{})
attributes := ec2Attributes[0].(map[string]interface{})
userKey = attributes["key_name"].(string)
subnet = attributes["subnet_id"].(string)
extraMasterSecGrp = attributes["additional_master_security_groups"].(string)
extraSlaveSecGrp = attributes["additional_slave_security_groups"].(string)
emrMasterSecGrp = attributes["emr_managed_master_security_group"].(string)
emrSlaveSecGrp = attributes["emr_managed_slave_security_group"].(string)
if len(strings.TrimSpace(attributes["instance_profile"].(string))) != 0 {
instanceProfile = strings.TrimSpace(attributes["instance_profile"].(string))
}
}
if v, ok := d.GetOk("service_role"); ok {
serviceRole = v.(string)
} else {
serviceRole = "EMR_DefaultRole"
}
emrApps := expandApplications(applications)
params := &emr.RunJobFlowInput{
Instances: &emr.JobFlowInstancesConfig{
Ec2KeyName: aws.String(userKey),
Ec2SubnetId: aws.String(subnet),
InstanceCount: aws.Int64(int64(coreInstanceCount + 1)),
KeepJobFlowAliveWhenNoSteps: aws.Bool(true),
MasterInstanceType: aws.String(masterInstanceType),
SlaveInstanceType: aws.String(coreInstanceType),
TerminationProtected: aws.Bool(false),
AdditionalMasterSecurityGroups: []*string{
aws.String(extraMasterSecGrp),
},
AdditionalSlaveSecurityGroups: []*string{
aws.String(extraSlaveSecGrp),
},
EmrManagedMasterSecurityGroup: aws.String(emrMasterSecGrp),
EmrManagedSlaveSecurityGroup: aws.String(emrSlaveSecGrp),
},
Name: aws.String(d.Get("name").(string)),
Applications: emrApps,
JobFlowRole: aws.String(instanceProfile),
ReleaseLabel: aws.String(d.Get("release_label").(string)),
ServiceRole: aws.String(serviceRole),
VisibleToAllUsers: aws.Bool(d.Get("visible_to_all_users").(bool)),
}
if v, ok := d.GetOk("log_uri"); ok {
logUrl := v.(string)
params.LogUri = aws.String(logUrl)
}
if v, ok := d.GetOk("bootstrap_action"); ok {
bootstrapActions := v.(*schema.Set).List()
log.Printf("[DEBUG] %v\n", bootstrapActions)
params.BootstrapActions = expandBootstrapActions(bootstrapActions)
}
if v, ok := d.GetOk("tags"); ok {
tagsIn := v.(map[string]interface{})
params.Tags = expandTags(tagsIn)
}
if v, ok := d.GetOk("configurations"); ok {
confUrl := v.(string)
params.Configurations = expandConfigures(confUrl)
}
log.Printf("[DEBUG] EMR Cluster create options: %s", params)
resp, err := conn.RunJobFlow(params)
if err != nil {
log.Printf("[ERROR] %s", err)
return err
}
log.Printf("[DEBUG] Created EMR Cluster done...")
d.SetId(*resp.JobFlowId)
log.Println(
"[INFO] Waiting for EMR Cluster to be available")
stateConf := &resource.StateChangeConf{
Pending: []string{"STARTING", "BOOTSTRAPPING"},
Target: []string{"WAITING", "RUNNING"},
Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta),
Timeout: 40 * time.Minute,
MinTimeout: 10 * time.Second,
Delay: 30 * time.Second, // Wait 30 secs before starting
}
_, err = stateConf.WaitForState()
if err != nil {
return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\": %s", err)
}
return resourceAwsEMRRead(d, meta)
}
func resourceAwsEMRRead(d *schema.ResourceData, meta interface{}) error {
emrconn := meta.(*AWSClient).emrconn
req := &emr.DescribeClusterInput{
ClusterId: aws.String(d.Id()),
}
resp, err := emrconn.DescribeCluster(req)
if err != nil {
return fmt.Errorf("Error reading EMR cluster: %s", err)
}
if resp.Cluster == nil {
d.SetId("")
log.Printf("[DEBUG] EMR Cluster (%s) not found", d.Id())
return nil
}
instance := resp.Cluster
if instance.Status != nil {
if *resp.Cluster.Status.State == "TERMINATED" {
log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id())
d.SetId("")
return nil
}
if *resp.Cluster.Status.State == "TERMINATED_WITH_ERRORS" {
log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id())
d.SetId("")
return nil
}
}
instanceGroups, errGrps := loadGroups(d, meta)
if errGrps == nil {
coreGroup := findGroup(instanceGroups, "CORE")
if coreGroup != nil {
d.Set("core_instance_type", coreGroup.InstanceType)
}
}
return nil
}
func resourceAwsEMRUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
if d.HasChange("core_instance_count") {
log.Printf("[DEBUG] Modify EMR cluster")
req := &emr.ListInstanceGroupsInput{
ClusterId: aws.String(d.Id()),
}
respGrps, errGrps := conn.ListInstanceGroups(req)
if errGrps != nil {
return fmt.Errorf("Error reading EMR cluster: %s", errGrps)
}
instanceGroups := respGrps.InstanceGroups
coreInstanceCount := d.Get("core_instance_count").(int)
coreGroup := findGroup(instanceGroups, "CORE")
params := &emr.ModifyInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupModifyConfig{
{
InstanceGroupId: aws.String(*coreGroup.Id),
InstanceCount: aws.Int64(int64(coreInstanceCount)),
},
},
}
_, errModify := conn.ModifyInstanceGroups(params)
if errModify != nil {
log.Printf("[ERROR] %s", errModify)
return errModify
}
log.Printf("[DEBUG] Modify EMR Cluster done...")
}
return resourceAwsEMRRead(d, meta)
}
func resourceAwsEMRDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
req := &emr.TerminateJobFlowsInput{
JobFlowIds: []*string{
aws.String(d.Id()),
},
}
_, err := conn.TerminateJobFlows(req)
if err != nil {
log.Printf("[ERROR], %s", err)
return err
}
d.SetId("")
return nil
}
func expandApplications(apps []interface{}) []*emr.Application {
appOut := make([]*emr.Application, 0, len(apps))
for _, appName := range expandStringList(apps) {
app := &emr.Application{
Name: appName,
}
appOut = append(appOut, app)
}
return appOut
}
func loadGroups(d *schema.ResourceData, meta interface{}) ([]*emr.InstanceGroup, error) {
emrconn := meta.(*AWSClient).emrconn
reqGrps := &emr.ListInstanceGroupsInput{
ClusterId: aws.String(d.Id()),
}
respGrps, errGrps := emrconn.ListInstanceGroups(reqGrps)
if errGrps != nil {
return nil, fmt.Errorf("Error reading EMR cluster: %s", errGrps)
}
return respGrps.InstanceGroups, nil
}
func findGroup(grps []*emr.InstanceGroup, typ string) *emr.InstanceGroup {
for _, grp := range grps {
if *grp.InstanceGroupType == typ {
return grp
}
}
return nil
}
func expandTags(m map[string]interface{}) []*emr.Tag {
var result []*emr.Tag
for k, v := range m {
result = append(result, &emr.Tag{
Key: aws.String(k),
Value: aws.String(v.(string)),
})
}
return result
}
func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActionConfig {
actionsOut := []*emr.BootstrapActionConfig{}
for _, raw := range bootstrapActions {
actionAttributes := raw.(map[string]interface{})
actionName := actionAttributes["name"].(string)
actionPath := actionAttributes["path"].(string)
actionArgs := actionAttributes["args"].(*schema.Set).List()
action := &emr.BootstrapActionConfig{
Name: aws.String(actionName),
ScriptBootstrapAction: &emr.ScriptBootstrapActionConfig{
Path: aws.String(actionPath),
Args: expandStringList(actionArgs),
},
}
actionsOut = append(actionsOut, action)
}
return actionsOut
}
func expandConfigures(input string) []*emr.Configuration {
configsOut := []*emr.Configuration{}
if strings.HasPrefix(input, "http") {
readHttpJson(input, &configsOut)
} else if strings.HasSuffix(input, ".json") {
readLocalJson(input, &configsOut)
} else {
readBodyJson(input, &configsOut)
}
log.Printf("[DEBUG] Configures %v\n", configsOut)
return configsOut
}
func readHttpJson(url string, target interface{}) error {
r, err := http.Get(url)
if err != nil {
return err
}
defer r.Body.Close()
return json.NewDecoder(r.Body).Decode(target)
}
func readLocalJson(localFile string, target interface{}) error {
file, e := ioutil.ReadFile(localFile)
if e != nil {
log.Printf("[ERROR] %s", e)
return e
}
return json.Unmarshal(file, target)
}
func readBodyJson(body string, target interface{}) error {
log.Printf("[DEBUG] Raw Body %s\n", body)
err := json.Unmarshal([]byte(body), target)
if err != nil {
log.Printf("[ERROR] parsing JSON %s", err)
return err
}
return nil
}
func resourceAwsEMRClusterStateRefreshFunc(d *schema.ResourceData, meta interface{}) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
conn := meta.(*AWSClient).emrconn
log.Printf("[INFO] Reading EMR Cluster Information: %s", d.Id())
params := &emr.DescribeClusterInput{
ClusterId: aws.String(d.Id()),
}
resp, err := conn.DescribeCluster(params)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if "ClusterNotFound" == awsErr.Code() {
return 42, "destroyed", nil
}
}
log.Printf("[WARN] Error on retrieving EMR Cluster (%s) when waiting: %s", d.Id(), err)
return nil, "", err
}
emrc := resp.Cluster
if emrc == nil {
return 42, "destroyed", nil
}
if resp.Cluster.Status != nil {
log.Printf("[DEBUG] EMR Cluster status (%s): %s", d.Id(), *resp.Cluster.Status)
}
return emrc, *emrc.Status.State, nil
}
}

View File

@ -0,0 +1,668 @@
package aws
import (
"log"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)
func resourceAwsEMRCluster() *schema.Resource {
return &schema.Resource{
Create: resourceAwsEMRClusterCreate,
Read: resourceAwsEMRClusterRead,
Update: resourceAwsEMRClusterUpdate,
Delete: resourceAwsEMRClusterDelete,
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
ForceNew: true,
Required: true,
},
"release_label": &schema.Schema{
Type: schema.TypeString,
ForceNew: true,
Required: true,
},
"master_instance_type": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"core_instance_type": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Computed: true,
},
"core_instance_count": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 0,
},
"cluster_state": &schema.Schema{
Type: schema.TypeString,
Computed: true,
},
"log_uri": &schema.Schema{
Type: schema.TypeString,
ForceNew: true,
Optional: true,
},
"master_public_dns": &schema.Schema{
Type: schema.TypeString,
Computed: true,
},
"applications": &schema.Schema{
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
},
"ec2_attributes": &schema.Schema{
Type: schema.TypeList,
MaxItems: 1,
Optional: true,
ForceNew: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"key_name": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"subnet_id": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"additional_master_security_groups": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"additional_slave_security_groups": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"emr_managed_master_security_group": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"emr_managed_slave_security_group": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"instance_profile": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
},
},
},
"bootstrap_action": &schema.Schema{
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"path": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"args": &schema.Schema{
Type: schema.TypeSet,
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
},
},
},
},
"tags": tagsSchema(),
"configurations": &schema.Schema{
Type: schema.TypeString,
ForceNew: true,
Optional: true,
},
"service_role": &schema.Schema{
Type: schema.TypeString,
ForceNew: true,
Required: true,
},
"visible_to_all_users": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Default: true,
},
},
}
}
func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
log.Printf("[DEBUG] Creating EMR cluster")
masterInstanceType := d.Get("master_instance_type").(string)
coreInstanceType := masterInstanceType
if v, ok := d.GetOk("core_instance_type"); ok {
coreInstanceType = v.(string)
}
coreInstanceCount := d.Get("core_instance_count").(int)
applications := d.Get("applications").(*schema.Set).List()
instanceConfig := &emr.JobFlowInstancesConfig{
MasterInstanceType: aws.String(masterInstanceType),
SlaveInstanceType: aws.String(coreInstanceType),
InstanceCount: aws.Int64(int64(coreInstanceCount)),
// Default values that we can open up in the future
KeepJobFlowAliveWhenNoSteps: aws.Bool(true),
TerminationProtected: aws.Bool(false),
}
var instanceProfile string
if a, ok := d.GetOk("ec2_attributes"); ok {
ec2Attributes := a.([]interface{})
attributes := ec2Attributes[0].(map[string]interface{})
if v, ok := attributes["key_name"]; ok {
instanceConfig.Ec2KeyName = aws.String(v.(string))
}
if v, ok := attributes["subnet_id"]; ok {
instanceConfig.Ec2SubnetId = aws.String(v.(string))
}
if v, ok := attributes["subnet_id"]; ok {
instanceConfig.Ec2SubnetId = aws.String(v.(string))
}
if v, ok := attributes["additional_master_security_groups"]; ok {
strSlice := strings.Split(v.(string), ",")
for i, s := range strSlice {
strSlice[i] = strings.TrimSpace(s)
}
instanceConfig.AdditionalMasterSecurityGroups = aws.StringSlice(strSlice)
}
if v, ok := attributes["additional_slave_security_groups"]; ok {
strSlice := strings.Split(v.(string), ",")
for i, s := range strSlice {
strSlice[i] = strings.TrimSpace(s)
}
instanceConfig.AdditionalSlaveSecurityGroups = aws.StringSlice(strSlice)
}
if v, ok := attributes["emr_managed_master_security_group"]; ok {
instanceConfig.EmrManagedMasterSecurityGroup = aws.String(v.(string))
}
if v, ok := attributes["emr_managed_slave_security_group"]; ok {
instanceConfig.EmrManagedSlaveSecurityGroup = aws.String(v.(string))
}
if len(strings.TrimSpace(attributes["instance_profile"].(string))) != 0 {
instanceProfile = strings.TrimSpace(attributes["instance_profile"].(string))
}
}
emrApps := expandApplications(applications)
params := &emr.RunJobFlowInput{
Instances: instanceConfig,
Name: aws.String(d.Get("name").(string)),
Applications: emrApps,
ReleaseLabel: aws.String(d.Get("release_label").(string)),
ServiceRole: aws.String(d.Get("service_role").(string)),
VisibleToAllUsers: aws.Bool(d.Get("visible_to_all_users").(bool)),
}
if v, ok := d.GetOk("log_uri"); ok {
params.LogUri = aws.String(v.(string))
}
if instanceProfile != "" {
params.JobFlowRole = aws.String(instanceProfile)
}
if v, ok := d.GetOk("bootstrap_action"); ok {
bootstrapActions := v.(*schema.Set).List()
params.BootstrapActions = expandBootstrapActions(bootstrapActions)
}
if v, ok := d.GetOk("tags"); ok {
tagsIn := v.(map[string]interface{})
params.Tags = expandTags(tagsIn)
}
if v, ok := d.GetOk("configurations"); ok {
confUrl := v.(string)
params.Configurations = expandConfigures(confUrl)
}
log.Printf("[DEBUG] EMR Cluster create options: %s", params)
resp, err := conn.RunJobFlow(params)
if err != nil {
log.Printf("[ERROR] %s", err)
return err
}
d.SetId(*resp.JobFlowId)
log.Println(
"[INFO] Waiting for EMR Cluster to be available")
stateConf := &resource.StateChangeConf{
Pending: []string{"STARTING", "BOOTSTRAPPING"},
Target: []string{"WAITING", "RUNNING"},
Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta),
Timeout: 40 * time.Minute,
MinTimeout: 10 * time.Second,
Delay: 30 * time.Second, // Wait 30 secs before starting
}
_, err = stateConf.WaitForState()
if err != nil {
return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\" or \"RUNNING\": %s", err)
}
return resourceAwsEMRClusterRead(d, meta)
}
func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error {
emrconn := meta.(*AWSClient).emrconn
req := &emr.DescribeClusterInput{
ClusterId: aws.String(d.Id()),
}
resp, err := emrconn.DescribeCluster(req)
if err != nil {
return fmt.Errorf("Error reading EMR cluster: %s", err)
}
if resp.Cluster == nil {
log.Printf("[DEBUG] EMR Cluster (%s) not found", d.Id())
d.SetId("")
return nil
}
cluster := resp.Cluster
if cluster.Status != nil {
if *cluster.Status.State == "TERMINATED" {
log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id())
d.SetId("")
return nil
}
if *cluster.Status.State == "TERMINATED_WITH_ERRORS" {
log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id())
d.SetId("")
return nil
}
d.Set("cluster_state", cluster.Status.State)
}
instanceGroups, err := fetchAllEMRInstanceGroups(meta, d.Id())
if err == nil {
coreGroup := findGroup(instanceGroups, "CORE")
if coreGroup != nil {
d.Set("core_instance_type", coreGroup.InstanceType)
}
}
d.Set("name", cluster.Name)
d.Set("service_role", cluster.ServiceRole)
d.Set("release_label", cluster.ReleaseLabel)
d.Set("log_uri", cluster.LogUri)
d.Set("master_public_dns", cluster.MasterPublicDnsName)
d.Set("visible_to_all_users", cluster.VisibleToAllUsers)
d.Set("tags", tagsToMapEMR(cluster.Tags))
if err := d.Set("applications", flattenApplications(cluster.Applications)); err != nil {
log.Printf("[ERR] Error setting EMR Applications for cluster (%s): %s", d.Id(), err)
}
// Configurations is a JSON document. It's built with an expand method but a
// simple string should be returned as JSON
if err := d.Set("configurations", cluster.Configurations); err != nil {
log.Printf("[ERR] Error setting EMR configurations for cluster (%s): %s", d.Id(), err)
}
if err := d.Set("ec2_attributes", flattenEc2Attributes(cluster.Ec2InstanceAttributes)); err != nil {
log.Printf("[ERR] Error setting EMR Ec2 Attributes: %s", err)
}
return nil
}
func resourceAwsEMRClusterUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
if d.HasChange("core_instance_count") {
log.Printf("[DEBUG] Modify EMR cluster")
groups, err := fetchAllEMRInstanceGroups(meta, d.Id())
if err != nil {
log.Printf("[DEBUG] Error finding all instance groups: %s", err)
return err
}
coreInstanceCount := d.Get("core_instance_count").(int)
coreGroup := findGroup(groups, "CORE")
if coreGroup == nil {
return fmt.Errorf("[ERR] Error finding core group")
}
params := &emr.ModifyInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupModifyConfig{
{
InstanceGroupId: coreGroup.Id,
InstanceCount: aws.Int64(int64(coreInstanceCount)),
},
},
}
_, errModify := conn.ModifyInstanceGroups(params)
if errModify != nil {
log.Printf("[ERROR] %s", errModify)
return errModify
}
log.Printf("[DEBUG] Modify EMR Cluster done...")
}
log.Println(
"[INFO] Waiting for EMR Cluster to be available")
stateConf := &resource.StateChangeConf{
Pending: []string{"STARTING", "BOOTSTRAPPING"},
Target: []string{"WAITING", "RUNNING"},
Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta),
Timeout: 40 * time.Minute,
MinTimeout: 10 * time.Second,
Delay: 5 * time.Second,
}
_, err := stateConf.WaitForState()
if err != nil {
return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\" or \"RUNNING\" after modification: %s", err)
}
return resourceAwsEMRClusterRead(d, meta)
}
func resourceAwsEMRClusterDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
req := &emr.TerminateJobFlowsInput{
JobFlowIds: []*string{
aws.String(d.Id()),
},
}
_, err := conn.TerminateJobFlows(req)
if err != nil {
log.Printf("[ERROR], %s", err)
return err
}
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
resp, err := conn.ListInstances(&emr.ListInstancesInput{
ClusterId: aws.String(d.Id()),
})
if err != nil {
return resource.NonRetryableError(err)
}
instanceCount := len(resp.Instances)
if resp == nil || instanceCount == 0 {
log.Printf("[DEBUG] No instances found for EMR Cluster (%s)", d.Id())
return nil
}
// Collect instance status states, wait for all instances to be terminated
// before moving on
var terminated []string
for j, i := range resp.Instances {
if i.Status != nil {
if *i.Status.State == "TERMINATED" {
terminated = append(terminated, *i.Ec2InstanceId)
}
} else {
log.Printf("[DEBUG] Cluster instance (%d : %s) has no status", j, *i.Ec2InstanceId)
}
}
if len(terminated) == instanceCount {
log.Printf("[DEBUG] All (%d) EMR Cluster (%s) Instances terminated", instanceCount, d.Id())
return nil
}
return resource.RetryableError(fmt.Errorf("[DEBUG] EMR Cluster (%s) has (%d) Instances remaining, retrying", d.Id(), len(resp.Instances)))
})
if err != nil {
log.Printf("[ERR] Error waiting for EMR Cluster (%s) Instances to drain", d.Id())
}
d.SetId("")
return nil
}
func expandApplications(apps []interface{}) []*emr.Application {
appOut := make([]*emr.Application, 0, len(apps))
for _, appName := range expandStringList(apps) {
app := &emr.Application{
Name: appName,
}
appOut = append(appOut, app)
}
return appOut
}
func flattenApplications(apps []*emr.Application) []interface{} {
appOut := make([]interface{}, 0, len(apps))
for _, app := range apps {
appOut = append(appOut, *app.Name)
}
return appOut
}
func flattenEc2Attributes(ia *emr.Ec2InstanceAttributes) []map[string]interface{} {
attrs := map[string]interface{}{}
result := make([]map[string]interface{}, 0)
if ia.Ec2KeyName != nil {
attrs["key_name"] = *ia.Ec2KeyName
}
if ia.Ec2SubnetId != nil {
attrs["subnet_id"] = *ia.Ec2SubnetId
}
if ia.IamInstanceProfile != nil {
attrs["instance_profile"] = *ia.IamInstanceProfile
}
if ia.EmrManagedMasterSecurityGroup != nil {
attrs["emr_managed_master_security_group"] = *ia.EmrManagedMasterSecurityGroup
}
if ia.EmrManagedSlaveSecurityGroup != nil {
attrs["emr_managed_slave_security_group"] = *ia.EmrManagedSlaveSecurityGroup
}
if len(ia.AdditionalMasterSecurityGroups) > 0 {
strs := aws.StringValueSlice(ia.AdditionalMasterSecurityGroups)
attrs["additional_master_security_groups"] = strings.Join(strs, ",")
}
if len(ia.AdditionalSlaveSecurityGroups) > 0 {
strs := aws.StringValueSlice(ia.AdditionalSlaveSecurityGroups)
attrs["additional_slave_security_groups"] = strings.Join(strs, ",")
}
result = append(result, attrs)
return result
}
func loadGroups(d *schema.ResourceData, meta interface{}) ([]*emr.InstanceGroup, error) {
emrconn := meta.(*AWSClient).emrconn
reqGrps := &emr.ListInstanceGroupsInput{
ClusterId: aws.String(d.Id()),
}
respGrps, errGrps := emrconn.ListInstanceGroups(reqGrps)
if errGrps != nil {
return nil, fmt.Errorf("Error reading EMR cluster: %s", errGrps)
}
return respGrps.InstanceGroups, nil
}
func findGroup(grps []*emr.InstanceGroup, typ string) *emr.InstanceGroup {
for _, grp := range grps {
if grp.InstanceGroupType != nil {
if *grp.InstanceGroupType == typ {
return grp
}
}
}
return nil
}
func expandTags(m map[string]interface{}) []*emr.Tag {
var result []*emr.Tag
for k, v := range m {
result = append(result, &emr.Tag{
Key: aws.String(k),
Value: aws.String(v.(string)),
})
}
return result
}
func tagsToMapEMR(ts []*emr.Tag) map[string]string {
result := make(map[string]string)
for _, t := range ts {
result[*t.Key] = *t.Value
}
return result
}
func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActionConfig {
actionsOut := []*emr.BootstrapActionConfig{}
for _, raw := range bootstrapActions {
actionAttributes := raw.(map[string]interface{})
actionName := actionAttributes["name"].(string)
actionPath := actionAttributes["path"].(string)
actionArgs := actionAttributes["args"].(*schema.Set).List()
action := &emr.BootstrapActionConfig{
Name: aws.String(actionName),
ScriptBootstrapAction: &emr.ScriptBootstrapActionConfig{
Path: aws.String(actionPath),
Args: expandStringList(actionArgs),
},
}
actionsOut = append(actionsOut, action)
}
return actionsOut
}
func expandConfigures(input string) []*emr.Configuration {
configsOut := []*emr.Configuration{}
if strings.HasPrefix(input, "http") {
if err := readHttpJson(input, &configsOut); err != nil {
log.Printf("[ERR] Error reading HTTP JSON: %s", err)
}
} else if strings.HasSuffix(input, ".json") {
if err := readLocalJson(input, &configsOut); err != nil {
log.Printf("[ERR] Error reading local JSON: %s", err)
}
} else {
if err := readBodyJson(input, &configsOut); err != nil {
log.Printf("[ERR] Error reading body JSON: %s", err)
}
}
log.Printf("[DEBUG] Expanded EMR Configurations %s", configsOut)
return configsOut
}
func readHttpJson(url string, target interface{}) error {
r, err := http.Get(url)
if err != nil {
return err
}
defer r.Body.Close()
return json.NewDecoder(r.Body).Decode(target)
}
func readLocalJson(localFile string, target interface{}) error {
file, e := ioutil.ReadFile(localFile)
if e != nil {
log.Printf("[ERROR] %s", e)
return e
}
return json.Unmarshal(file, target)
}
func readBodyJson(body string, target interface{}) error {
log.Printf("[DEBUG] Raw Body %s\n", body)
err := json.Unmarshal([]byte(body), target)
if err != nil {
log.Printf("[ERROR] parsing JSON %s", err)
return err
}
return nil
}
func resourceAwsEMRClusterStateRefreshFunc(d *schema.ResourceData, meta interface{}) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
conn := meta.(*AWSClient).emrconn
log.Printf("[INFO] Reading EMR Cluster Information: %s", d.Id())
params := &emr.DescribeClusterInput{
ClusterId: aws.String(d.Id()),
}
resp, err := conn.DescribeCluster(params)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if "ClusterNotFound" == awsErr.Code() {
return 42, "destroyed", nil
}
}
log.Printf("[WARN] Error on retrieving EMR Cluster (%s) when waiting: %s", d.Id(), err)
return nil, "", err
}
emrc := resp.Cluster
if emrc == nil {
return 42, "destroyed", nil
}
if resp.Cluster.Status != nil {
log.Printf("[DEBUG] EMR Cluster status (%s): %s", d.Id(), *resp.Cluster.Status)
}
return emrc, *emrc.Status.State, nil
}
}

View File

@ -0,0 +1,373 @@
package aws
import (
"fmt"
"log"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccAWSEMRCluster_basic(t *testing.T) {
var jobFlow emr.RunJobFlowOutput
r := acctest.RandInt()
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSEmrDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccAWSEmrClusterConfig(r),
Check: testAccCheckAWSEmrClusterExists("aws_emr_cluster.tf-test-cluster", &jobFlow),
},
},
})
}
func testAccCheckAWSEmrDestroy(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).emrconn
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_emr_cluster" {
continue
}
params := &emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
}
describe, err := conn.DescribeCluster(params)
if err == nil {
if describe.Cluster != nil &&
*describe.Cluster.Status.State == "WAITING" {
return fmt.Errorf("EMR Cluster still exists")
}
}
providerErr, ok := err.(awserr.Error)
if !ok {
return err
}
log.Printf("[ERROR] %v", providerErr)
}
return nil
}
func testAccCheckAWSEmrClusterExists(n string, v *emr.RunJobFlowOutput) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No cluster id set")
}
conn := testAccProvider.Meta().(*AWSClient).emrconn
describe, err := conn.DescribeCluster(&emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
})
if err != nil {
return fmt.Errorf("EMR error: %v", err)
}
if describe.Cluster != nil &&
*describe.Cluster.Id != rs.Primary.ID {
return fmt.Errorf("EMR cluser not found")
}
if describe.Cluster != nil &&
*describe.Cluster.Status.State != "WAITING" {
return fmt.Errorf("EMR cluser is not up yet")
}
return nil
}
}
func testAccAWSEmrClusterConfig(r int) string {
return fmt.Sprintf(`
provider "aws" {
region = "us-west-2"
}
resource "aws_emr_cluster" "tf-test-cluster" {
name = "emr-test-%d"
release_label = "emr-4.6.0"
applications = ["Spark"]
ec2_attributes {
subnet_id = "${aws_subnet.main.id}"
emr_managed_master_security_group = "${aws_security_group.allow_all.id}"
emr_managed_slave_security_group = "${aws_security_group.allow_all.id}"
instance_profile = "${aws_iam_instance_profile.emr_profile.arn}"
}
master_instance_type = "m3.xlarge"
core_instance_type = "m3.xlarge"
core_instance_count = 1
tags {
role = "rolename"
dns_zone = "env_zone"
env = "env"
name = "name-env"
}
bootstrap_action {
path = "s3://elasticmapreduce/bootstrap-actions/run-if"
name = "runif"
args = ["instance.isMaster=true", "echo running on master node"]
}
configurations = "test-fixtures/emr_configurations.json"
depends_on = ["aws_main_route_table_association.a"]
service_role = "${aws_iam_role.iam_emr_default_role.arn}"
}
resource "aws_security_group" "allow_all" {
name = "allow_all"
description = "Allow all inbound traffic"
vpc_id = "${aws_vpc.main.id}"
ingress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
depends_on = ["aws_subnet.main"]
lifecycle {
ignore_changes = ["ingress", "egress"]
}
tags {
name = "emr_test"
}
}
resource "aws_vpc" "main" {
cidr_block = "168.31.0.0/16"
enable_dns_hostnames = true
tags {
name = "emr_test"
}
}
resource "aws_subnet" "main" {
vpc_id = "${aws_vpc.main.id}"
cidr_block = "168.31.0.0/20"
tags {
name = "emr_test"
}
}
resource "aws_internet_gateway" "gw" {
vpc_id = "${aws_vpc.main.id}"
}
resource "aws_route_table" "r" {
vpc_id = "${aws_vpc.main.id}"
route {
cidr_block = "0.0.0.0/0"
gateway_id = "${aws_internet_gateway.gw.id}"
}
}
resource "aws_main_route_table_association" "a" {
vpc_id = "${aws_vpc.main.id}"
route_table_id = "${aws_route_table.r.id}"
}
###
# IAM things
###
# IAM role for EMR Service
resource "aws_iam_role" "iam_emr_default_role" {
name = "iam_emr_default_role_%d"
assume_role_policy = <<EOT
{
"Version": "2008-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "elasticmapreduce.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOT
}
resource "aws_iam_role_policy_attachment" "service-attach" {
role = "${aws_iam_role.iam_emr_default_role.id}"
policy_arn = "${aws_iam_policy.iam_emr_default_policy.arn}"
}
resource "aws_iam_policy" "iam_emr_default_policy" {
name = "iam_emr_default_policy_%d"
policy = <<EOT
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Resource": "*",
"Action": [
"ec2:AuthorizeSecurityGroupEgress",
"ec2:AuthorizeSecurityGroupIngress",
"ec2:CancelSpotInstanceRequests",
"ec2:CreateNetworkInterface",
"ec2:CreateSecurityGroup",
"ec2:CreateTags",
"ec2:DeleteNetworkInterface",
"ec2:DeleteSecurityGroup",
"ec2:DeleteTags",
"ec2:DescribeAvailabilityZones",
"ec2:DescribeAccountAttributes",
"ec2:DescribeDhcpOptions",
"ec2:DescribeInstanceStatus",
"ec2:DescribeInstances",
"ec2:DescribeKeyPairs",
"ec2:DescribeNetworkAcls",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribePrefixLists",
"ec2:DescribeRouteTables",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSpotInstanceRequests",
"ec2:DescribeSpotPriceHistory",
"ec2:DescribeSubnets",
"ec2:DescribeVpcAttribute",
"ec2:DescribeVpcEndpoints",
"ec2:DescribeVpcEndpointServices",
"ec2:DescribeVpcs",
"ec2:DetachNetworkInterface",
"ec2:ModifyImageAttribute",
"ec2:ModifyInstanceAttribute",
"ec2:RequestSpotInstances",
"ec2:RevokeSecurityGroupEgress",
"ec2:RunInstances",
"ec2:TerminateInstances",
"ec2:DeleteVolume",
"ec2:DescribeVolumeStatus",
"ec2:DescribeVolumes",
"ec2:DetachVolume",
"iam:GetRole",
"iam:GetRolePolicy",
"iam:ListInstanceProfiles",
"iam:ListRolePolicies",
"iam:PassRole",
"s3:CreateBucket",
"s3:Get*",
"s3:List*",
"sdb:BatchPutAttributes",
"sdb:Select",
"sqs:CreateQueue",
"sqs:Delete*",
"sqs:GetQueue*",
"sqs:PurgeQueue",
"sqs:ReceiveMessage"
]
}]
}
EOT
}
# IAM Role for EC2 Instance Profile
resource "aws_iam_role" "iam_emr_profile_role" {
name = "iam_emr_profile_role_%d"
assume_role_policy = <<EOT
{
"Version": "2008-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOT
}
resource "aws_iam_instance_profile" "emr_profile" {
name = "emr_profile_%d"
roles = ["${aws_iam_role.iam_emr_profile_role.name}"]
}
resource "aws_iam_role_policy_attachment" "profile-attach" {
role = "${aws_iam_role.iam_emr_profile_role.id}"
policy_arn = "${aws_iam_policy.iam_emr_profile_policy.arn}"
}
resource "aws_iam_policy" "iam_emr_profile_policy" {
name = "iam_emr_profile_policy_%d"
policy = <<EOT
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Resource": "*",
"Action": [
"cloudwatch:*",
"dynamodb:*",
"ec2:Describe*",
"elasticmapreduce:Describe*",
"elasticmapreduce:ListBootstrapActions",
"elasticmapreduce:ListClusters",
"elasticmapreduce:ListInstanceGroups",
"elasticmapreduce:ListInstances",
"elasticmapreduce:ListSteps",
"kinesis:CreateStream",
"kinesis:DeleteStream",
"kinesis:DescribeStream",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:MergeShards",
"kinesis:PutRecord",
"kinesis:SplitShard",
"rds:Describe*",
"s3:*",
"sdb:*",
"sns:*",
"sqs:*"
]
}]
}
EOT
}
`, r, r, r, r, r, r)
}

View File

@ -0,0 +1,251 @@
package aws
import (
"errors"
"log"
"time"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)
var emrInstanceGroupNotFound = errors.New("No matching EMR Instance Group")
func resourceAwsEMRInstanceGroup() *schema.Resource {
return &schema.Resource{
Create: resourceAwsEMRInstanceGroupCreate,
Read: resourceAwsEMRInstanceGroupRead,
Update: resourceAwsEMRInstanceGroupUpdate,
Delete: resourceAwsEMRInstanceGroupDelete,
Schema: map[string]*schema.Schema{
"cluster_id": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"instance_type": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"instance_count": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 0,
},
"running_instance_count": &schema.Schema{
Type: schema.TypeInt,
Computed: true,
},
"status": &schema.Schema{
Type: schema.TypeString,
Computed: true,
},
"name": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
},
}
}
func resourceAwsEMRInstanceGroupCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
clusterId := d.Get("cluster_id").(string)
instanceType := d.Get("instance_type").(string)
instanceCount := d.Get("instance_count").(int)
groupName := d.Get("name").(string)
params := &emr.AddInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupConfig{
{
InstanceRole: aws.String("TASK"),
InstanceCount: aws.Int64(int64(instanceCount)),
InstanceType: aws.String(instanceType),
Name: aws.String(groupName),
},
},
JobFlowId: aws.String(clusterId),
}
log.Printf("[DEBUG] Creating EMR task group params: %s", params)
resp, err := conn.AddInstanceGroups(params)
if err != nil {
return err
}
log.Printf("[DEBUG] Created EMR task group finished: %#v", resp)
if resp == nil || len(resp.InstanceGroupIds) == 0 {
return fmt.Errorf("Error creating instance groups: no instance group returned")
}
d.SetId(*resp.InstanceGroupIds[0])
return nil
}
func resourceAwsEMRInstanceGroupRead(d *schema.ResourceData, meta interface{}) error {
group, err := fetchEMRInstanceGroup(meta, d.Get("cluster_id").(string), d.Id())
if err != nil {
switch err {
case emrInstanceGroupNotFound:
log.Printf("[DEBUG] EMR Instance Group (%s) not found, removing", d.Id())
d.SetId("")
return nil
default:
return err
}
}
// Guard against the chance of fetchEMRInstanceGroup returning nil group but
// not a emrInstanceGroupNotFound error
if group == nil {
log.Printf("[DEBUG] EMR Instance Group (%s) not found, removing", d.Id())
d.SetId("")
return nil
}
d.Set("name", group.Name)
d.Set("instance_count", group.RequestedInstanceCount)
d.Set("running_instance_count", group.RunningInstanceCount)
d.Set("instance_type", group.InstanceType)
if group.Status != nil && group.Status.State != nil {
d.Set("status", group.Status.State)
}
return nil
}
func fetchAllEMRInstanceGroups(meta interface{}, clusterId string) ([]*emr.InstanceGroup, error) {
conn := meta.(*AWSClient).emrconn
req := &emr.ListInstanceGroupsInput{
ClusterId: aws.String(clusterId),
}
var groups []*emr.InstanceGroup
marker := aws.String("intitial")
for marker != nil {
log.Printf("[DEBUG] EMR Cluster Instance Marker: %s", *marker)
respGrps, errGrps := conn.ListInstanceGroups(req)
if errGrps != nil {
return nil, fmt.Errorf("[ERR] Error reading EMR cluster (%s): %s", clusterId, errGrps)
}
if respGrps == nil {
return nil, fmt.Errorf("[ERR] Error reading EMR Instance Group for cluster (%s)", clusterId)
}
if respGrps.InstanceGroups != nil {
for _, g := range respGrps.InstanceGroups {
groups = append(groups, g)
}
} else {
log.Printf("[DEBUG] EMR Instance Group list was empty")
}
marker = respGrps.Marker
}
if len(groups) == 0 {
return nil, fmt.Errorf("[WARN] No instance groups found for EMR Cluster (%s)", clusterId)
}
return groups, nil
}
func fetchEMRInstanceGroup(meta interface{}, clusterId, groupId string) (*emr.InstanceGroup, error) {
groups, err := fetchAllEMRInstanceGroups(meta, clusterId)
if err != nil {
return nil, err
}
var group *emr.InstanceGroup
for _, ig := range groups {
if groupId == *ig.Id {
group = ig
break
}
}
if group != nil {
return group, nil
}
return nil, emrInstanceGroupNotFound
}
func resourceAwsEMRInstanceGroupUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
log.Printf("[DEBUG] Modify EMR task group")
instanceCount := d.Get("instance_count").(int)
params := &emr.ModifyInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupModifyConfig{
{
InstanceGroupId: aws.String(d.Id()),
InstanceCount: aws.Int64(int64(instanceCount)),
},
},
}
_, err := conn.ModifyInstanceGroups(params)
if err != nil {
return err
}
stateConf := &resource.StateChangeConf{
Pending: []string{"PROVISIONING", "BOOTSTRAPPING", "RESIZING"},
Target: []string{"RUNNING"},
Refresh: instanceGroupStateRefresh(conn, d.Get("cluster_id").(string), d.Id()),
Timeout: 10 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}
_, err = stateConf.WaitForState()
if err != nil {
return fmt.Errorf(
"Error waiting for instance (%s) to terminate: %s", d.Id(), err)
}
return resourceAwsEMRInstanceGroupRead(d, meta)
}
func instanceGroupStateRefresh(meta interface{}, clusterID, igID string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
group, err := fetchEMRInstanceGroup(meta, clusterID, igID)
if err != nil {
return nil, "Not Found", err
}
if group.Status == nil || group.Status.State == nil {
log.Printf("[WARN] ERM Instance Group found, but without state")
return nil, "Undefined", fmt.Errorf("Undefined EMR Cluster Instance Group state")
}
return group, *group.Status.State, nil
}
}
func resourceAwsEMRInstanceGroupDelete(d *schema.ResourceData, meta interface{}) error {
log.Printf("[WARN] AWS EMR Instance Group does not support DELETE; resizing cluster to zero before removing from state")
conn := meta.(*AWSClient).emrconn
params := &emr.ModifyInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupModifyConfig{
{
InstanceGroupId: aws.String(d.Id()),
InstanceCount: aws.Int64(0),
},
},
}
_, err := conn.ModifyInstanceGroups(params)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,356 @@
package aws
import (
"fmt"
"log"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccAWSEMRInstanceGroup_basic(t *testing.T) {
var ig emr.InstanceGroup
rInt := acctest.RandInt()
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSEmrInstanceGroupDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccAWSEmrInstanceGroupConfig(rInt),
Check: testAccCheckAWSEmrInstanceGroupExists("aws_emr_instance_group.task", &ig),
},
},
})
}
func testAccCheckAWSEmrInstanceGroupDestroy(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).emrconn
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_emr_cluster" {
continue
}
params := &emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
}
describe, err := conn.DescribeCluster(params)
if err == nil {
if describe.Cluster != nil &&
*describe.Cluster.Status.State == "WAITING" {
return fmt.Errorf("EMR Cluster still exists")
}
}
providerErr, ok := err.(awserr.Error)
if !ok {
return err
}
log.Printf("[ERROR] %v", providerErr)
}
return nil
}
func testAccCheckAWSEmrInstanceGroupExists(n string, v *emr.InstanceGroup) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No task group id set")
}
meta := testAccProvider.Meta()
g, err := fetchEMRInstanceGroup(meta, rs.Primary.Attributes["cluster_id"], rs.Primary.ID)
if err != nil {
return fmt.Errorf("EMR error: %v", err)
}
if g == nil {
return fmt.Errorf("No match found for (%s)", n)
}
v = g
return nil
}
}
func testAccAWSEmrInstanceGroupConfig(r int) string {
return fmt.Sprintf(`
provider "aws" {
region = "us-west-2"
}
resource "aws_emr_cluster" "tf-test-cluster" {
name = "tf-test-emr-%d"
release_label = "emr-4.6.0"
applications = ["Spark"]
ec2_attributes {
subnet_id = "${aws_subnet.main.id}"
emr_managed_master_security_group = "${aws_security_group.allow_all.id}"
emr_managed_slave_security_group = "${aws_security_group.allow_all.id}"
instance_profile = "${aws_iam_instance_profile.emr_profile.arn}"
}
master_instance_type = "m3.xlarge"
core_instance_type = "m3.xlarge"
core_instance_count = 2
tags {
role = "rolename"
dns_zone = "env_zone"
env = "env"
name = "name-env"
}
bootstrap_action {
path = "s3://elasticmapreduce/bootstrap-actions/run-if"
name = "runif"
args = ["instance.isMaster=true", "echo running on master node"]
}
configurations = "test-fixtures/emr_configurations.json"
service_role = "${aws_iam_role.iam_emr_default_role.arn}"
depends_on = ["aws_internet_gateway.gw"]
}
resource "aws_emr_instance_group" "task" {
cluster_id = "${aws_emr_cluster.tf-test-cluster.id}"
instance_count = 1
instance_type = "m3.xlarge"
}
resource "aws_security_group" "allow_all" {
name = "allow_all"
description = "Allow all inbound traffic"
vpc_id = "${aws_vpc.main.id}"
ingress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
depends_on = ["aws_subnet.main"]
lifecycle {
ignore_changes = ["ingress", "egress"]
}
}
resource "aws_vpc" "main" {
cidr_block = "168.31.0.0/16"
enable_dns_hostnames = true
}
resource "aws_subnet" "main" {
vpc_id = "${aws_vpc.main.id}"
cidr_block = "168.31.0.0/20"
# map_public_ip_on_launch = true
}
resource "aws_internet_gateway" "gw" {
vpc_id = "${aws_vpc.main.id}"
}
resource "aws_route_table" "r" {
vpc_id = "${aws_vpc.main.id}"
route {
cidr_block = "0.0.0.0/0"
gateway_id = "${aws_internet_gateway.gw.id}"
}
}
resource "aws_main_route_table_association" "a" {
vpc_id = "${aws_vpc.main.id}"
route_table_id = "${aws_route_table.r.id}"
}
###
# IAM role for EMR Service
resource "aws_iam_role" "iam_emr_default_role" {
name = "iam_emr_default_role_%d"
assume_role_policy = <<EOT
{
"Version": "2008-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "elasticmapreduce.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOT
}
resource "aws_iam_role_policy_attachment" "service-attach" {
role = "${aws_iam_role.iam_emr_default_role.id}"
policy_arn = "${aws_iam_policy.iam_emr_default_policy.arn}"
}
resource "aws_iam_policy" "iam_emr_default_policy" {
name = "iam_emr_default_policy_%d"
policy = <<EOT
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Resource": "*",
"Action": [
"ec2:AuthorizeSecurityGroupEgress",
"ec2:AuthorizeSecurityGroupIngress",
"ec2:CancelSpotInstanceRequests",
"ec2:CreateNetworkInterface",
"ec2:CreateSecurityGroup",
"ec2:CreateTags",
"ec2:DeleteNetworkInterface",
"ec2:DeleteSecurityGroup",
"ec2:DeleteTags",
"ec2:DescribeAvailabilityZones",
"ec2:DescribeAccountAttributes",
"ec2:DescribeDhcpOptions",
"ec2:DescribeInstanceStatus",
"ec2:DescribeInstances",
"ec2:DescribeKeyPairs",
"ec2:DescribeNetworkAcls",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribePrefixLists",
"ec2:DescribeRouteTables",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSpotInstanceRequests",
"ec2:DescribeSpotPriceHistory",
"ec2:DescribeSubnets",
"ec2:DescribeVpcAttribute",
"ec2:DescribeVpcEndpoints",
"ec2:DescribeVpcEndpointServices",
"ec2:DescribeVpcs",
"ec2:DetachNetworkInterface",
"ec2:ModifyImageAttribute",
"ec2:ModifyInstanceAttribute",
"ec2:RequestSpotInstances",
"ec2:RevokeSecurityGroupEgress",
"ec2:RunInstances",
"ec2:TerminateInstances",
"ec2:DeleteVolume",
"ec2:DescribeVolumeStatus",
"ec2:DescribeVolumes",
"ec2:DetachVolume",
"iam:GetRole",
"iam:GetRolePolicy",
"iam:ListInstanceProfiles",
"iam:ListRolePolicies",
"iam:PassRole",
"s3:CreateBucket",
"s3:Get*",
"s3:List*",
"sdb:BatchPutAttributes",
"sdb:Select",
"sqs:CreateQueue",
"sqs:Delete*",
"sqs:GetQueue*",
"sqs:PurgeQueue",
"sqs:ReceiveMessage"
]
}]
}
EOT
}
# IAM Role for EC2 Instance Profile
resource "aws_iam_role" "iam_emr_profile_role" {
name = "iam_emr_profile_role_%d"
assume_role_policy = <<EOT
{
"Version": "2008-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOT
}
resource "aws_iam_instance_profile" "emr_profile" {
name = "emr_profile_%d"
roles = ["${aws_iam_role.iam_emr_profile_role.name}"]
}
resource "aws_iam_role_policy_attachment" "profile-attach" {
role = "${aws_iam_role.iam_emr_profile_role.id}"
policy_arn = "${aws_iam_policy.iam_emr_profile_policy.arn}"
}
resource "aws_iam_policy" "iam_emr_profile_policy" {
name = "iam_emr_profile_policy_%d"
policy = <<EOT
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Resource": "*",
"Action": [
"cloudwatch:*",
"dynamodb:*",
"ec2:Describe*",
"elasticmapreduce:Describe*",
"elasticmapreduce:ListBootstrapActions",
"elasticmapreduce:ListClusters",
"elasticmapreduce:ListInstanceGroups",
"elasticmapreduce:ListInstances",
"elasticmapreduce:ListSteps",
"kinesis:CreateStream",
"kinesis:DeleteStream",
"kinesis:DescribeStream",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:MergeShards",
"kinesis:PutRecord",
"kinesis:SplitShard",
"rds:Describe*",
"s3:*",
"sdb:*",
"sns:*",
"sqs:*"
]
}]
}
EOT
}`, r, r, r, r, r, r)
}

View File

@ -1,115 +0,0 @@
package aws
import (
"log"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform/helper/schema"
)
func resourceAwsEMRTaskGroup() *schema.Resource {
return &schema.Resource{
Create: resourceAwsEMRTaskGroupCreate,
Read: resourceAwsEMRTaskGroupRead,
Update: resourceAwsEMRTaskGroupUpdate,
Delete: resourceAwsEMRTaskGroupDelete,
Schema: map[string]*schema.Schema{
"cluster_id": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"instance_type": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"instance_count": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 60,
},
"name": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
},
}
}
func resourceAwsEMRTaskGroupCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
clusterId := d.Get("cluster_id").(string)
instanceType := d.Get("instance_type").(string)
instanceCount := d.Get("instance_count").(int)
groupName := d.Get("name").(string)
log.Printf("[DEBUG] Creating EMR task group")
params := &emr.AddInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupConfig{
{
InstanceRole: aws.String("TASK"),
InstanceCount: aws.Int64(int64(instanceCount)),
InstanceType: aws.String(instanceType),
Name: aws.String(groupName),
},
},
JobFlowId: aws.String(clusterId),
}
resp, err := conn.AddInstanceGroups(params)
if err != nil {
log.Printf("[ERROR] %s", err)
return err
}
log.Printf("[DEBUG] Created EMR task group finished: %#v", resp)
d.SetId(*resp.InstanceGroupIds[0])
return nil
}
func resourceAwsEMRTaskGroupRead(d *schema.ResourceData, meta interface{}) error {
return nil
}
func resourceAwsEMRTaskGroupUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
log.Printf("[DEBUG] Modify EMR task group")
instanceCount := d.Get("instance_count").(int)
if d.HasChange("name") {
return fmt.Errorf("[WARN] Error updating task group, change name is not supported by api")
}
if d.HasChange("instance_type") {
return fmt.Errorf("[WARN] Error updating task group, change instance_type is not supported by api")
}
params := &emr.ModifyInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupModifyConfig{
{
InstanceGroupId: aws.String(d.Id()),
InstanceCount: aws.Int64(int64(instanceCount)),
},
},
}
resp, err := conn.ModifyInstanceGroups(params)
if err != nil {
log.Printf("[ERROR] %s", err)
return err
}
log.Printf("[DEBUG] Modify EMR task group finished: %#v", resp)
return nil
}
func resourceAwsEMRTaskGroupDelete(d *schema.ResourceData, meta interface{}) error {
return nil
}

View File

@ -1,176 +0,0 @@
package aws
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
"log"
"testing"
)
func TestAccAWSEmrTaskGroup_basic(t *testing.T) {
var jobFlow emr.RunJobFlowOutput
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSEmrTaskGroupDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccAWSEmrTaskGroupConfig,
Check: testAccCheckAWSEmrTaskGroupExists("aws_emr_task_group.task", &jobFlow),
},
},
})
}
func testAccCheckAWSEmrTaskGroupDestroy(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).emrconn
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_emr" {
continue
}
params := &emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
}
describe, err := conn.DescribeCluster(params)
if err == nil {
if describe.Cluster != nil &&
*describe.Cluster.Status.State == "WAITING" {
return fmt.Errorf("EMR Cluster still exists")
}
}
providerErr, ok := err.(awserr.Error)
if !ok {
return err
}
log.Printf("[ERROR] %v", providerErr)
}
return nil
}
func testAccCheckAWSEmrTaskGroupExists(n string, v *emr.RunJobFlowOutput) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No task group id set")
}
conn := testAccProvider.Meta().(*AWSClient).emrconn
_, err := conn.DescribeCluster(&emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.Attributes["cluster_id"]),
})
if err != nil {
return fmt.Errorf("EMR error: %v", err)
}
return nil
}
}
var testAccAWSEmrTaskGroupConfig = fmt.Sprintf(`
provider "aws" {
region = "ap-southeast-2"
}
resource "aws_emr" "tf-test-cluster" {
name = "emr-%s"
release_label = "emr-4.6.0"
applications = ["Spark"]
ec2_attributes {
subnet_id = "${aws_subnet.main.id}"
emr_managed_master_security_group = "${aws_security_group.allow_all.id}"
emr_managed_slave_security_group = "${aws_security_group.allow_all.id}"
}
master_instance_type = "m3.xlarge"
core_instance_type = "m3.xlarge"
core_instance_count = 1
tags {
role = "rolename"
dns_zone = "env_zone"
env = "env"
name = "name-env"
}
bootstrap_action {
path ="s3://elasticmapreduce/bootstrap-actions/run-if"
name ="runif"
args =["instance.isMaster=true","echo running on master node"]
}
configurations = "test-fixtures/emr_configurations.json"
}
resource "aws_emr_task_group" "task" {
cluster_id = "${aws_emr.tf-test-cluster.id}"
instance_count = 1
instance_type = "m3.xlarge"
}
resource "aws_security_group" "allow_all" {
name = "allow_all"
description = "Allow all inbound traffic"
vpc_id = "${aws_vpc.main.id}"
ingress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
depends_on = ["aws_subnet.main"]
lifecycle {
ignore_changes = ["ingress", "egress"]
}
}
resource "aws_vpc" "main" {
cidr_block = "168.31.0.0/16"
enable_dns_hostnames = true
}
resource "aws_subnet" "main" {
vpc_id = "${aws_vpc.main.id}"
cidr_block = "168.31.0.0/20"
# map_public_ip_on_launch = true
}
resource "aws_internet_gateway" "gw" {
vpc_id = "${aws_vpc.main.id}"
}
resource "aws_route_table" "r" {
vpc_id = "${aws_vpc.main.id}"
route {
cidr_block = "0.0.0.0/0"
gateway_id = "${aws_internet_gateway.gw.id}"
}
}
resource "aws_main_route_table_association" "a" {
vpc_id = "${aws_vpc.main.id}"
route_table_id = "${aws_route_table.r.id}"
}
`, acctest.RandString(10))

View File

@ -1,184 +0,0 @@
package aws
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
"log"
"testing"
)
func TestAccAWSEmrCluster_basic(t *testing.T) {
var jobFlow emr.RunJobFlowOutput
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSEmrDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccAWSEmrClusterConfig,
Check: testAccCheckAWSEmrClusterExists("aws_emr.tf-test-cluster", &jobFlow),
},
},
})
}
func testAccCheckAWSEmrDestroy(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).emrconn
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_emr" {
continue
}
params := &emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
}
describe, err := conn.DescribeCluster(params)
if err == nil {
if describe.Cluster != nil &&
*describe.Cluster.Status.State == "WAITING" {
return fmt.Errorf("EMR Cluster still exists")
}
}
providerErr, ok := err.(awserr.Error)
if !ok {
return err
}
log.Printf("[ERROR] %v", providerErr)
}
return nil
}
func testAccCheckAWSEmrClusterExists(n string, v *emr.RunJobFlowOutput) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No cluster id set")
}
conn := testAccProvider.Meta().(*AWSClient).emrconn
describe, err := conn.DescribeCluster(&emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
})
if err != nil {
return fmt.Errorf("EMR error: %v", err)
}
if describe.Cluster != nil &&
*describe.Cluster.Id != rs.Primary.ID {
return fmt.Errorf("EMR cluser not found")
}
if describe.Cluster != nil &&
*describe.Cluster.Status.State != "WAITING" {
return fmt.Errorf("EMR cluser is not up yet")
}
return nil
}
}
var testAccAWSEmrClusterConfig = fmt.Sprintf(`
provider "aws" {
region = "ap-southeast-2"
}
resource "aws_emr" "tf-test-cluster" {
name = "emr-%s"
release_label = "emr-4.6.0"
applications = ["Spark"]
ec2_attributes {
subnet_id = "${aws_subnet.main.id}"
emr_managed_master_security_group = "${aws_security_group.allow_all.id}"
emr_managed_slave_security_group = "${aws_security_group.allow_all.id}"
}
master_instance_type = "m3.xlarge"
core_instance_type = "m3.xlarge"
core_instance_count = 1
tags {
role = "rolename"
dns_zone = "env_zone"
env = "env"
name = "name-env"
}
bootstrap_action {
path ="s3://elasticmapreduce/bootstrap-actions/run-if"
name ="runif"
args =["instance.isMaster=true","echo running on master node"]
}
configurations = "test-fixtures/emr_configurations.json"
depends_on = ["aws_main_route_table_association.a"]
}
resource "aws_security_group" "allow_all" {
name = "allow_all"
description = "Allow all inbound traffic"
vpc_id = "${aws_vpc.main.id}"
ingress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
depends_on = ["aws_subnet.main"]
lifecycle {
ignore_changes = ["ingress", "egress"]
}
}
resource "aws_vpc" "main" {
cidr_block = "168.31.0.0/16"
enable_dns_hostnames = true
}
resource "aws_subnet" "main" {
vpc_id = "${aws_vpc.main.id}"
cidr_block = "168.31.0.0/20"
# map_public_ip_on_launch = true
}
resource "aws_internet_gateway" "gw" {
vpc_id = "${aws_vpc.main.id}"
}
resource "aws_route_table" "r" {
vpc_id = "${aws_vpc.main.id}"
route {
cidr_block = "0.0.0.0/0"
gateway_id = "${aws_internet_gateway.gw.id}"
}
}
resource "aws_main_route_table_association" "a" {
vpc_id = "${aws_vpc.main.id}"
route_table_id = "${aws_route_table.r.id}"
}
`, acctest.RandString(10))

View File

@ -0,0 +1,397 @@
---
layout: "aws"
page_title: "AWS: aws_emr_cluster"
sidebar_current: "docs-aws-resource-emr-cluster"
description: |-
Provides an Elastic MapReduce Cluster
---
# aws\_emr\_cluster
Provides an Elastic MapReduce Cluster, a web service that makes it easy to
process large amounts of data efficiently. See [Amazon Elastic MapReduce Documentation](https://aws.amazon.com/documentation/elastic-mapreduce/)
for more information.
## Example Usage
```
resource "aws_emr_cluster" "emr-test-cluster" {
name = "emr-test-arn"
release_label = "emr-4.6.0"
applications = ["Spark"]
ec2_attributes {
subnet_id = "${aws_subnet.main.id}"
emr_managed_master_security_group = "${aws_security_group.sg.id}"
emr_managed_slave_security_group = "${aws_security_group.sg.id}"
instance_profile = "${aws_iam_instance_profile.emr_profile.arn}"
}
master_instance_type = "m3.xlarge"
core_instance_type = "m3.xlarge"
core_instance_count = 1
tags {
role = "rolename"
env = "env"
}
bootstrap_action {
path = "s3://elasticmapreduce/bootstrap-actions/run-if"
name = "runif"
args = ["instance.isMaster=true", "echo running on master node"]
}
configurations = "test-fixtures/emr_configurations.json"
service_role = "${aws_iam_role.iam_emr_service_role.arn}"
}
```
The `aws_emr_cluster` resource typically requires two IAM roles, one for the EMR Cluster
to use as a service, and another to place on your Cluster Instances to interact
with AWS from those instances. The suggested role policy template for the EMR service is `AmazonElasticMapReduceRole`,
and `AmazonElasticMapReduceforEC2Role` for the EC2 profile. See the [Getting
Started](http://docs.aws.amazon.com/fr_fr/ElasticMapReduce/latest/ManagementGuide/emr-gs-launch-sample-cluster.html)
guide for more information on these IAM roles. There is also a fully-bootable
example Terraform configuration at the bottom of this page.
## Argument Reference
The following arguments are supported:
* `name` - (Required) The name of the job flow
* `release_label` - (Required) The release label for the Amazon EMR release
* `master_instance_type` - (Required) The EC2 instance type of the master node
* `core_instance_type` - (Optional) The EC2 instance type of the slave nodes
* `core_instance_count` - (Optional) number of Amazon EC2 instances used to execute the job flow. Default `0`
* `log_uri` - (Optional) S3 bucket to write the log files of the job flow. If a value
is not provided, logs are not created
* `applications` - (Optional) A list of applications for the cluster. Valid values are: `Hadoop`, `Hive`,
`Mahout`, `Pig`, and `Spark.` Case insensitive
* `ec2_attributes` - (Optional) attributes for the EC2 instances running the job
flow. Defined below
* `bootstrap_action` - (Optional) list of bootstrap actions that will be run before Hadoop is started on
the cluster nodes. Defined below
* `configurations` - (Optional) list of configurations supplied for the EMR cluster you are creating
* `service_role` - (Optional) IAM role that will be assumed by the Amazon EMR service to access AWS resources
* `visible_to_all_users` - (Optional) Whether the job flow is visible to all IAM users of the AWS account associated with the job flow. Default `true`
* `tags` - (Optional) list of tags to apply to the EMR Cluster
## ec2\_attributes
Attributes for the Amazon EC2 instances running the job flow
* `key_name` - (Optional) Amazon EC2 key pair that can be used to ssh to the master
node as the user called `hadoop`
* `subnet_id` - (Optional) VPC subnet id where you want the job flow to launch.
Cannot specify the `cc1.4xlarge` instance type for nodes of a job flow launched in a Amazon VPC
* `additional_master_security_groups` - (Optional) list of additional Amazon EC2 security group IDs for the master node
* `additional_slave_security_groups` - (Optional) list of additional Amazon EC2 security group IDs for the slave nodes
* `emr_managed_master_security_group` - (Optional) identifier of the Amazon EC2 security group for the master node
* `emr_managed_slave_security_group` - (Optional) identifier of the Amazon EC2 security group for the slave nodes
* `instance_profile` - (Optional) Instance Profile for EC2 instances of the cluster assume this role
## bootstrap\_action
* `name` - (Required) name of the bootstrap action
* `path` - (Required) location of the script to run during a bootstrap action. Can be either a location in Amazon S3 or on a local file system
* `args` - (Optional) list of command line arguments to pass to the bootstrap action script
## Attributes Reference
The following attributes are exported:
* `id` - The ID of the EMR Cluster
* `name`
* `release_label`
* `master_instance_type`
* `core_instance_type`
* `core_instance_count`
* `log_uri`
* `applications`
* `ec2_attributes`
* `bootstrap_action`
* `configurations`
* `service_role`
* `visible_to_all_users`
* `tags`
## Example bootable config
**NOTE:** This configuration demonstrates a minimal configuration needed to
boot an example EMR Cluster. It is not meant to display best practices. Please
use at your own risk.
```
provider "aws" {
region = "us-west-2"
}
resource "aws_emr_cluster" "tf-test-cluster" {
name = "emr-test-arn"
release_label = "emr-4.6.0"
applications = ["Spark"]
ec2_attributes {
subnet_id = "${aws_subnet.main.id}"
emr_managed_master_security_group = "${aws_security_group.allow_all.id}"
emr_managed_slave_security_group = "${aws_security_group.allow_all.id}"
instance_profile = "${aws_iam_instance_profile.emr_profile.arn}"
}
master_instance_type = "m3.xlarge"
core_instance_type = "m3.xlarge"
core_instance_count = 1
tags {
role = "rolename"
dns_zone = "env_zone"
env = "env"
name = "name-env"
}
bootstrap_action {
path = "s3://elasticmapreduce/bootstrap-actions/run-if"
name = "runif"
args = ["instance.isMaster=true", "echo running on master node"]
}
configurations = "test-fixtures/emr_configurations.json"
service_role = "${aws_iam_role.iam_emr_service_role.arn}"
}
resource "aws_security_group" "allow_all" {
name = "allow_all"
description = "Allow all inbound traffic"
vpc_id = "${aws_vpc.main.id}"
ingress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
depends_on = ["aws_subnet.main"]
lifecycle {
ignore_changes = ["ingress", "egress"]
}
tags {
name = "emr_test"
}
}
resource "aws_vpc" "main" {
cidr_block = "168.31.0.0/16"
enable_dns_hostnames = true
tags {
name = "emr_test"
}
}
resource "aws_subnet" "main" {
vpc_id = "${aws_vpc.main.id}"
cidr_block = "168.31.0.0/20"
tags {
name = "emr_test"
}
}
resource "aws_internet_gateway" "gw" {
vpc_id = "${aws_vpc.main.id}"
}
resource "aws_route_table" "r" {
vpc_id = "${aws_vpc.main.id}"
route {
cidr_block = "0.0.0.0/0"
gateway_id = "${aws_internet_gateway.gw.id}"
}
}
resource "aws_main_route_table_association" "a" {
vpc_id = "${aws_vpc.main.id}"
route_table_id = "${aws_route_table.r.id}"
}
###
# IAM Role setups
###
# IAM role for EMR Service
resource "aws_iam_role" "iam_emr_service_role" {
name = "iam_emr_service_role"
assume_role_policy = <<EOF
{
"Version": "2008-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "elasticmapreduce.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}
resource "aws_iam_role_policy" "iam_emr_service_policy" {
name = "iam_emr_service_policy"
role = "${aws_iam_role.iam_emr_service_role.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Resource": "*",
"Action": [
"ec2:AuthorizeSecurityGroupEgress",
"ec2:AuthorizeSecurityGroupIngress",
"ec2:CancelSpotInstanceRequests",
"ec2:CreateNetworkInterface",
"ec2:CreateSecurityGroup",
"ec2:CreateTags",
"ec2:DeleteNetworkInterface",
"ec2:DeleteSecurityGroup",
"ec2:DeleteTags",
"ec2:DescribeAvailabilityZones",
"ec2:DescribeAccountAttributes",
"ec2:DescribeDhcpOptions",
"ec2:DescribeInstanceStatus",
"ec2:DescribeInstances",
"ec2:DescribeKeyPairs",
"ec2:DescribeNetworkAcls",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribePrefixLists",
"ec2:DescribeRouteTables",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSpotInstanceRequests",
"ec2:DescribeSpotPriceHistory",
"ec2:DescribeSubnets",
"ec2:DescribeVpcAttribute",
"ec2:DescribeVpcEndpoints",
"ec2:DescribeVpcEndpointServices",
"ec2:DescribeVpcs",
"ec2:DetachNetworkInterface",
"ec2:ModifyImageAttribute",
"ec2:ModifyInstanceAttribute",
"ec2:RequestSpotInstances",
"ec2:RevokeSecurityGroupEgress",
"ec2:RunInstances",
"ec2:TerminateInstances",
"ec2:DeleteVolume",
"ec2:DescribeVolumeStatus",
"ec2:DescribeVolumes",
"ec2:DetachVolume",
"iam:GetRole",
"iam:GetRolePolicy",
"iam:ListInstanceProfiles",
"iam:ListRolePolicies",
"iam:PassRole",
"s3:CreateBucket",
"s3:Get*",
"s3:List*",
"sdb:BatchPutAttributes",
"sdb:Select",
"sqs:CreateQueue",
"sqs:Delete*",
"sqs:GetQueue*",
"sqs:PurgeQueue",
"sqs:ReceiveMessage"
]
}]
}
EOF
}
# IAM Role for EC2 Instance Profile
resource "aws_iam_role" "iam_emr_profile_role" {
name = "iam_emr_profile_role"
assume_role_policy = <<EOF
{
"Version": "2008-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}
resource "aws_iam_instance_profile" "emr_profile" {
name = "emr_profile"
roles = ["${aws_iam_role.iam_emr_profile_role.name}"]
}
resource "aws_iam_role_policy" "iam_emr_profile_policy" {
name = "iam_emr_profile_policy"
role = "${aws_iam_role.iam_emr_profile_role.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Resource": "*",
"Action": [
"cloudwatch:*",
"dynamodb:*",
"ec2:Describe*",
"elasticmapreduce:Describe*",
"elasticmapreduce:ListBootstrapActions",
"elasticmapreduce:ListClusters",
"elasticmapreduce:ListInstanceGroups",
"elasticmapreduce:ListInstances",
"elasticmapreduce:ListSteps",
"kinesis:CreateStream",
"kinesis:DeleteStream",
"kinesis:DescribeStream",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:MergeShards",
"kinesis:PutRecord",
"kinesis:SplitShard",
"rds:Describe*",
"s3:*",
"sdb:*",
"sns:*",
"sqs:*"
]
}]
}
EOF
}
```

View File

@ -0,0 +1,50 @@
---
layout: "aws"
page_title: "AWS: aws_emr_instance_group"
sidebar_current: "docs-aws-resource-emr-instance-group"
description: |-
Provides an Elastic MapReduce Cluster Instance Group
---
# aws\_emr\_instance\_group
Provides an Elastic MapReduce Cluster Instance Group configuration.
See [Amazon Elastic MapReduce Documentation](http://docs.aws.amazon.com/en_en/ElasticMapReduce/latest/ManagementGuide/InstanceGroups.html)
for more information.
~> **NOTE:** At this time, Instance Groups cannot be destroyed through the API nor
web interface. Instance Groups are destroyed when the EMR Cluster is destroyed.
Terraform will resize any Instance Group to zero when destroying the resource.
## Example Usage
```
resource "aws_emr_cluster_instance_group" "task" {
cluster_id = "${aws_emr_cluster.tf-test-cluster.id}"
instance_count = 1
instance_type = "m3.xlarge"
name = "my little instance group"
}
```
## Argument Reference
The following arguments are supported:
* `name` - (Optional) Optional human friendly name for this Instance Group
* `cluster_id` - (Required) ID of the EMR Cluster to attach to
* `instance_type` - (Required) Type of instances for this Group
* `instance_count` - (Optional) Count of instances to launch
## ec2\_attributes
Attributes for the Instance Group
* `name` - Human friendly name for this Instance Group
* `cluster_id` - ID of the EMR Cluster the group is attached to
* `instance_type` - Type of instances for this Group
* `instance_count` - Count of desired instances to launch
* `running_instance_count` - Count of actual running instances in the group
* `status` - State of the instance group. One of `PROVISIONING`, `BOOTSTRAPPING`, `RUNNING`, `RESIZING`, `SUSPENDED`, `TERMINATING`, `TERMINATED`, `ARRESTED`, `SHUTTING_DOWN`, `ENDED`

View File

@ -457,6 +457,19 @@
</ul>
</li>
<li<%= sidebar_current(/^docs-aws-resource-emr/) %>>
<a href="#">Elastic Map Reduce Resources</a>
<ul class="nav nav-visible">
<li<%= sidebar_current("docs-aws-resource-emr-cluster") %>>
<a href="/docs/providers/aws/r/emr_cluster.html">aws_emr_cluster</a>
</li>
<li<%= sidebar_current("docs-aws-resource-emr-instance-group") %>>
<a href="/docs/providers/aws/r/emr_instance_group.html">aws_emr_instance_group</a>
</li>
</ul>
</li>
<li<%= sidebar_current(/^docs-aws-resource-elasticsearch/) %>>
<a href="#">ElasticSearch Resources</a>
<ul class="nav nav-visible">