vendor: Adding RabbitHole for RabbitMQ Provider
This commit is contained in:
parent
239863d417
commit
a93cead0f6
|
@ -0,0 +1,20 @@
|
||||||
|
## Contributing
|
||||||
|
|
||||||
|
The workflow is pretty standard:
|
||||||
|
|
||||||
|
1. Fork it
|
||||||
|
2. Create your feature branch (`git checkout -b my-new-feature`)
|
||||||
|
3. Commit your changes (`git commit -am 'Add some feature'`)
|
||||||
|
4. Push to the branch (`git push -u origin my-new-feature`)
|
||||||
|
5. Create new Pull Request
|
||||||
|
|
||||||
|
## Running Tests
|
||||||
|
|
||||||
|
First run `bin/ci/before_build.sh` that will create a vhost and user(s) needed
|
||||||
|
by the test suite.
|
||||||
|
|
||||||
|
The project uses [Ginkgo](http://onsi.github.io/ginkgo/) and [Gomega](https://github.com/onsi/gomega).
|
||||||
|
|
||||||
|
To clone dependencies and run tests, use `make`. It is also possible
|
||||||
|
to use the brilliant [Ginkgo CLI runner](http://onsi.github.io/ginkgo/#the-ginkgo-cli) e.g.
|
||||||
|
to only run a subset of tests.
|
|
@ -0,0 +1,32 @@
|
||||||
|
## Changes Between 1.0.0 and 1.1.0 (unreleased)
|
||||||
|
|
||||||
|
### More Complete Message Stats Information
|
||||||
|
|
||||||
|
Message stats now include fields such as `deliver_get` and `redeliver`.
|
||||||
|
|
||||||
|
GH issue: [#73](https://github.com/michaelklishin/rabbit-hole/pull/73).
|
||||||
|
|
||||||
|
Contributed by Edward Wilde.
|
||||||
|
|
||||||
|
|
||||||
|
## 1.0 (first tagged release, Dec 25th, 2015)
|
||||||
|
|
||||||
|
### TLS Support
|
||||||
|
|
||||||
|
`rabbithole.NewTLSClient` is a new function which works
|
||||||
|
much like `NewClient` but additionally accepts a transport.
|
||||||
|
|
||||||
|
Contributed by @[GrimTheReaper](https://github.com/GrimTheReaper).
|
||||||
|
|
||||||
|
### Federation Support
|
||||||
|
|
||||||
|
It is now possible to create federation links
|
||||||
|
over HTTP API.
|
||||||
|
|
||||||
|
Contributed by [Ryan Grenz](https://github.com/grenzr-bskyb).
|
||||||
|
|
||||||
|
### Core Operations Support
|
||||||
|
|
||||||
|
Most common HTTP API operations (listing and management of
|
||||||
|
vhosts, users, permissions, queues, exchanges, and bindings)
|
||||||
|
are supported by the client.
|
|
@ -0,0 +1,23 @@
|
||||||
|
Copyright (c) 2013, Michael Klishin
|
||||||
|
All rights reserved.
|
||||||
|
|
||||||
|
Redistribution and use in source and binary forms, with or without modification,
|
||||||
|
are permitted provided that the following conditions are met:
|
||||||
|
|
||||||
|
Redistributions of source code must retain the above copyright notice, this
|
||||||
|
list of conditions and the following disclaimer.
|
||||||
|
|
||||||
|
Redistributions in binary form must reproduce the above copyright notice, this
|
||||||
|
list of conditions and the following disclaimer in the documentation and/or
|
||||||
|
other materials provided with the distribution.
|
||||||
|
|
||||||
|
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||||
|
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||||
|
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||||
|
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
|
||||||
|
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||||
|
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||||
|
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
|
||||||
|
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||||
|
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||||
|
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
@ -0,0 +1,26 @@
|
||||||
|
export GOPATH := $(CURDIR)
|
||||||
|
|
||||||
|
COVER_FILE := coverage
|
||||||
|
|
||||||
|
all: test
|
||||||
|
|
||||||
|
.PHONY: test
|
||||||
|
|
||||||
|
test: install-dependencies
|
||||||
|
go test -v
|
||||||
|
|
||||||
|
cover: install-dependencies install-cover
|
||||||
|
go test -v -test.coverprofile="$(COVER_FILE).prof"
|
||||||
|
sed -i.bak 's|_'$(GOPATH)'|.|g' $(COVER_FILE).prof
|
||||||
|
go tool cover -html=$(COVER_FILE).prof -o $(COVER_FILE).html
|
||||||
|
rm $(COVER_FILE).prof*
|
||||||
|
|
||||||
|
install-cover:
|
||||||
|
go get code.google.com/p/go.tools/cmd/cover
|
||||||
|
|
||||||
|
install-dependencies:
|
||||||
|
go get github.com/onsi/ginkgo
|
||||||
|
go get github.com/onsi/gomega
|
||||||
|
go get github.com/streadway/amqp
|
||||||
|
# to get Ginkgo CLI
|
||||||
|
go install github.com/onsi/ginkgo/ginkgo
|
|
@ -0,0 +1,296 @@
|
||||||
|
# Rabbit Hole, a RabbitMQ HTTP API Client for Go
|
||||||
|
|
||||||
|
This library is a [RabbitMQ HTTP API](https://raw.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_0/priv/www/api/index.html) client for the Go language.
|
||||||
|
|
||||||
|
## Supported Go Versions
|
||||||
|
|
||||||
|
Rabbit Hole requires Go 1.3+.
|
||||||
|
|
||||||
|
|
||||||
|
## Supported RabbitMQ Versions
|
||||||
|
|
||||||
|
* RabbitMQ 3.x
|
||||||
|
|
||||||
|
All versions require [RabbitMQ Management UI plugin](http://www.rabbitmq.com/management.html) to be installed and enabled.
|
||||||
|
|
||||||
|
|
||||||
|
## Project Maturity
|
||||||
|
|
||||||
|
Rabbit Hole is a fairly mature library (started in October 2013)
|
||||||
|
designed after a couple of other RabbitMQ HTTP API clients with stable
|
||||||
|
APIs. Breaking API changes are not out of the question but not without
|
||||||
|
a reasonable version bump.
|
||||||
|
|
||||||
|
It is largely (80-90%) feature complete and decently documented.
|
||||||
|
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
|
||||||
|
```
|
||||||
|
go get github.com/michaelklishin/rabbit-hole
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
## Documentation
|
||||||
|
|
||||||
|
### Overview
|
||||||
|
|
||||||
|
To import the package:
|
||||||
|
|
||||||
|
``` go
|
||||||
|
import (
|
||||||
|
"github.com/michaelklishin/rabbit-hole"
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
All HTTP API operations are accessible via `rabbithole.Client`, which
|
||||||
|
should be instantiated with `rabbithole.NewClient`:
|
||||||
|
|
||||||
|
``` go
|
||||||
|
// URI, username, password
|
||||||
|
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
|
||||||
|
```
|
||||||
|
|
||||||
|
SSL/TSL is now available, by adding a Transport Layer to the parameters
|
||||||
|
of `rabbithole.NewTLSClient`:
|
||||||
|
``` go
|
||||||
|
transport := &http.Transport{TLSClientConfig: tlsConfig}
|
||||||
|
rmqc, _ := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
|
||||||
|
```
|
||||||
|
However, RabbitMQ-Management does not have SSL/TLS enabled by default,
|
||||||
|
so you must enable it.
|
||||||
|
|
||||||
|
[API reference](http://godoc.org/github.com/michaelklishin/rabbit-hole) is available on [godoc.org](http://godoc.org).
|
||||||
|
|
||||||
|
|
||||||
|
### Getting Overview
|
||||||
|
|
||||||
|
``` go
|
||||||
|
res, err := rmqc.Overview()
|
||||||
|
```
|
||||||
|
|
||||||
|
### Node and Cluster Status
|
||||||
|
|
||||||
|
``` go
|
||||||
|
xs, err := rmqc.ListNodes()
|
||||||
|
// => []NodeInfo, err
|
||||||
|
|
||||||
|
node, err := rmqc.GetNode("rabbit@mercurio")
|
||||||
|
// => NodeInfo, err
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### Operations on Connections
|
||||||
|
|
||||||
|
``` go
|
||||||
|
xs, err := rmqc.ListConnections()
|
||||||
|
// => []ConnectionInfo, err
|
||||||
|
|
||||||
|
conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
|
||||||
|
// => ConnectionInfo, err
|
||||||
|
|
||||||
|
// Forcefully close connection
|
||||||
|
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
|
||||||
|
// => *http.Response, err
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### Operations on Channels
|
||||||
|
|
||||||
|
``` go
|
||||||
|
xs, err := rmqc.ListChannels()
|
||||||
|
// => []ChannelInfo, err
|
||||||
|
|
||||||
|
ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
|
||||||
|
// => ChannelInfo, err
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### Operations on Vhosts
|
||||||
|
|
||||||
|
``` go
|
||||||
|
xs, err := rmqc.ListVhosts()
|
||||||
|
// => []VhostInfo, err
|
||||||
|
|
||||||
|
// information about individual vhost
|
||||||
|
x, err := rmqc.GetVhost("/")
|
||||||
|
// => VhostInfo, err
|
||||||
|
|
||||||
|
// creates or updates individual vhost
|
||||||
|
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual vhost
|
||||||
|
resp, err := rmqc.DeleteVhost("/")
|
||||||
|
// => *http.Response, err
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### Managing Users
|
||||||
|
|
||||||
|
``` go
|
||||||
|
xs, err := rmqc.ListUsers()
|
||||||
|
// => []UserInfo, err
|
||||||
|
|
||||||
|
// information about individual user
|
||||||
|
x, err := rmqc.GetUser("my.user")
|
||||||
|
// => UserInfo, err
|
||||||
|
|
||||||
|
// creates or updates individual user
|
||||||
|
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management,policymaker"})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual user
|
||||||
|
resp, err := rmqc.DeleteUser("my.user")
|
||||||
|
// => *http.Response, err
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### Managing Permissions
|
||||||
|
|
||||||
|
``` go
|
||||||
|
xs, err := rmqc.ListPermissions()
|
||||||
|
// => []PermissionInfo, err
|
||||||
|
|
||||||
|
// permissions of individual user
|
||||||
|
x, err := rmqc.ListPermissionsOf("my.user")
|
||||||
|
// => []PermissionInfo, err
|
||||||
|
|
||||||
|
// permissions of individual user in vhost
|
||||||
|
x, err := rmqc.GetPermissionsIn("/", "my.user")
|
||||||
|
// => PermissionInfo, err
|
||||||
|
|
||||||
|
// updates permissions of user in vhost
|
||||||
|
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// revokes permissions in vhost
|
||||||
|
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
|
||||||
|
// => *http.Response, err
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### Operations on Exchanges
|
||||||
|
|
||||||
|
``` go
|
||||||
|
xs, err := rmqc.ListExchanges()
|
||||||
|
// => []ExchangeInfo, err
|
||||||
|
|
||||||
|
// list exchanges in a vhost
|
||||||
|
xs, err := rmqc.ListExchangesIn("/")
|
||||||
|
// => []ExchangeInfo, err
|
||||||
|
|
||||||
|
// information about individual exchange
|
||||||
|
x, err := rmqc.GetExchange("/", "amq.fanout")
|
||||||
|
// => ExchangeInfo, err
|
||||||
|
|
||||||
|
// declares an exchange
|
||||||
|
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual exchange
|
||||||
|
resp, err := rmqc.DeleteExchange("/", "an.exchange")
|
||||||
|
// => *http.Response, err
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### Operations on Queues
|
||||||
|
|
||||||
|
``` go
|
||||||
|
qs, err := rmqc.ListQueues()
|
||||||
|
// => []QueueInfo, err
|
||||||
|
|
||||||
|
// list queues in a vhost
|
||||||
|
qs, err := rmqc.ListQueuesIn("/")
|
||||||
|
// => []QueueInfo, err
|
||||||
|
|
||||||
|
// information about individual queue
|
||||||
|
q, err := rmqc.GetQueue("/", "a.queue")
|
||||||
|
// => QueueInfo, err
|
||||||
|
|
||||||
|
// declares a queue
|
||||||
|
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual queue
|
||||||
|
resp, err := rmqc.DeleteQueue("/", "a.queue")
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// purges all messages in queue
|
||||||
|
resp, err := rmqc.PurgeQueue("/", "a.queue")
|
||||||
|
// => *http.Response, err
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### Operations on Bindings
|
||||||
|
|
||||||
|
``` go
|
||||||
|
bs, err := rmqc.ListBindings()
|
||||||
|
// => []BindingInfo, err
|
||||||
|
|
||||||
|
// list bindings in a vhost
|
||||||
|
bs, err := rmqc.ListBindingsIn("/")
|
||||||
|
// => []BindingInfo, err
|
||||||
|
|
||||||
|
// list bindings of a queue
|
||||||
|
bs, err := rmqc.ListQueueBindings("/", "a.queue")
|
||||||
|
// => []BindingInfo, err
|
||||||
|
|
||||||
|
// declare a binding
|
||||||
|
resp, err := rmqc.DeclareBinding("/", BindingInfo{
|
||||||
|
Source: "an.exchange",
|
||||||
|
Destination: "a.queue",
|
||||||
|
DestinationType: "queue",
|
||||||
|
RoutingKey: "#",
|
||||||
|
})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual binding
|
||||||
|
resp, err := rmqc.DeleteBinding("/", BindingInfo{
|
||||||
|
Source: "an.exchange",
|
||||||
|
Destination: "a.queue",
|
||||||
|
DestinationType: "queue",
|
||||||
|
RoutingKey: "#",
|
||||||
|
PropertiesKey: "%23",
|
||||||
|
})
|
||||||
|
// => *http.Response, err
|
||||||
|
```
|
||||||
|
|
||||||
|
### HTTPS Connections
|
||||||
|
|
||||||
|
``` go
|
||||||
|
var tlsConfig *tls.Config
|
||||||
|
|
||||||
|
...
|
||||||
|
|
||||||
|
transport := &http.Transport{TLSClientConfig: tlsConfig}
|
||||||
|
|
||||||
|
rmqc, err := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Changing Transport Layer
|
||||||
|
|
||||||
|
``` go
|
||||||
|
var transport *http.Transport
|
||||||
|
|
||||||
|
...
|
||||||
|
|
||||||
|
rmqc.SetTransport(transport)
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
## CI Status
|
||||||
|
|
||||||
|
[![Build Status](https://travis-ci.org/michaelklishin/rabbit-hole.svg?branch=master)](https://travis-ci.org/michaelklishin/rabbit-hole)
|
||||||
|
|
||||||
|
|
||||||
|
## Contributing
|
||||||
|
|
||||||
|
See [CONTRIBUTING.md](https://github.com/michaelklishin/rabbit-hole/blob/master/CONTRIBUTING.md)
|
||||||
|
|
||||||
|
|
||||||
|
## License & Copyright
|
||||||
|
|
||||||
|
2-clause BSD license.
|
||||||
|
|
||||||
|
(c) Michael S. Klishin, 2013-2016.
|
|
@ -0,0 +1,157 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/bindings
|
||||||
|
//
|
||||||
|
|
||||||
|
// Example response:
|
||||||
|
//
|
||||||
|
// [
|
||||||
|
// {
|
||||||
|
// "source": "",
|
||||||
|
// "vhost": "\/",
|
||||||
|
// "destination": "amq.gen-Dzw36tPTm_VsmILY9oTG9w",
|
||||||
|
// "destination_type": "queue",
|
||||||
|
// "routing_key": "amq.gen-Dzw36tPTm_VsmILY9oTG9w",
|
||||||
|
// "arguments": {
|
||||||
|
//
|
||||||
|
// },
|
||||||
|
// "properties_key": "amq.gen-Dzw36tPTm_VsmILY9oTG9w"
|
||||||
|
// }
|
||||||
|
// ]
|
||||||
|
|
||||||
|
type BindingInfo struct {
|
||||||
|
// Binding source (exchange name)
|
||||||
|
Source string `json:"source"`
|
||||||
|
Vhost string `json:"vhost"`
|
||||||
|
// Binding destination (queue or exchange name)
|
||||||
|
Destination string `json:"destination"`
|
||||||
|
// Destination type, either "queue" or "exchange"
|
||||||
|
DestinationType string `json:"destination_type"`
|
||||||
|
RoutingKey string `json:"routing_key"`
|
||||||
|
Arguments map[string]interface{} `json:"arguments"`
|
||||||
|
PropertiesKey string `json:"properties_key"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns all bindings
|
||||||
|
func (c *Client) ListBindings() (rec []BindingInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "bindings/")
|
||||||
|
if err != nil {
|
||||||
|
return []BindingInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []BindingInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/bindings/{vhost}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Returns all bindings in a virtual host.
|
||||||
|
func (c *Client) ListBindingsIn(vhost string) (rec []BindingInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "bindings/"+url.QueryEscape(vhost))
|
||||||
|
if err != nil {
|
||||||
|
return []BindingInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []BindingInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/queues/{vhost}/{queue}/bindings
|
||||||
|
//
|
||||||
|
|
||||||
|
// Example response:
|
||||||
|
// [
|
||||||
|
// {"source":"",
|
||||||
|
// "vhost":"/",
|
||||||
|
// "destination":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
|
||||||
|
// "destination_type":"queue",
|
||||||
|
// "routing_key":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
|
||||||
|
// "arguments":{},
|
||||||
|
// "properties_key":"amq.gen-H0tnavWatL7g7uU2q5cAPA"},
|
||||||
|
// {"source":"temp",
|
||||||
|
// "vhost":"/",
|
||||||
|
// "destination":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
|
||||||
|
// "destination_type":"queue",
|
||||||
|
// "routing_key":"",
|
||||||
|
// "arguments":{},
|
||||||
|
// "properties_key":"~"}
|
||||||
|
// ]
|
||||||
|
|
||||||
|
// Returns all bindings of individual queue.
|
||||||
|
func (c *Client) ListQueueBindings(vhost, queue string) (rec []BindingInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue)+"/bindings")
|
||||||
|
if err != nil {
|
||||||
|
return []BindingInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []BindingInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// POST /api/bindings/{vhost}/e/{source}/{destination_type}/{destination}
|
||||||
|
//
|
||||||
|
|
||||||
|
// DeclareBinding updates information about a binding between a source and a target
|
||||||
|
func (c *Client) DeclareBinding(vhost string, info BindingInfo) (res *http.Response, err error) {
|
||||||
|
info.Vhost = vhost
|
||||||
|
|
||||||
|
if info.Arguments == nil {
|
||||||
|
info.Arguments = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := newRequestWithBody(c, "POST", "bindings/"+url.QueryEscape(vhost)+"/e/"+url.QueryEscape(info.Source)+"/"+url.QueryEscape(string(info.DestinationType[0]))+"/"+url.QueryEscape(info.Destination), body)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/bindings/{vhost}/e/{source}/{destination_type}/{destination}/{props}
|
||||||
|
//
|
||||||
|
|
||||||
|
// DeleteBinding delets an individual binding
|
||||||
|
func (c *Client) DeleteBinding(vhost string, info BindingInfo) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "bindings/"+url.QueryEscape(vhost)+"/e/"+url.QueryEscape(info.Source)+"/"+url.QueryEscape(string(info.DestinationType[0]))+"/"+url.QueryEscape(info.Destination)+"/"+url.QueryEscape(info.PropertiesKey), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import "net/url"
|
||||||
|
|
||||||
|
// Brief (very incomplete) connection information.
|
||||||
|
type BriefConnectionDetails struct {
|
||||||
|
// Connection name
|
||||||
|
Name string `json:"name"`
|
||||||
|
// Client port
|
||||||
|
PeerPort Port `json:"peer_port"`
|
||||||
|
// Client host
|
||||||
|
PeerHost string `json:"peer_host"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChannelInfo struct {
|
||||||
|
// Channel number
|
||||||
|
Number int `json:"number"`
|
||||||
|
// Channel name
|
||||||
|
Name string `json:"name"`
|
||||||
|
|
||||||
|
// basic.qos (prefetch count) value used
|
||||||
|
PrefetchCount int `json:"prefetch_count"`
|
||||||
|
// How many consumers does this channel have
|
||||||
|
ConsumerCount int `json:"consumer_count"`
|
||||||
|
|
||||||
|
// Number of unacknowledged messages on this channel
|
||||||
|
UnacknowledgedMessageCount int `json:"messages_unacknowledged"`
|
||||||
|
// Number of messages on this channel unconfirmed to publishers
|
||||||
|
UnconfirmedMessageCount int `json:"messages_unconfirmed"`
|
||||||
|
// Number of messages on this channel uncommited to message store
|
||||||
|
UncommittedMessageCount int `json:"messages_uncommitted"`
|
||||||
|
// Number of acks on this channel uncommited to message store
|
||||||
|
UncommittedAckCount int `json:"acks_uncommitted"`
|
||||||
|
|
||||||
|
// TODO(mk): custom deserializer to date/time?
|
||||||
|
IdleSince string `json:"idle_since"`
|
||||||
|
|
||||||
|
// True if this channel uses publisher confirms
|
||||||
|
UsesPublisherConfirms bool `json:"confirm"`
|
||||||
|
// True if this channel uses transactions
|
||||||
|
Transactional bool `json:"transactional"`
|
||||||
|
// True if this channel is blocked via channel.flow
|
||||||
|
ClientFlowBlocked bool `json:"client_flow_blocked"`
|
||||||
|
|
||||||
|
User string `json:"user"`
|
||||||
|
Vhost string `json:"vhost"`
|
||||||
|
Node string `json:"node"`
|
||||||
|
|
||||||
|
ConnectionDetails BriefConnectionDetails `json:"connection_details"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/channels
|
||||||
|
//
|
||||||
|
|
||||||
|
// Returns information about all open channels.
|
||||||
|
func (c *Client) ListChannels() (rec []ChannelInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "channels")
|
||||||
|
if err != nil {
|
||||||
|
return []ChannelInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []ChannelInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/channels/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Returns channel information.
|
||||||
|
func (c *Client) GetChannel(name string) (rec *ChannelInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "channels/"+url.QueryEscape(name))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
|
@ -0,0 +1,132 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
// URI of a RabbitMQ node to use, not including the path, e.g. http://127.0.0.1:15672.
|
||||||
|
Endpoint string
|
||||||
|
// Username to use. This RabbitMQ user must have the "management" tag.
|
||||||
|
Username string
|
||||||
|
// Password to use.
|
||||||
|
Password string
|
||||||
|
host string
|
||||||
|
transport *http.Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClient(uri string, username string, password string) (me *Client, err error) {
|
||||||
|
u, err := url.Parse(uri)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
me = &Client{
|
||||||
|
Endpoint: uri,
|
||||||
|
host: u.Host,
|
||||||
|
Username: username,
|
||||||
|
Password: password,
|
||||||
|
}
|
||||||
|
|
||||||
|
return me, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//NewTLSClient Creates a Client with a Transport Layer; it is up to the developer to make that layer Secure.
|
||||||
|
func NewTLSClient(uri string, username string, password string, transport *http.Transport) (me *Client, err error) {
|
||||||
|
u, err := url.Parse(uri)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
me = &Client{
|
||||||
|
Endpoint: uri,
|
||||||
|
host: u.Host,
|
||||||
|
Username: username,
|
||||||
|
Password: password,
|
||||||
|
transport: transport,
|
||||||
|
}
|
||||||
|
|
||||||
|
return me, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//SetTransport changes the Transport Layer that the Client will use.
|
||||||
|
func (c *Client) SetTransport(transport *http.Transport) {
|
||||||
|
c.transport = transport
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGETRequest(client *Client, path string) (*http.Request, error) {
|
||||||
|
s := client.Endpoint + "/api/" + path
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", s, nil)
|
||||||
|
|
||||||
|
req.Close = true
|
||||||
|
req.SetBasicAuth(client.Username, client.Password)
|
||||||
|
// set Opaque to preserve percent-encoded path. MK.
|
||||||
|
req.URL.Opaque = "//" + client.host + "/api/" + path
|
||||||
|
|
||||||
|
return req, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRequestWithBody(client *Client, method string, path string, body []byte) (*http.Request, error) {
|
||||||
|
s := client.Endpoint + "/api/" + path
|
||||||
|
|
||||||
|
req, err := http.NewRequest(method, s, bytes.NewReader(body))
|
||||||
|
|
||||||
|
req.Close = true
|
||||||
|
req.SetBasicAuth(client.Username, client.Password)
|
||||||
|
// set Opaque to preserve percent-encoded path. MK.
|
||||||
|
req.URL.Opaque = "//" + client.host + "/api/" + path
|
||||||
|
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
|
||||||
|
return req, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func executeRequest(client *Client, req *http.Request) (res *http.Response, err error) {
|
||||||
|
var httpc *http.Client
|
||||||
|
if client.transport != nil {
|
||||||
|
httpc = &http.Client{Transport: client.transport}
|
||||||
|
} else {
|
||||||
|
httpc = &http.Client{}
|
||||||
|
}
|
||||||
|
res, err = httpc.Do(req)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func executeAndParseRequest(client *Client, req *http.Request, rec interface{}) (err error) {
|
||||||
|
var httpc *http.Client
|
||||||
|
if client.transport != nil {
|
||||||
|
httpc = &http.Client{Transport: client.transport}
|
||||||
|
} else {
|
||||||
|
httpc = &http.Client{}
|
||||||
|
}
|
||||||
|
res, err := httpc.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer res.Body.Close() // always close body
|
||||||
|
|
||||||
|
if isNotFound(res) {
|
||||||
|
return errors.New("not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.NewDecoder(res.Body).Decode(&rec)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isNotFound(res *http.Response) bool {
|
||||||
|
return res.StatusCode == http.StatusNotFound
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import "strconv"
|
||||||
|
|
||||||
|
// Extra arguments as a map (on queues, bindings, etc)
|
||||||
|
type Properties map[string]interface{}
|
||||||
|
|
||||||
|
// Port used by RabbitMQ or clients
|
||||||
|
type Port int
|
||||||
|
|
||||||
|
func (p *Port) UnmarshalJSON(b []byte) error {
|
||||||
|
stringValue := string(b)
|
||||||
|
var parsed int64
|
||||||
|
var err error
|
||||||
|
if stringValue[0] == '"' && stringValue[len(stringValue)-1] == '"' {
|
||||||
|
parsed, err = strconv.ParseInt(stringValue[1:len(stringValue)-1], 10, 32)
|
||||||
|
} else {
|
||||||
|
parsed, err = strconv.ParseInt(stringValue, 10, 32)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
*p = Port(int(parsed))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rate of change of a numerical value
|
||||||
|
type RateDetails struct {
|
||||||
|
Rate float32 `json:"rate"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RabbitMQ context (Erlang app) running on
|
||||||
|
// a node
|
||||||
|
type BrokerContext struct {
|
||||||
|
Node string `json:"node"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
Path string `json:"path"`
|
||||||
|
Port Port `json:"port"`
|
||||||
|
Ignore bool `json:"ignore_in_use"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Basic published messages statistics
|
||||||
|
type MessageStats struct {
|
||||||
|
Publish int `json:"publish"`
|
||||||
|
PublishDetails RateDetails `json:"publish_details"`
|
||||||
|
Deliver int `json:"deliver"`
|
||||||
|
DeliverDetails RateDetails `json:"deliver_details"`
|
||||||
|
DeliverNoAck int `json:"deliver_noack"`
|
||||||
|
DeliverNoAckDetails RateDetails `json:"deliver_noack_details"`
|
||||||
|
DeliverGet int `json:"deliver_get"`
|
||||||
|
DeliverGetDetails RateDetails `json:"deliver_get_details"`
|
||||||
|
Redeliver int `json:"redeliver"`
|
||||||
|
RedeliverDetails RateDetails `json:"redeliver_details"`
|
||||||
|
Get int `json:"get"`
|
||||||
|
GetDetails RateDetails `json:"get_details"`
|
||||||
|
GetNoAck int `json:"get_no_ack"`
|
||||||
|
GetNoAckDetails RateDetails `json:"get_no_ack_details"`
|
||||||
|
}
|
|
@ -0,0 +1,131 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Provides information about connection to a RabbitMQ node.
|
||||||
|
type ConnectionInfo struct {
|
||||||
|
// Connection name
|
||||||
|
Name string `json:"name"`
|
||||||
|
// Node the client is connected to
|
||||||
|
Node string `json:"node"`
|
||||||
|
// Number of open channels
|
||||||
|
Channels int `json:"channels"`
|
||||||
|
// Connection state
|
||||||
|
State string `json:"state"`
|
||||||
|
// Connection type, network (via AMQP client) or direct (via direct Erlang client)
|
||||||
|
Type string `json:"type"`
|
||||||
|
|
||||||
|
// Server port
|
||||||
|
Port Port `json:"port"`
|
||||||
|
// Client port
|
||||||
|
PeerPort Port `json:"peer_port"`
|
||||||
|
|
||||||
|
// Server host
|
||||||
|
Host string `json:"host"`
|
||||||
|
// Client host
|
||||||
|
PeerHost string `json:"peer_host"`
|
||||||
|
|
||||||
|
// Last connection blocking reason, if any
|
||||||
|
LastBlockedBy string `json:"last_blocked_by"`
|
||||||
|
// When connection was last blocked
|
||||||
|
LastBlockedAge string `json:"last_blocked_age"`
|
||||||
|
|
||||||
|
// True if connection uses TLS/SSL
|
||||||
|
UsesTLS bool `json:"ssl"`
|
||||||
|
// Client certificate subject
|
||||||
|
PeerCertSubject string `json:"peer_cert_subject"`
|
||||||
|
// Client certificate validity
|
||||||
|
PeerCertValidity string `json:"peer_cert_validity"`
|
||||||
|
// Client certificate issuer
|
||||||
|
PeerCertIssuer string `json:"peer_cert_issuer"`
|
||||||
|
|
||||||
|
// TLS/SSL protocol and version
|
||||||
|
SSLProtocol string `json:"ssl_protocol"`
|
||||||
|
// Key exchange mechanism
|
||||||
|
SSLKeyExchange string `json:"ssl_key_exchange"`
|
||||||
|
// SSL cipher suite used
|
||||||
|
SSLCipher string `json:"ssl_cipher"`
|
||||||
|
// SSL hash
|
||||||
|
SSLHash string `json:"ssl_hash"`
|
||||||
|
|
||||||
|
// Protocol, e.g. AMQP 0-9-1 or MQTT 3-1
|
||||||
|
Protocol string `json:"protocol"`
|
||||||
|
User string `json:"user"`
|
||||||
|
// Virtual host
|
||||||
|
Vhost string `json:"vhost"`
|
||||||
|
|
||||||
|
// Heartbeat timeout
|
||||||
|
Timeout int `json:"timeout"`
|
||||||
|
// Maximum frame size (AMQP 0-9-1)
|
||||||
|
FrameMax int `json:"frame_max"`
|
||||||
|
|
||||||
|
// A map of client properties (name, version, capabilities, etc)
|
||||||
|
ClientProperties Properties `json:"client_properties"`
|
||||||
|
|
||||||
|
// Octets received
|
||||||
|
RecvOct uint64 `json:"recv_oct"`
|
||||||
|
// Octets sent
|
||||||
|
SendOct uint64 `json:"send_oct"`
|
||||||
|
RecvCount uint64 `json:"recv_cnt"`
|
||||||
|
SendCount uint64 `json:"send_cnt"`
|
||||||
|
SendPending uint64 `json:"send_pend"`
|
||||||
|
// Ingress data rate
|
||||||
|
RecvOctDetails RateDetails `json:"recv_oct_details"`
|
||||||
|
// Egress data rate
|
||||||
|
SendOctDetails RateDetails `json:"send_oct_details"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/connections
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) ListConnections() (rec []ConnectionInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "connections")
|
||||||
|
if err != nil {
|
||||||
|
return []ConnectionInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []ConnectionInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/connections/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) GetConnection(name string) (rec *ConnectionInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "connections/"+url.QueryEscape(name))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/connections/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) CloseConnection(name string) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "connections/"+url.QueryEscape(name), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -0,0 +1,177 @@
|
||||||
|
/*
|
||||||
|
Rabbit Hole is a Go client for the RabbitMQ HTTP API.
|
||||||
|
|
||||||
|
All HTTP API operations are accessible via `rabbithole.Client`, which
|
||||||
|
should be instantiated with `rabbithole.NewClient`.
|
||||||
|
|
||||||
|
// URI, username, password
|
||||||
|
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
|
||||||
|
|
||||||
|
Getting Overview
|
||||||
|
|
||||||
|
res, err := rmqc.Overview()
|
||||||
|
|
||||||
|
Node and Cluster Status
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// => []NodeInfo, err
|
||||||
|
xs, err := rmqc.ListNodes()
|
||||||
|
|
||||||
|
node, err := rmqc.GetNode("rabbit@mercurio")
|
||||||
|
// => NodeInfo, err
|
||||||
|
|
||||||
|
Operations on Connections
|
||||||
|
|
||||||
|
xs, err := rmqc.ListConnections()
|
||||||
|
// => []ConnectionInfo, err
|
||||||
|
|
||||||
|
conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
|
||||||
|
// => ConnectionInfo, err
|
||||||
|
|
||||||
|
// Forcefully close connection
|
||||||
|
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
Operations on Channels
|
||||||
|
|
||||||
|
xs, err := rmqc.ListChannels()
|
||||||
|
// => []ChannelInfo, err
|
||||||
|
|
||||||
|
ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
|
||||||
|
// => ChannelInfo, err
|
||||||
|
|
||||||
|
Operations on Exchanges
|
||||||
|
|
||||||
|
xs, err := rmqc.ListExchanges()
|
||||||
|
// => []ExchangeInfo, err
|
||||||
|
|
||||||
|
// list exchanges in a vhost
|
||||||
|
xs, err := rmqc.ListExchangesIn("/")
|
||||||
|
// => []ExchangeInfo, err
|
||||||
|
|
||||||
|
// information about individual exchange
|
||||||
|
x, err := rmqc.GetExchange("/", "amq.fanout")
|
||||||
|
// => ExchangeInfo, err
|
||||||
|
|
||||||
|
// declares an exchange
|
||||||
|
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual exchange
|
||||||
|
resp, err := rmqc.DeleteExchange("/", "an.exchange")
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
Operations on Queues
|
||||||
|
|
||||||
|
xs, err := rmqc.ListQueues()
|
||||||
|
// => []QueueInfo, err
|
||||||
|
|
||||||
|
// list queues in a vhost
|
||||||
|
xs, err := rmqc.ListQueuesIn("/")
|
||||||
|
// => []QueueInfo, err
|
||||||
|
|
||||||
|
// information about individual queue
|
||||||
|
x, err := rmqc.GetQueue("/", "a.queue")
|
||||||
|
// => QueueInfo, err
|
||||||
|
|
||||||
|
// declares a queue
|
||||||
|
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual queue
|
||||||
|
resp, err := rmqc.DeleteQueue("/", "a.queue")
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// purges all messages in queue
|
||||||
|
resp, err := rmqc.PurgeQueue("/", "a.queue")
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
Operations on Bindings
|
||||||
|
|
||||||
|
bs, err := rmqc.ListBindings()
|
||||||
|
// => []BindingInfo, err
|
||||||
|
|
||||||
|
// list bindings in a vhost
|
||||||
|
bs, err := rmqc.ListBindingsIn("/")
|
||||||
|
// => []BindingInfo, err
|
||||||
|
|
||||||
|
// list bindings of a queue
|
||||||
|
bs, err := rmqc.ListQueueBindings("/", "a.queue")
|
||||||
|
// => []BindingInfo, err
|
||||||
|
|
||||||
|
// declare a binding
|
||||||
|
resp, err := rmqc.DeclareBinding("/", BindingInfo{
|
||||||
|
Source: "an.exchange",
|
||||||
|
Destination: "a.queue",
|
||||||
|
DestinationType: "queue",
|
||||||
|
RoutingKey: "#",
|
||||||
|
})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual binding
|
||||||
|
resp, err := rmqc.DeleteBinding("/", BindingInfo{
|
||||||
|
Source: "an.exchange",
|
||||||
|
Destination: "a.queue",
|
||||||
|
DestinationType: "queue",
|
||||||
|
RoutingKey: "#",
|
||||||
|
PropertiesKey: "%23",
|
||||||
|
})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
Operations on Vhosts
|
||||||
|
|
||||||
|
xs, err := rmqc.ListVhosts()
|
||||||
|
// => []VhostInfo, err
|
||||||
|
|
||||||
|
// information about individual vhost
|
||||||
|
x, err := rmqc.GetVhost("/")
|
||||||
|
// => VhostInfo, err
|
||||||
|
|
||||||
|
// creates or updates individual vhost
|
||||||
|
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual vhost
|
||||||
|
resp, err := rmqc.DeleteVhost("/")
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
Managing Users
|
||||||
|
|
||||||
|
xs, err := rmqc.ListUsers()
|
||||||
|
// => []UserInfo, err
|
||||||
|
|
||||||
|
// information about individual user
|
||||||
|
x, err := rmqc.GetUser("my.user")
|
||||||
|
// => UserInfo, err
|
||||||
|
|
||||||
|
// creates or updates individual user
|
||||||
|
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management policymaker"})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// deletes individual user
|
||||||
|
resp, err := rmqc.DeleteUser("my.user")
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
Managing Permissions
|
||||||
|
|
||||||
|
xs, err := rmqc.ListPermissions()
|
||||||
|
// => []PermissionInfo, err
|
||||||
|
|
||||||
|
// permissions of individual user
|
||||||
|
x, err := rmqc.ListPermissionsOf("my.user")
|
||||||
|
// => []PermissionInfo, err
|
||||||
|
|
||||||
|
// permissions of individual user in vhost
|
||||||
|
x, err := rmqc.GetPermissionsIn("/", "my.user")
|
||||||
|
// => PermissionInfo, err
|
||||||
|
|
||||||
|
// updates permissions of user in vhost
|
||||||
|
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
|
||||||
|
// => *http.Response, err
|
||||||
|
|
||||||
|
// revokes permissions in vhost
|
||||||
|
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
|
||||||
|
// => *http.Response, err
|
||||||
|
*/
|
||||||
|
package rabbithole
|
|
@ -0,0 +1,219 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/exchanges
|
||||||
|
//
|
||||||
|
|
||||||
|
type IngressEgressStats struct {
|
||||||
|
PublishIn int `json:"publish_in"`
|
||||||
|
PublishInDetails RateDetails `json:"publish_in_details"`
|
||||||
|
|
||||||
|
PublishOut int `json:"publish_out"`
|
||||||
|
PublishOutDetails RateDetails `json:"publish_out_details"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ExchangeInfo struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Vhost string `json:"vhost"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Durable bool `json:"durable"`
|
||||||
|
AutoDelete bool `json:"auto_delete"`
|
||||||
|
Internal bool `json:"internal"`
|
||||||
|
Arguments map[string]interface{} `json:"arguments"`
|
||||||
|
|
||||||
|
MessageStats IngressEgressStats `json:"message_stats"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ExchangeSettings struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Durable bool `json:"durable"`
|
||||||
|
AutoDelete bool `json:"auto_delete"`
|
||||||
|
Arguments map[string]interface{} `json:"arguments"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) ListExchanges() (rec []ExchangeInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "exchanges")
|
||||||
|
if err != nil {
|
||||||
|
return []ExchangeInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []ExchangeInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/exchanges/{vhost}
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) ListExchangesIn(vhost string) (rec []ExchangeInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "exchanges/"+url.QueryEscape(vhost))
|
||||||
|
if err != nil {
|
||||||
|
return []ExchangeInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []ExchangeInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/exchanges/{vhost}/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Example response:
|
||||||
|
//
|
||||||
|
// {
|
||||||
|
// "incoming": [
|
||||||
|
// {
|
||||||
|
// "stats": {
|
||||||
|
// "publish": 2760,
|
||||||
|
// "publish_details": {
|
||||||
|
// "rate": 20
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// "channel_details": {
|
||||||
|
// "name": "127.0.0.1:46928 -> 127.0.0.1:5672 (2)",
|
||||||
|
// "number": 2,
|
||||||
|
// "connection_name": "127.0.0.1:46928 -> 127.0.0.1:5672",
|
||||||
|
// "peer_port": 46928,
|
||||||
|
// "peer_host": "127.0.0.1"
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// ],
|
||||||
|
// "outgoing": [
|
||||||
|
// {
|
||||||
|
// "stats": {
|
||||||
|
// "publish": 1280,
|
||||||
|
// "publish_details": {
|
||||||
|
// "rate": 20
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// "queue": {
|
||||||
|
// "name": "amq.gen-7NhO_yRr4lDdp-8hdnvfuw",
|
||||||
|
// "vhost": "rabbit\/hole"
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// ],
|
||||||
|
// "message_stats": {
|
||||||
|
// "publish_in": 2760,
|
||||||
|
// "publish_in_details": {
|
||||||
|
// "rate": 20
|
||||||
|
// },
|
||||||
|
// "publish_out": 1280,
|
||||||
|
// "publish_out_details": {
|
||||||
|
// "rate": 20
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// "name": "amq.fanout",
|
||||||
|
// "vhost": "rabbit\/hole",
|
||||||
|
// "type": "fanout",
|
||||||
|
// "durable": true,
|
||||||
|
// "auto_delete": false,
|
||||||
|
// "internal": false,
|
||||||
|
// "arguments": {
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
type ExchangeIngressDetails struct {
|
||||||
|
Stats MessageStats `json:"stats"`
|
||||||
|
ChannelDetails PublishingChannel `json:"channel_details"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PublishingChannel struct {
|
||||||
|
Number int `json:"number"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
ConnectionName string `json:"connection_name"`
|
||||||
|
PeerPort Port `json:"peer_port"`
|
||||||
|
PeerHost string `json:"peer_host"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type NameAndVhost struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Vhost string `json:"vhost"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ExchangeEgressDetails struct {
|
||||||
|
Stats MessageStats `json:"stats"`
|
||||||
|
Queue NameAndVhost `json:"queue"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DetailedExchangeInfo struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Vhost string `json:"vhost"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Durable bool `json:"durable"`
|
||||||
|
AutoDelete bool `json:"auto_delete"`
|
||||||
|
Internal bool `json:"internal"`
|
||||||
|
Arguments map[string]interface{} `json:"arguments"`
|
||||||
|
|
||||||
|
Incoming []ExchangeIngressDetails `json:"incoming"`
|
||||||
|
Outgoing []ExchangeEgressDetails `json:"outgoing"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) GetExchange(vhost, exchange string) (rec *DetailedExchangeInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "exchanges/"+url.QueryEscape(vhost)+"/"+exchange)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// PUT /api/exchanges/{vhost}/{exchange}
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) DeclareExchange(vhost, exchange string, info ExchangeSettings) (res *http.Response, err error) {
|
||||||
|
if info.Arguments == nil {
|
||||||
|
info.Arguments = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := newRequestWithBody(c, "PUT", "exchanges/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(exchange), body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/exchanges/{vhost}/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) DeleteExchange(vhost, exchange string) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "exchanges/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(exchange), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -0,0 +1,72 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Federation definition: additional arguments
|
||||||
|
// added to the entities (queues, exchanges or both)
|
||||||
|
// that match a policy.
|
||||||
|
type FederationDefinition struct {
|
||||||
|
Uri string `json:"uri"`
|
||||||
|
Expires int `json:"expires"`
|
||||||
|
MessageTTL int32 `json:"message-ttl"`
|
||||||
|
MaxHops int `json:"max-hops"`
|
||||||
|
PrefetchCount int `json:"prefetch-count"`
|
||||||
|
ReconnectDelay int `json:"reconnect-delay"`
|
||||||
|
AckMode string `json:"ack-mode"`
|
||||||
|
TrustUserId bool `json:"trust-user-id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Represents a configured Federation upstream.
|
||||||
|
type FederationUpstream struct {
|
||||||
|
Definition FederationDefinition `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// PUT /api/parameters/federation-upstream/{vhost}/{upstream}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Updates a federation upstream
|
||||||
|
func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error) {
|
||||||
|
fedUp := FederationUpstream{
|
||||||
|
Definition: fDef,
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(fedUp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := newRequestWithBody(c, "PUT", "parameters/federation-upstream/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(upstreamName), body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/parameters/federation-upstream/{vhost}/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Deletes a federation upstream.
|
||||||
|
func (c *Client) DeleteFederationUpstream(vhost, upstreamName string) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "parameters/federation-upstream/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(upstreamName), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/overview
|
||||||
|
//
|
||||||
|
|
||||||
|
type QueueTotals struct {
|
||||||
|
Messages int `json:"messages"`
|
||||||
|
MessagesDetails RateDetails `json:"messages_details"`
|
||||||
|
|
||||||
|
MessagesReady int `json:"messages_ready"`
|
||||||
|
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
|
||||||
|
|
||||||
|
MessagesUnacknowledged int `json:"messages_unacknowledged"`
|
||||||
|
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ObjectTotals struct {
|
||||||
|
Consumers int `json:"consumers"`
|
||||||
|
Queues int `json:"queues"`
|
||||||
|
Exchanges int `json:"exchanges"`
|
||||||
|
Connections int `json:"connections"`
|
||||||
|
Channels int `json:"channels"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Listener struct {
|
||||||
|
Node string `json:"node"`
|
||||||
|
Protocol string `json:"protocol"`
|
||||||
|
IpAddress string `json:"ip_address"`
|
||||||
|
Port Port `json:"port"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Overview struct {
|
||||||
|
ManagementVersion string `json:"management_version"`
|
||||||
|
StatisticsLevel string `json:"statistics_level"`
|
||||||
|
RabbitMQVersion string `json:"rabbitmq_version"`
|
||||||
|
ErlangVersion string `json:"erlang_version"`
|
||||||
|
FullErlangVersion string `json:"erlang_full_version"`
|
||||||
|
ExchangeTypes []ExchangeType `json:"exchange_types"`
|
||||||
|
MessageStats MessageStats `json:"message_stats"`
|
||||||
|
QueueTotals QueueTotals `json:"queue_totals"`
|
||||||
|
ObjectTotals ObjectTotals `json:"object_totals"`
|
||||||
|
Node string `json:"node"`
|
||||||
|
StatisticsDBNode string `json:"statistics_db_node"`
|
||||||
|
Listeners []Listener `json:"listeners"`
|
||||||
|
Contexts []BrokerContext `json:"contexts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Overview() (rec *Overview, err error) {
|
||||||
|
req, err := newGETRequest(c, "overview")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/whoami
|
||||||
|
//
|
||||||
|
|
||||||
|
type WhoamiInfo struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags string `json:"tags"`
|
||||||
|
AuthBackend string `json:"auth_backend"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Whoami() (rec *WhoamiInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "whoami")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
|
@ -0,0 +1,301 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: this probably should be fixed in RabbitMQ management plugin
|
||||||
|
type OsPid string
|
||||||
|
|
||||||
|
type NameDescriptionEnabled struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
Enabled bool `json:"enabled"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type AuthMechanism NameDescriptionEnabled
|
||||||
|
|
||||||
|
type ExchangeType NameDescriptionEnabled
|
||||||
|
|
||||||
|
type NameDescriptionVersion struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
Version string `json:"version"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ErlangApp NameDescriptionVersion
|
||||||
|
|
||||||
|
type NodeInfo struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
NodeType string `json:"type"`
|
||||||
|
IsRunning bool `json:"running"`
|
||||||
|
OsPid OsPid `json:"os_pid"`
|
||||||
|
|
||||||
|
FdUsed int `json:"fd_used"`
|
||||||
|
FdTotal int `json:"fd_total"`
|
||||||
|
SocketsUsed int `json:"sockets_used"`
|
||||||
|
SocketsTotal int `json:"sockets_total"`
|
||||||
|
MemUsed int `json:"mem_used"`
|
||||||
|
MemLimit int `json:"mem_limit"`
|
||||||
|
MemAlarm bool `json:"mem_alarm"`
|
||||||
|
DiskFree int `json:"disk_free"`
|
||||||
|
DiskFreeLimit int `json:"disk_free_limit"`
|
||||||
|
DiskFreeAlarm bool `json:"disk_free_alarm"`
|
||||||
|
|
||||||
|
// Erlang scheduler run queue length
|
||||||
|
RunQueueLength uint32 `json:"run_queue"`
|
||||||
|
Processors uint32 `json:"processors"`
|
||||||
|
Uptime uint64 `json:"uptime"`
|
||||||
|
|
||||||
|
ExchangeTypes []ExchangeType `json:"exchange_types"`
|
||||||
|
AuthMechanisms []AuthMechanism `json:"auth_mechanisms"`
|
||||||
|
ErlangApps []ErlangApp `json:"applications"`
|
||||||
|
Contexts []BrokerContext `json:"contexts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/nodes
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) ListNodes() (rec []NodeInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "nodes")
|
||||||
|
if err != nil {
|
||||||
|
return []NodeInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/nodes/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// {
|
||||||
|
// "partitions": [],
|
||||||
|
// "os_pid": "39292",
|
||||||
|
// "fd_used": 35,
|
||||||
|
// "fd_total": 256,
|
||||||
|
// "sockets_used": 4,
|
||||||
|
// "sockets_total": 138,
|
||||||
|
// "mem_used": 69964432,
|
||||||
|
// "mem_limit": 2960660889,
|
||||||
|
// "mem_alarm": false,
|
||||||
|
// "disk_free_limit": 50000000,
|
||||||
|
// "disk_free": 188362731520,
|
||||||
|
// "disk_free_alarm": false,
|
||||||
|
// "proc_used": 370,
|
||||||
|
// "proc_total": 1048576,
|
||||||
|
// "statistics_level": "fine",
|
||||||
|
// "uptime": 98355255,
|
||||||
|
// "run_queue": 0,
|
||||||
|
// "processors": 8,
|
||||||
|
// "exchange_types": [
|
||||||
|
// {
|
||||||
|
// "name": "topic",
|
||||||
|
// "description": "AMQP topic exchange, as per the AMQP specification",
|
||||||
|
// "enabled": true
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "x-consistent-hash",
|
||||||
|
// "description": "Consistent Hashing Exchange",
|
||||||
|
// "enabled": true
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "fanout",
|
||||||
|
// "description": "AMQP fanout exchange, as per the AMQP specification",
|
||||||
|
// "enabled": true
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "direct",
|
||||||
|
// "description": "AMQP direct exchange, as per the AMQP specification",
|
||||||
|
// "enabled": true
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "headers",
|
||||||
|
// "description": "AMQP headers exchange, as per the AMQP specification",
|
||||||
|
// "enabled": true
|
||||||
|
// }
|
||||||
|
// ],
|
||||||
|
// "auth_mechanisms": [
|
||||||
|
// {
|
||||||
|
// "name": "AMQPLAIN",
|
||||||
|
// "description": "QPid AMQPLAIN mechanism",
|
||||||
|
// "enabled": true
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "PLAIN",
|
||||||
|
// "description": "SASL PLAIN authentication mechanism",
|
||||||
|
// "enabled": true
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "RABBIT-CR-DEMO",
|
||||||
|
// "description": "RabbitMQ Demo challenge-response authentication mechanism",
|
||||||
|
// "enabled": false
|
||||||
|
// }
|
||||||
|
// ],
|
||||||
|
// "applications": [
|
||||||
|
// {
|
||||||
|
// "name": "amqp_client",
|
||||||
|
// "description": "RabbitMQ AMQP Client",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "asn1",
|
||||||
|
// "description": "The Erlang ASN1 compiler version 2.0.3",
|
||||||
|
// "version": "2.0.3"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "cowboy",
|
||||||
|
// "description": "Small, fast, modular HTTP server.",
|
||||||
|
// "version": "0.5.0-rmq3.2.0-git4b93c2d"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "crypto",
|
||||||
|
// "description": "CRYPTO version 2",
|
||||||
|
// "version": "3.1"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "inets",
|
||||||
|
// "description": "INETS CXC 138 49",
|
||||||
|
// "version": "5.9.6"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "kernel",
|
||||||
|
// "description": "ERTS CXC 138 10",
|
||||||
|
// "version": "2.16.3"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "mnesia",
|
||||||
|
// "description": "MNESIA CXC 138 12",
|
||||||
|
// "version": "4.10"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "mochiweb",
|
||||||
|
// "description": "MochiMedia Web Server",
|
||||||
|
// "version": "2.7.0-rmq3.2.0-git680dba8"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "os_mon",
|
||||||
|
// "description": "CPO CXC 138 46",
|
||||||
|
// "version": "2.2.13"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "public_key",
|
||||||
|
// "description": "Public key infrastructure",
|
||||||
|
// "version": "0.20"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbit",
|
||||||
|
// "description": "RabbitMQ",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbitmq_consistent_hash_exchange",
|
||||||
|
// "description": "Consistent Hash Exchange Type",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbitmq_management",
|
||||||
|
// "description": "RabbitMQ Management Console",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbitmq_management_agent",
|
||||||
|
// "description": "RabbitMQ Management Agent",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbitmq_mqtt",
|
||||||
|
// "description": "RabbitMQ MQTT Adapter",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbitmq_shovel",
|
||||||
|
// "description": "Data Shovel for RabbitMQ",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbitmq_shovel_management",
|
||||||
|
// "description": "Shovel Status",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbitmq_stomp",
|
||||||
|
// "description": "Embedded Rabbit Stomp Adapter",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbitmq_web_dispatch",
|
||||||
|
// "description": "RabbitMQ Web Dispatcher",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "rabbitmq_web_stomp",
|
||||||
|
// "description": "Rabbit WEB-STOMP - WebSockets to Stomp adapter",
|
||||||
|
// "version": "3.2.0"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "sasl",
|
||||||
|
// "description": "SASL CXC 138 11",
|
||||||
|
// "version": "2.3.3"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "sockjs",
|
||||||
|
// "description": "SockJS",
|
||||||
|
// "version": "0.3.4-rmq3.2.0-git3132eb9"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "ssl",
|
||||||
|
// "description": "Erlang\/OTP SSL application",
|
||||||
|
// "version": "5.3.1"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "stdlib",
|
||||||
|
// "description": "ERTS CXC 138 10",
|
||||||
|
// "version": "1.19.3"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "webmachine",
|
||||||
|
// "description": "webmachine",
|
||||||
|
// "version": "1.10.3-rmq3.2.0-gite9359c7"
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "xmerl",
|
||||||
|
// "description": "XML parser",
|
||||||
|
// "version": "1.3.4"
|
||||||
|
// }
|
||||||
|
// ],
|
||||||
|
// "contexts": [
|
||||||
|
// {
|
||||||
|
// "description": "Redirect to port 15672",
|
||||||
|
// "path": "\/",
|
||||||
|
// "port": 55672,
|
||||||
|
// "ignore_in_use": true
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "description": "RabbitMQ Management",
|
||||||
|
// "path": "\/",
|
||||||
|
// "port": 15672
|
||||||
|
// }
|
||||||
|
// ],
|
||||||
|
// "name": "rabbit@mercurio",
|
||||||
|
// "type": "disc",
|
||||||
|
// "running": true
|
||||||
|
// }
|
||||||
|
|
||||||
|
func (c *Client) GetNode(name string) (rec *NodeInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "nodes/"+url.QueryEscape(name))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
|
@ -0,0 +1,126 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/permissions
|
||||||
|
//
|
||||||
|
|
||||||
|
// Example response:
|
||||||
|
//
|
||||||
|
// [{"user":"guest","vhost":"/","configure":".*","write":".*","read":".*"}]
|
||||||
|
|
||||||
|
type PermissionInfo struct {
|
||||||
|
User string `json:"user"`
|
||||||
|
Vhost string `json:"vhost"`
|
||||||
|
|
||||||
|
// Configuration permissions
|
||||||
|
Configure string `json:"configure"`
|
||||||
|
// Write permissions
|
||||||
|
Write string `json:"write"`
|
||||||
|
// Read permissions
|
||||||
|
Read string `json:"read"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns permissions for all users and virtual hosts.
|
||||||
|
func (c *Client) ListPermissions() (rec []PermissionInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "permissions/")
|
||||||
|
if err != nil {
|
||||||
|
return []PermissionInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []PermissionInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/users/{user}/permissions
|
||||||
|
//
|
||||||
|
|
||||||
|
// Returns permissions of a specific user.
|
||||||
|
func (c *Client) ListPermissionsOf(username string) (rec []PermissionInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "users/"+url.QueryEscape(username)+"/permissions")
|
||||||
|
if err != nil {
|
||||||
|
return []PermissionInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []PermissionInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/permissions/{vhost}/{user}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Returns permissions of user in virtual host.
|
||||||
|
func (c *Client) GetPermissionsIn(vhost, username string) (rec PermissionInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username))
|
||||||
|
if err != nil {
|
||||||
|
return PermissionInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return PermissionInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// PUT /api/permissions/{vhost}/{user}
|
||||||
|
//
|
||||||
|
|
||||||
|
type Permissions struct {
|
||||||
|
Configure string `json:"configure"`
|
||||||
|
Write string `json:"write"`
|
||||||
|
Read string `json:"read"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updates permissions of user in virtual host.
|
||||||
|
func (c *Client) UpdatePermissionsIn(vhost, username string, permissions Permissions) (res *http.Response, err error) {
|
||||||
|
body, err := json.Marshal(permissions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := newRequestWithBody(c, "PUT", "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username), body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/permissions/{vhost}/{user}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Clears (deletes) permissions of user in virtual host.
|
||||||
|
func (c *Client) ClearPermissionsIn(vhost, username string) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
func (c *Client) EnabledProtocols() (xs []string, err error) {
|
||||||
|
overview, err := c.Overview()
|
||||||
|
if err != nil {
|
||||||
|
return []string{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// we really need to implement Map/Filter/etc. MK.
|
||||||
|
xs = make([]string, len(overview.Listeners))
|
||||||
|
for i, lnr := range overview.Listeners {
|
||||||
|
xs[i] = lnr.Protocol
|
||||||
|
}
|
||||||
|
|
||||||
|
return xs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) ProtocolPorts() (res map[string]Port, err error) {
|
||||||
|
res = map[string]Port{}
|
||||||
|
|
||||||
|
overview, err := c.Overview()
|
||||||
|
if err != nil {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, lnr := range overview.Listeners {
|
||||||
|
res[lnr.Protocol] = lnr.Port
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -0,0 +1,127 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Policy definition: additional arguments
|
||||||
|
// added to the entities (queues, exchanges or both)
|
||||||
|
// that match a policy.
|
||||||
|
type PolicyDefinition map[string]interface{}
|
||||||
|
|
||||||
|
type NodeNames []string
|
||||||
|
|
||||||
|
// Represents a configured policy.
|
||||||
|
type Policy struct {
|
||||||
|
// Virtual host this policy is in.
|
||||||
|
Vhost string `json:"vhost"`
|
||||||
|
// Regular expression pattern used to match queues and exchanges,
|
||||||
|
// , e.g. "^ha\..+"
|
||||||
|
Pattern string `json:"pattern"`
|
||||||
|
// What this policy applies to: "queues", "exchanges", etc.
|
||||||
|
ApplyTo string `json:"apply-to"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Priority int `json:"priority"`
|
||||||
|
// Additional arguments added to the entities (queues,
|
||||||
|
// exchanges or both) that match a policy
|
||||||
|
Definition PolicyDefinition `json:"definition"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/policies
|
||||||
|
//
|
||||||
|
|
||||||
|
// Return all policies (across all virtual hosts).
|
||||||
|
func (c *Client) ListPolicies() (rec []Policy, err error) {
|
||||||
|
req, err := newGETRequest(c, "policies")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/policies/{vhost}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Returns policies in a specific virtual host.
|
||||||
|
func (c *Client) ListPoliciesIn(vhost string) (rec []Policy, err error) {
|
||||||
|
req, err := newGETRequest(c, "policies/"+url.QueryEscape(vhost))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/policies/{vhost}/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Returns individual policy in virtual host.
|
||||||
|
func (c *Client) GetPolicy(vhost, name string) (rec *Policy, err error) {
|
||||||
|
req, err := newGETRequest(c, "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// PUT /api/policies/{vhost}/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Updates a policy.
|
||||||
|
func (c *Client) PutPolicy(vhost string, name string, policy Policy) (res *http.Response, err error) {
|
||||||
|
body, err := json.Marshal(policy)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := newRequestWithBody(c, "PUT", "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name), body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/policies/{vhost}/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Deletes a policy.
|
||||||
|
func (c *Client) DeletePolicy(vhost, name string) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -0,0 +1,283 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Information about backing queue (queue storage engine).
|
||||||
|
type BackingQueueStatus struct {
|
||||||
|
Q1 int `json:"q1"`
|
||||||
|
Q2 int `json:"q2"`
|
||||||
|
Q3 int `json:"q3"`
|
||||||
|
Q4 int `json:"q4"`
|
||||||
|
// Total queue length
|
||||||
|
Length int64 `json:"len"`
|
||||||
|
// Number of pending acks from consumers
|
||||||
|
PendingAcks int64 `json:"pending_acks"`
|
||||||
|
// Number of messages held in RAM
|
||||||
|
RAMMessageCount int64 `json:"ram_msg_count"`
|
||||||
|
// Number of outstanding acks held in RAM
|
||||||
|
RAMAckCount int64 `json:"ram_ack_count"`
|
||||||
|
// Number of persistent messages in the store
|
||||||
|
PersistentCount int64 `json:"persistent_count"`
|
||||||
|
// Average ingress (inbound) rate, not including messages
|
||||||
|
// that straight through to auto-acking consumers.
|
||||||
|
AverageIngressRate float64 `json:"avg_ingress_rate"`
|
||||||
|
// Average egress (outbound) rate, not including messages
|
||||||
|
// that straight through to auto-acking consumers.
|
||||||
|
AverageEgressRate float64 `json:"avg_egress_rate"`
|
||||||
|
// rate at which unacknowledged message records enter RAM,
|
||||||
|
// e.g. because messages are delivered requiring acknowledgement
|
||||||
|
AverageAckIngressRate float32 `json:"avg_ack_ingress_rate"`
|
||||||
|
// rate at which unacknowledged message records leave RAM,
|
||||||
|
// e.g. because acks arrive or unacked messages are paged out
|
||||||
|
AverageAckEgressRate float32 `json:"avg_ack_egress_rate"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type OwnerPidDetails struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
PeerPort Port `json:"peer_port"`
|
||||||
|
PeerHost string `json:"peer_host"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueueInfo struct {
|
||||||
|
// Queue name
|
||||||
|
Name string `json:"name"`
|
||||||
|
// Virtual host this queue belongs to
|
||||||
|
Vhost string `json:"vhost"`
|
||||||
|
// Is this queue durable?
|
||||||
|
Durable bool `json:"durable"`
|
||||||
|
// Is this queue auto-delted?
|
||||||
|
AutoDelete bool `json:"auto_delete"`
|
||||||
|
// Extra queue arguments
|
||||||
|
Arguments map[string]interface{} `json:"arguments"`
|
||||||
|
|
||||||
|
// RabbitMQ node that hosts master for this queue
|
||||||
|
Node string `json:"node"`
|
||||||
|
// Queue status
|
||||||
|
Status string `json:"status"`
|
||||||
|
|
||||||
|
// Total amount of RAM used by this queue
|
||||||
|
Memory int64 `json:"memory"`
|
||||||
|
// How many consumers this queue has
|
||||||
|
Consumers int `json:"consumers"`
|
||||||
|
// Utilisation of all the consumers
|
||||||
|
ConsumerUtilisation float64 `json:"consumer_utilisation"`
|
||||||
|
// If there is an exclusive consumer, its consumer tag
|
||||||
|
ExclusiveConsumerTag string `json:"exclusive_consumer_tag"`
|
||||||
|
|
||||||
|
// Policy applied to this queue, if any
|
||||||
|
Policy string `json:"policy"`
|
||||||
|
|
||||||
|
// Total bytes of messages in this queues
|
||||||
|
MessagesBytes int64 `json:"message_bytes"`
|
||||||
|
MessagesBytesPersistent int64 `json:"message_bytes_persistent"`
|
||||||
|
MessagesBytesRAM int64 `json:"message_bytes_ram"`
|
||||||
|
|
||||||
|
// Total number of messages in this queue
|
||||||
|
Messages int `json:"messages"`
|
||||||
|
MessagesDetails RateDetails `json:"messages_details"`
|
||||||
|
MessagesPersistent int `json:"messages_persistent"`
|
||||||
|
MessagesRAM int `json:"messages_ram"`
|
||||||
|
|
||||||
|
// Number of messages ready to be delivered
|
||||||
|
MessagesReady int `json:"messages_ready"`
|
||||||
|
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
|
||||||
|
|
||||||
|
// Number of messages delivered and pending acknowledgements from consumers
|
||||||
|
MessagesUnacknowledged int `json:"messages_unacknowledged"`
|
||||||
|
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
|
||||||
|
|
||||||
|
MessageStats MessageStats `json:"message_stats"`
|
||||||
|
|
||||||
|
OwnerPidDetails OwnerPidDetails `json:"owner_pid_details"`
|
||||||
|
|
||||||
|
BackingQueueStatus BackingQueueStatus `json:"backing_queue_status"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DetailedQueueInfo QueueInfo
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/queues
|
||||||
|
//
|
||||||
|
|
||||||
|
// [
|
||||||
|
// {
|
||||||
|
// "owner_pid_details": {
|
||||||
|
// "name": "127.0.0.1:46928 -> 127.0.0.1:5672",
|
||||||
|
// "peer_port": 46928,
|
||||||
|
// "peer_host": "127.0.0.1"
|
||||||
|
// },
|
||||||
|
// "message_stats": {
|
||||||
|
// "publish": 19830,
|
||||||
|
// "publish_details": {
|
||||||
|
// "rate": 5
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// "messages": 15,
|
||||||
|
// "messages_details": {
|
||||||
|
// "rate": 0
|
||||||
|
// },
|
||||||
|
// "messages_ready": 15,
|
||||||
|
// "messages_ready_details": {
|
||||||
|
// "rate": 0
|
||||||
|
// },
|
||||||
|
// "messages_unacknowledged": 0,
|
||||||
|
// "messages_unacknowledged_details": {
|
||||||
|
// "rate": 0
|
||||||
|
// },
|
||||||
|
// "policy": "",
|
||||||
|
// "exclusive_consumer_tag": "",
|
||||||
|
// "consumers": 0,
|
||||||
|
// "memory": 143112,
|
||||||
|
// "backing_queue_status": {
|
||||||
|
// "q1": 0,
|
||||||
|
// "q2": 0,
|
||||||
|
// "delta": [
|
||||||
|
// "delta",
|
||||||
|
// "undefined",
|
||||||
|
// 0,
|
||||||
|
// "undefined"
|
||||||
|
// ],
|
||||||
|
// "q3": 0,
|
||||||
|
// "q4": 15,
|
||||||
|
// "len": 15,
|
||||||
|
// "pending_acks": 0,
|
||||||
|
// "target_ram_count": "infinity",
|
||||||
|
// "ram_msg_count": 15,
|
||||||
|
// "ram_ack_count": 0,
|
||||||
|
// "next_seq_id": 19830,
|
||||||
|
// "persistent_count": 0,
|
||||||
|
// "avg_ingress_rate": 4.9920127795527,
|
||||||
|
// "avg_egress_rate": 4.9920127795527,
|
||||||
|
// "avg_ack_ingress_rate": 0,
|
||||||
|
// "avg_ack_egress_rate": 0
|
||||||
|
// },
|
||||||
|
// "status": "running",
|
||||||
|
// "name": "amq.gen-QLEaT5Rn_ogbN3O8ZOQt3Q",
|
||||||
|
// "vhost": "rabbit\/hole",
|
||||||
|
// "durable": false,
|
||||||
|
// "auto_delete": false,
|
||||||
|
// "arguments": {
|
||||||
|
// "x-message-ttl": 5000
|
||||||
|
// },
|
||||||
|
// "node": "rabbit@marzo"
|
||||||
|
// }
|
||||||
|
// ]
|
||||||
|
|
||||||
|
func (c *Client) ListQueues() (rec []QueueInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "queues")
|
||||||
|
if err != nil {
|
||||||
|
return []QueueInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []QueueInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/queues/{vhost}
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) ListQueuesIn(vhost string) (rec []QueueInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost))
|
||||||
|
if err != nil {
|
||||||
|
return []QueueInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []QueueInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/queues/{vhost}/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) GetQueue(vhost, queue string) (rec *DetailedQueueInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost)+"/"+queue)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// PUT /api/exchanges/{vhost}/{exchange}
|
||||||
|
//
|
||||||
|
|
||||||
|
type QueueSettings struct {
|
||||||
|
Durable bool `json:"durable"`
|
||||||
|
AutoDelete bool `json:"auto_delete"`
|
||||||
|
Arguments map[string]interface{} `json:"arguments"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) DeclareQueue(vhost, queue string, info QueueSettings) (res *http.Response, err error) {
|
||||||
|
if info.Arguments == nil {
|
||||||
|
info.Arguments = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := newRequestWithBody(c, "PUT", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue), body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/queues/{vhost}/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) DeleteQueue(vhost, queue string) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/queues/{vhost}/{name}/contents
|
||||||
|
//
|
||||||
|
|
||||||
|
func (c *Client) PurgeQueue(vhost, queue string) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue)+"/contents", nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -0,0 +1,109 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
type UserInfo struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
PasswordHash string `json:"password_hash"`
|
||||||
|
// Tags control permissions. Built-in tags: administrator, management, policymaker.
|
||||||
|
Tags string `json:"tags"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Settings used to create users. Tags must be comma-separated.
|
||||||
|
type UserSettings struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
// Tags control permissions. Administrator grants full
|
||||||
|
// permissions, management grants management UI and HTTP API
|
||||||
|
// access, policymaker grants policy management permissions.
|
||||||
|
Tags string `json:"tags"`
|
||||||
|
|
||||||
|
// *never* returned by RabbitMQ. Set by the client
|
||||||
|
// to create/update a user. MK.
|
||||||
|
Password string `json:"password"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/users
|
||||||
|
//
|
||||||
|
|
||||||
|
// Example response:
|
||||||
|
// [{"name":"guest","password_hash":"8LYTIFbVUwi8HuV/dGlp2BYsD1I=","tags":"administrator"}]
|
||||||
|
|
||||||
|
// Returns a list of all users in a cluster.
|
||||||
|
func (c *Client) ListUsers() (rec []UserInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "users/")
|
||||||
|
if err != nil {
|
||||||
|
return []UserInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []UserInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/users/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Returns information about individual user.
|
||||||
|
func (c *Client) GetUser(username string) (rec *UserInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "users/"+url.QueryEscape(username))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// PUT /api/users/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Updates information about individual user.
|
||||||
|
func (c *Client) PutUser(username string, info UserSettings) (res *http.Response, err error) {
|
||||||
|
body, err := json.Marshal(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := newRequestWithBody(c, "PUT", "users/"+url.QueryEscape(username), body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/users/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Deletes user.
|
||||||
|
func (c *Client) DeleteUser(username string) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "users/"+url.QueryEscape(username), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -0,0 +1,160 @@
|
||||||
|
package rabbithole
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/vhosts
|
||||||
|
//
|
||||||
|
|
||||||
|
// Example response:
|
||||||
|
|
||||||
|
// [
|
||||||
|
// {
|
||||||
|
// "message_stats": {
|
||||||
|
// "publish": 78,
|
||||||
|
// "publish_details": {
|
||||||
|
// "rate": 0
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// "messages": 0,
|
||||||
|
// "messages_details": {
|
||||||
|
// "rate": 0
|
||||||
|
// },
|
||||||
|
// "messages_ready": 0,
|
||||||
|
// "messages_ready_details": {
|
||||||
|
// "rate": 0
|
||||||
|
// },
|
||||||
|
// "messages_unacknowledged": 0,
|
||||||
|
// "messages_unacknowledged_details": {
|
||||||
|
// "rate": 0
|
||||||
|
// },
|
||||||
|
// "recv_oct": 16653,
|
||||||
|
// "recv_oct_details": {
|
||||||
|
// "rate": 0
|
||||||
|
// },
|
||||||
|
// "send_oct": 40495,
|
||||||
|
// "send_oct_details": {
|
||||||
|
// "rate": 0
|
||||||
|
// },
|
||||||
|
// "name": "\/",
|
||||||
|
// "tracing": false
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// "name": "29dd51888b834698a8b5bc3e7f8623aa1c9671f5",
|
||||||
|
// "tracing": false
|
||||||
|
// }
|
||||||
|
// ]
|
||||||
|
|
||||||
|
type VhostInfo struct {
|
||||||
|
// Virtual host name
|
||||||
|
Name string `json:"name"`
|
||||||
|
// True if tracing is enabled for this virtual host
|
||||||
|
Tracing bool `json:"tracing"`
|
||||||
|
|
||||||
|
// Total number of messages in queues of this virtual host
|
||||||
|
Messages int `json:"messages"`
|
||||||
|
MessagesDetails RateDetails `json:"messages_details"`
|
||||||
|
|
||||||
|
// Total number of messages ready to be delivered in queues of this virtual host
|
||||||
|
MessagesReady int `json:"messages_ready"`
|
||||||
|
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
|
||||||
|
|
||||||
|
// Total number of messages pending acknowledgement from consumers in this virtual host
|
||||||
|
MessagesUnacknowledged int `json:"messages_unacknowledged"`
|
||||||
|
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
|
||||||
|
|
||||||
|
// Octets received
|
||||||
|
RecvOct uint64 `json:"recv_oct"`
|
||||||
|
// Octets sent
|
||||||
|
SendOct uint64 `json:"send_oct"`
|
||||||
|
RecvCount uint64 `json:"recv_cnt"`
|
||||||
|
SendCount uint64 `json:"send_cnt"`
|
||||||
|
SendPending uint64 `json:"send_pend"`
|
||||||
|
RecvOctDetails RateDetails `json:"recv_oct_details"`
|
||||||
|
SendOctDetails RateDetails `json:"send_oct_details"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a list of virtual hosts.
|
||||||
|
func (c *Client) ListVhosts() (rec []VhostInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "vhosts")
|
||||||
|
if err != nil {
|
||||||
|
return []VhostInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return []VhostInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// GET /api/vhosts/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Returns information about a specific virtual host.
|
||||||
|
func (c *Client) GetVhost(vhostname string) (rec *VhostInfo, err error) {
|
||||||
|
req, err := newGETRequest(c, "vhosts/"+url.QueryEscape(vhostname))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// PUT /api/vhosts/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Settings used to create or modify virtual hosts.
|
||||||
|
type VhostSettings struct {
|
||||||
|
// True if tracing should be enabled.
|
||||||
|
Tracing bool `json:"tracing"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates or updates a virtual host.
|
||||||
|
func (c *Client) PutVhost(vhostname string, settings VhostSettings) (res *http.Response, err error) {
|
||||||
|
body, err := json.Marshal(settings)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := newRequestWithBody(c, "PUT", "vhosts/"+url.QueryEscape(vhostname), body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// DELETE /api/vhosts/{name}
|
||||||
|
//
|
||||||
|
|
||||||
|
// Deletes a virtual host.
|
||||||
|
func (c *Client) DeleteVhost(vhostname string) (res *http.Response, err error) {
|
||||||
|
req, err := newRequestWithBody(c, "DELETE", "vhosts/"+url.QueryEscape(vhostname), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = executeRequest(c, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -1425,6 +1425,12 @@
|
||||||
"path": "github.com/maximilien/softlayer-go/softlayer",
|
"path": "github.com/maximilien/softlayer-go/softlayer",
|
||||||
"revision": "85659debe44fab5792fc92cf755c04b115b9dc19"
|
"revision": "85659debe44fab5792fc92cf755c04b115b9dc19"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"checksumSHA1": "GSum+utW01N3KeMdvAPnsW0TemM=",
|
||||||
|
"path": "github.com/michaelklishin/rabbit-hole",
|
||||||
|
"revision": "88550829bcdcf614361c73459c903578eb44074e",
|
||||||
|
"revisionTime": "2016-07-06T11:10:56Z"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "7niW29CvYceZ6zbia6b/LT+yD/M=",
|
"checksumSHA1": "7niW29CvYceZ6zbia6b/LT+yD/M=",
|
||||||
"path": "github.com/mitchellh/cli",
|
"path": "github.com/mitchellh/cli",
|
||||||
|
|
Loading…
Reference in New Issue