diff --git a/builtin/providers/azurerm/resource_arm_storage_blob.go b/builtin/providers/azurerm/resource_arm_storage_blob.go index 80a3aed92..67a3900b0 100644 --- a/builtin/providers/azurerm/resource_arm_storage_blob.go +++ b/builtin/providers/azurerm/resource_arm_storage_blob.go @@ -1,10 +1,18 @@ package azurerm import ( + "bytes" + "crypto/rand" + "encoding/base64" "fmt" + "io" "log" + "os" + "runtime" "strings" + "sync" + "github.com/Azure/azure-sdk-for-go/storage" "github.com/hashicorp/terraform/helper/schema" ) @@ -49,14 +57,53 @@ func resourceArmStorageBlob() *schema.Resource { Default: 0, ValidateFunc: validateArmStorageBlobSize, }, + "source": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, "url": { Type: schema.TypeString, Computed: true, }, + "parallelism": { + Type: schema.TypeInt, + Optional: true, + Default: 8, + ForceNew: true, + ValidateFunc: validateArmStorageBlobParallelism, + }, + "attempts": { + Type: schema.TypeInt, + Optional: true, + Default: 1, + ForceNew: true, + ValidateFunc: validateArmStorageBlobAttempts, + }, }, } } +func validateArmStorageBlobParallelism(v interface{}, k string) (ws []string, errors []error) { + value := v.(int) + + if value <= 0 { + errors = append(errors, fmt.Errorf("Blob Parallelism %q is invalid, must be greater than 0", value)) + } + + return +} + +func validateArmStorageBlobAttempts(v interface{}, k string) (ws []string, errors []error) { + value := v.(int) + + if value <= 0 { + errors = append(errors, fmt.Errorf("Blob Attempts %q is invalid, must be greater than 0", value)) + } + + return +} + func validateArmStorageBlobSize(v interface{}, k string) (ws []string, errors []error) { value := v.(int) @@ -101,19 +148,338 @@ func resourceArmStorageBlobCreate(d *schema.ResourceData, meta interface{}) erro log.Printf("[INFO] Creating blob %q in storage account %q", name, storageAccountName) switch strings.ToLower(blobType) { case "block": - err = blobClient.CreateBlockBlob(cont, name) + if err := blobClient.CreateBlockBlob(cont, name); err != nil { + return fmt.Errorf("Error creating storage blob on Azure: %s", err) + } + + source := d.Get("source").(string) + if source != "" { + parallelism := d.Get("parallelism").(int) + attempts := d.Get("attempts").(int) + if err := resourceArmStorageBlobBlockUploadFromSource(cont, name, source, blobClient, parallelism, attempts); err != nil { + return fmt.Errorf("Error creating storage blob on Azure: %s", err) + } + } case "page": - size := int64(d.Get("size").(int)) - err = blobClient.PutPageBlob(cont, name, size, map[string]string{}) - } - if err != nil { - return fmt.Errorf("Error creating storage blob on Azure: %s", err) + source := d.Get("source").(string) + if source != "" { + parallelism := d.Get("parallelism").(int) + attempts := d.Get("attempts").(int) + if err := resourceArmStorageBlobPageUploadFromSource(cont, name, source, blobClient, parallelism, attempts); err != nil { + return fmt.Errorf("Error creating storage blob on Azure: %s", err) + } + } else { + size := int64(d.Get("size").(int)) + if err := blobClient.PutPageBlob(cont, name, size, map[string]string{}); err != nil { + return fmt.Errorf("Error creating storage blob on Azure: %s", err) + } + } } d.SetId(name) return resourceArmStorageBlobRead(d, meta) } +type resourceArmStorageBlobPage struct { + offset int64 + section *io.SectionReader +} + +func resourceArmStorageBlobPageUploadFromSource(container, name, source string, client *storage.BlobStorageClient, parallelism, attempts int) error { + workerCount := parallelism * runtime.NumCPU() + + file, err := os.Open(source) + if err != nil { + return fmt.Errorf("Error opening source file for upload %q: %s", source, err) + } + defer file.Close() + + blobSize, pageList, err := resourceArmStorageBlobPageSplit(file) + if err != nil { + return fmt.Errorf("Error splitting source file %q into pages: %s", source, err) + } + + if err := client.PutPageBlob(container, name, blobSize, map[string]string{}); err != nil { + return fmt.Errorf("Error creating storage blob on Azure: %s", err) + } + + pages := make(chan resourceArmStorageBlobPage, len(pageList)) + errors := make(chan error, len(pageList)) + wg := &sync.WaitGroup{} + wg.Add(len(pageList)) + + total := int64(0) + for _, page := range pageList { + total += page.section.Size() + pages <- page + } + close(pages) + + for i := 0; i < workerCount; i++ { + go resourceArmStorageBlobPageUploadWorker(resourceArmStorageBlobPageUploadContext{ + container: container, + name: name, + source: source, + blobSize: blobSize, + client: client, + pages: pages, + errors: errors, + wg: wg, + attempts: attempts, + }) + } + + wg.Wait() + + if len(errors) > 0 { + return fmt.Errorf("Error while uploading source file %q: %s", source, <-errors) + } + + return nil +} + +func resourceArmStorageBlobPageSplit(file *os.File) (int64, []resourceArmStorageBlobPage, error) { + const ( + minPageSize int64 = 4 * 1024 + maxPageSize int64 = 4 * 1024 * 1024 + ) + + info, err := file.Stat() + if err != nil { + return int64(0), nil, fmt.Errorf("Could not stat file %q: %s", file.Name(), err) + } + + blobSize := info.Size() + if info.Size()%minPageSize != 0 { + blobSize = info.Size() + (minPageSize - (info.Size() % minPageSize)) + } + + emptyPage := make([]byte, minPageSize) + + type byteRange struct { + offset int64 + length int64 + } + + var nonEmptyRanges []byteRange + var currentRange byteRange + for i := int64(0); i < blobSize; i += minPageSize { + pageBuf := make([]byte, minPageSize) + _, err = file.ReadAt(pageBuf, i) + if err != nil && err != io.EOF { + return int64(0), nil, fmt.Errorf("Could not read chunk at %d: %s", i, err) + } + + if bytes.Equal(pageBuf, emptyPage) { + if currentRange.length != 0 { + nonEmptyRanges = append(nonEmptyRanges, currentRange) + } + currentRange = byteRange{ + offset: i + minPageSize, + } + } else { + currentRange.length += minPageSize + if currentRange.length == maxPageSize || (currentRange.offset+currentRange.length == blobSize) { + nonEmptyRanges = append(nonEmptyRanges, currentRange) + currentRange = byteRange{ + offset: i + minPageSize, + } + } + } + } + + var pages []resourceArmStorageBlobPage + for _, nonEmptyRange := range nonEmptyRanges { + pages = append(pages, resourceArmStorageBlobPage{ + offset: nonEmptyRange.offset, + section: io.NewSectionReader(file, nonEmptyRange.offset, nonEmptyRange.length), + }) + } + + return info.Size(), pages, nil +} + +type resourceArmStorageBlobPageUploadContext struct { + container string + name string + source string + blobSize int64 + client *storage.BlobStorageClient + pages chan resourceArmStorageBlobPage + errors chan error + wg *sync.WaitGroup + attempts int +} + +func resourceArmStorageBlobPageUploadWorker(ctx resourceArmStorageBlobPageUploadContext) { + for page := range ctx.pages { + start := page.offset + end := page.offset + page.section.Size() - 1 + if end > ctx.blobSize-1 { + end = ctx.blobSize - 1 + } + size := end - start + 1 + + chunk := make([]byte, size) + _, err := page.section.Read(chunk) + if err != nil && err != io.EOF { + ctx.errors <- fmt.Errorf("Error reading source file %q at offset %d: %s", ctx.source, page.offset, err) + ctx.wg.Done() + continue + } + + for x := 0; x < ctx.attempts; x++ { + err = ctx.client.PutPage(ctx.container, ctx.name, start, end, storage.PageWriteTypeUpdate, chunk, map[string]string{}) + if err == nil { + break + } + } + if err != nil { + ctx.errors <- fmt.Errorf("Error writing page at offset %d for file %q: %s", page.offset, ctx.source, err) + ctx.wg.Done() + continue + } + + ctx.wg.Done() + } +} + +type resourceArmStorageBlobBlock struct { + section *io.SectionReader + id string +} + +func resourceArmStorageBlobBlockUploadFromSource(container, name, source string, client *storage.BlobStorageClient, parallelism, attempts int) error { + workerCount := parallelism * runtime.NumCPU() + + file, err := os.Open(source) + if err != nil { + return fmt.Errorf("Error opening source file for upload %q: %s", source, err) + } + defer file.Close() + + blockList, parts, err := resourceArmStorageBlobBlockSplit(file) + if err != nil { + return fmt.Errorf("Error reading and splitting source file for upload %q: %s", source, err) + } + + wg := &sync.WaitGroup{} + blocks := make(chan resourceArmStorageBlobBlock, len(parts)) + errors := make(chan error, len(parts)) + + wg.Add(len(parts)) + for _, p := range parts { + blocks <- p + } + close(blocks) + + for i := 0; i < workerCount; i++ { + go resourceArmStorageBlobBlockUploadWorker(resourceArmStorageBlobBlockUploadContext{ + client: client, + source: source, + container: container, + name: name, + blocks: blocks, + errors: errors, + wg: wg, + attempts: attempts, + }) + } + + wg.Wait() + + if len(errors) > 0 { + return fmt.Errorf("Error while uploading source file %q: %s", source, <-errors) + } + + err = client.PutBlockList(container, name, blockList) + if err != nil { + return fmt.Errorf("Error updating block list for source file %q: %s", source, err) + } + + return nil +} + +func resourceArmStorageBlobBlockSplit(file *os.File) ([]storage.Block, []resourceArmStorageBlobBlock, error) { + const ( + idSize = 64 + blockSize int64 = 4 * 1024 * 1024 + ) + var parts []resourceArmStorageBlobBlock + var blockList []storage.Block + + info, err := file.Stat() + if err != nil { + return nil, nil, fmt.Errorf("Error stating source file %q: %s", file.Name(), err) + } + + for i := int64(0); i < info.Size(); i = i + blockSize { + entropy := make([]byte, idSize) + _, err = rand.Read(entropy) + if err != nil { + return nil, nil, fmt.Errorf("Error generating a random block ID for source file %q: %s", file.Name(), err) + } + + sectionSize := blockSize + remainder := info.Size() - i + if remainder < blockSize { + sectionSize = remainder + } + + block := storage.Block{ + ID: base64.StdEncoding.EncodeToString(entropy), + Status: storage.BlockStatusUncommitted, + } + + blockList = append(blockList, block) + + parts = append(parts, resourceArmStorageBlobBlock{ + id: block.ID, + section: io.NewSectionReader(file, i, sectionSize), + }) + } + + return blockList, parts, nil +} + +type resourceArmStorageBlobBlockUploadContext struct { + client *storage.BlobStorageClient + container string + name string + source string + attempts int + blocks chan resourceArmStorageBlobBlock + errors chan error + wg *sync.WaitGroup +} + +func resourceArmStorageBlobBlockUploadWorker(ctx resourceArmStorageBlobBlockUploadContext) { + for block := range ctx.blocks { + buffer := make([]byte, block.section.Size()) + + _, err := block.section.Read(buffer) + if err != nil { + ctx.errors <- fmt.Errorf("Error reading source file %q: %s", ctx.source, err) + ctx.wg.Done() + continue + } + + for i := 0; i < ctx.attempts; i++ { + err = ctx.client.PutBlock(ctx.container, ctx.name, block.id, buffer) + if err == nil { + break + } + } + if err != nil { + ctx.errors <- fmt.Errorf("Error uploading block %q for source file %q: %s", block.id, ctx.source, err) + ctx.wg.Done() + continue + } + + ctx.wg.Done() + } +} + func resourceArmStorageBlobRead(d *schema.ResourceData, meta interface{}) error { armClient := meta.(*ArmClient) diff --git a/builtin/providers/azurerm/resource_arm_storage_blob_test.go b/builtin/providers/azurerm/resource_arm_storage_blob_test.go index d4fc5dc74..ecf768fe7 100644 --- a/builtin/providers/azurerm/resource_arm_storage_blob_test.go +++ b/builtin/providers/azurerm/resource_arm_storage_blob_test.go @@ -1,11 +1,15 @@ package azurerm import ( + "crypto/rand" "fmt" + "io" + "io/ioutil" "testing" "strings" + "github.com/Azure/azure-sdk-for-go/storage" "github.com/hashicorp/terraform/helper/acctest" "github.com/hashicorp/terraform/helper/resource" "github.com/hashicorp/terraform/terraform" @@ -83,6 +87,62 @@ func TestResourceAzureRMStorageBlobSize_validation(t *testing.T) { } } +func TestResourceAzureRMStorageBlobParallelism_validation(t *testing.T) { + cases := []struct { + Value int + ErrCount int + }{ + { + Value: 1, + ErrCount: 0, + }, + { + Value: 0, + ErrCount: 1, + }, + { + Value: -1, + ErrCount: 1, + }, + } + + for _, tc := range cases { + _, errors := validateArmStorageBlobParallelism(tc.Value, "azurerm_storage_blob") + + if len(errors) != tc.ErrCount { + t.Fatalf("Expected the Azure RM Storage Blob parallelism to trigger a validation error") + } + } +} + +func TestResourceAzureRMStorageBlobAttempts_validation(t *testing.T) { + cases := []struct { + Value int + ErrCount int + }{ + { + Value: 1, + ErrCount: 0, + }, + { + Value: 0, + ErrCount: 1, + }, + { + Value: -1, + ErrCount: 1, + }, + } + + for _, tc := range cases { + _, errors := validateArmStorageBlobAttempts(tc.Value, "azurerm_storage_blob") + + if len(errors) != tc.ErrCount { + t.Fatalf("Expected the Azure RM Storage Blob attempts to trigger a validation error") + } + } +} + func TestAccAzureRMStorageBlob_basic(t *testing.T) { ri := acctest.RandInt() rs := strings.ToLower(acctest.RandString(11)) @@ -103,6 +163,100 @@ func TestAccAzureRMStorageBlob_basic(t *testing.T) { }) } +func TestAccAzureRMStorageBlobBlock_source(t *testing.T) { + ri := acctest.RandInt() + rs1 := strings.ToLower(acctest.RandString(11)) + sourceBlob, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("Failed to create local source blob file") + } + + _, err = io.CopyN(sourceBlob, rand.Reader, 25*1024*1024) + if err != nil { + t.Fatalf("Failed to write random test to source blob") + } + + err = sourceBlob.Close() + if err != nil { + t.Fatalf("Failed to close source blob") + } + + config := fmt.Sprintf(testAccAzureRMStorageBlobBlock_source, ri, rs1, sourceBlob.Name()) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testCheckAzureRMStorageBlobDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: config, + Check: resource.ComposeTestCheckFunc( + testCheckAzureRMStorageBlobMatchesFile("azurerm_storage_blob.source", storage.BlobTypeBlock, sourceBlob.Name()), + ), + }, + }, + }) +} + +func TestAccAzureRMStorageBlobPage_source(t *testing.T) { + ri := acctest.RandInt() + rs1 := strings.ToLower(acctest.RandString(11)) + sourceBlob, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("Failed to create local source blob file") + } + + err = sourceBlob.Truncate(25*1024*1024 + 512) + if err != nil { + t.Fatalf("Failed to truncate file to 25M") + } + + for i := int64(0); i < 20; i = i + 2 { + randomBytes := make([]byte, 1*1024*1024) + _, err = rand.Read(randomBytes) + if err != nil { + t.Fatalf("Failed to read random bytes") + } + + _, err = sourceBlob.WriteAt(randomBytes, i*1024*1024) + if err != nil { + t.Fatalf("Failed to write random bytes to file") + } + } + + randomBytes := make([]byte, 5*1024*1024) + _, err = rand.Read(randomBytes) + if err != nil { + t.Fatalf("Failed to read random bytes") + } + + _, err = sourceBlob.WriteAt(randomBytes, 20*1024*1024) + if err != nil { + t.Fatalf("Failed to write random bytes to file") + } + + err = sourceBlob.Close() + if err != nil { + t.Fatalf("Failed to close source blob") + } + + config := fmt.Sprintf(testAccAzureRMStorageBlobPage_source, ri, rs1, sourceBlob.Name()) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testCheckAzureRMStorageBlobDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: config, + Check: resource.ComposeTestCheckFunc( + testCheckAzureRMStorageBlobMatchesFile("azurerm_storage_blob.source", storage.BlobTypePage, sourceBlob.Name()), + ), + }, + }, + }) +} + func testCheckAzureRMStorageBlobExists(name string) resource.TestCheckFunc { return func(s *terraform.State) error { @@ -141,6 +295,64 @@ func testCheckAzureRMStorageBlobExists(name string) resource.TestCheckFunc { } } +func testCheckAzureRMStorageBlobMatchesFile(name string, kind storage.BlobType, filePath string) resource.TestCheckFunc { + return func(s *terraform.State) error { + + rs, ok := s.RootModule().Resources[name] + if !ok { + return fmt.Errorf("Not found: %s", name) + } + + name := rs.Primary.Attributes["name"] + storageAccountName := rs.Primary.Attributes["storage_account_name"] + storageContainerName := rs.Primary.Attributes["storage_container_name"] + resourceGroup, hasResourceGroup := rs.Primary.Attributes["resource_group_name"] + if !hasResourceGroup { + return fmt.Errorf("Bad: no resource group found in state for storage blob: %s", name) + } + + armClient := testAccProvider.Meta().(*ArmClient) + blobClient, accountExists, err := armClient.getBlobStorageClientForStorageAccount(resourceGroup, storageAccountName) + if err != nil { + return err + } + if !accountExists { + return fmt.Errorf("Bad: Storage Account %q does not exist", storageAccountName) + } + + properties, err := blobClient.GetBlobProperties(storageContainerName, name) + if err != nil { + return err + } + + if properties.BlobType != kind { + return fmt.Errorf("Bad: blob type %q does not match expected type %q", properties.BlobType, kind) + } + + blob, err := blobClient.GetBlob(storageContainerName, name) + if err != nil { + return err + } + + contents, err := ioutil.ReadAll(blob) + if err != nil { + return err + } + defer blob.Close() + + expectedContents, err := ioutil.ReadFile(filePath) + if err != nil { + return err + } + + if string(contents) != string(expectedContents) { + return fmt.Errorf("Bad: Storage Blob %q (storage container: %q) does not match contents", name, storageContainerName) + } + + return nil + } +} + func testCheckAzureRMStorageBlobDestroy(s *terraform.State) error { for _, rs := range s.RootModule().Resources { if rs.Type != "azurerm_storage_blob" { @@ -212,3 +424,79 @@ resource "azurerm_storage_blob" "test" { size = 5120 } ` + +var testAccAzureRMStorageBlobBlock_source = ` +resource "azurerm_resource_group" "test" { + name = "acctestrg-%d" + location = "westus" +} + +resource "azurerm_storage_account" "source" { + name = "acctestacc%s" + resource_group_name = "${azurerm_resource_group.test.name}" + location = "westus" + account_type = "Standard_LRS" + + tags { + environment = "staging" + } +} + +resource "azurerm_storage_container" "source" { + name = "source" + resource_group_name = "${azurerm_resource_group.test.name}" + storage_account_name = "${azurerm_storage_account.source.name}" + container_access_type = "blob" +} + +resource "azurerm_storage_blob" "source" { + name = "source.vhd" + + resource_group_name = "${azurerm_resource_group.test.name}" + storage_account_name = "${azurerm_storage_account.source.name}" + storage_container_name = "${azurerm_storage_container.source.name}" + + type = "block" + source = "%s" + parallelism = 4 + attempts = 2 +} +` + +var testAccAzureRMStorageBlobPage_source = ` +resource "azurerm_resource_group" "test" { + name = "acctestrg-%d" + location = "westus" +} + +resource "azurerm_storage_account" "source" { + name = "acctestacc%s" + resource_group_name = "${azurerm_resource_group.test.name}" + location = "westus" + account_type = "Standard_LRS" + + tags { + environment = "staging" + } +} + +resource "azurerm_storage_container" "source" { + name = "source" + resource_group_name = "${azurerm_resource_group.test.name}" + storage_account_name = "${azurerm_storage_account.source.name}" + container_access_type = "blob" +} + +resource "azurerm_storage_blob" "source" { + name = "source.vhd" + + resource_group_name = "${azurerm_resource_group.test.name}" + storage_account_name = "${azurerm_storage_account.source.name}" + storage_container_name = "${azurerm_storage_container.source.name}" + + type = "page" + source = "%s" + parallelism = 3 + attempts = 3 +} +`