package azurerm import ( "bytes" "crypto/rand" "encoding/base64" "fmt" "io" "log" "os" "runtime" "strings" "sync" "github.com/Azure/azure-sdk-for-go/storage" "github.com/hashicorp/terraform/helper/schema" ) func resourceArmStorageBlob() *schema.Resource { return &schema.Resource{ Create: resourceArmStorageBlobCreate, Read: resourceArmStorageBlobRead, Exists: resourceArmStorageBlobExists, Delete: resourceArmStorageBlobDelete, Schema: map[string]*schema.Schema{ "name": { Type: schema.TypeString, Required: true, ForceNew: true, }, "resource_group_name": { Type: schema.TypeString, Required: true, ForceNew: true, }, "storage_account_name": { Type: schema.TypeString, Required: true, ForceNew: true, }, "storage_container_name": { Type: schema.TypeString, Required: true, ForceNew: true, }, "type": { Type: schema.TypeString, Required: true, ForceNew: true, ValidateFunc: validateArmStorageBlobType, }, "size": { Type: schema.TypeInt, Optional: true, ForceNew: true, Default: 0, ValidateFunc: validateArmStorageBlobSize, }, "source": { Type: schema.TypeString, Optional: true, ForceNew: true, }, "url": { Type: schema.TypeString, Computed: true, }, "parallelism": { Type: schema.TypeInt, Optional: true, Default: 8, ForceNew: true, ValidateFunc: validateArmStorageBlobParallelism, }, "attempts": { Type: schema.TypeInt, Optional: true, Default: 1, ForceNew: true, ValidateFunc: validateArmStorageBlobAttempts, }, }, } } func validateArmStorageBlobParallelism(v interface{}, k string) (ws []string, errors []error) { value := v.(int) if value <= 0 { errors = append(errors, fmt.Errorf("Blob Parallelism %q is invalid, must be greater than 0", value)) } return } func validateArmStorageBlobAttempts(v interface{}, k string) (ws []string, errors []error) { value := v.(int) if value <= 0 { errors = append(errors, fmt.Errorf("Blob Attempts %q is invalid, must be greater than 0", value)) } return } func validateArmStorageBlobSize(v interface{}, k string) (ws []string, errors []error) { value := v.(int) if value%512 != 0 { errors = append(errors, fmt.Errorf("Blob Size %q is invalid, must be a multiple of 512", value)) } return } func validateArmStorageBlobType(v interface{}, k string) (ws []string, errors []error) { value := strings.ToLower(v.(string)) validTypes := map[string]struct{}{ "block": struct{}{}, "page": struct{}{}, } if _, ok := validTypes[value]; !ok { errors = append(errors, fmt.Errorf("Blob type %q is invalid, must be %q or %q", value, "block", "page")) } return } func resourceArmStorageBlobCreate(d *schema.ResourceData, meta interface{}) error { armClient := meta.(*ArmClient) resourceGroupName := d.Get("resource_group_name").(string) storageAccountName := d.Get("storage_account_name").(string) blobClient, accountExists, err := armClient.getBlobStorageClientForStorageAccount(resourceGroupName, storageAccountName) if err != nil { return err } if !accountExists { return fmt.Errorf("Storage Account %q Not Found", storageAccountName) } name := d.Get("name").(string) blobType := d.Get("type").(string) cont := d.Get("storage_container_name").(string) log.Printf("[INFO] Creating blob %q in storage account %q", name, storageAccountName) switch strings.ToLower(blobType) { case "block": if err := blobClient.CreateBlockBlob(cont, name); err != nil { return fmt.Errorf("Error creating storage blob on Azure: %s", err) } source := d.Get("source").(string) if source != "" { parallelism := d.Get("parallelism").(int) attempts := d.Get("attempts").(int) if err := resourceArmStorageBlobBlockUploadFromSource(cont, name, source, blobClient, parallelism, attempts); err != nil { return fmt.Errorf("Error creating storage blob on Azure: %s", err) } } case "page": source := d.Get("source").(string) if source != "" { parallelism := d.Get("parallelism").(int) attempts := d.Get("attempts").(int) if err := resourceArmStorageBlobPageUploadFromSource(cont, name, source, blobClient, parallelism, attempts); err != nil { return fmt.Errorf("Error creating storage blob on Azure: %s", err) } } else { size := int64(d.Get("size").(int)) if err := blobClient.PutPageBlob(cont, name, size, map[string]string{}); err != nil { return fmt.Errorf("Error creating storage blob on Azure: %s", err) } } } d.SetId(name) return resourceArmStorageBlobRead(d, meta) } type resourceArmStorageBlobPage struct { offset int64 section *io.SectionReader } func resourceArmStorageBlobPageUploadFromSource(container, name, source string, client *storage.BlobStorageClient, parallelism, attempts int) error { workerCount := parallelism * runtime.NumCPU() file, err := os.Open(source) if err != nil { return fmt.Errorf("Error opening source file for upload %q: %s", source, err) } defer file.Close() blobSize, pageList, err := resourceArmStorageBlobPageSplit(file) if err != nil { return fmt.Errorf("Error splitting source file %q into pages: %s", source, err) } if err := client.PutPageBlob(container, name, blobSize, map[string]string{}); err != nil { return fmt.Errorf("Error creating storage blob on Azure: %s", err) } pages := make(chan resourceArmStorageBlobPage, len(pageList)) errors := make(chan error, len(pageList)) wg := &sync.WaitGroup{} wg.Add(len(pageList)) total := int64(0) for _, page := range pageList { total += page.section.Size() pages <- page } close(pages) for i := 0; i < workerCount; i++ { go resourceArmStorageBlobPageUploadWorker(resourceArmStorageBlobPageUploadContext{ container: container, name: name, source: source, blobSize: blobSize, client: client, pages: pages, errors: errors, wg: wg, attempts: attempts, }) } wg.Wait() if len(errors) > 0 { return fmt.Errorf("Error while uploading source file %q: %s", source, <-errors) } return nil } func resourceArmStorageBlobPageSplit(file *os.File) (int64, []resourceArmStorageBlobPage, error) { const ( minPageSize int64 = 4 * 1024 maxPageSize int64 = 4 * 1024 * 1024 ) info, err := file.Stat() if err != nil { return int64(0), nil, fmt.Errorf("Could not stat file %q: %s", file.Name(), err) } blobSize := info.Size() if info.Size()%minPageSize != 0 { blobSize = info.Size() + (minPageSize - (info.Size() % minPageSize)) } emptyPage := make([]byte, minPageSize) type byteRange struct { offset int64 length int64 } var nonEmptyRanges []byteRange var currentRange byteRange for i := int64(0); i < blobSize; i += minPageSize { pageBuf := make([]byte, minPageSize) _, err = file.ReadAt(pageBuf, i) if err != nil && err != io.EOF { return int64(0), nil, fmt.Errorf("Could not read chunk at %d: %s", i, err) } if bytes.Equal(pageBuf, emptyPage) { if currentRange.length != 0 { nonEmptyRanges = append(nonEmptyRanges, currentRange) } currentRange = byteRange{ offset: i + minPageSize, } } else { currentRange.length += minPageSize if currentRange.length == maxPageSize || (currentRange.offset+currentRange.length == blobSize) { nonEmptyRanges = append(nonEmptyRanges, currentRange) currentRange = byteRange{ offset: i + minPageSize, } } } } var pages []resourceArmStorageBlobPage for _, nonEmptyRange := range nonEmptyRanges { pages = append(pages, resourceArmStorageBlobPage{ offset: nonEmptyRange.offset, section: io.NewSectionReader(file, nonEmptyRange.offset, nonEmptyRange.length), }) } return info.Size(), pages, nil } type resourceArmStorageBlobPageUploadContext struct { container string name string source string blobSize int64 client *storage.BlobStorageClient pages chan resourceArmStorageBlobPage errors chan error wg *sync.WaitGroup attempts int } func resourceArmStorageBlobPageUploadWorker(ctx resourceArmStorageBlobPageUploadContext) { for page := range ctx.pages { start := page.offset end := page.offset + page.section.Size() - 1 if end > ctx.blobSize-1 { end = ctx.blobSize - 1 } size := end - start + 1 chunk := make([]byte, size) _, err := page.section.Read(chunk) if err != nil && err != io.EOF { ctx.errors <- fmt.Errorf("Error reading source file %q at offset %d: %s", ctx.source, page.offset, err) ctx.wg.Done() continue } for x := 0; x < ctx.attempts; x++ { err = ctx.client.PutPage(ctx.container, ctx.name, start, end, storage.PageWriteTypeUpdate, chunk, map[string]string{}) if err == nil { break } } if err != nil { ctx.errors <- fmt.Errorf("Error writing page at offset %d for file %q: %s", page.offset, ctx.source, err) ctx.wg.Done() continue } ctx.wg.Done() } } type resourceArmStorageBlobBlock struct { section *io.SectionReader id string } func resourceArmStorageBlobBlockUploadFromSource(container, name, source string, client *storage.BlobStorageClient, parallelism, attempts int) error { workerCount := parallelism * runtime.NumCPU() file, err := os.Open(source) if err != nil { return fmt.Errorf("Error opening source file for upload %q: %s", source, err) } defer file.Close() blockList, parts, err := resourceArmStorageBlobBlockSplit(file) if err != nil { return fmt.Errorf("Error reading and splitting source file for upload %q: %s", source, err) } wg := &sync.WaitGroup{} blocks := make(chan resourceArmStorageBlobBlock, len(parts)) errors := make(chan error, len(parts)) wg.Add(len(parts)) for _, p := range parts { blocks <- p } close(blocks) for i := 0; i < workerCount; i++ { go resourceArmStorageBlobBlockUploadWorker(resourceArmStorageBlobBlockUploadContext{ client: client, source: source, container: container, name: name, blocks: blocks, errors: errors, wg: wg, attempts: attempts, }) } wg.Wait() if len(errors) > 0 { return fmt.Errorf("Error while uploading source file %q: %s", source, <-errors) } err = client.PutBlockList(container, name, blockList) if err != nil { return fmt.Errorf("Error updating block list for source file %q: %s", source, err) } return nil } func resourceArmStorageBlobBlockSplit(file *os.File) ([]storage.Block, []resourceArmStorageBlobBlock, error) { const ( idSize = 64 blockSize int64 = 4 * 1024 * 1024 ) var parts []resourceArmStorageBlobBlock var blockList []storage.Block info, err := file.Stat() if err != nil { return nil, nil, fmt.Errorf("Error stating source file %q: %s", file.Name(), err) } for i := int64(0); i < info.Size(); i = i + blockSize { entropy := make([]byte, idSize) _, err = rand.Read(entropy) if err != nil { return nil, nil, fmt.Errorf("Error generating a random block ID for source file %q: %s", file.Name(), err) } sectionSize := blockSize remainder := info.Size() - i if remainder < blockSize { sectionSize = remainder } block := storage.Block{ ID: base64.StdEncoding.EncodeToString(entropy), Status: storage.BlockStatusUncommitted, } blockList = append(blockList, block) parts = append(parts, resourceArmStorageBlobBlock{ id: block.ID, section: io.NewSectionReader(file, i, sectionSize), }) } return blockList, parts, nil } type resourceArmStorageBlobBlockUploadContext struct { client *storage.BlobStorageClient container string name string source string attempts int blocks chan resourceArmStorageBlobBlock errors chan error wg *sync.WaitGroup } func resourceArmStorageBlobBlockUploadWorker(ctx resourceArmStorageBlobBlockUploadContext) { for block := range ctx.blocks { buffer := make([]byte, block.section.Size()) _, err := block.section.Read(buffer) if err != nil { ctx.errors <- fmt.Errorf("Error reading source file %q: %s", ctx.source, err) ctx.wg.Done() continue } for i := 0; i < ctx.attempts; i++ { err = ctx.client.PutBlock(ctx.container, ctx.name, block.id, buffer) if err == nil { break } } if err != nil { ctx.errors <- fmt.Errorf("Error uploading block %q for source file %q: %s", block.id, ctx.source, err) ctx.wg.Done() continue } ctx.wg.Done() } } func resourceArmStorageBlobRead(d *schema.ResourceData, meta interface{}) error { armClient := meta.(*ArmClient) resourceGroupName := d.Get("resource_group_name").(string) storageAccountName := d.Get("storage_account_name").(string) blobClient, accountExists, err := armClient.getBlobStorageClientForStorageAccount(resourceGroupName, storageAccountName) if err != nil { return err } if !accountExists { log.Printf("[DEBUG] Storage account %q not found, removing blob %q from state", storageAccountName, d.Id()) d.SetId("") return nil } exists, err := resourceArmStorageBlobExists(d, meta) if err != nil { return err } if !exists { // Exists already removed this from state return nil } name := d.Get("name").(string) storageContainerName := d.Get("storage_container_name").(string) url := blobClient.GetBlobURL(storageContainerName, name) if url == "" { log.Printf("[INFO] URL for %q is empty", name) } d.Set("url", url) return nil } func resourceArmStorageBlobExists(d *schema.ResourceData, meta interface{}) (bool, error) { armClient := meta.(*ArmClient) resourceGroupName := d.Get("resource_group_name").(string) storageAccountName := d.Get("storage_account_name").(string) blobClient, accountExists, err := armClient.getBlobStorageClientForStorageAccount(resourceGroupName, storageAccountName) if err != nil { return false, err } if !accountExists { log.Printf("[DEBUG] Storage account %q not found, removing blob %q from state", storageAccountName, d.Id()) d.SetId("") return false, nil } name := d.Get("name").(string) storageContainerName := d.Get("storage_container_name").(string) log.Printf("[INFO] Checking for existence of storage blob %q.", name) exists, err := blobClient.BlobExists(storageContainerName, name) if err != nil { return false, fmt.Errorf("error testing existence of storage blob %q: %s", name, err) } if !exists { log.Printf("[INFO] Storage blob %q no longer exists, removing from state...", name) d.SetId("") } return exists, nil } func resourceArmStorageBlobDelete(d *schema.ResourceData, meta interface{}) error { armClient := meta.(*ArmClient) resourceGroupName := d.Get("resource_group_name").(string) storageAccountName := d.Get("storage_account_name").(string) blobClient, accountExists, err := armClient.getBlobStorageClientForStorageAccount(resourceGroupName, storageAccountName) if err != nil { return err } if !accountExists { log.Printf("[INFO]Storage Account %q doesn't exist so the blob won't exist", storageAccountName) return nil } name := d.Get("name").(string) storageContainerName := d.Get("storage_container_name").(string) log.Printf("[INFO] Deleting storage blob %q", name) if _, err = blobClient.DeleteBlobIfExists(storageContainerName, name, map[string]string{}); err != nil { return fmt.Errorf("Error deleting storage blob %q: %s", name, err) } d.SetId("") return nil }