Merge pull request #7694 from jtopjian/provider-rabbitmq

RabbitMQ Provider
This commit is contained in:
Paul Stack 2016-09-02 08:08:18 +01:00 committed by GitHub
commit 05994cef31
58 changed files with 5692 additions and 0 deletions

View File

@ -0,0 +1,26 @@
#!/bin/bash
set -e
cd
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install -y git make mercurial
sudo apt-get install -y rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
sudo wget -O /usr/local/bin/gimme https://raw.githubusercontent.com/travis-ci/gimme/master/gimme
sudo chmod +x /usr/local/bin/gimme
gimme 1.6 >> .bashrc
mkdir ~/go
eval "$(/usr/local/bin/gimme 1.6)"
echo 'export GOPATH=$HOME/go' >> .bashrc
export GOPATH=$HOME/go
export PATH=$PATH:$HOME/terraform:$HOME/go/bin
echo 'export PATH=$PATH:$HOME/terraform:$HOME/go/bin' >> .bashrc
source .bashrc
go get -u github.com/kardianos/govendor
go get github.com/hashicorp/terraform

View File

@ -0,0 +1,34 @@
package rabbitmq
import (
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
)
func TestAccBinding_importBasic(t *testing.T) {
resourceName := "rabbitmq_binding.test"
var bindingInfo rabbithole.BindingInfo
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccBindingCheckDestroy(bindingInfo),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccBindingConfig_basic,
Check: testAccBindingCheck(
resourceName, &bindingInfo,
),
},
resource.TestStep{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

View File

@ -0,0 +1,34 @@
package rabbitmq
import (
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
)
func TestAccExchange_importBasic(t *testing.T) {
resourceName := "rabbitmq_exchange.test"
var exchangeInfo rabbithole.ExchangeInfo
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccExchangeCheckDestroy(&exchangeInfo),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccExchangeConfig_basic,
Check: testAccExchangeCheck(
resourceName, &exchangeInfo,
),
},
resource.TestStep{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

View File

@ -0,0 +1,34 @@
package rabbitmq
import (
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
)
func TestAccPermissions_importBasic(t *testing.T) {
resourceName := "rabbitmq_permissions.test"
var permissionInfo rabbithole.PermissionInfo
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccPermissionsCheckDestroy(&permissionInfo),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccPermissionsConfig_basic,
Check: testAccPermissionsCheck(
resourceName, &permissionInfo,
),
},
resource.TestStep{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

View File

@ -0,0 +1,34 @@
package rabbitmq
import (
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
)
func TestAccPolicy_importBasic(t *testing.T) {
resourceName := "rabbitmq_policy.test"
var policy rabbithole.Policy
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccPolicyCheckDestroy(&policy),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccPolicyConfig_basic,
Check: testAccPolicyCheck(
resourceName, &policy,
),
},
resource.TestStep{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

View File

@ -0,0 +1,34 @@
package rabbitmq
import (
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
)
func TestAccQueue_importBasic(t *testing.T) {
resourceName := "rabbitmq_queue.test"
var queue rabbithole.QueueInfo
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccQueueCheckDestroy(&queue),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccQueueConfig_basic,
Check: testAccQueueCheck(
resourceName, &queue,
),
},
resource.TestStep{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

View File

@ -0,0 +1,33 @@
package rabbitmq
import (
"testing"
"github.com/hashicorp/terraform/helper/resource"
)
func TestAccUser_importBasic(t *testing.T) {
resourceName := "rabbitmq_user.test"
var user string
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccUserCheckDestroy(user),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccUserConfig_basic,
Check: testAccUserCheck(
resourceName, &user,
),
},
resource.TestStep{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"password"},
},
},
})
}

View File

@ -0,0 +1,32 @@
package rabbitmq
import (
"testing"
"github.com/hashicorp/terraform/helper/resource"
)
func TestAccVhost_importBasic(t *testing.T) {
resourceName := "rabbitmq_vhost.test"
var vhost string
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccVhostCheckDestroy(vhost),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccVhostConfig_basic,
Check: testAccVhostCheck(
resourceName, &vhost,
),
},
resource.TestStep{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

View File

@ -0,0 +1,123 @@
package rabbitmq
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/terraform"
)
func Provider() terraform.ResourceProvider {
return &schema.Provider{
Schema: map[string]*schema.Schema{
"endpoint": &schema.Schema{
Type: schema.TypeString,
Required: true,
DefaultFunc: schema.EnvDefaultFunc("RABBITMQ_ENDPOINT", nil),
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value == "" {
errors = append(errors, fmt.Errorf("Endpoint must not be an empty string"))
}
return
},
},
"username": &schema.Schema{
Type: schema.TypeString,
Required: true,
DefaultFunc: schema.EnvDefaultFunc("RABBITMQ_USERNAME", nil),
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value == "" {
errors = append(errors, fmt.Errorf("Username must not be an empty string"))
}
return
},
},
"password": &schema.Schema{
Type: schema.TypeString,
Required: true,
DefaultFunc: schema.EnvDefaultFunc("RABBITMQ_PASSWORD", nil),
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value == "" {
errors = append(errors, fmt.Errorf("Password must not be an empty string"))
}
return
},
},
"insecure": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("RABBITMQ_INSECURE", nil),
},
"cacert_file": &schema.Schema{
Type: schema.TypeString,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("RABBITMQ_CACERT", ""),
},
},
ResourcesMap: map[string]*schema.Resource{
"rabbitmq_binding": resourceBinding(),
"rabbitmq_exchange": resourceExchange(),
"rabbitmq_permissions": resourcePermissions(),
"rabbitmq_policy": resourcePolicy(),
"rabbitmq_queue": resourceQueue(),
"rabbitmq_user": resourceUser(),
"rabbitmq_vhost": resourceVhost(),
},
ConfigureFunc: providerConfigure,
}
}
func providerConfigure(d *schema.ResourceData) (interface{}, error) {
var username = d.Get("username").(string)
var password = d.Get("password").(string)
var endpoint = d.Get("endpoint").(string)
var insecure = d.Get("insecure").(bool)
var cacertFile = d.Get("cacert_file").(string)
// Configure TLS/SSL:
// Ignore self-signed cert warnings
// Specify a custom CA / intermediary cert
// Specify a certificate and key
tlsConfig := &tls.Config{}
if cacertFile != "" {
caCert, err := ioutil.ReadFile(cacertFile)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
}
if insecure {
tlsConfig.InsecureSkipVerify = true
}
// Connect to RabbitMQ management interface
transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, err := rabbithole.NewTLSClient(endpoint, username, password, transport)
if err != nil {
return nil, err
}
return rmqc, nil
}

View File

@ -0,0 +1,46 @@
package rabbitmq
import (
"os"
"testing"
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/terraform"
)
// To run these acceptance tests, you will need access to a RabbitMQ server
// with the management plugin enabled.
//
// Set the RABBITMQ_ENDPOINT, RABBITMQ_USERNAME, and RABBITMQ_PASSWORD
// environment variables before running the tests.
//
// You can run the tests like this:
// make testacc TEST=./builtin/providers/rabbitmq
var testAccProviders map[string]terraform.ResourceProvider
var testAccProvider *schema.Provider
func init() {
testAccProvider = Provider().(*schema.Provider)
testAccProviders = map[string]terraform.ResourceProvider{
"rabbitmq": testAccProvider,
}
}
func TestProvider(t *testing.T) {
if err := Provider().(*schema.Provider).InternalValidate(); err != nil {
t.Fatalf("err: %s", err)
}
}
func TestProvider_impl(t *testing.T) {
var _ terraform.ResourceProvider = Provider()
}
func testAccPreCheck(t *testing.T) {
for _, name := range []string{"RABBITMQ_ENDPOINT", "RABBITMQ_USERNAME", "RABBITMQ_PASSWORD"} {
if v := os.Getenv(name); v == "" {
t.Fatal("RABBITMQ_ENDPOINT, RABBITMQ_USERNAME and RABBITMQ_PASSWORD must be set for acceptance tests")
}
}
}

View File

@ -0,0 +1,195 @@
package rabbitmq
import (
"fmt"
"log"
"strings"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/schema"
)
func resourceBinding() *schema.Resource {
return &schema.Resource{
Create: CreateBinding,
Read: ReadBinding,
Delete: DeleteBinding,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
Schema: map[string]*schema.Schema{
"source": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"vhost": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"destination": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"destination_type": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"properties_key": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"routing_key": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"arguments": &schema.Schema{
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},
},
}
}
func CreateBinding(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
vhost := d.Get("vhost").(string)
bindingInfo := rabbithole.BindingInfo{
Source: d.Get("source").(string),
Destination: d.Get("destination").(string),
DestinationType: d.Get("destination_type").(string),
RoutingKey: d.Get("routing_key").(string),
PropertiesKey: d.Get("properties_key").(string),
Arguments: d.Get("arguments").(map[string]interface{}),
}
if err := declareBinding(rmqc, vhost, bindingInfo); err != nil {
return err
}
name := fmt.Sprintf("%s/%s/%s/%s/%s", vhost, bindingInfo.Source, bindingInfo.Destination, bindingInfo.DestinationType, bindingInfo.PropertiesKey)
d.SetId(name)
return ReadBinding(d, meta)
}
func ReadBinding(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
bindingId := strings.Split(d.Id(), "/")
if len(bindingId) < 5 {
return fmt.Errorf("Unable to determine binding ID")
}
vhost := bindingId[0]
source := bindingId[1]
destination := bindingId[2]
destinationType := bindingId[3]
propertiesKey := bindingId[4]
bindings, err := rmqc.ListBindingsIn(vhost)
if err != nil {
return err
}
log.Printf("[DEBUG] RabbitMQ: Bindings retrieved: %#v", bindings)
bindingFound := false
for _, binding := range bindings {
if binding.Source == source && binding.Destination == destination && binding.DestinationType == destinationType && binding.PropertiesKey == propertiesKey {
log.Printf("[DEBUG] RabbitMQ: Found Binding: %#v", binding)
bindingFound = true
d.Set("vhost", binding.Vhost)
d.Set("source", binding.Source)
d.Set("destination", binding.Destination)
d.Set("destination_type", binding.DestinationType)
d.Set("routing_key", binding.RoutingKey)
d.Set("properties_key", binding.PropertiesKey)
d.Set("arguments", binding.Arguments)
}
}
// The binding could not be found,
// so consider it deleted and remove from state
if !bindingFound {
d.SetId("")
}
return nil
}
func DeleteBinding(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
bindingId := strings.Split(d.Id(), "/")
if len(bindingId) < 5 {
return fmt.Errorf("Unable to determine binding ID")
}
vhost := bindingId[0]
source := bindingId[1]
destination := bindingId[2]
destinationType := bindingId[3]
propertiesKey := bindingId[4]
bindingInfo := rabbithole.BindingInfo{
Vhost: vhost,
Source: source,
Destination: destination,
DestinationType: destinationType,
PropertiesKey: propertiesKey,
}
log.Printf("[DEBUG] RabbitMQ: Attempting to delete binding for %s/%s/%s/%s/%s",
vhost, source, destination, destinationType, propertiesKey)
resp, err := rmqc.DeleteBinding(vhost, bindingInfo)
if err != nil {
return err
}
log.Printf("[DEBUG] RabbitMQ: Binding delete response: %#v", resp)
if resp.StatusCode == 404 {
// The binding was already deleted
return nil
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error deleting RabbitMQ binding: %s", resp.Status)
}
return nil
}
func declareBinding(rmqc *rabbithole.Client, vhost string, bindingInfo rabbithole.BindingInfo) error {
log.Printf("[DEBUG] RabbitMQ: Attempting to declare binding for %s/%s/%s/%s/%s",
vhost, bindingInfo.Source, bindingInfo.Destination, bindingInfo.DestinationType, bindingInfo.PropertiesKey)
resp, err := rmqc.DeclareBinding(vhost, bindingInfo)
log.Printf("[DEBUG] RabbitMQ: Binding declare response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error declaring RabbitMQ binding: %s", resp.Status)
}
return nil
}

View File

@ -0,0 +1,121 @@
package rabbitmq
import (
"fmt"
"strings"
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccBinding(t *testing.T) {
var bindingInfo rabbithole.BindingInfo
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccBindingCheckDestroy(bindingInfo),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccBindingConfig_basic,
Check: testAccBindingCheck(
"rabbitmq_binding.test", &bindingInfo,
),
},
},
})
}
func testAccBindingCheck(rn string, bindingInfo *rabbithole.BindingInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[rn]
if !ok {
return fmt.Errorf("resource not found: %s", rn)
}
if rs.Primary.ID == "" {
return fmt.Errorf("binding id not set")
}
rmqc := testAccProvider.Meta().(*rabbithole.Client)
bindingParts := strings.Split(rs.Primary.ID, "/")
bindings, err := rmqc.ListBindingsIn(bindingParts[0])
if err != nil {
return fmt.Errorf("Error retrieving exchange: %s", err)
}
for _, binding := range bindings {
if binding.Source == bindingParts[1] && binding.Destination == bindingParts[2] && binding.DestinationType == bindingParts[3] && binding.PropertiesKey == bindingParts[4] {
bindingInfo = &binding
return nil
}
}
return fmt.Errorf("Unable to find binding %s", rn)
}
}
func testAccBindingCheckDestroy(bindingInfo rabbithole.BindingInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
rmqc := testAccProvider.Meta().(*rabbithole.Client)
bindings, err := rmqc.ListBindingsIn(bindingInfo.Vhost)
if err != nil {
return fmt.Errorf("Error retrieving exchange: %s", err)
}
for _, binding := range bindings {
if binding.Source == bindingInfo.Source && binding.Destination == bindingInfo.Destination && binding.DestinationType == bindingInfo.DestinationType && binding.PropertiesKey == bindingInfo.PropertiesKey {
return fmt.Errorf("Binding still exists")
}
}
return nil
}
}
const testAccBindingConfig_basic = `
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_permissions" "guest" {
user = "guest"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
resource "rabbitmq_exchange" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
settings {
type = "fanout"
durable = false
auto_delete = true
}
}
resource "rabbitmq_queue" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
settings {
durable = true
auto_delete = false
}
}
resource "rabbitmq_binding" "test" {
source = "${rabbitmq_exchange.test.name}"
vhost = "${rabbitmq_vhost.test.name}"
destination = "${rabbitmq_queue.test.name}"
destination_type = "queue"
routing_key = "#"
properties_key = "%23"
}`

