basic emr implementation
quick emr resize implementation ass task group not force new add task group check empty slices clean up rename to initial_instance_count add task instance group as resource cluster resize core group clean up add name option log info clean up change log debug format clean up add missing security groups for master and slave add bootstrap actions add options for bootstrap action add tags option clean up fix for tags array support delimiters : = bootstrap actions fix add configurations item load local or remote config rename function support multiple bootstrap actions default value 0 for core group follow aws api able to create a master only tags use terraform tag schema option item for log_uri ec2_attribute as option add emr task group accTests add embedded json config add embedded json config add service_role and instance_profile add partial state support for either the "TERMINATED" or "TERMINATED_WITH_ERRORS" state not allowing to change name or instance_type for task group "core_instance_type" change into "Optional" and "Computed" apply MaxItems for ec2Attributes remove all debug "fmt.Println" clean up debug info and useless variable Expose visible_to_all_users as an option, default will be true remove debug info logging should happen before setId("") add hanChange checking first clean up debug log add some force new double check the core group existed add waiting and polling, until cluster up testcase add EMR cluster id and status checking clean up using common way to read ec2_attributes
This commit is contained in:
parent
aecb86edad
commit
ad8679e916
|
@ -232,11 +232,13 @@ func Provider() terraform.ResourceProvider {
|
|||
"aws_elasticsearch_domain": resourceAwsElasticSearchDomain(),
|
||||
"aws_elastictranscoder_pipeline": resourceAwsElasticTranscoderPipeline(),
|
||||
"aws_elastictranscoder_preset": resourceAwsElasticTranscoderPreset(),
|
||||
"aws_elb": resourceAwsElb(),
|
||||
"aws_elb_attachment": resourceAwsElbAttachment(),
|
||||
"aws_flow_log": resourceAwsFlowLog(),
|
||||
"aws_glacier_vault": resourceAwsGlacierVault(),
|
||||
"aws_iam_access_key": resourceAwsIamAccessKey(),
|
||||
"aws_elb": resourceAwsElb(),
|
||||
"aws_elb_attachment": resourceAwsElbAttachment(),
|
||||
"aws_emr": resourceAwsEMR(),
|
||||
"aws_emr_task_group": resourceAwsEMRTaskGroup(),
|
||||
"aws_flow_log": resourceAwsFlowLog(),
|
||||
"aws_glacier_vault": resourceAwsGlacierVault(),
|
||||
"aws_iam_access_key": resourceAwsIamAccessKey(),
|
||||
"aws_iam_account_password_policy": resourceAwsIamAccountPasswordPolicy(),
|
||||
"aws_iam_group_policy": resourceAwsIamGroupPolicy(),
|
||||
"aws_iam_group": resourceAwsIamGroup(),
|
||||
|
|
|
@ -0,0 +1,498 @@
|
|||
package aws
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/service/emr"
|
||||
"github.com/hashicorp/terraform/helper/resource"
|
||||
"github.com/hashicorp/terraform/helper/schema"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func resourceAwsEMR() *schema.Resource {
|
||||
return &schema.Resource{
|
||||
Create: resourceAwsEMRCreate,
|
||||
Read: resourceAwsEMRRead,
|
||||
Update: resourceAwsEMRUpdate,
|
||||
Delete: resourceAwsEMRDelete,
|
||||
Schema: map[string]*schema.Schema{
|
||||
"name": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Required: true,
|
||||
},
|
||||
"release_label": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Required: true,
|
||||
},
|
||||
"master_instance_type": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Required: true,
|
||||
ForceNew: true,
|
||||
},
|
||||
"core_instance_type": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
Computed: true,
|
||||
},
|
||||
"core_instance_count": &schema.Schema{
|
||||
Type: schema.TypeInt,
|
||||
Optional: true,
|
||||
Default: 0,
|
||||
},
|
||||
"log_uri": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
"applications": &schema.Schema{
|
||||
Type: schema.TypeSet,
|
||||
Optional: true,
|
||||
ForceNew: true,
|
||||
Elem: &schema.Schema{Type: schema.TypeString},
|
||||
Set: schema.HashString,
|
||||
},
|
||||
"ec2_attributes": &schema.Schema{
|
||||
Type: schema.TypeList,
|
||||
MaxItems: 1,
|
||||
Optional: true,
|
||||
Elem: &schema.Resource{
|
||||
Schema: map[string]*schema.Schema{
|
||||
"key_name": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
"subnet_id": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
"additional_master_security_groups": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
"additional_slave_security_groups": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
"emr_managed_master_security_group": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
"emr_managed_slave_security_group": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
"instance_profile": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"bootstrap_action": &schema.Schema{
|
||||
Type: schema.TypeSet,
|
||||
Optional: true,
|
||||
Elem: &schema.Resource{
|
||||
Schema: map[string]*schema.Schema{
|
||||
"name": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Required: true,
|
||||
},
|
||||
"path": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Required: true,
|
||||
},
|
||||
"args": &schema.Schema{
|
||||
Type: schema.TypeSet,
|
||||
Optional: true,
|
||||
Elem: &schema.Schema{Type: schema.TypeString},
|
||||
Set: schema.HashString,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"tags": tagsSchema(),
|
||||
"configurations": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
"service_role": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
"visible_to_all_users": &schema.Schema{
|
||||
Type: schema.TypeBool,
|
||||
Optional: true,
|
||||
Default: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func resourceAwsEMRCreate(d *schema.ResourceData, meta interface{}) error {
|
||||
conn := meta.(*AWSClient).emrconn
|
||||
|
||||
log.Printf("[DEBUG] Creating EMR cluster")
|
||||
masterInstanceType := d.Get("master_instance_type").(string)
|
||||
coreInstanceType := masterInstanceType
|
||||
if v, ok := d.GetOk("core_instance_type"); ok {
|
||||
coreInstanceType = v.(string)
|
||||
}
|
||||
coreInstanceCount := d.Get("core_instance_count").(int)
|
||||
|
||||
applications := d.Get("applications").(*schema.Set).List()
|
||||
var userKey, subnet, extraMasterSecGrp, extraSlaveSecGrp, emrMasterSecGrp, emrSlaveSecGrp, instanceProfile, serviceRole string
|
||||
instanceProfile = "EMR_EC2_DefaultRole"
|
||||
|
||||
if a, ok := d.GetOk("ec2_attributes"); ok {
|
||||
ec2Attributes := a.([]interface{})
|
||||
attributes := ec2Attributes[0].(map[string]interface{})
|
||||
userKey = attributes["key_name"].(string)
|
||||
subnet = attributes["subnet_id"].(string)
|
||||
extraMasterSecGrp = attributes["additional_master_security_groups"].(string)
|
||||
extraSlaveSecGrp = attributes["additional_slave_security_groups"].(string)
|
||||
emrMasterSecGrp = attributes["emr_managed_master_security_group"].(string)
|
||||
emrSlaveSecGrp = attributes["emr_managed_slave_security_group"].(string)
|
||||
|
||||
if len(strings.TrimSpace(attributes["instance_profile"].(string))) != 0 {
|
||||
instanceProfile = strings.TrimSpace(attributes["instance_profile"].(string))
|
||||
}
|
||||
}
|
||||
|
||||
if v, ok := d.GetOk("service_role"); ok {
|
||||
serviceRole = v.(string)
|
||||
} else {
|
||||
serviceRole = "EMR_DefaultRole"
|
||||
}
|
||||
|
||||
emrApps := expandApplications(applications)
|
||||
|
||||
params := &emr.RunJobFlowInput{
|
||||
Instances: &emr.JobFlowInstancesConfig{
|
||||
Ec2KeyName: aws.String(userKey),
|
||||
Ec2SubnetId: aws.String(subnet),
|
||||
InstanceCount: aws.Int64(int64(coreInstanceCount + 1)),
|
||||
KeepJobFlowAliveWhenNoSteps: aws.Bool(true),
|
||||
MasterInstanceType: aws.String(masterInstanceType),
|
||||
SlaveInstanceType: aws.String(coreInstanceType),
|
||||
TerminationProtected: aws.Bool(false),
|
||||
AdditionalMasterSecurityGroups: []*string{
|
||||
aws.String(extraMasterSecGrp),
|
||||
},
|
||||
AdditionalSlaveSecurityGroups: []*string{
|
||||
aws.String(extraSlaveSecGrp),
|
||||
},
|
||||
EmrManagedMasterSecurityGroup: aws.String(emrMasterSecGrp),
|
||||
EmrManagedSlaveSecurityGroup: aws.String(emrSlaveSecGrp),
|
||||
},
|
||||
Name: aws.String(d.Get("name").(string)),
|
||||
Applications: emrApps,
|
||||
|
||||
JobFlowRole: aws.String(instanceProfile),
|
||||
ReleaseLabel: aws.String(d.Get("release_label").(string)),
|
||||
ServiceRole: aws.String(serviceRole),
|
||||
VisibleToAllUsers: aws.Bool(d.Get("visible_to_all_users").(bool)),
|
||||
}
|
||||
|
||||
if v, ok := d.GetOk("log_uri"); ok {
|
||||
logUrl := v.(string)
|
||||
params.LogUri = aws.String(logUrl)
|
||||
}
|
||||
if v, ok := d.GetOk("bootstrap_action"); ok {
|
||||
bootstrapActions := v.(*schema.Set).List()
|
||||
log.Printf("[DEBUG] %v\n", bootstrapActions)
|
||||
params.BootstrapActions = expandBootstrapActions(bootstrapActions)
|
||||
}
|
||||
if v, ok := d.GetOk("tags"); ok {
|
||||
tagsIn := v.(map[string]interface{})
|
||||
params.Tags = expandTags(tagsIn)
|
||||
}
|
||||
if v, ok := d.GetOk("configurations"); ok {
|
||||
confUrl := v.(string)
|
||||
params.Configurations = expandConfigures(confUrl)
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] EMR Cluster create options: %s", params)
|
||||
resp, err := conn.RunJobFlow(params)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Created EMR Cluster done...")
|
||||
d.SetId(*resp.JobFlowId)
|
||||
|
||||
log.Println(
|
||||
"[INFO] Waiting for EMR Cluster to be available")
|
||||
|
||||
stateConf := &resource.StateChangeConf{
|
||||
Pending: []string{"STARTING", "BOOTSTRAPPING"},
|
||||
Target: []string{"WAITING", "RUNNING"},
|
||||
Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta),
|
||||
Timeout: 40 * time.Minute,
|
||||
MinTimeout: 10 * time.Second,
|
||||
Delay: 30 * time.Second, // Wait 30 secs before starting
|
||||
}
|
||||
|
||||
_, err = stateConf.WaitForState()
|
||||
if err != nil {
|
||||
return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\": %s", err)
|
||||
}
|
||||
|
||||
return resourceAwsEMRRead(d, meta)
|
||||
}
|
||||
|
||||
func resourceAwsEMRRead(d *schema.ResourceData, meta interface{}) error {
|
||||
emrconn := meta.(*AWSClient).emrconn
|
||||
|
||||
req := &emr.DescribeClusterInput{
|
||||
ClusterId: aws.String(d.Id()),
|
||||
}
|
||||
|
||||
resp, err := emrconn.DescribeCluster(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error reading EMR cluster: %s", err)
|
||||
}
|
||||
|
||||
if resp.Cluster == nil {
|
||||
d.SetId("")
|
||||
log.Printf("[DEBUG] EMR Cluster (%s) not found", d.Id())
|
||||
return nil
|
||||
}
|
||||
|
||||
instance := resp.Cluster
|
||||
|
||||
if instance.Status != nil {
|
||||
if *resp.Cluster.Status.State == "TERMINATED" {
|
||||
log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id())
|
||||
d.SetId("")
|
||||
return nil
|
||||
}
|
||||
|
||||
if *resp.Cluster.Status.State == "TERMINATED_WITH_ERRORS" {
|
||||
log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id())
|
||||
d.SetId("")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
instanceGroups, errGrps := loadGroups(d, meta)
|
||||
if errGrps == nil {
|
||||
coreGroup := findGroup(instanceGroups, "CORE")
|
||||
if coreGroup != nil {
|
||||
d.Set("core_instance_type", coreGroup.InstanceType)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func resourceAwsEMRUpdate(d *schema.ResourceData, meta interface{}) error {
|
||||
conn := meta.(*AWSClient).emrconn
|
||||
|
||||
if d.HasChange("core_instance_count") {
|
||||
log.Printf("[DEBUG] Modify EMR cluster")
|
||||
req := &emr.ListInstanceGroupsInput{
|
||||
ClusterId: aws.String(d.Id()),
|
||||
}
|
||||
|
||||
respGrps, errGrps := conn.ListInstanceGroups(req)
|
||||
if errGrps != nil {
|
||||
return fmt.Errorf("Error reading EMR cluster: %s", errGrps)
|
||||
}
|
||||
instanceGroups := respGrps.InstanceGroups
|
||||
|
||||
coreInstanceCount := d.Get("core_instance_count").(int)
|
||||
coreGroup := findGroup(instanceGroups, "CORE")
|
||||
|
||||
params := &emr.ModifyInstanceGroupsInput{
|
||||
InstanceGroups: []*emr.InstanceGroupModifyConfig{
|
||||
{
|
||||
InstanceGroupId: aws.String(*coreGroup.Id),
|
||||
InstanceCount: aws.Int64(int64(coreInstanceCount)),
|
||||
},
|
||||
},
|
||||
}
|
||||
_, errModify := conn.ModifyInstanceGroups(params)
|
||||
if errModify != nil {
|
||||
log.Printf("[ERROR] %s", errModify)
|
||||
return errModify
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Modify EMR Cluster done...")
|
||||
}
|
||||
|
||||
return resourceAwsEMRRead(d, meta)
|
||||
}
|
||||
|
||||
func resourceAwsEMRDelete(d *schema.ResourceData, meta interface{}) error {
|
||||
conn := meta.(*AWSClient).emrconn
|
||||
|
||||
req := &emr.TerminateJobFlowsInput{
|
||||
JobFlowIds: []*string{
|
||||
aws.String(d.Id()),
|
||||
},
|
||||
}
|
||||
|
||||
_, err := conn.TerminateJobFlows(req)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR], %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
d.SetId("")
|
||||
return nil
|
||||
}
|
||||
|
||||
func expandApplications(apps []interface{}) []*emr.Application {
|
||||
appOut := make([]*emr.Application, 0, len(apps))
|
||||
|
||||
for _, appName := range expandStringList(apps) {
|
||||
app := &emr.Application{
|
||||
Name: appName,
|
||||
}
|
||||
appOut = append(appOut, app)
|
||||
}
|
||||
return appOut
|
||||
}
|
||||
|
||||
func loadGroups(d *schema.ResourceData, meta interface{}) ([]*emr.InstanceGroup, error) {
|
||||
emrconn := meta.(*AWSClient).emrconn
|
||||
reqGrps := &emr.ListInstanceGroupsInput{
|
||||
ClusterId: aws.String(d.Id()),
|
||||
}
|
||||
|
||||
respGrps, errGrps := emrconn.ListInstanceGroups(reqGrps)
|
||||
if errGrps != nil {
|
||||
return nil, fmt.Errorf("Error reading EMR cluster: %s", errGrps)
|
||||
}
|
||||
return respGrps.InstanceGroups, nil
|
||||
}
|
||||
|
||||
func findGroup(grps []*emr.InstanceGroup, typ string) *emr.InstanceGroup {
|
||||
for _, grp := range grps {
|
||||
if *grp.InstanceGroupType == typ {
|
||||
return grp
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func expandTags(m map[string]interface{}) []*emr.Tag {
|
||||
var result []*emr.Tag
|
||||
for k, v := range m {
|
||||
result = append(result, &emr.Tag{
|
||||
Key: aws.String(k),
|
||||
Value: aws.String(v.(string)),
|
||||
})
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActionConfig {
|
||||
actionsOut := []*emr.BootstrapActionConfig{}
|
||||
|
||||
for _, raw := range bootstrapActions {
|
||||
actionAttributes := raw.(map[string]interface{})
|
||||
actionName := actionAttributes["name"].(string)
|
||||
actionPath := actionAttributes["path"].(string)
|
||||
actionArgs := actionAttributes["args"].(*schema.Set).List()
|
||||
|
||||
action := &emr.BootstrapActionConfig{
|
||||
Name: aws.String(actionName),
|
||||
ScriptBootstrapAction: &emr.ScriptBootstrapActionConfig{
|
||||
Path: aws.String(actionPath),
|
||||
Args: expandStringList(actionArgs),
|
||||
},
|
||||
}
|
||||
actionsOut = append(actionsOut, action)
|
||||
}
|
||||
|
||||
return actionsOut
|
||||
}
|
||||
|
||||
func expandConfigures(input string) []*emr.Configuration {
|
||||
configsOut := []*emr.Configuration{}
|
||||
if strings.HasPrefix(input, "http") {
|
||||
readHttpJson(input, &configsOut)
|
||||
} else if strings.HasSuffix(input, ".json") {
|
||||
readLocalJson(input, &configsOut)
|
||||
} else {
|
||||
readBodyJson(input, &configsOut)
|
||||
}
|
||||
log.Printf("[DEBUG] Configures %v\n", configsOut)
|
||||
|
||||
return configsOut
|
||||
}
|
||||
|
||||
func readHttpJson(url string, target interface{}) error {
|
||||
r, err := http.Get(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
return json.NewDecoder(r.Body).Decode(target)
|
||||
}
|
||||
|
||||
func readLocalJson(localFile string, target interface{}) error {
|
||||
file, e := ioutil.ReadFile(localFile)
|
||||
if e != nil {
|
||||
log.Printf("[ERROR] %s", e)
|
||||
return e
|
||||
}
|
||||
|
||||
return json.Unmarshal(file, target)
|
||||
}
|
||||
|
||||
func readBodyJson(body string, target interface{}) error {
|
||||
log.Printf("[DEBUG] Raw Body %s\n", body)
|
||||
err := json.Unmarshal([]byte(body), target)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] parsing JSON %s", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func resourceAwsEMRClusterStateRefreshFunc(d *schema.ResourceData, meta interface{}) resource.StateRefreshFunc {
|
||||
return func() (interface{}, string, error) {
|
||||
conn := meta.(*AWSClient).emrconn
|
||||
|
||||
log.Printf("[INFO] Reading EMR Cluster Information: %s", d.Id())
|
||||
params := &emr.DescribeClusterInput{
|
||||
ClusterId: aws.String(d.Id()),
|
||||
}
|
||||
|
||||
resp, err := conn.DescribeCluster(params)
|
||||
|
||||
if err != nil {
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
if "ClusterNotFound" == awsErr.Code() {
|
||||
return 42, "destroyed", nil
|
||||
}
|
||||
}
|
||||
log.Printf("[WARN] Error on retrieving EMR Cluster (%s) when waiting: %s", d.Id(), err)
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
emrc := resp.Cluster
|
||||
|
||||
if emrc == nil {
|
||||
return 42, "destroyed", nil
|
||||
}
|
||||
|
||||
if resp.Cluster.Status != nil {
|
||||
log.Printf("[DEBUG] EMR Cluster status (%s): %s", d.Id(), *resp.Cluster.Status)
|
||||
}
|
||||
|
||||
return emrc, *emrc.Status.State, nil
|
||||
}
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
package aws
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/emr"
|
||||
"github.com/hashicorp/terraform/helper/schema"
|
||||
)
|
||||
|
||||
func resourceAwsEMRTaskGroup() *schema.Resource {
|
||||
return &schema.Resource{
|
||||
Create: resourceAwsEMRTaskGroupCreate,
|
||||
Read: resourceAwsEMRTaskGroupRead,
|
||||
Update: resourceAwsEMRTaskGroupUpdate,
|
||||
Delete: resourceAwsEMRTaskGroupDelete,
|
||||
Schema: map[string]*schema.Schema{
|
||||
"cluster_id": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Required: true,
|
||||
ForceNew: true,
|
||||
},
|
||||
"instance_type": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Required: true,
|
||||
ForceNew: true,
|
||||
},
|
||||
"instance_count": &schema.Schema{
|
||||
Type: schema.TypeInt,
|
||||
Optional: true,
|
||||
Default: 60,
|
||||
},
|
||||
"name": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Optional: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func resourceAwsEMRTaskGroupCreate(d *schema.ResourceData, meta interface{}) error {
|
||||
conn := meta.(*AWSClient).emrconn
|
||||
|
||||
clusterId := d.Get("cluster_id").(string)
|
||||
instanceType := d.Get("instance_type").(string)
|
||||
instanceCount := d.Get("instance_count").(int)
|
||||
groupName := d.Get("name").(string)
|
||||
|
||||
log.Printf("[DEBUG] Creating EMR task group")
|
||||
params := &emr.AddInstanceGroupsInput{
|
||||
InstanceGroups: []*emr.InstanceGroupConfig{
|
||||
{
|
||||
InstanceRole: aws.String("TASK"),
|
||||
InstanceCount: aws.Int64(int64(instanceCount)),
|
||||
InstanceType: aws.String(instanceType),
|
||||
Name: aws.String(groupName),
|
||||
},
|
||||
},
|
||||
JobFlowId: aws.String(clusterId),
|
||||
}
|
||||
resp, err := conn.AddInstanceGroups(params)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Created EMR task group finished: %#v", resp)
|
||||
d.SetId(*resp.InstanceGroupIds[0])
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func resourceAwsEMRTaskGroupRead(d *schema.ResourceData, meta interface{}) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func resourceAwsEMRTaskGroupUpdate(d *schema.ResourceData, meta interface{}) error {
|
||||
conn := meta.(*AWSClient).emrconn
|
||||
|
||||
log.Printf("[DEBUG] Modify EMR task group")
|
||||
instanceCount := d.Get("instance_count").(int)
|
||||
|
||||
if d.HasChange("name") {
|
||||
return fmt.Errorf("[WARN] Error updating task group, change name is not supported by api")
|
||||
}
|
||||
|
||||
if d.HasChange("instance_type") {
|
||||
return fmt.Errorf("[WARN] Error updating task group, change instance_type is not supported by api")
|
||||
}
|
||||
|
||||
params := &emr.ModifyInstanceGroupsInput{
|
||||
InstanceGroups: []*emr.InstanceGroupModifyConfig{
|
||||
{
|
||||
InstanceGroupId: aws.String(d.Id()),
|
||||
InstanceCount: aws.Int64(int64(instanceCount)),
|
||||
},
|
||||
},
|
||||
}
|
||||
resp, err := conn.ModifyInstanceGroups(params)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Modify EMR task group finished: %#v", resp)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func resourceAwsEMRTaskGroupDelete(d *schema.ResourceData, meta interface{}) error {
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,176 @@
|
|||
package aws
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/service/emr"
|
||||
"github.com/hashicorp/terraform/helper/acctest"
|
||||
"github.com/hashicorp/terraform/helper/resource"
|
||||
"github.com/hashicorp/terraform/terraform"
|
||||
"log"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAccAWSEmrTaskGroup_basic(t *testing.T) {
|
||||
var jobFlow emr.RunJobFlowOutput
|
||||
resource.Test(t, resource.TestCase{
|
||||
PreCheck: func() { testAccPreCheck(t) },
|
||||
Providers: testAccProviders,
|
||||
CheckDestroy: testAccCheckAWSEmrTaskGroupDestroy,
|
||||
Steps: []resource.TestStep{
|
||||
resource.TestStep{
|
||||
Config: testAccAWSEmrTaskGroupConfig,
|
||||
Check: testAccCheckAWSEmrTaskGroupExists("aws_emr_task_group.task", &jobFlow),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func testAccCheckAWSEmrTaskGroupDestroy(s *terraform.State) error {
|
||||
conn := testAccProvider.Meta().(*AWSClient).emrconn
|
||||
|
||||
for _, rs := range s.RootModule().Resources {
|
||||
if rs.Type != "aws_emr" {
|
||||
continue
|
||||
}
|
||||
|
||||
params := &emr.DescribeClusterInput{
|
||||
ClusterId: aws.String(rs.Primary.ID),
|
||||
}
|
||||
|
||||
describe, err := conn.DescribeCluster(params)
|
||||
|
||||
if err == nil {
|
||||
if describe.Cluster != nil &&
|
||||
*describe.Cluster.Status.State == "WAITING" {
|
||||
return fmt.Errorf("EMR Cluster still exists")
|
||||
}
|
||||
}
|
||||
|
||||
providerErr, ok := err.(awserr.Error)
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("[ERROR] %v", providerErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func testAccCheckAWSEmrTaskGroupExists(n string, v *emr.RunJobFlowOutput) resource.TestCheckFunc {
|
||||
return func(s *terraform.State) error {
|
||||
rs, ok := s.RootModule().Resources[n]
|
||||
if !ok {
|
||||
return fmt.Errorf("Not found: %s", n)
|
||||
}
|
||||
if rs.Primary.ID == "" {
|
||||
return fmt.Errorf("No task group id set")
|
||||
}
|
||||
conn := testAccProvider.Meta().(*AWSClient).emrconn
|
||||
_, err := conn.DescribeCluster(&emr.DescribeClusterInput{
|
||||
ClusterId: aws.String(rs.Primary.Attributes["cluster_id"]),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("EMR error: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var testAccAWSEmrTaskGroupConfig = fmt.Sprintf(`
|
||||
provider "aws" {
|
||||
region = "ap-southeast-2"
|
||||
}
|
||||
|
||||
resource "aws_emr" "tf-test-cluster" {
|
||||
name = "emr-%s"
|
||||
release_label = "emr-4.6.0"
|
||||
applications = ["Spark"]
|
||||
|
||||
ec2_attributes {
|
||||
subnet_id = "${aws_subnet.main.id}"
|
||||
emr_managed_master_security_group = "${aws_security_group.allow_all.id}"
|
||||
emr_managed_slave_security_group = "${aws_security_group.allow_all.id}"
|
||||
}
|
||||
|
||||
master_instance_type = "m3.xlarge"
|
||||
core_instance_type = "m3.xlarge"
|
||||
core_instance_count = 1
|
||||
|
||||
tags {
|
||||
role = "rolename"
|
||||
dns_zone = "env_zone"
|
||||
env = "env"
|
||||
name = "name-env"
|
||||
}
|
||||
|
||||
bootstrap_action {
|
||||
path ="s3://elasticmapreduce/bootstrap-actions/run-if"
|
||||
name ="runif"
|
||||
args =["instance.isMaster=true","echo running on master node"]
|
||||
}
|
||||
|
||||
configurations = "test-fixtures/emr_configurations.json"
|
||||
}
|
||||
|
||||
resource "aws_emr_task_group" "task" {
|
||||
cluster_id = "${aws_emr.tf-test-cluster.id}"
|
||||
instance_count = 1
|
||||
instance_type = "m3.xlarge"
|
||||
}
|
||||
|
||||
resource "aws_security_group" "allow_all" {
|
||||
name = "allow_all"
|
||||
description = "Allow all inbound traffic"
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
|
||||
ingress {
|
||||
from_port = 0
|
||||
to_port = 0
|
||||
protocol = "-1"
|
||||
cidr_blocks = ["0.0.0.0/0"]
|
||||
}
|
||||
egress {
|
||||
from_port = 0
|
||||
to_port = 0
|
||||
protocol = "-1"
|
||||
cidr_blocks = ["0.0.0.0/0"]
|
||||
}
|
||||
|
||||
depends_on = ["aws_subnet.main"]
|
||||
lifecycle {
|
||||
ignore_changes = ["ingress", "egress"]
|
||||
}
|
||||
}
|
||||
|
||||
resource "aws_vpc" "main" {
|
||||
cidr_block = "168.31.0.0/16"
|
||||
enable_dns_hostnames = true
|
||||
}
|
||||
|
||||
resource "aws_subnet" "main" {
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
cidr_block = "168.31.0.0/20"
|
||||
# map_public_ip_on_launch = true
|
||||
}
|
||||
|
||||
resource "aws_internet_gateway" "gw" {
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
}
|
||||
|
||||
resource "aws_route_table" "r" {
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
|
||||
route {
|
||||
cidr_block = "0.0.0.0/0"
|
||||
gateway_id = "${aws_internet_gateway.gw.id}"
|
||||
}
|
||||
}
|
||||
|
||||
resource "aws_main_route_table_association" "a" {
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
route_table_id = "${aws_route_table.r.id}"
|
||||
}
|
||||
`, acctest.RandString(10))
|
|
@ -0,0 +1,184 @@
|
|||
package aws
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/service/emr"
|
||||
"github.com/hashicorp/terraform/helper/acctest"
|
||||
"github.com/hashicorp/terraform/helper/resource"
|
||||
"github.com/hashicorp/terraform/terraform"
|
||||
"log"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAccAWSEmrCluster_basic(t *testing.T) {
|
||||
var jobFlow emr.RunJobFlowOutput
|
||||
resource.Test(t, resource.TestCase{
|
||||
PreCheck: func() { testAccPreCheck(t) },
|
||||
Providers: testAccProviders,
|
||||
CheckDestroy: testAccCheckAWSEmrDestroy,
|
||||
Steps: []resource.TestStep{
|
||||
resource.TestStep{
|
||||
Config: testAccAWSEmrClusterConfig,
|
||||
Check: testAccCheckAWSEmrClusterExists("aws_emr.tf-test-cluster", &jobFlow),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func testAccCheckAWSEmrDestroy(s *terraform.State) error {
|
||||
conn := testAccProvider.Meta().(*AWSClient).emrconn
|
||||
|
||||
for _, rs := range s.RootModule().Resources {
|
||||
if rs.Type != "aws_emr" {
|
||||
continue
|
||||
}
|
||||
|
||||
params := &emr.DescribeClusterInput{
|
||||
ClusterId: aws.String(rs.Primary.ID),
|
||||
}
|
||||
|
||||
describe, err := conn.DescribeCluster(params)
|
||||
|
||||
if err == nil {
|
||||
if describe.Cluster != nil &&
|
||||
*describe.Cluster.Status.State == "WAITING" {
|
||||
return fmt.Errorf("EMR Cluster still exists")
|
||||
}
|
||||
}
|
||||
|
||||
providerErr, ok := err.(awserr.Error)
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("[ERROR] %v", providerErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func testAccCheckAWSEmrClusterExists(n string, v *emr.RunJobFlowOutput) resource.TestCheckFunc {
|
||||
return func(s *terraform.State) error {
|
||||
rs, ok := s.RootModule().Resources[n]
|
||||
if !ok {
|
||||
return fmt.Errorf("Not found: %s", n)
|
||||
}
|
||||
if rs.Primary.ID == "" {
|
||||
return fmt.Errorf("No cluster id set")
|
||||
}
|
||||
conn := testAccProvider.Meta().(*AWSClient).emrconn
|
||||
describe, err := conn.DescribeCluster(&emr.DescribeClusterInput{
|
||||
ClusterId: aws.String(rs.Primary.ID),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("EMR error: %v", err)
|
||||
}
|
||||
|
||||
if describe.Cluster != nil &&
|
||||
*describe.Cluster.Id != rs.Primary.ID {
|
||||
return fmt.Errorf("EMR cluser not found")
|
||||
}
|
||||
|
||||
if describe.Cluster != nil &&
|
||||
*describe.Cluster.Status.State != "WAITING" {
|
||||
return fmt.Errorf("EMR cluser is not up yet")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var testAccAWSEmrClusterConfig = fmt.Sprintf(`
|
||||
provider "aws" {
|
||||
region = "ap-southeast-2"
|
||||
}
|
||||
|
||||
resource "aws_emr" "tf-test-cluster" {
|
||||
name = "emr-%s"
|
||||
release_label = "emr-4.6.0"
|
||||
applications = ["Spark"]
|
||||
|
||||
ec2_attributes {
|
||||
subnet_id = "${aws_subnet.main.id}"
|
||||
emr_managed_master_security_group = "${aws_security_group.allow_all.id}"
|
||||
emr_managed_slave_security_group = "${aws_security_group.allow_all.id}"
|
||||
}
|
||||
|
||||
master_instance_type = "m3.xlarge"
|
||||
core_instance_type = "m3.xlarge"
|
||||
core_instance_count = 1
|
||||
|
||||
tags {
|
||||
role = "rolename"
|
||||
dns_zone = "env_zone"
|
||||
env = "env"
|
||||
name = "name-env"
|
||||
}
|
||||
|
||||
bootstrap_action {
|
||||
path ="s3://elasticmapreduce/bootstrap-actions/run-if"
|
||||
name ="runif"
|
||||
args =["instance.isMaster=true","echo running on master node"]
|
||||
}
|
||||
|
||||
configurations = "test-fixtures/emr_configurations.json"
|
||||
|
||||
depends_on = ["aws_main_route_table_association.a"]
|
||||
}
|
||||
|
||||
resource "aws_security_group" "allow_all" {
|
||||
name = "allow_all"
|
||||
description = "Allow all inbound traffic"
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
|
||||
ingress {
|
||||
from_port = 0
|
||||
to_port = 0
|
||||
protocol = "-1"
|
||||
cidr_blocks = ["0.0.0.0/0"]
|
||||
}
|
||||
egress {
|
||||
from_port = 0
|
||||
to_port = 0
|
||||
protocol = "-1"
|
||||
cidr_blocks = ["0.0.0.0/0"]
|
||||
}
|
||||
|
||||
depends_on = ["aws_subnet.main"]
|
||||
lifecycle {
|
||||
ignore_changes = ["ingress", "egress"]
|
||||
}
|
||||
}
|
||||
|
||||
resource "aws_vpc" "main" {
|
||||
cidr_block = "168.31.0.0/16"
|
||||
enable_dns_hostnames = true
|
||||
}
|
||||
|
||||
resource "aws_subnet" "main" {
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
cidr_block = "168.31.0.0/20"
|
||||
# map_public_ip_on_launch = true
|
||||
}
|
||||
|
||||
resource "aws_internet_gateway" "gw" {
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
}
|
||||
|
||||
resource "aws_route_table" "r" {
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
|
||||
route {
|
||||
cidr_block = "0.0.0.0/0"
|
||||
gateway_id = "${aws_internet_gateway.gw.id}"
|
||||
}
|
||||
}
|
||||
|
||||
resource "aws_main_route_table_association" "a" {
|
||||
vpc_id = "${aws_vpc.main.id}"
|
||||
route_table_id = "${aws_route_table.r.id}"
|
||||
}
|
||||
|
||||
`, acctest.RandString(10))
|
|
@ -0,0 +1,28 @@
|
|||
[
|
||||
{
|
||||
"Classification": "hadoop-env",
|
||||
"Configurations": [
|
||||
{
|
||||
"Classification": "export",
|
||||
"Configurations": [],
|
||||
"Properties": {
|
||||
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
|
||||
}
|
||||
}
|
||||
],
|
||||
"Properties": {}
|
||||
},
|
||||
{
|
||||
"Classification": "spark-env",
|
||||
"Configurations": [
|
||||
{
|
||||
"Classification": "export",
|
||||
"Configurations": [],
|
||||
"Properties": {
|
||||
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
|
||||
}
|
||||
}
|
||||
],
|
||||
"Properties": {}
|
||||
}
|
||||
]
|
Loading…
Reference in New Issue