View File

@ -0,0 +1,189 @@
package rabbitmq
import (
"fmt"
"log"
"strings"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/schema"
)
func resourceExchange() *schema.Resource {
return &schema.Resource{
Create: CreateExchange,
Read: ReadExchange,
Delete: DeleteExchange,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"vhost": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "/",
ForceNew: true,
},
"settings": &schema.Schema{
Type: schema.TypeList,
Required: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"type": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"durable": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"auto_delete": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"arguments": &schema.Schema{
Type: schema.TypeMap,
Optional: true,
},
},
},
},
},
}
}
func CreateExchange(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
name := d.Get("name").(string)
vhost := d.Get("vhost").(string)
settingsList := d.Get("settings").([]interface{})
settingsMap, ok := settingsList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("Unable to parse settings")
}
if err := declareExchange(rmqc, vhost, name, settingsMap); err != nil {
return err
}
id := fmt.Sprintf("%s@%s", name, vhost)
d.SetId(id)
return ReadExchange(d, meta)
}
func ReadExchange(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
exchangeId := strings.Split(d.Id(), "@")
if len(exchangeId) < 2 {
return fmt.Errorf("Unable to determine exchange ID")
}
name := exchangeId[0]
vhost := exchangeId[1]
exchangeSettings, err := rmqc.GetExchange(vhost, name)
if err != nil {
return checkDeleted(d, err)
}
log.Printf("[DEBUG] RabbitMQ: Exchange retrieved %s: %#v", d.Id(), exchangeSettings)
d.Set("name", exchangeSettings.Name)
d.Set("vhost", exchangeSettings.Vhost)
exchange := make([]map[string]interface{}, 1)
e := make(map[string]interface{})
e["type"] = exchangeSettings.Type
e["durable"] = exchangeSettings.Durable
e["auto_delete"] = exchangeSettings.AutoDelete
e["arguments"] = exchangeSettings.Arguments
exchange[0] = e
d.Set("settings", exchange)
return nil
}
func DeleteExchange(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
exchangeId := strings.Split(d.Id(), "@")
if len(exchangeId) < 2 {
return fmt.Errorf("Unable to determine exchange ID")
}
name := exchangeId[0]
vhost := exchangeId[1]
log.Printf("[DEBUG] RabbitMQ: Attempting to delete exchange %s", d.Id())
resp, err := rmqc.DeleteExchange(vhost, name)
log.Printf("[DEBUG] RabbitMQ: Exchange delete response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode == 404 {
// The exchange was automatically deleted
return nil
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error deleting RabbitMQ exchange: %s", resp.Status)
}
return nil
}
func declareExchange(rmqc *rabbithole.Client, vhost string, name string, settingsMap map[string]interface{}) error {
exchangeSettings := rabbithole.ExchangeSettings{}
if v, ok := settingsMap["type"].(string); ok {
exchangeSettings.Type = v
}
if v, ok := settingsMap["durable"].(bool); ok {
exchangeSettings.Durable = v
}
if v, ok := settingsMap["auto_delete"].(bool); ok {
exchangeSettings.AutoDelete = v
}
if v, ok := settingsMap["arguments"].(map[string]interface{}); ok {
exchangeSettings.Arguments = v
}
log.Printf("[DEBUG] RabbitMQ: Attempting to declare exchange %s@%s: %#v", name, vhost, exchangeSettings)
resp, err := rmqc.DeclareExchange(vhost, name, exchangeSettings)
log.Printf("[DEBUG] RabbitMQ: Exchange declare response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error declaring RabbitMQ exchange: %s", resp.Status)
}
return nil
}

View File

@ -0,0 +1,103 @@
package rabbitmq
import (
"fmt"
"strings"
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccExchange(t *testing.T) {
var exchangeInfo rabbithole.ExchangeInfo
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccExchangeCheckDestroy(&exchangeInfo),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccExchangeConfig_basic,
Check: testAccExchangeCheck(
"rabbitmq_exchange.test", &exchangeInfo,
),
},
},
})
}
func testAccExchangeCheck(rn string, exchangeInfo *rabbithole.ExchangeInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[rn]
if !ok {
return fmt.Errorf("resource not found: %s", rn)
}
if rs.Primary.ID == "" {
return fmt.Errorf("exchange id not set")
}
rmqc := testAccProvider.Meta().(*rabbithole.Client)
exchParts := strings.Split(rs.Primary.ID, "@")
exchanges, err := rmqc.ListExchangesIn(exchParts[1])
if err != nil {
return fmt.Errorf("Error retrieving exchange: %s", err)
}
for _, exchange := range exchanges {
if exchange.Name == exchParts[0] && exchange.Vhost == exchParts[1] {
exchangeInfo = &exchange
return nil
}
}
return fmt.Errorf("Unable to find exchange %s", rn)
}
}
func testAccExchangeCheckDestroy(exchangeInfo *rabbithole.ExchangeInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
rmqc := testAccProvider.Meta().(*rabbithole.Client)
exchanges, err := rmqc.ListExchangesIn(exchangeInfo.Vhost)
if err != nil {
return fmt.Errorf("Error retrieving exchange: %s", err)
}
for _, exchange := range exchanges {
if exchange.Name == exchangeInfo.Name && exchange.Vhost == exchangeInfo.Vhost {
return fmt.Errorf("Exchange %s@%s still exist", exchangeInfo.Name, exchangeInfo.Vhost)
}
}
return nil
}
}
const testAccExchangeConfig_basic = `
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_permissions" "guest" {
user = "guest"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
resource "rabbitmq_exchange" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
settings {
type = "fanout"
durable = false
auto_delete = true
}
}`

View File

@ -0,0 +1,205 @@
package rabbitmq
import (
"fmt"
"log"
"strings"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/schema"
)
func resourcePermissions() *schema.Resource {
return &schema.Resource{
Create: CreatePermissions,
Update: UpdatePermissions,
Read: ReadPermissions,
Delete: DeletePermissions,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
Schema: map[string]*schema.Schema{
"user": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"vhost": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "/",
ForceNew: true,
},
"permissions": &schema.Schema{
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"configure": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"write": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"read": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
},
},
},
},
}
}
func CreatePermissions(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
user := d.Get("user").(string)
vhost := d.Get("vhost").(string)
permsList := d.Get("permissions").([]interface{})
permsMap, ok := permsList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("Unable to parse permissions")
}
if err := setPermissionsIn(rmqc, vhost, user, permsMap); err != nil {
return err
}
id := fmt.Sprintf("%s@%s", user, vhost)
d.SetId(id)
return ReadPermissions(d, meta)
}
func ReadPermissions(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
permissionId := strings.Split(d.Id(), "@")
if len(permissionId) < 2 {
return fmt.Errorf("Unable to determine Permission ID")
}
user := permissionId[0]
vhost := permissionId[1]
userPerms, err := rmqc.GetPermissionsIn(vhost, user)
if err != nil {
return checkDeleted(d, err)
}
log.Printf("[DEBUG] RabbitMQ: Permission retrieved for %s: %#v", d.Id(), userPerms)
d.Set("user", userPerms.User)
d.Set("vhost", userPerms.Vhost)
perms := make([]map[string]interface{}, 1)
p := make(map[string]interface{})
p["configure"] = userPerms.Configure
p["write"] = userPerms.Write
p["read"] = userPerms.Read
perms[0] = p
d.Set("permissions", perms)
return nil
}
func UpdatePermissions(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
permissionId := strings.Split(d.Id(), "@")
if len(permissionId) < 2 {
return fmt.Errorf("Unable to determine Permission ID")
}
user := permissionId[0]
vhost := permissionId[1]
if d.HasChange("permissions") {
_, newPerms := d.GetChange("permissions")
newPermsList := newPerms.([]interface{})
permsMap, ok := newPermsList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("Unable to parse permissions")
}
if err := setPermissionsIn(rmqc, vhost, user, permsMap); err != nil {
return err
}
}
return ReadPermissions(d, meta)
}
func DeletePermissions(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
permissionId := strings.Split(d.Id(), "@")
if len(permissionId) < 2 {
return fmt.Errorf("Unable to determine Permission ID")
}
user := permissionId[0]
vhost := permissionId[1]
log.Printf("[DEBUG] RabbitMQ: Attempting to delete permission for %s", d.Id())
resp, err := rmqc.ClearPermissionsIn(vhost, user)
log.Printf("[DEBUG] RabbitMQ: Permission delete response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode == 404 {
// The permissions were already deleted
return nil
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error deleting RabbitMQ permission: %s", resp.Status)
}
return nil
}
func setPermissionsIn(rmqc *rabbithole.Client, vhost string, user string, permsMap map[string]interface{}) error {
perms := rabbithole.Permissions{}
if v, ok := permsMap["configure"].(string); ok {
perms.Configure = v
}
if v, ok := permsMap["write"].(string); ok {
perms.Write = v
}
if v, ok := permsMap["read"].(string); ok {
perms.Read = v
}
log.Printf("[DEBUG] RabbitMQ: Attempting to set permissions for %s@%s: %#v", user, vhost, perms)
resp, err := rmqc.UpdatePermissionsIn(vhost, user, perms)
log.Printf("[DEBUG] RabbitMQ: Permission response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error setting permissions: %s", resp.Status)
}
return nil
}

View File

@ -0,0 +1,124 @@
package rabbitmq
import (
"fmt"
"strings"
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccPermissions(t *testing.T) {
var permissionInfo rabbithole.PermissionInfo
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccPermissionsCheckDestroy(&permissionInfo),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccPermissionsConfig_basic,
Check: testAccPermissionsCheck(
"rabbitmq_permissions.test", &permissionInfo,
),
},
resource.TestStep{
Config: testAccPermissionsConfig_update,
Check: testAccPermissionsCheck(
"rabbitmq_permissions.test", &permissionInfo,
),
},
},
})
}
func testAccPermissionsCheck(rn string, permissionInfo *rabbithole.PermissionInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[rn]
if !ok {
return fmt.Errorf("resource not found: %s", rn)
}
if rs.Primary.ID == "" {
return fmt.Errorf("permission id not set")
}
rmqc := testAccProvider.Meta().(*rabbithole.Client)
perms, err := rmqc.ListPermissions()
if err != nil {
return fmt.Errorf("Error retrieving permissions: %s", err)
}
userParts := strings.Split(rs.Primary.ID, "@")
for _, perm := range perms {
if perm.User == userParts[0] && perm.Vhost == userParts[1] {
permissionInfo = &perm
return nil
}
}
return fmt.Errorf("Unable to find permissions for user %s", rn)
}
}
func testAccPermissionsCheckDestroy(permissionInfo *rabbithole.PermissionInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
rmqc := testAccProvider.Meta().(*rabbithole.Client)
perms, err := rmqc.ListPermissions()
if err != nil {
return fmt.Errorf("Error retrieving permissions: %s", err)
}
for _, perm := range perms {
if perm.User == permissionInfo.User && perm.Vhost == permissionInfo.Vhost {
return fmt.Errorf("Permissions still exist for user %s@%s", permissionInfo.User, permissionInfo.Vhost)
}
}
return nil
}
}
const testAccPermissionsConfig_basic = `
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_user" "test" {
name = "mctest"
password = "foobar"
tags = ["administrator"]
}
resource "rabbitmq_permissions" "test" {
user = "${rabbitmq_user.test.name}"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}`
const testAccPermissionsConfig_update = `
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_user" "test" {
name = "mctest"
password = "foobar"
tags = ["administrator"]
}
resource "rabbitmq_permissions" "test" {
user = "${rabbitmq_user.test.name}"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ""
}
}`

View File

@ -0,0 +1,239 @@
package rabbitmq
import (
"fmt"
"log"
"strings"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/schema"
)
func resourcePolicy() *schema.Resource {
return &schema.Resource{
Create: CreatePolicy,
Update: UpdatePolicy,
Read: ReadPolicy,
Delete: DeletePolicy,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"vhost": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"policy": &schema.Schema{
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"pattern": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"priority": &schema.Schema{
Type: schema.TypeInt,
Required: true,
},
"apply_to": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"definition": &schema.Schema{
Type: schema.TypeMap,
Required: true,
},
},
},
},
},
}
}
func CreatePolicy(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
name := d.Get("name").(string)
vhost := d.Get("vhost").(string)
policyList := d.Get("policy").([]interface{})
policyMap, ok := policyList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("Unable to parse policy")
}
if err := putPolicy(rmqc, vhost, name, policyMap); err != nil {
return err
}
id := fmt.Sprintf("%s@%s", name, vhost)
d.SetId(id)
return ReadPolicy(d, meta)
}
func ReadPolicy(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
policyId := strings.Split(d.Id(), "@")
if len(policyId) < 2 {
return fmt.Errorf("Unable to determine policy ID")
}
user := policyId[0]
vhost := policyId[1]
policy, err := rmqc.GetPolicy(vhost, user)
if err != nil {
return checkDeleted(d, err)
}
log.Printf("[DEBUG] RabbitMQ: Policy retrieved for %s: %#v", d.Id(), policy)
d.Set("name", policy.Name)
d.Set("vhost", policy.Vhost)
setPolicy := make([]map[string]interface{}, 1)
p := make(map[string]interface{})
p["pattern"] = policy.Pattern
p["priority"] = policy.Priority
p["apply_to"] = policy.ApplyTo
policyDefinition := make(map[string]interface{})
for key, value := range policy.Definition {
if v, ok := value.([]interface{}); ok {
var nodes []string
for _, node := range v {
if n, ok := node.(string); ok {
nodes = append(nodes, n)
}
}
value = strings.Join(nodes, ",")
}
policyDefinition[key] = value
}
p["definition"] = policyDefinition
setPolicy[0] = p
d.Set("policy", setPolicy)
return nil
}
func UpdatePolicy(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
policyId := strings.Split(d.Id(), "@")
if len(policyId) < 2 {
return fmt.Errorf("Unable to determine policy ID")
}
user := policyId[0]
vhost := policyId[1]
if d.HasChange("policy") {
_, newPolicy := d.GetChange("policy")
policyList := newPolicy.([]interface{})
policyMap, ok := policyList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("Unable to parse policy")
}
if err := putPolicy(rmqc, user, vhost, policyMap); err != nil {
return err
}
}
return ReadPolicy(d, meta)
}
func DeletePolicy(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
policyId := strings.Split(d.Id(), "@")
if len(policyId) < 2 {
return fmt.Errorf("Unable to determine policy ID")
}
user := policyId[0]
vhost := policyId[1]
log.Printf("[DEBUG] RabbitMQ: Attempting to delete policy for %s", d.Id())
resp, err := rmqc.DeletePolicy(vhost, user)
log.Printf("[DEBUG] RabbitMQ: Policy delete response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode == 404 {
// the policy was automatically deleted
return nil
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error deleting RabbitMQ policy: %s", resp.Status)
}
return nil
}
func putPolicy(rmqc *rabbithole.Client, vhost string, name string, policyMap map[string]interface{}) error {
policy := rabbithole.Policy{}
policy.Vhost = vhost
policy.Name = name
if v, ok := policyMap["pattern"].(string); ok {
policy.Pattern = v
}
if v, ok := policyMap["priority"].(int); ok {
policy.Priority = v
}
if v, ok := policyMap["apply_to"].(string); ok {
policy.ApplyTo = v
}
if v, ok := policyMap["definition"].(map[string]interface{}); ok {
// special case for ha-mode = nodes
if x, ok := v["ha-mode"]; ok && x == "nodes" {
var nodes rabbithole.NodeNames
nodes = strings.Split(v["ha-params"].(string), ",")
v["ha-params"] = nodes
}
policyDefinition := rabbithole.PolicyDefinition{}
policyDefinition = v
policy.Definition = policyDefinition
}
log.Printf("[DEBUG] RabbitMQ: Attempting to declare policy for %s@%s: %#v", name, vhost, policy)
resp, err := rmqc.PutPolicy(vhost, name, policy)
log.Printf("[DEBUG] RabbitMQ: Policy declare response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error declaring RabbitMQ policy: %s", resp.Status)
}
return nil
}

View File

@ -0,0 +1,141 @@
package rabbitmq
import (
"fmt"
"strings"
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccPolicy(t *testing.T) {
var policy rabbithole.Policy
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccPolicyCheckDestroy(&policy),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccPolicyConfig_basic,
Check: testAccPolicyCheck(
"rabbitmq_policy.test", &policy,
),
},
resource.TestStep{
Config: testAccPolicyConfig_update,
Check: testAccPolicyCheck(
"rabbitmq_policy.test", &policy,
),
},
},
})
}
func testAccPolicyCheck(rn string, policy *rabbithole.Policy) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[rn]
if !ok {
return fmt.Errorf("resource not found: %s", rn)
}
if rs.Primary.ID == "" {
return fmt.Errorf("policy id not set")
}
rmqc := testAccProvider.Meta().(*rabbithole.Client)
policyParts := strings.Split(rs.Primary.ID, "@")
policies, err := rmqc.ListPolicies()
if err != nil {
return fmt.Errorf("Error retrieving policies: %s", err)
}
for _, p := range policies {
if p.Name == policyParts[0] && p.Vhost == policyParts[1] {
policy = &p
return nil
}
}
return fmt.Errorf("Unable to find policy %s", rn)
}
}
func testAccPolicyCheckDestroy(policy *rabbithole.Policy) resource.TestCheckFunc {
return func(s *terraform.State) error {
rmqc := testAccProvider.Meta().(*rabbithole.Client)
policies, err := rmqc.ListPolicies()
if err != nil {
return fmt.Errorf("Error retrieving policies: %s", err)
}
for _, p := range policies {
if p.Name == policy.Name && p.Vhost == policy.Vhost {
return fmt.Errorf("Policy %s@%s still exist", policy.Name, policy.Vhost)
}
}
return nil
}
}
const testAccPolicyConfig_basic = `
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_permissions" "guest" {
user = "guest"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
resource "rabbitmq_policy" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
policy {
pattern = ".*"
priority = 0
apply_to = "all"
definition {
ha-mode = "nodes"
ha-params = "a,b,c"
}
}
}`
const testAccPolicyConfig_update = `
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_permissions" "guest" {
user = "guest"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
resource "rabbitmq_policy" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
policy {
pattern = ".*"
priority = 0
apply_to = "all"
definition {
ha-mode = "all"
}
}
}`

View File

@ -0,0 +1,180 @@
package rabbitmq
import (
"fmt"
"log"
"strings"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/schema"
)
func resourceQueue() *schema.Resource {
return &schema.Resource{
Create: CreateQueue,
Read: ReadQueue,
Delete: DeleteQueue,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"vhost": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "/",
ForceNew: true,
},
"settings": &schema.Schema{
Type: schema.TypeList,
Required: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"durable": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"auto_delete": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"arguments": &schema.Schema{
Type: schema.TypeMap,
Optional: true,
},
},
},
},
},
}
}
func CreateQueue(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
name := d.Get("name").(string)
vhost := d.Get("vhost").(string)
settingsList := d.Get("settings").([]interface{})
settingsMap, ok := settingsList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("Unable to parse settings")
}
if err := declareQueue(rmqc, vhost, name, settingsMap); err != nil {
return err
}
id := fmt.Sprintf("%s@%s", name, vhost)
d.SetId(id)
return ReadQueue(d, meta)
}
func ReadQueue(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
queueId := strings.Split(d.Id(), "@")
if len(queueId) < 2 {
return fmt.Errorf("Unable to determine Queue ID")
}
user := queueId[0]
vhost := queueId[1]
queueSettings, err := rmqc.GetQueue(vhost, user)
if err != nil {
return checkDeleted(d, err)
}
log.Printf("[DEBUG] RabbitMQ: Queue retrieved for %s: %#v", d.Id(), queueSettings)
d.Set("name", queueSettings.Name)
d.Set("vhost", queueSettings.Vhost)
queue := make([]map[string]interface{}, 1)
e := make(map[string]interface{})
e["durable"] = queueSettings.Durable
e["auto_delete"] = queueSettings.AutoDelete
e["arguments"] = queueSettings.Arguments
queue[0] = e
d.Set("settings", queue)
return nil
}
func DeleteQueue(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
queueId := strings.Split(d.Id(), "@")
if len(queueId) < 2 {
return fmt.Errorf("Unable to determine Queue ID")
}
user := queueId[0]
vhost := queueId[1]
log.Printf("[DEBUG] RabbitMQ: Attempting to delete queue for %s", d.Id())
resp, err := rmqc.DeleteQueue(vhost, user)
log.Printf("[DEBUG] RabbitMQ: Queue delete response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode == 404 {
// the queue was automatically deleted
return nil
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error deleting RabbitMQ queue: %s", resp.Status)
}
return nil
}
func declareQueue(rmqc *rabbithole.Client, vhost string, name string, settingsMap map[string]interface{}) error {
queueSettings := rabbithole.QueueSettings{}
if v, ok := settingsMap["durable"].(bool); ok {
queueSettings.Durable = v
}
if v, ok := settingsMap["auto_delete"].(bool); ok {
queueSettings.AutoDelete = v
}
if v, ok := settingsMap["arguments"].(map[string]interface{}); ok {
queueSettings.Arguments = v
}
log.Printf("[DEBUG] RabbitMQ: Attempting to declare queue for %s@%s: %#v", name, vhost, queueSettings)
resp, err := rmqc.DeclareQueue(vhost, name, queueSettings)
log.Printf("[DEBUG] RabbitMQ: Queue declare response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error declaring RabbitMQ queue: %s", resp.Status)
}
return nil
}

View File

@ -0,0 +1,102 @@
package rabbitmq
import (
"fmt"
"strings"
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccQueue(t *testing.T) {
var queueInfo rabbithole.QueueInfo
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccQueueCheckDestroy(&queueInfo),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccQueueConfig_basic,
Check: testAccQueueCheck(
"rabbitmq_queue.test", &queueInfo,
),
},
},
})
}
func testAccQueueCheck(rn string, queueInfo *rabbithole.QueueInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[rn]
if !ok {
return fmt.Errorf("resource not found: %s", rn)
}
if rs.Primary.ID == "" {
return fmt.Errorf("queue id not set")
}
rmqc := testAccProvider.Meta().(*rabbithole.Client)
queueParts := strings.Split(rs.Primary.ID, "@")
queues, err := rmqc.ListQueuesIn(queueParts[1])
if err != nil {
return fmt.Errorf("Error retrieving queue: %s", err)
}
for _, queue := range queues {
if queue.Name == queueParts[0] && queue.Vhost == queueParts[1] {
queueInfo = &queue
return nil
}
}
return fmt.Errorf("Unable to find queue %s", rn)
}
}
func testAccQueueCheckDestroy(queueInfo *rabbithole.QueueInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
rmqc := testAccProvider.Meta().(*rabbithole.Client)
queues, err := rmqc.ListQueuesIn(queueInfo.Vhost)
if err != nil {
return fmt.Errorf("Error retrieving queue: %s", err)
}
for _, queue := range queues {
if queue.Name == queueInfo.Name && queue.Vhost == queueInfo.Vhost {
return fmt.Errorf("Queue %s@%s still exist", queueInfo.Name, queueInfo.Vhost)
}
}
return nil
}
}
const testAccQueueConfig_basic = `
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_permissions" "guest" {
user = "guest"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
resource "rabbitmq_queue" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
settings {
durable = false
auto_delete = true
}
}`

View File

@ -0,0 +1,174 @@
package rabbitmq
import (
"fmt"
"log"
"strings"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/schema"
)
func resourceUser() *schema.Resource {
return &schema.Resource{
Create: CreateUser,
Update: UpdateUser,
Read: ReadUser,
Delete: DeleteUser,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"password": &schema.Schema{
Type: schema.TypeString,
Required: true,
Sensitive: true,
},
"tags": &schema.Schema{
Type: schema.TypeList,
Required: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
},
}
}
func CreateUser(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
name := d.Get("name").(string)
var tagList []string
for _, v := range d.Get("tags").([]interface{}) {
tagList = append(tagList, v.(string))
}
tags := strings.Join(tagList, ",")
userSettings := rabbithole.UserSettings{
Password: d.Get("password").(string),
Tags: tags,
}
log.Printf("[DEBUG] RabbitMQ: Attempting to create user %s", name)
resp, err := rmqc.PutUser(name, userSettings)
log.Printf("[DEBUG] RabbitMQ: user creation response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error creating RabbitMQ user: %s", resp.Status)
}
d.SetId(name)
return ReadUser(d, meta)
}
func ReadUser(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
user, err := rmqc.GetUser(d.Id())
if err != nil {
return checkDeleted(d, err)
}
log.Printf("[DEBUG] RabbitMQ: User retrieved: %#v", user)
d.Set("name", user.Name)
tags := strings.Split(user.Tags, ",")
d.Set("tags", tags)
return nil
}
func UpdateUser(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
name := d.Id()
if d.HasChange("password") {
_, newPassword := d.GetChange("password")
userSettings := rabbithole.UserSettings{
Password: newPassword.(string),
}
log.Printf("[DEBUG] RabbitMQ: Attempting to update password for %s", name)
resp, err := rmqc.PutUser(name, userSettings)
log.Printf("[DEBUG] RabbitMQ: Password update response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error updating RabbitMQ user: %s", resp.Status)
}
}
if d.HasChange("tags") {
_, newTags := d.GetChange("tags")
var tagList []string
for _, v := range newTags.([]interface{}) {
tagList = append(tagList, v.(string))
}
tags := strings.Join(tagList, ",")
userSettings := rabbithole.UserSettings{
Tags: tags,
}
log.Printf("[DEBUG] RabbitMQ: Attempting to update tags for %s", name)
resp, err := rmqc.PutUser(name, userSettings)
log.Printf("[DEBUG] RabbitMQ: Tags update response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error updating RabbitMQ user: %s", resp.Status)
}
}
return ReadUser(d, meta)
}
func DeleteUser(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
name := d.Id()
log.Printf("[DEBUG] RabbitMQ: Attempting to delete user %s", name)
resp, err := rmqc.DeleteUser(name)
log.Printf("[DEBUG] RabbitMQ: User delete response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode == 404 {
// the user was automatically deleted
return nil
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error deleting RabbitMQ user: %s", resp.Status)
}
return nil
}

View File

@ -0,0 +1,94 @@
package rabbitmq
import (
"fmt"
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccUser(t *testing.T) {
var user string
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccUserCheckDestroy(user),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccUserConfig_basic,
Check: testAccUserCheck(
"rabbitmq_user.test", &user,
),
},
resource.TestStep{
Config: testAccUserConfig_update,
Check: testAccUserCheck(
"rabbitmq_user.test", &user,
),
},
},
})
}
func testAccUserCheck(rn string, name *string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[rn]
if !ok {
return fmt.Errorf("resource not found: %s", rn)
}
if rs.Primary.ID == "" {
return fmt.Errorf("user id not set")
}
rmqc := testAccProvider.Meta().(*rabbithole.Client)
users, err := rmqc.ListUsers()
if err != nil {
return fmt.Errorf("Error retrieving users: %s", err)
}
for _, user := range users {
if user.Name == rs.Primary.ID {
*name = rs.Primary.ID
return nil
}
}
return fmt.Errorf("Unable to find user %s", rn)
}
}
func testAccUserCheckDestroy(name string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rmqc := testAccProvider.Meta().(*rabbithole.Client)
users, err := rmqc.ListUsers()
if err != nil {
return fmt.Errorf("Error retrieving users: %s", err)
}
for _, user := range users {
if user.Name == name {
return fmt.Errorf("user still exists: %s", name)
}
}
return nil
}
}
const testAccUserConfig_basic = `
resource "rabbitmq_user" "test" {
name = "mctest"
password = "foobar"
tags = ["administrator", "management"]
}`
const testAccUserConfig_update = `
resource "rabbitmq_user" "test" {
name = "mctest"
password = "foobarry"
tags = ["management"]
}`

View File

@ -0,0 +1,85 @@
package rabbitmq
import (
"fmt"
"log"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/schema"
)
func resourceVhost() *schema.Resource {
return &schema.Resource{
Create: CreateVhost,
Read: ReadVhost,
Delete: DeleteVhost,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
},
}
}
func CreateVhost(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
vhost := d.Get("name").(string)
log.Printf("[DEBUG] RabbitMQ: Attempting to create vhost %s", vhost)
resp, err := rmqc.PutVhost(vhost, rabbithole.VhostSettings{})
log.Printf("[DEBUG] RabbitMQ: vhost creation response: %#v", resp)
if err != nil {
return err
}
d.SetId(vhost)
return ReadVhost(d, meta)
}
func ReadVhost(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
vhost, err := rmqc.GetVhost(d.Id())
if err != nil {
return checkDeleted(d, err)
}
log.Printf("[DEBUG] RabbitMQ: Vhost retrieved: %#v", vhost)
d.Set("name", vhost.Name)
return nil
}
func DeleteVhost(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)
log.Printf("[DEBUG] RabbitMQ: Attempting to delete vhost %s", d.Id())
resp, err := rmqc.DeleteVhost(d.Id())
log.Printf("[DEBUG] RabbitMQ: vhost deletion response: %#v", resp)
if err != nil {
return err
}
if resp.StatusCode == 404 {
// the vhost was automatically deleted
return nil
}
if resp.StatusCode >= 400 {
return fmt.Errorf("Error deleting RabbitMQ user: %s", resp.Status)
}
return nil
}

View File

@ -0,0 +1,79 @@
package rabbitmq
import (
"fmt"
"testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccVhost(t *testing.T) {
var vhost string
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccVhostCheckDestroy(vhost),
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccVhostConfig_basic,
Check: testAccVhostCheck(
"rabbitmq_vhost.test", &vhost,
),
},
},
})
}
func testAccVhostCheck(rn string, name *string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[rn]
if !ok {
return fmt.Errorf("resource not found: %s", rn)
}
if rs.Primary.ID == "" {
return fmt.Errorf("vhost id not set")
}
rmqc := testAccProvider.Meta().(*rabbithole.Client)
vhosts, err := rmqc.ListVhosts()
if err != nil {
return fmt.Errorf("Error retrieving vhosts: %s", err)
}
for _, vhost := range vhosts {
if vhost.Name == rs.Primary.ID {
*name = rs.Primary.ID
return nil
}
}
return fmt.Errorf("Unable to find vhost %s", rn)
}
}
func testAccVhostCheckDestroy(name string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rmqc := testAccProvider.Meta().(*rabbithole.Client)
vhosts, err := rmqc.ListVhosts()
if err != nil {
return fmt.Errorf("Error retrieving vhosts: %s", err)
}
for _, vhost := range vhosts {
if vhost.Name == name {
return fmt.Errorf("vhost still exists: %s", vhost)
}
}
return nil
}
}
const testAccVhostConfig_basic = `
resource "rabbitmq_vhost" "test" {
name = "test"
}`

View File

@ -0,0 +1,14 @@
package rabbitmq
import (
"github.com/hashicorp/terraform/helper/schema"
)
func checkDeleted(d *schema.ResourceData, err error) error {
if err.Error() == "not found" {
d.SetId("")
return nil
}
return err
}

View File

@ -38,6 +38,7 @@ import (
packetprovider "github.com/hashicorp/terraform/builtin/providers/packet"
postgresqlprovider "github.com/hashicorp/terraform/builtin/providers/postgresql"
powerdnsprovider "github.com/hashicorp/terraform/builtin/providers/powerdns"
rabbitmqprovider "github.com/hashicorp/terraform/builtin/providers/rabbitmq"
randomprovider "github.com/hashicorp/terraform/builtin/providers/random"
rundeckprovider "github.com/hashicorp/terraform/builtin/providers/rundeck"
scalewayprovider "github.com/hashicorp/terraform/builtin/providers/scaleway"
@ -93,6 +94,7 @@ var InternalProviders = map[string]plugin.ProviderFunc{
"packet": packetprovider.Provider,
"postgresql": postgresqlprovider.Provider,
"powerdns": powerdnsprovider.Provider,
"rabbitmq": rabbitmqprovider.Provider,
"random": randomprovider.Provider,
"rundeck": rundeckprovider.Provider,
"scaleway": scalewayprovider.Provider,

View File

@ -0,0 +1,20 @@
## Contributing
The workflow is pretty standard:
1. Fork it
2. Create your feature branch (`git checkout -b my-new-feature`)
3. Commit your changes (`git commit -am 'Add some feature'`)
4. Push to the branch (`git push -u origin my-new-feature`)
5. Create new Pull Request
## Running Tests
First run `bin/ci/before_build.sh` that will create a vhost and user(s) needed
by the test suite.
The project uses [Ginkgo](http://onsi.github.io/ginkgo/) and [Gomega](https://github.com/onsi/gomega).
To clone dependencies and run tests, use `make`. It is also possible
to use the brilliant [Ginkgo CLI runner](http://onsi.github.io/ginkgo/#the-ginkgo-cli) e.g.
to only run a subset of tests.

View File

@ -0,0 +1,32 @@
## Changes Between 1.0.0 and 1.1.0 (unreleased)
### More Complete Message Stats Information
Message stats now include fields such as `deliver_get` and `redeliver`.
GH issue: [#73](https://github.com/michaelklishin/rabbit-hole/pull/73).
Contributed by Edward Wilde.
## 1.0 (first tagged release, Dec 25th, 2015)
### TLS Support
`rabbithole.NewTLSClient` is a new function which works
much like `NewClient` but additionally accepts a transport.
Contributed by @[GrimTheReaper](https://github.com/GrimTheReaper).
### Federation Support
It is now possible to create federation links
over HTTP API.
Contributed by [Ryan Grenz](https://github.com/grenzr-bskyb).
### Core Operations Support
Most common HTTP API operations (listing and management of
vhosts, users, permissions, queues, exchanges, and bindings)
are supported by the client.

23
vendor/github.com/michaelklishin/rabbit-hole/LICENSE generated vendored Normal file
View File

@ -0,0 +1,23 @@
Copyright (c) 2013, Michael Klishin
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

26
vendor/github.com/michaelklishin/rabbit-hole/Makefile generated vendored Normal file
View File

@ -0,0 +1,26 @@
export GOPATH := $(CURDIR)
COVER_FILE := coverage
all: test
.PHONY: test
test: install-dependencies
go test -v
cover: install-dependencies install-cover
go test -v -test.coverprofile="$(COVER_FILE).prof"
sed -i.bak 's|_'$(GOPATH)'|.|g' $(COVER_FILE).prof
go tool cover -html=$(COVER_FILE).prof -o $(COVER_FILE).html
rm $(COVER_FILE).prof*
install-cover:
go get code.google.com/p/go.tools/cmd/cover
install-dependencies:
go get github.com/onsi/ginkgo
go get github.com/onsi/gomega
go get github.com/streadway/amqp
# to get Ginkgo CLI
go install github.com/onsi/ginkgo/ginkgo

296
vendor/github.com/michaelklishin/rabbit-hole/README.md generated vendored Normal file
View File

@ -0,0 +1,296 @@
# Rabbit Hole, a RabbitMQ HTTP API Client for Go
This library is a [RabbitMQ HTTP API](https://raw.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_0/priv/www/api/index.html) client for the Go language.
## Supported Go Versions
Rabbit Hole requires Go 1.3+.
## Supported RabbitMQ Versions
* RabbitMQ 3.x
All versions require [RabbitMQ Management UI plugin](http://www.rabbitmq.com/management.html) to be installed and enabled.
## Project Maturity
Rabbit Hole is a fairly mature library (started in October 2013)
designed after a couple of other RabbitMQ HTTP API clients with stable
APIs. Breaking API changes are not out of the question but not without
a reasonable version bump.
It is largely (80-90%) feature complete and decently documented.
## Installation
```
go get github.com/michaelklishin/rabbit-hole
```
## Documentation
### Overview
To import the package:
``` go
import (
"github.com/michaelklishin/rabbit-hole"
)
```
All HTTP API operations are accessible via `rabbithole.Client`, which
should be instantiated with `rabbithole.NewClient`:
``` go
// URI, username, password
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
```
SSL/TSL is now available, by adding a Transport Layer to the parameters
of `rabbithole.NewTLSClient`:
``` go
transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, _ := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
```
However, RabbitMQ-Management does not have SSL/TLS enabled by default,
so you must enable it.
[API reference](http://godoc.org/github.com/michaelklishin/rabbit-hole) is available on [godoc.org](http://godoc.org).
### Getting Overview
``` go
res, err := rmqc.Overview()
```
### Node and Cluster Status
``` go
xs, err := rmqc.ListNodes()
// => []NodeInfo, err
node, err := rmqc.GetNode("rabbit@mercurio")
// => NodeInfo, err
```
### Operations on Connections
``` go
xs, err := rmqc.ListConnections()
// => []ConnectionInfo, err
conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => ConnectionInfo, err
// Forcefully close connection
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => *http.Response, err
```
### Operations on Channels
``` go
xs, err := rmqc.ListChannels()
// => []ChannelInfo, err
ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
// => ChannelInfo, err
```
### Operations on Vhosts
``` go
xs, err := rmqc.ListVhosts()
// => []VhostInfo, err
// information about individual vhost
x, err := rmqc.GetVhost("/")
// => VhostInfo, err
// creates or updates individual vhost
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
// => *http.Response, err
// deletes individual vhost
resp, err := rmqc.DeleteVhost("/")
// => *http.Response, err
```
### Managing Users
``` go
xs, err := rmqc.ListUsers()
// => []UserInfo, err
// information about individual user
x, err := rmqc.GetUser("my.user")
// => UserInfo, err
// creates or updates individual user
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management,policymaker"})
// => *http.Response, err
// deletes individual user
resp, err := rmqc.DeleteUser("my.user")
// => *http.Response, err
```
### Managing Permissions
``` go
xs, err := rmqc.ListPermissions()
// => []PermissionInfo, err
// permissions of individual user
x, err := rmqc.ListPermissionsOf("my.user")
// => []PermissionInfo, err
// permissions of individual user in vhost
x, err := rmqc.GetPermissionsIn("/", "my.user")
// => PermissionInfo, err
// updates permissions of user in vhost
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
// => *http.Response, err
// revokes permissions in vhost
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
// => *http.Response, err
```
### Operations on Exchanges
``` go
xs, err := rmqc.ListExchanges()
// => []ExchangeInfo, err
// list exchanges in a vhost
xs, err := rmqc.ListExchangesIn("/")
// => []ExchangeInfo, err
// information about individual exchange
x, err := rmqc.GetExchange("/", "amq.fanout")
// => ExchangeInfo, err
// declares an exchange
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
// => *http.Response, err
// deletes individual exchange
resp, err := rmqc.DeleteExchange("/", "an.exchange")
// => *http.Response, err
```
### Operations on Queues
``` go
qs, err := rmqc.ListQueues()
// => []QueueInfo, err
// list queues in a vhost
qs, err := rmqc.ListQueuesIn("/")
// => []QueueInfo, err
// information about individual queue
q, err := rmqc.GetQueue("/", "a.queue")
// => QueueInfo, err
// declares a queue
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
// => *http.Response, err
// deletes individual queue
resp, err := rmqc.DeleteQueue("/", "a.queue")
// => *http.Response, err
// purges all messages in queue
resp, err := rmqc.PurgeQueue("/", "a.queue")
// => *http.Response, err
```
### Operations on Bindings
``` go
bs, err := rmqc.ListBindings()
// => []BindingInfo, err
// list bindings in a vhost
bs, err := rmqc.ListBindingsIn("/")
// => []BindingInfo, err
// list bindings of a queue
bs, err := rmqc.ListQueueBindings("/", "a.queue")
// => []BindingInfo, err
// declare a binding
resp, err := rmqc.DeclareBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
})
// => *http.Response, err
// deletes individual binding
resp, err := rmqc.DeleteBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
PropertiesKey: "%23",
})
// => *http.Response, err
```
### HTTPS Connections
``` go
var tlsConfig *tls.Config
...
transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, err := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
```
### Changing Transport Layer
``` go
var transport *http.Transport
...
rmqc.SetTransport(transport)
```
## CI Status
[![Build Status](https://travis-ci.org/michaelklishin/rabbit-hole.svg?branch=master)](https://travis-ci.org/michaelklishin/rabbit-hole)
## Contributing
See [CONTRIBUTING.md](https://github.com/michaelklishin/rabbit-hole/blob/master/CONTRIBUTING.md)
## License & Copyright
2-clause BSD license.
(c) Michael S. Klishin, 2013-2016.

View File

@ -0,0 +1,157 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/bindings
//
// Example response:
//
// [
// {
// "source": "",
// "vhost": "\/",
// "destination": "amq.gen-Dzw36tPTm_VsmILY9oTG9w",
// "destination_type": "queue",
// "routing_key": "amq.gen-Dzw36tPTm_VsmILY9oTG9w",
// "arguments": {
//
// },
// "properties_key": "amq.gen-Dzw36tPTm_VsmILY9oTG9w"
// }
// ]
type BindingInfo struct {
// Binding source (exchange name)
Source string `json:"source"`
Vhost string `json:"vhost"`
// Binding destination (queue or exchange name)
Destination string `json:"destination"`
// Destination type, either "queue" or "exchange"
DestinationType string `json:"destination_type"`
RoutingKey string `json:"routing_key"`
Arguments map[string]interface{} `json:"arguments"`
PropertiesKey string `json:"properties_key"`
}
// Returns all bindings
func (c *Client) ListBindings() (rec []BindingInfo, err error) {
req, err := newGETRequest(c, "bindings/")
if err != nil {
return []BindingInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []BindingInfo{}, err
}
return rec, nil
}
//
// GET /api/bindings/{vhost}
//
// Returns all bindings in a virtual host.
func (c *Client) ListBindingsIn(vhost string) (rec []BindingInfo, err error) {
req, err := newGETRequest(c, "bindings/"+url.QueryEscape(vhost))
if err != nil {
return []BindingInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []BindingInfo{}, err
}
return rec, nil
}
//
// GET /api/queues/{vhost}/{queue}/bindings
//
// Example response:
// [
// {"source":"",
// "vhost":"/",
// "destination":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
// "destination_type":"queue",
// "routing_key":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
// "arguments":{},
// "properties_key":"amq.gen-H0tnavWatL7g7uU2q5cAPA"},
// {"source":"temp",
// "vhost":"/",
// "destination":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
// "destination_type":"queue",
// "routing_key":"",
// "arguments":{},
// "properties_key":"~"}
// ]
// Returns all bindings of individual queue.
func (c *Client) ListQueueBindings(vhost, queue string) (rec []BindingInfo, err error) {
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue)+"/bindings")
if err != nil {
return []BindingInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []BindingInfo{}, err
}
return rec, nil
}
//
// POST /api/bindings/{vhost}/e/{source}/{destination_type}/{destination}
//
// DeclareBinding updates information about a binding between a source and a target
func (c *Client) DeclareBinding(vhost string, info BindingInfo) (res *http.Response, err error) {
info.Vhost = vhost
if info.Arguments == nil {
info.Arguments = make(map[string]interface{})
}
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "POST", "bindings/"+url.QueryEscape(vhost)+"/e/"+url.QueryEscape(info.Source)+"/"+url.QueryEscape(string(info.DestinationType[0]))+"/"+url.QueryEscape(info.Destination), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/bindings/{vhost}/e/{source}/{destination_type}/{destination}/{props}
//
// DeleteBinding delets an individual binding
func (c *Client) DeleteBinding(vhost string, info BindingInfo) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "bindings/"+url.QueryEscape(vhost)+"/e/"+url.QueryEscape(info.Source)+"/"+url.QueryEscape(string(info.DestinationType[0]))+"/"+url.QueryEscape(info.Destination)+"/"+url.QueryEscape(info.PropertiesKey), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,86 @@
package rabbithole
import "net/url"
// Brief (very incomplete) connection information.
type BriefConnectionDetails struct {
// Connection name
Name string `json:"name"`
// Client port
PeerPort Port `json:"peer_port"`
// Client host
PeerHost string `json:"peer_host"`
}
type ChannelInfo struct {
// Channel number
Number int `json:"number"`
// Channel name
Name string `json:"name"`
// basic.qos (prefetch count) value used
PrefetchCount int `json:"prefetch_count"`
// How many consumers does this channel have
ConsumerCount int `json:"consumer_count"`
// Number of unacknowledged messages on this channel
UnacknowledgedMessageCount int `json:"messages_unacknowledged"`
// Number of messages on this channel unconfirmed to publishers
UnconfirmedMessageCount int `json:"messages_unconfirmed"`
// Number of messages on this channel uncommited to message store
UncommittedMessageCount int `json:"messages_uncommitted"`
// Number of acks on this channel uncommited to message store
UncommittedAckCount int `json:"acks_uncommitted"`
// TODO(mk): custom deserializer to date/time?
IdleSince string `json:"idle_since"`
// True if this channel uses publisher confirms
UsesPublisherConfirms bool `json:"confirm"`
// True if this channel uses transactions
Transactional bool `json:"transactional"`
// True if this channel is blocked via channel.flow
ClientFlowBlocked bool `json:"client_flow_blocked"`
User string `json:"user"`
Vhost string `json:"vhost"`
Node string `json:"node"`
ConnectionDetails BriefConnectionDetails `json:"connection_details"`
}
//
// GET /api/channels
//
// Returns information about all open channels.
func (c *Client) ListChannels() (rec []ChannelInfo, err error) {
req, err := newGETRequest(c, "channels")
if err != nil {
return []ChannelInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ChannelInfo{}, err
}
return rec, nil
}
//
// GET /api/channels/{name}
//
// Returns channel information.
func (c *Client) GetChannel(name string) (rec *ChannelInfo, err error) {
req, err := newGETRequest(c, "channels/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}

132
vendor/github.com/michaelklishin/rabbit-hole/client.go generated vendored Normal file
View File

@ -0,0 +1,132 @@
package rabbithole
import (
"bytes"
"encoding/json"
"errors"
"net/http"
"net/url"
)
type Client struct {
// URI of a RabbitMQ node to use, not including the path, e.g. http://127.0.0.1:15672.
Endpoint string
// Username to use. This RabbitMQ user must have the "management" tag.
Username string
// Password to use.
Password string
host string
transport *http.Transport
}
func NewClient(uri string, username string, password string) (me *Client, err error) {
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
me = &Client{
Endpoint: uri,
host: u.Host,
Username: username,
Password: password,
}
return me, nil
}
//NewTLSClient Creates a Client with a Transport Layer; it is up to the developer to make that layer Secure.
func NewTLSClient(uri string, username string, password string, transport *http.Transport) (me *Client, err error) {
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
me = &Client{
Endpoint: uri,
host: u.Host,
Username: username,
Password: password,
transport: transport,
}
return me, nil
}
//SetTransport changes the Transport Layer that the Client will use.
func (c *Client) SetTransport(transport *http.Transport) {
c.transport = transport
}
func newGETRequest(client *Client, path string) (*http.Request, error) {
s := client.Endpoint + "/api/" + path
req, err := http.NewRequest("GET", s, nil)
req.Close = true
req.SetBasicAuth(client.Username, client.Password)
// set Opaque to preserve percent-encoded path. MK.
req.URL.Opaque = "//" + client.host + "/api/" + path
return req, err
}
func newRequestWithBody(client *Client, method string, path string, body []byte) (*http.Request, error) {
s := client.Endpoint + "/api/" + path
req, err := http.NewRequest(method, s, bytes.NewReader(body))
req.Close = true
req.SetBasicAuth(client.Username, client.Password)
// set Opaque to preserve percent-encoded path. MK.
req.URL.Opaque = "//" + client.host + "/api/" + path
req.Header.Add("Content-Type", "application/json")
return req, err
}
func executeRequest(client *Client, req *http.Request) (res *http.Response, err error) {
var httpc *http.Client
if client.transport != nil {
httpc = &http.Client{Transport: client.transport}
} else {
httpc = &http.Client{}
}
res, err = httpc.Do(req)
if err != nil {
return nil, err
}
return res, nil
}
func executeAndParseRequest(client *Client, req *http.Request, rec interface{}) (err error) {
var httpc *http.Client
if client.transport != nil {
httpc = &http.Client{Transport: client.transport}
} else {
httpc = &http.Client{}
}
res, err := httpc.Do(req)
if err != nil {
return err
}
defer res.Body.Close() // always close body
if isNotFound(res) {
return errors.New("not found")
}
err = json.NewDecoder(res.Body).Decode(&rec)
if err != nil {
return err
}
return nil
}
func isNotFound(res *http.Response) bool {
return res.StatusCode == http.StatusNotFound
}

57
vendor/github.com/michaelklishin/rabbit-hole/common.go generated vendored Normal file
View File

@ -0,0 +1,57 @@
package rabbithole
import "strconv"
// Extra arguments as a map (on queues, bindings, etc)
type Properties map[string]interface{}
// Port used by RabbitMQ or clients
type Port int
func (p *Port) UnmarshalJSON(b []byte) error {
stringValue := string(b)
var parsed int64
var err error
if stringValue[0] == '"' && stringValue[len(stringValue)-1] == '"' {
parsed, err = strconv.ParseInt(stringValue[1:len(stringValue)-1], 10, 32)
} else {
parsed, err = strconv.ParseInt(stringValue, 10, 32)
}
if err == nil {
*p = Port(int(parsed))
}
return err
}
// Rate of change of a numerical value
type RateDetails struct {
Rate float32 `json:"rate"`
}
// RabbitMQ context (Erlang app) running on
// a node
type BrokerContext struct {
Node string `json:"node"`
Description string `json:"description"`
Path string `json:"path"`
Port Port `json:"port"`
Ignore bool `json:"ignore_in_use"`
}
// Basic published messages statistics
type MessageStats struct {
Publish int `json:"publish"`
PublishDetails RateDetails `json:"publish_details"`
Deliver int `json:"deliver"`
DeliverDetails RateDetails `json:"deliver_details"`
DeliverNoAck int `json:"deliver_noack"`
DeliverNoAckDetails RateDetails `json:"deliver_noack_details"`
DeliverGet int `json:"deliver_get"`
DeliverGetDetails RateDetails `json:"deliver_get_details"`
Redeliver int `json:"redeliver"`
RedeliverDetails RateDetails `json:"redeliver_details"`
Get int `json:"get"`
GetDetails RateDetails `json:"get_details"`
GetNoAck int `json:"get_no_ack"`
GetNoAckDetails RateDetails `json:"get_no_ack_details"`
}

View File

@ -0,0 +1,131 @@
package rabbithole
import (
"net/http"
"net/url"
)
// Provides information about connection to a RabbitMQ node.
type ConnectionInfo struct {
// Connection name
Name string `json:"name"`
// Node the client is connected to
Node string `json:"node"`
// Number of open channels
Channels int `json:"channels"`
// Connection state
State string `json:"state"`
// Connection type, network (via AMQP client) or direct (via direct Erlang client)
Type string `json:"type"`
// Server port
Port Port `json:"port"`
// Client port
PeerPort Port `json:"peer_port"`
// Server host
Host string `json:"host"`
// Client host
PeerHost string `json:"peer_host"`
// Last connection blocking reason, if any
LastBlockedBy string `json:"last_blocked_by"`
// When connection was last blocked
LastBlockedAge string `json:"last_blocked_age"`
// True if connection uses TLS/SSL
UsesTLS bool `json:"ssl"`
// Client certificate subject
PeerCertSubject string `json:"peer_cert_subject"`
// Client certificate validity
PeerCertValidity string `json:"peer_cert_validity"`
// Client certificate issuer
PeerCertIssuer string `json:"peer_cert_issuer"`
// TLS/SSL protocol and version
SSLProtocol string `json:"ssl_protocol"`
// Key exchange mechanism
SSLKeyExchange string `json:"ssl_key_exchange"`
// SSL cipher suite used
SSLCipher string `json:"ssl_cipher"`
// SSL hash
SSLHash string `json:"ssl_hash"`
// Protocol, e.g. AMQP 0-9-1 or MQTT 3-1
Protocol string `json:"protocol"`
User string `json:"user"`
// Virtual host
Vhost string `json:"vhost"`
// Heartbeat timeout
Timeout int `json:"timeout"`
// Maximum frame size (AMQP 0-9-1)
FrameMax int `json:"frame_max"`
// A map of client properties (name, version, capabilities, etc)
ClientProperties Properties `json:"client_properties"`
// Octets received
RecvOct uint64 `json:"recv_oct"`
// Octets sent
SendOct uint64 `json:"send_oct"`
RecvCount uint64 `json:"recv_cnt"`
SendCount uint64 `json:"send_cnt"`
SendPending uint64 `json:"send_pend"`
// Ingress data rate
RecvOctDetails RateDetails `json:"recv_oct_details"`
// Egress data rate
SendOctDetails RateDetails `json:"send_oct_details"`
}
//
// GET /api/connections
//
func (c *Client) ListConnections() (rec []ConnectionInfo, err error) {
req, err := newGETRequest(c, "connections")
if err != nil {
return []ConnectionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ConnectionInfo{}, err
}
return rec, nil
}
//
// GET /api/connections/{name}
//
func (c *Client) GetConnection(name string) (rec *ConnectionInfo, err error) {
req, err := newGETRequest(c, "connections/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// DELETE /api/connections/{name}
//
func (c *Client) CloseConnection(name string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "connections/"+url.QueryEscape(name), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

177
vendor/github.com/michaelklishin/rabbit-hole/doc.go generated vendored Normal file
View File

@ -0,0 +1,177 @@
/*
Rabbit Hole is a Go client for the RabbitMQ HTTP API.
All HTTP API operations are accessible via `rabbithole.Client`, which
should be instantiated with `rabbithole.NewClient`.
// URI, username, password
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
Getting Overview
res, err := rmqc.Overview()
Node and Cluster Status
var err error
// => []NodeInfo, err
xs, err := rmqc.ListNodes()
node, err := rmqc.GetNode("rabbit@mercurio")
// => NodeInfo, err
Operations on Connections
xs, err := rmqc.ListConnections()
// => []ConnectionInfo, err
conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => ConnectionInfo, err
// Forcefully close connection
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => *http.Response, err
Operations on Channels
xs, err := rmqc.ListChannels()
// => []ChannelInfo, err
ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
// => ChannelInfo, err
Operations on Exchanges
xs, err := rmqc.ListExchanges()
// => []ExchangeInfo, err
// list exchanges in a vhost
xs, err := rmqc.ListExchangesIn("/")
// => []ExchangeInfo, err
// information about individual exchange
x, err := rmqc.GetExchange("/", "amq.fanout")
// => ExchangeInfo, err
// declares an exchange
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
// => *http.Response, err
// deletes individual exchange
resp, err := rmqc.DeleteExchange("/", "an.exchange")
// => *http.Response, err
Operations on Queues
xs, err := rmqc.ListQueues()
// => []QueueInfo, err
// list queues in a vhost
xs, err := rmqc.ListQueuesIn("/")
// => []QueueInfo, err
// information about individual queue
x, err := rmqc.GetQueue("/", "a.queue")
// => QueueInfo, err
// declares a queue
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
// => *http.Response, err
// deletes individual queue
resp, err := rmqc.DeleteQueue("/", "a.queue")
// => *http.Response, err
// purges all messages in queue
resp, err := rmqc.PurgeQueue("/", "a.queue")
// => *http.Response, err
Operations on Bindings
bs, err := rmqc.ListBindings()
// => []BindingInfo, err
// list bindings in a vhost
bs, err := rmqc.ListBindingsIn("/")
// => []BindingInfo, err
// list bindings of a queue
bs, err := rmqc.ListQueueBindings("/", "a.queue")
// => []BindingInfo, err
// declare a binding
resp, err := rmqc.DeclareBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
})
// => *http.Response, err
// deletes individual binding
resp, err := rmqc.DeleteBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
PropertiesKey: "%23",
})
// => *http.Response, err
Operations on Vhosts
xs, err := rmqc.ListVhosts()
// => []VhostInfo, err
// information about individual vhost
x, err := rmqc.GetVhost("/")
// => VhostInfo, err
// creates or updates individual vhost
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
// => *http.Response, err
// deletes individual vhost
resp, err := rmqc.DeleteVhost("/")
// => *http.Response, err
Managing Users
xs, err := rmqc.ListUsers()
// => []UserInfo, err
// information about individual user
x, err := rmqc.GetUser("my.user")
// => UserInfo, err
// creates or updates individual user
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management policymaker"})
// => *http.Response, err
// deletes individual user
resp, err := rmqc.DeleteUser("my.user")
// => *http.Response, err
Managing Permissions
xs, err := rmqc.ListPermissions()
// => []PermissionInfo, err
// permissions of individual user
x, err := rmqc.ListPermissionsOf("my.user")
// => []PermissionInfo, err
// permissions of individual user in vhost
x, err := rmqc.GetPermissionsIn("/", "my.user")
// => PermissionInfo, err
// updates permissions of user in vhost
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
// => *http.Response, err
// revokes permissions in vhost
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
// => *http.Response, err
*/
package rabbithole

View File

@ -0,0 +1,219 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/exchanges
//
type IngressEgressStats struct {
PublishIn int `json:"publish_in"`
PublishInDetails RateDetails `json:"publish_in_details"`
PublishOut int `json:"publish_out"`
PublishOutDetails RateDetails `json:"publish_out_details"`
}
type ExchangeInfo struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
Arguments map[string]interface{} `json:"arguments"`
MessageStats IngressEgressStats `json:"message_stats"`
}
type ExchangeSettings struct {
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Arguments map[string]interface{} `json:"arguments"`
}
func (c *Client) ListExchanges() (rec []ExchangeInfo, err error) {
req, err := newGETRequest(c, "exchanges")
if err != nil {
return []ExchangeInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ExchangeInfo{}, err
}
return rec, nil
}
//
// GET /api/exchanges/{vhost}
//
func (c *Client) ListExchangesIn(vhost string) (rec []ExchangeInfo, err error) {
req, err := newGETRequest(c, "exchanges/"+url.QueryEscape(vhost))
if err != nil {
return []ExchangeInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ExchangeInfo{}, err
}
return rec, nil
}
//
// GET /api/exchanges/{vhost}/{name}
//
// Example response:
//
// {
// "incoming": [
// {
// "stats": {
// "publish": 2760,
// "publish_details": {
// "rate": 20
// }
// },
// "channel_details": {
// "name": "127.0.0.1:46928 -> 127.0.0.1:5672 (2)",
// "number": 2,
// "connection_name": "127.0.0.1:46928 -> 127.0.0.1:5672",
// "peer_port": 46928,
// "peer_host": "127.0.0.1"
// }
// }
// ],
// "outgoing": [
// {
// "stats": {
// "publish": 1280,
// "publish_details": {
// "rate": 20
// }
// },
// "queue": {
// "name": "amq.gen-7NhO_yRr4lDdp-8hdnvfuw",
// "vhost": "rabbit\/hole"
// }
// }
// ],
// "message_stats": {
// "publish_in": 2760,
// "publish_in_details": {
// "rate": 20
// },
// "publish_out": 1280,
// "publish_out_details": {
// "rate": 20
// }
// },
// "name": "amq.fanout",
// "vhost": "rabbit\/hole",
// "type": "fanout",
// "durable": true,
// "auto_delete": false,
// "internal": false,
// "arguments": {
// }
// }
type ExchangeIngressDetails struct {
Stats MessageStats `json:"stats"`
ChannelDetails PublishingChannel `json:"channel_details"`
}
type PublishingChannel struct {
Number int `json:"number"`
Name string `json:"name"`
ConnectionName string `json:"connection_name"`
PeerPort Port `json:"peer_port"`
PeerHost string `json:"peer_host"`
}
type NameAndVhost struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
}
type ExchangeEgressDetails struct {
Stats MessageStats `json:"stats"`
Queue NameAndVhost `json:"queue"`
}
type DetailedExchangeInfo struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
Arguments map[string]interface{} `json:"arguments"`
Incoming []ExchangeIngressDetails `json:"incoming"`
Outgoing []ExchangeEgressDetails `json:"outgoing"`
}
func (c *Client) GetExchange(vhost, exchange string) (rec *DetailedExchangeInfo, err error) {
req, err := newGETRequest(c, "exchanges/"+url.QueryEscape(vhost)+"/"+exchange)
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/exchanges/{vhost}/{exchange}
//
func (c *Client) DeclareExchange(vhost, exchange string, info ExchangeSettings) (res *http.Response, err error) {
if info.Arguments == nil {
info.Arguments = make(map[string]interface{})
}
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "exchanges/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(exchange), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/exchanges/{vhost}/{name}
//
func (c *Client) DeleteExchange(vhost, exchange string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "exchanges/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(exchange), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,72 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
// Federation definition: additional arguments
// added to the entities (queues, exchanges or both)
// that match a policy.
type FederationDefinition struct {
Uri string `json:"uri"`
Expires int `json:"expires"`
MessageTTL int32 `json:"message-ttl"`
MaxHops int `json:"max-hops"`
PrefetchCount int `json:"prefetch-count"`
ReconnectDelay int `json:"reconnect-delay"`
AckMode string `json:"ack-mode"`
TrustUserId bool `json:"trust-user-id"`
}
// Represents a configured Federation upstream.
type FederationUpstream struct {
Definition FederationDefinition `json:"value"`
}
//
// PUT /api/parameters/federation-upstream/{vhost}/{upstream}
//
// Updates a federation upstream
func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error) {
fedUp := FederationUpstream{
Definition: fDef,
}
body, err := json.Marshal(fedUp)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "parameters/federation-upstream/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(upstreamName), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/parameters/federation-upstream/{vhost}/{name}
//
// Deletes a federation upstream.
func (c *Client) DeleteFederationUpstream(vhost, upstreamName string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "parameters/federation-upstream/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(upstreamName), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

83
vendor/github.com/michaelklishin/rabbit-hole/misc.go generated vendored Normal file
View File

@ -0,0 +1,83 @@
package rabbithole
//
// GET /api/overview
//
type QueueTotals struct {
Messages int `json:"messages"`
MessagesDetails RateDetails `json:"messages_details"`
MessagesReady int `json:"messages_ready"`
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
}
type ObjectTotals struct {
Consumers int `json:"consumers"`
Queues int `json:"queues"`
Exchanges int `json:"exchanges"`
Connections int `json:"connections"`
Channels int `json:"channels"`
}
type Listener struct {
Node string `json:"node"`
Protocol string `json:"protocol"`
IpAddress string `json:"ip_address"`
Port Port `json:"port"`
}
type Overview struct {
ManagementVersion string `json:"management_version"`
StatisticsLevel string `json:"statistics_level"`
RabbitMQVersion string `json:"rabbitmq_version"`
ErlangVersion string `json:"erlang_version"`
FullErlangVersion string `json:"erlang_full_version"`
ExchangeTypes []ExchangeType `json:"exchange_types"`
MessageStats MessageStats `json:"message_stats"`
QueueTotals QueueTotals `json:"queue_totals"`
ObjectTotals ObjectTotals `json:"object_totals"`
Node string `json:"node"`
StatisticsDBNode string `json:"statistics_db_node"`
Listeners []Listener `json:"listeners"`
Contexts []BrokerContext `json:"contexts"`
}
func (c *Client) Overview() (rec *Overview, err error) {
req, err := newGETRequest(c, "overview")
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/whoami
//
type WhoamiInfo struct {
Name string `json:"name"`
Tags string `json:"tags"`
AuthBackend string `json:"auth_backend"`
}
func (c *Client) Whoami() (rec *WhoamiInfo, err error) {
req, err := newGETRequest(c, "whoami")
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}

301
vendor/github.com/michaelklishin/rabbit-hole/nodes.go generated vendored Normal file
View File

@ -0,0 +1,301 @@
package rabbithole
import (
"net/url"
)
// TODO: this probably should be fixed in RabbitMQ management plugin
type OsPid string
type NameDescriptionEnabled struct {
Name string `json:"name"`
Description string `json:"description"`
Enabled bool `json:"enabled"`
}
type AuthMechanism NameDescriptionEnabled
type ExchangeType NameDescriptionEnabled
type NameDescriptionVersion struct {
Name string `json:"name"`
Description string `json:"description"`
Version string `json:"version"`
}
type ErlangApp NameDescriptionVersion
type NodeInfo struct {
Name string `json:"name"`
NodeType string `json:"type"`
IsRunning bool `json:"running"`
OsPid OsPid `json:"os_pid"`
FdUsed int `json:"fd_used"`
FdTotal int `json:"fd_total"`
SocketsUsed int `json:"sockets_used"`
SocketsTotal int `json:"sockets_total"`
MemUsed int `json:"mem_used"`
MemLimit int `json:"mem_limit"`
MemAlarm bool `json:"mem_alarm"`
DiskFree int `json:"disk_free"`
DiskFreeLimit int `json:"disk_free_limit"`
DiskFreeAlarm bool `json:"disk_free_alarm"`
// Erlang scheduler run queue length
RunQueueLength uint32 `json:"run_queue"`
Processors uint32 `json:"processors"`
Uptime uint64 `json:"uptime"`
ExchangeTypes []ExchangeType `json:"exchange_types"`
AuthMechanisms []AuthMechanism `json:"auth_mechanisms"`
ErlangApps []ErlangApp `json:"applications"`
Contexts []BrokerContext `json:"contexts"`
}
//
// GET /api/nodes
//
func (c *Client) ListNodes() (rec []NodeInfo, err error) {
req, err := newGETRequest(c, "nodes")
if err != nil {
return []NodeInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/nodes/{name}
//
// {
// "partitions": [],
// "os_pid": "39292",
// "fd_used": 35,
// "fd_total": 256,
// "sockets_used": 4,
// "sockets_total": 138,
// "mem_used": 69964432,
// "mem_limit": 2960660889,
// "mem_alarm": false,
// "disk_free_limit": 50000000,
// "disk_free": 188362731520,
// "disk_free_alarm": false,
// "proc_used": 370,
// "proc_total": 1048576,
// "statistics_level": "fine",
// "uptime": 98355255,
// "run_queue": 0,
// "processors": 8,
// "exchange_types": [
// {
// "name": "topic",
// "description": "AMQP topic exchange, as per the AMQP specification",
// "enabled": true
// },
// {
// "name": "x-consistent-hash",
// "description": "Consistent Hashing Exchange",
// "enabled": true
// },
// {
// "name": "fanout",
// "description": "AMQP fanout exchange, as per the AMQP specification",
// "enabled": true
// },
// {
// "name": "direct",
// "description": "AMQP direct exchange, as per the AMQP specification",
// "enabled": true
// },
// {
// "name": "headers",
// "description": "AMQP headers exchange, as per the AMQP specification",
// "enabled": true
// }
// ],
// "auth_mechanisms": [
// {
// "name": "AMQPLAIN",
// "description": "QPid AMQPLAIN mechanism",
// "enabled": true
// },
// {
// "name": "PLAIN",
// "description": "SASL PLAIN authentication mechanism",
// "enabled": true
// },
// {
// "name": "RABBIT-CR-DEMO",
// "description": "RabbitMQ Demo challenge-response authentication mechanism",
// "enabled": false
// }
// ],
// "applications": [
// {
// "name": "amqp_client",
// "description": "RabbitMQ AMQP Client",
// "version": "3.2.0"
// },
// {
// "name": "asn1",
// "description": "The Erlang ASN1 compiler version 2.0.3",
// "version": "2.0.3"
// },
// {
// "name": "cowboy",
// "description": "Small, fast, modular HTTP server.",
// "version": "0.5.0-rmq3.2.0-git4b93c2d"
// },
// {
// "name": "crypto",
// "description": "CRYPTO version 2",
// "version": "3.1"
// },
// {
// "name": "inets",
// "description": "INETS CXC 138 49",
// "version": "5.9.6"
// },
// {
// "name": "kernel",
// "description": "ERTS CXC 138 10",
// "version": "2.16.3"
// },
// {
// "name": "mnesia",
// "description": "MNESIA CXC 138 12",
// "version": "4.10"
// },
// {
// "name": "mochiweb",
// "description": "MochiMedia Web Server",
// "version": "2.7.0-rmq3.2.0-git680dba8"
// },
// {
// "name": "os_mon",
// "description": "CPO CXC 138 46",
// "version": "2.2.13"
// },
// {
// "name": "public_key",
// "description": "Public key infrastructure",
// "version": "0.20"
// },
// {
// "name": "rabbit",
// "description": "RabbitMQ",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_consistent_hash_exchange",
// "description": "Consistent Hash Exchange Type",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_management",
// "description": "RabbitMQ Management Console",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_management_agent",
// "description": "RabbitMQ Management Agent",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_mqtt",
// "description": "RabbitMQ MQTT Adapter",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_shovel",
// "description": "Data Shovel for RabbitMQ",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_shovel_management",
// "description": "Shovel Status",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_stomp",
// "description": "Embedded Rabbit Stomp Adapter",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_web_dispatch",
// "description": "RabbitMQ Web Dispatcher",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_web_stomp",
// "description": "Rabbit WEB-STOMP - WebSockets to Stomp adapter",
// "version": "3.2.0"
// },
// {
// "name": "sasl",
// "description": "SASL CXC 138 11",
// "version": "2.3.3"
// },
// {
// "name": "sockjs",
// "description": "SockJS",
// "version": "0.3.4-rmq3.2.0-git3132eb9"
// },
// {
// "name": "ssl",
// "description": "Erlang\/OTP SSL application",
// "version": "5.3.1"
// },
// {
// "name": "stdlib",
// "description": "ERTS CXC 138 10",
// "version": "1.19.3"
// },
// {
// "name": "webmachine",
// "description": "webmachine",
// "version": "1.10.3-rmq3.2.0-gite9359c7"
// },
// {
// "name": "xmerl",
// "description": "XML parser",
// "version": "1.3.4"
// }
// ],
// "contexts": [
// {
// "description": "Redirect to port 15672",
// "path": "\/",
// "port": 55672,
// "ignore_in_use": true
// },
// {
// "description": "RabbitMQ Management",
// "path": "\/",
// "port": 15672
// }
// ],
// "name": "rabbit@mercurio",
// "type": "disc",
// "running": true
// }
func (c *Client) GetNode(name string) (rec *NodeInfo, err error) {
req, err := newGETRequest(c, "nodes/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}

View File

@ -0,0 +1,126 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/permissions
//
// Example response:
//
// [{"user":"guest","vhost":"/","configure":".*","write":".*","read":".*"}]
type PermissionInfo struct {
User string `json:"user"`
Vhost string `json:"vhost"`
// Configuration permissions
Configure string `json:"configure"`
// Write permissions
Write string `json:"write"`
// Read permissions
Read string `json:"read"`
}
// Returns permissions for all users and virtual hosts.
func (c *Client) ListPermissions() (rec []PermissionInfo, err error) {
req, err := newGETRequest(c, "permissions/")
if err != nil {
return []PermissionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []PermissionInfo{}, err
}
return rec, nil
}
//
// GET /api/users/{user}/permissions
//
// Returns permissions of a specific user.
func (c *Client) ListPermissionsOf(username string) (rec []PermissionInfo, err error) {
req, err := newGETRequest(c, "users/"+url.QueryEscape(username)+"/permissions")
if err != nil {
return []PermissionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []PermissionInfo{}, err
}
return rec, nil
}
//
// GET /api/permissions/{vhost}/{user}
//
// Returns permissions of user in virtual host.
func (c *Client) GetPermissionsIn(vhost, username string) (rec PermissionInfo, err error) {
req, err := newGETRequest(c, "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username))
if err != nil {
return PermissionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return PermissionInfo{}, err
}
return rec, nil
}
//
// PUT /api/permissions/{vhost}/{user}
//
type Permissions struct {
Configure string `json:"configure"`
Write string `json:"write"`
Read string `json:"read"`
}
// Updates permissions of user in virtual host.
func (c *Client) UpdatePermissionsIn(vhost, username string, permissions Permissions) (res *http.Response, err error) {
body, err := json.Marshal(permissions)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/permissions/{vhost}/{user}
//
// Clears (deletes) permissions of user in virtual host.
func (c *Client) ClearPermissionsIn(vhost, username string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,31 @@
package rabbithole
func (c *Client) EnabledProtocols() (xs []string, err error) {
overview, err := c.Overview()
if err != nil {
return []string{}, err
}
// we really need to implement Map/Filter/etc. MK.
xs = make([]string, len(overview.Listeners))
for i, lnr := range overview.Listeners {
xs[i] = lnr.Protocol
}
return xs, nil
}
func (c *Client) ProtocolPorts() (res map[string]Port, err error) {
res = map[string]Port{}
overview, err := c.Overview()
if err != nil {
return res, err
}
for _, lnr := range overview.Listeners {
res[lnr.Protocol] = lnr.Port
}
return res, nil
}

View File

@ -0,0 +1,127 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
// Policy definition: additional arguments
// added to the entities (queues, exchanges or both)
// that match a policy.
type PolicyDefinition map[string]interface{}
type NodeNames []string
// Represents a configured policy.
type Policy struct {
// Virtual host this policy is in.
Vhost string `json:"vhost"`
// Regular expression pattern used to match queues and exchanges,
// , e.g. "^ha\..+"
Pattern string `json:"pattern"`
// What this policy applies to: "queues", "exchanges", etc.
ApplyTo string `json:"apply-to"`
Name string `json:"name"`
Priority int `json:"priority"`
// Additional arguments added to the entities (queues,
// exchanges or both) that match a policy
Definition PolicyDefinition `json:"definition"`
}
//
// GET /api/policies
//
// Return all policies (across all virtual hosts).
func (c *Client) ListPolicies() (rec []Policy, err error) {
req, err := newGETRequest(c, "policies")
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/policies/{vhost}
//
// Returns policies in a specific virtual host.
func (c *Client) ListPoliciesIn(vhost string) (rec []Policy, err error) {
req, err := newGETRequest(c, "policies/"+url.QueryEscape(vhost))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/policies/{vhost}/{name}
//
// Returns individual policy in virtual host.
func (c *Client) GetPolicy(vhost, name string) (rec *Policy, err error) {
req, err := newGETRequest(c, "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/policies/{vhost}/{name}
//
// Updates a policy.
func (c *Client) PutPolicy(vhost string, name string, policy Policy) (res *http.Response, err error) {
body, err := json.Marshal(policy)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/policies/{vhost}/{name}
//
// Deletes a policy.
func (c *Client) DeletePolicy(vhost, name string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

283
vendor/github.com/michaelklishin/rabbit-hole/queues.go generated vendored Normal file
View File

@ -0,0 +1,283 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
// Information about backing queue (queue storage engine).
type BackingQueueStatus struct {
Q1 int `json:"q1"`
Q2 int `json:"q2"`
Q3 int `json:"q3"`
Q4 int `json:"q4"`
// Total queue length
Length int64 `json:"len"`
// Number of pending acks from consumers
PendingAcks int64 `json:"pending_acks"`
// Number of messages held in RAM
RAMMessageCount int64 `json:"ram_msg_count"`
// Number of outstanding acks held in RAM
RAMAckCount int64 `json:"ram_ack_count"`
// Number of persistent messages in the store
PersistentCount int64 `json:"persistent_count"`
// Average ingress (inbound) rate, not including messages
// that straight through to auto-acking consumers.
AverageIngressRate float64 `json:"avg_ingress_rate"`
// Average egress (outbound) rate, not including messages
// that straight through to auto-acking consumers.
AverageEgressRate float64 `json:"avg_egress_rate"`
// rate at which unacknowledged message records enter RAM,
// e.g. because messages are delivered requiring acknowledgement
AverageAckIngressRate float32 `json:"avg_ack_ingress_rate"`
// rate at which unacknowledged message records leave RAM,
// e.g. because acks arrive or unacked messages are paged out
AverageAckEgressRate float32 `json:"avg_ack_egress_rate"`
}
type OwnerPidDetails struct {
Name string `json:"name"`
PeerPort Port `json:"peer_port"`
PeerHost string `json:"peer_host"`
}
type QueueInfo struct {
// Queue name
Name string `json:"name"`
// Virtual host this queue belongs to
Vhost string `json:"vhost"`
// Is this queue durable?
Durable bool `json:"durable"`
// Is this queue auto-delted?
AutoDelete bool `json:"auto_delete"`
// Extra queue arguments
Arguments map[string]interface{} `json:"arguments"`
// RabbitMQ node that hosts master for this queue
Node string `json:"node"`
// Queue status
Status string `json:"status"`
// Total amount of RAM used by this queue
Memory int64 `json:"memory"`
// How many consumers this queue has
Consumers int `json:"consumers"`
// Utilisation of all the consumers
ConsumerUtilisation float64 `json:"consumer_utilisation"`
// If there is an exclusive consumer, its consumer tag
ExclusiveConsumerTag string `json:"exclusive_consumer_tag"`
// Policy applied to this queue, if any
Policy string `json:"policy"`
// Total bytes of messages in this queues
MessagesBytes int64 `json:"message_bytes"`
MessagesBytesPersistent int64 `json:"message_bytes_persistent"`
MessagesBytesRAM int64 `json:"message_bytes_ram"`
// Total number of messages in this queue
Messages int `json:"messages"`
MessagesDetails RateDetails `json:"messages_details"`
MessagesPersistent int `json:"messages_persistent"`
MessagesRAM int `json:"messages_ram"`
// Number of messages ready to be delivered
MessagesReady int `json:"messages_ready"`
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
// Number of messages delivered and pending acknowledgements from consumers
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
MessageStats MessageStats `json:"message_stats"`
OwnerPidDetails OwnerPidDetails `json:"owner_pid_details"`
BackingQueueStatus BackingQueueStatus `json:"backing_queue_status"`
}
type DetailedQueueInfo QueueInfo
//
// GET /api/queues
//
// [
// {
// "owner_pid_details": {
// "name": "127.0.0.1:46928 -> 127.0.0.1:5672",
// "peer_port": 46928,
// "peer_host": "127.0.0.1"
// },
// "message_stats": {
// "publish": 19830,
// "publish_details": {
// "rate": 5
// }
// },
// "messages": 15,
// "messages_details": {
// "rate": 0
// },
// "messages_ready": 15,
// "messages_ready_details": {
// "rate": 0
// },
// "messages_unacknowledged": 0,
// "messages_unacknowledged_details": {
// "rate": 0
// },
// "policy": "",
// "exclusive_consumer_tag": "",
// "consumers": 0,
// "memory": 143112,
// "backing_queue_status": {
// "q1": 0,
// "q2": 0,
// "delta": [
// "delta",
// "undefined",
// 0,
// "undefined"
// ],
// "q3": 0,
// "q4": 15,
// "len": 15,
// "pending_acks": 0,
// "target_ram_count": "infinity",
// "ram_msg_count": 15,
// "ram_ack_count": 0,
// "next_seq_id": 19830,
// "persistent_count": 0,
// "avg_ingress_rate": 4.9920127795527,
// "avg_egress_rate": 4.9920127795527,
// "avg_ack_ingress_rate": 0,
// "avg_ack_egress_rate": 0
// },
// "status": "running",
// "name": "amq.gen-QLEaT5Rn_ogbN3O8ZOQt3Q",
// "vhost": "rabbit\/hole",
// "durable": false,
// "auto_delete": false,
// "arguments": {
// "x-message-ttl": 5000
// },
// "node": "rabbit@marzo"
// }
// ]
func (c *Client) ListQueues() (rec []QueueInfo, err error) {
req, err := newGETRequest(c, "queues")
if err != nil {
return []QueueInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []QueueInfo{}, err
}
return rec, nil
}
//
// GET /api/queues/{vhost}
//
func (c *Client) ListQueuesIn(vhost string) (rec []QueueInfo, err error) {
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost))
if err != nil {
return []QueueInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []QueueInfo{}, err
}
return rec, nil
}
//
// GET /api/queues/{vhost}/{name}
//
func (c *Client) GetQueue(vhost, queue string) (rec *DetailedQueueInfo, err error) {
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost)+"/"+queue)
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/exchanges/{vhost}/{exchange}
//
type QueueSettings struct {
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Arguments map[string]interface{} `json:"arguments"`
}
func (c *Client) DeclareQueue(vhost, queue string, info QueueSettings) (res *http.Response, err error) {
if info.Arguments == nil {
info.Arguments = make(map[string]interface{})
}
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/queues/{vhost}/{name}
//
func (c *Client) DeleteQueue(vhost, queue string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/queues/{vhost}/{name}/contents
//
func (c *Client) PurgeQueue(vhost, queue string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue)+"/contents", nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

109
vendor/github.com/michaelklishin/rabbit-hole/users.go generated vendored Normal file
View File

@ -0,0 +1,109 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
type UserInfo struct {
Name string `json:"name"`
PasswordHash string `json:"password_hash"`
// Tags control permissions. Built-in tags: administrator, management, policymaker.
Tags string `json:"tags"`
}
// Settings used to create users. Tags must be comma-separated.
type UserSettings struct {
Name string `json:"name"`
// Tags control permissions. Administrator grants full
// permissions, management grants management UI and HTTP API
// access, policymaker grants policy management permissions.
Tags string `json:"tags"`
// *never* returned by RabbitMQ. Set by the client
// to create/update a user. MK.
Password string `json:"password"`
}
//
// GET /api/users
//
// Example response:
// [{"name":"guest","password_hash":"8LYTIFbVUwi8HuV/dGlp2BYsD1I=","tags":"administrator"}]
// Returns a list of all users in a cluster.
func (c *Client) ListUsers() (rec []UserInfo, err error) {
req, err := newGETRequest(c, "users/")
if err != nil {
return []UserInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []UserInfo{}, err
}
return rec, nil
}
//
// GET /api/users/{name}
//
// Returns information about individual user.
func (c *Client) GetUser(username string) (rec *UserInfo, err error) {
req, err := newGETRequest(c, "users/"+url.QueryEscape(username))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/users/{name}
//
// Updates information about individual user.
func (c *Client) PutUser(username string, info UserSettings) (res *http.Response, err error) {
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "users/"+url.QueryEscape(username), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/users/{name}
//
// Deletes user.
func (c *Client) DeleteUser(username string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "users/"+url.QueryEscape(username), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

160
vendor/github.com/michaelklishin/rabbit-hole/vhosts.go generated vendored Normal file
View File

@ -0,0 +1,160 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/vhosts
//
// Example response:
// [
// {
// "message_stats": {
// "publish": 78,
// "publish_details": {
// "rate": 0
// }
// },
// "messages": 0,
// "messages_details": {
// "rate": 0
// },
// "messages_ready": 0,
// "messages_ready_details": {
// "rate": 0
// },
// "messages_unacknowledged": 0,
// "messages_unacknowledged_details": {
// "rate": 0
// },
// "recv_oct": 16653,
// "recv_oct_details": {
// "rate": 0
// },
// "send_oct": 40495,
// "send_oct_details": {
// "rate": 0
// },
// "name": "\/",
// "tracing": false
// },
// {
// "name": "29dd51888b834698a8b5bc3e7f8623aa1c9671f5",
// "tracing": false
// }
// ]
type VhostInfo struct {
// Virtual host name
Name string `json:"name"`
// True if tracing is enabled for this virtual host
Tracing bool `json:"tracing"`
// Total number of messages in queues of this virtual host
Messages int `json:"messages"`
MessagesDetails RateDetails `json:"messages_details"`
// Total number of messages ready to be delivered in queues of this virtual host
MessagesReady int `json:"messages_ready"`
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
// Total number of messages pending acknowledgement from consumers in this virtual host
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
// Octets received
RecvOct uint64 `json:"recv_oct"`
// Octets sent
SendOct uint64 `json:"send_oct"`
RecvCount uint64 `json:"recv_cnt"`
SendCount uint64 `json:"send_cnt"`
SendPending uint64 `json:"send_pend"`
RecvOctDetails RateDetails `json:"recv_oct_details"`
SendOctDetails RateDetails `json:"send_oct_details"`
}
// Returns a list of virtual hosts.
func (c *Client) ListVhosts() (rec []VhostInfo, err error) {
req, err := newGETRequest(c, "vhosts")
if err != nil {
return []VhostInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []VhostInfo{}, err
}
return rec, nil
}
//
// GET /api/vhosts/{name}
//
// Returns information about a specific virtual host.
func (c *Client) GetVhost(vhostname string) (rec *VhostInfo, err error) {
req, err := newGETRequest(c, "vhosts/"+url.QueryEscape(vhostname))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/vhosts/{name}
//
// Settings used to create or modify virtual hosts.
type VhostSettings struct {
// True if tracing should be enabled.
Tracing bool `json:"tracing"`
}
// Creates or updates a virtual host.
func (c *Client) PutVhost(vhostname string, settings VhostSettings) (res *http.Response, err error) {
body, err := json.Marshal(settings)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "vhosts/"+url.QueryEscape(vhostname), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/vhosts/{name}
//
// Deletes a virtual host.
func (c *Client) DeleteVhost(vhostname string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "vhosts/"+url.QueryEscape(vhostname), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

6
vendor/vendor.json vendored
View File

@ -1448,6 +1448,12 @@
"path": "github.com/maximilien/softlayer-go/softlayer",
"revision": "85659debe44fab5792fc92cf755c04b115b9dc19"
},
{
"checksumSHA1": "GSum+utW01N3KeMdvAPnsW0TemM=",
"path": "github.com/michaelklishin/rabbit-hole",
"revision": "88550829bcdcf614361c73459c903578eb44074e",
"revisionTime": "2016-07-06T11:10:56Z"
},
{
"checksumSHA1": "7niW29CvYceZ6zbia6b/LT+yD/M=",
"path": "github.com/mitchellh/cli",

View File

@ -0,0 +1,55 @@
---
layout: "rabbitmq"
page_title: "Provider: RabbitMQ"
sidebar_current: "docs-rabbitmq-index"
description: |-
A provider for a RabbitMQ Server.
---
# RabbitMQ Provider
[RabbitMQ](http://www.rabbitmq.com) is an AMQP message broker server. The
RabbitMQ provider exposes resources used to manage the configuration of
resources in a RabbitMQ server.
Use the navigation to the left to read about the available resources.
## Example Usage
The following is a minimal example:
```
# Configure the RabbitMQ provider
provider "rabbitmq" {
endpoint = "http://127.0.0.1"
username = "guest"
password = "guest"
}
# Create a virtual host
resource "rabbitmq_vhost" "vhost_1" {
name = "vhost_1"
}
```
## Requirements
The RabbitMQ management plugin must be enabled to use this provider. You can
enable the plugin by doing something similar to:
```
$ sudo rabbitmq-plugins enable rabbitmq_management
```
## Argument Reference
The following arguments are supported:
* `endpoint` - (Required) The HTTP URL of the management plugin on the
RabbitMQ server. The RabbitMQ management plugin *must* be enabled in order
to use this provder. _Note_: This is not the IP address or hostname of the
RabbitMQ server that you would use to access RabbitMQ directly.
* `username` - (Required) Username to use to authenticate with the server.
* `password` - (Optional) Password for the given user.
* `insecure` - (Optional) Trust self-signed certificates.
* `cacert_file` - (Optional) The path to a custom CA / intermediate certificate.

View File

@ -0,0 +1,89 @@
---
layout: "rabbitmq"
page_title: "RabbitMQ: rabbitmq_binding"
sidebar_current: "docs-rabbitmq-resource-binding"
description: |-
Creates and manages a binding on a RabbitMQ server.
---
# rabbitmq\_binding
The ``rabbitmq_binding`` resource creates and manages a binding relationship
between a queue an exchange.
## Example Usage
```
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_permissions" "guest" {
user = "guest"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
resource "rabbitmq_exchange" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
settings {
type = "fanout"
durable = false
auto_delete = true
}
}
resource "rabbitmq_queue" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
settings {
durable = true
auto_delete = false
}
}
resource "rabbitmq_binding" "test" {
source = "${rabbitmq_exchange.test.name}"
vhost = "${rabbitmq_vhost.test.name}"
destination = "${rabbitmq_queue.test.name}"
destination_type = "queue"
routing_key = "#"
properties_key = "%23"
}
```
## Argument Reference
The following arguments are supported:
* `source` - (Required) The source exchange.
* `vhost` - (Required) The vhost to create the resource in.
* `destination` - (Required) The destination queue or exchange.
* `destination_type` - (Required) The type of destination (queue or exchange).
* `properties_key` - (Required) A unique key to refer to the binding.
* `routing_key` - (Optional) A routing key for the binding.
* `arguments` - (Optional) Additional key/value arguments for the binding.
## Attributes Reference
No further attributes are exported.
## Import
Bindings can be imported using the `id` which is composed of
`vhost/source/destination/destination_type/properties_key`. E.g.
```
terraform import rabbitmq_binding.test test/test/test/queue/%23
```

View File

@ -0,0 +1,75 @@
---
layout: "rabbitmq"
page_title: "RabbitMQ: rabbitmq_exchange"
sidebar_current: "docs-rabbitmq-resource-exchange"
description: |-
Creates and manages an exchange on a RabbitMQ server.
---
# rabbitmq\_exchange
The ``rabbitmq_exchange`` resource creates and manages an exchange.
## Example Usage
```
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_permissions" "guest" {
user = "guest"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
resource "rabbitmq_exchange" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
settings {
type = "fanout"
durable = false
auto_delete = true
}
}
```
## Argument Reference
The following arguments are supported:
* `name` - (Required) The name of the exchange.
* `vhost` - (Required) The vhost to create the resource in.
* `settings` - (Required) The settings of the exchange. The structure is
described below.
The `settings` block supports:
* `type` - (Required) The type of exchange.
* `durable` - (Optional) Whether the exchange survives server restarts.
Defaults to `false`.
* `auto_delete` - (Optional) Whether the exchange will self-delete when all
queues have finished using it.
* `arguments` - (Optional) Additional key/value settings for the exchange.
## Attributes Reference
No further attributes are exported.
## Import
Exchanges can be imported using the `id` which is composed of `name@vhost`.
E.g.
```
terraform import rabbitmq_exchange.test test@vhost
```

View File

@ -0,0 +1,66 @@
---
layout: "rabbitmq"
page_title: "RabbitMQ: rabbitmq_permissions"
sidebar_current: "docs-rabbitmq-resource-permissions"
description: |-
Creates and manages a user's permissions on a RabbitMQ server.
---
# rabbitmq\_permissions
The ``rabbitmq_permissions`` resource creates and manages a user's set of
permissions.
## Example Usage
```
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_user" "test" {
name = "mctest"
password = "foobar"
tags = ["administrator"]
}
resource "rabbitmq_permissions" "test" {
user = "${rabbitmq_user.test.name}"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
```
## Argument Reference
The following arguments are supported:
* `user` - (Required) The user to apply the permissions to.
* `vhost` - (Required) The vhost to create the resource in.
* `permissions` - (Required) The settings of the permissions. The structure is
described below.
The `permissions` block supports:
* `configure` - (Required) The "configure" ACL.
* `write` - (Required) The "write" ACL.
* `read` - (Required) The "read" ACL.
## Attributes Reference
No further attributes are exported.
## Import
Permissions can be imported using the `id` which is composed of `user@vhost`.
E.g.
```
terraform import rabbitmq_permissions.test user@vhost
```

View File

@ -0,0 +1,75 @@
---
layout: "rabbitmq"
page_title: "RabbitMQ: rabbitmq_policy"
sidebar_current: "docs-rabbitmq-resource-policy"
description: |-
Creates and manages a policy on a RabbitMQ server.
---
# rabbitmq\_policy
The ``rabbitmq_policy`` resource creates and manages policies for exchanges
and queues.
## Example Usage
```
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_permissions" "guest" {
user = "guest"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
resource "rabbitmq_policy" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
policy {
pattern = ".*"
priority = 0
apply_to = "all"
definition {
ha-mode = "all"
}
}
}
```
## Argument Reference
The following arguments are supported:
* `name` - (Required) The name of the policy.
* `vhost` - (Required) The vhost to create the resource in.
* `policy` - (Required) The settings of the policy. The structure is
described below.
The `policy` block supports:
* `pattern` - (Required) A pattern to match an exchange or queue name.
* `priority` - (Required) The policy with the greater priority is applied first.
* `apply_to` - (Required) Can either be "exchange", "queues", or "all".
* `definition` - (Required) Key/value pairs of the policy definition. See the
RabbitMQ documentation for definition references and examples.
## Attributes Reference
No further attributes are exported.
## Import
Policies can be imported using the `id` which is composed of `name@vhost`.
E.g.
```
terraform import rabbitmq_policy.test name@vhost
```

View File

@ -0,0 +1,71 @@
---
layout: "rabbitmq"
page_title: "RabbitMQ: rabbitmq_queue"
sidebar_current: "docs-rabbitmq-resource-queue"
description: |-
Creates and manages a queue on a RabbitMQ server.
---
# rabbitmq\_queue
The ``rabbitmq_queue`` resource creates and manages a queue.
## Example Usage
```
resource "rabbitmq_vhost" "test" {
name = "test"
}
resource "rabbitmq_permissions" "guest" {
user = "guest"
vhost = "${rabbitmq_vhost.test.name}"
permissions {
configure = ".*"
write = ".*"
read = ".*"
}
}
resource "rabbitmq_queue" "test" {
name = "test"
vhost = "${rabbitmq_permissions.guest.vhost}"
settings {
durable = false
auto_delete = true
}
}
```
## Argument Reference
The following arguments are supported:
* `name` - (Required) The name of the queue.
* `vhost` - (Required) The vhost to create the resource in.
* `settings` - (Required) The settings of the queue. The structure is
described below.
The `settings` block supports:
* `durable` - (Optional) Whether the queue survives server restarts.
Defaults to `false`.
* `auto_delete` - (Optional) Whether the queue will self-delete when all
consumers have unsubscribed.
* `arguments` - (Optional) Additional key/value settings for the queue.
## Attributes Reference
No further attributes are exported.
## Import
Queues can be imported using the `id` which is composed of `name@vhost`. E.g.
```
terraform import rabbitmq_queue.test name@vhost
```

View File

@ -0,0 +1,45 @@
---
layout: "rabbitmq"
page_title: "RabbitMQ: rabbitmq_user"
sidebar_current: "docs-rabbitmq-resource-user"
description: |-
Creates and manages a user on a RabbitMQ server.
---
# rabbitmq\_user
The ``rabbitmq_user`` resource creates and manages a user.
## Example Usage
```
resource "rabbitmq_user" "test" {
name = "mctest"
password = "foobar"
tags = ["administrator", "management"]
}
```
## Argument Reference
The following arguments are supported:
* `name` - (Required) The name of the user.
* `password` - (Required) The password of the user. The value of this argument
is plain-text so make sure to secure where this is defined.
* `tags` - (Required) Which permission model to apply to the user. Valid
options are: management, policymaker, monitoring, and administrator.
## Attributes Reference
No further attributes are exported.
## Import
Users can be imported using the `name`, e.g.
```
terraform import rabbitmq_user.test mctest
```

View File

@ -0,0 +1,37 @@
---
layout: "rabbitmq"
page_title: "RabbitMQ: rabbitmq_vhost"
sidebar_current: "docs-rabbitmq-resource-vhost"
description: |-
Creates and manages a vhost on a RabbitMQ server.
---
# rabbitmq\_vhost
The ``rabbitmq_vhost`` resource creates and manages a vhost.
## Example Usage
```
resource "rabbitmq_vhost" "my_vhost" {
name = "my_vhost"
}
```
## Argument Reference
The following arguments are supported:
* `name` - (Required) The name of the vhost.
## Attributes Reference
No further attributes are exported.
## Import
Vhosts can be imported using the `name`, e.g.
```
terraform import rabbitmq_vhost.my_vhost my_vhost
```

View File

@ -286,6 +286,10 @@
<a href="/docs/providers/powerdns/index.html">PowerDNS</a>
</li>
<li<%= sidebar_current("docs-providers-rabbitmq") %>>
<a href="/docs/providers/rabbitmq/index.html">RabbitMQ</a>
</li>
<li<%= sidebar_current("docs-providers-random") %>>
<a href="/docs/providers/random/index.html">Random</a>
</li>

View File

@ -0,0 +1,44 @@
<% wrap_layout :inner do %>
<% content_for :sidebar do %>
<div class="docs-sidebar hidden-print affix-top" role="complementary">
<ul class="nav docs-sidenav">
<li<%= sidebar_current("docs-home") %>>
<a href="/docs/providers/index.html">&laquo; Documentation Home</a>
</li>
<li<%= sidebar_current("docs-rabbitmq-index") %>>
<a href="/docs/providers/rabbitmq/index.html">RabbitMQ Provider</a>
</li>
<li<%= sidebar_current(/^docs-rabbitmq-resource/) %>>
<a href="#">Resources</a>
<ul class="nav nav-visible">
<li<%= sidebar_current("docs-rabbitmq-resource-binding") %>>
<a href="/docs/providers/rabbitmq/r/binding.html">rabbitmq_binding</a>
</li>
<li<%= sidebar_current("docs-rabbitmq-resource-exchange") %>>
<a href="/docs/providers/rabbitmq/r/exchange.html">rabbitmq_exchange</a>
</li>
<li<%= sidebar_current("docs-rabbitmq-resource-permissions") %>>
<a href="/docs/providers/rabbitmq/r/permissions.html">rabbitmq_permissions</a>
</li>
<li<%= sidebar_current("docs-rabbitmq-resource-policy") %>>
<a href="/docs/providers/rabbitmq/r/policy.html">rabbitmq_policy</a>
</li>
<li<%= sidebar_current("docs-rabbitmq-resource-queue") %>>
<a href="/docs/providers/rabbitmq/r/queue.html">rabbitmq_queue</a>
</li>
<li<%= sidebar_current("docs-rabbitmq-resource-user") %>>
<a href="/docs/providers/rabbitmq/r/user.html">rabbitmq_user</a>
</li>
<li<%= sidebar_current("docs-rabbitmq-resource-vhost") %>>
<a href="/docs/providers/rabbitmq/r/vhost.html">rabbitmq_vhost</a>
</li>
</ul>
</li>
</ul>
</div>
<% end %>
<%= yield %>
<% end %>