provider/influxdb: vendor dependencies
This commit is contained in:
parent
7061448d74
commit
f450584617
|
@ -7,7 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/hashicorp/terraform/helper/schema"
|
"github.com/hashicorp/terraform/helper/schema"
|
||||||
"github.com/hashicorp/terraform/terraform"
|
"github.com/hashicorp/terraform/terraform"
|
||||||
"github.com/influxdb/influxdb/client"
|
"github.com/influxdata/influxdb/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
var quoteReplacer = strings.NewReplacer(`"`, `\"`)
|
var quoteReplacer = strings.NewReplacer(`"`, `\"`)
|
||||||
|
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/terraform/helper/schema"
|
"github.com/hashicorp/terraform/helper/schema"
|
||||||
"github.com/influxdb/influxdb/client"
|
"github.com/influxdata/influxdb/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ResourceDatabase() *schema.Resource {
|
func ResourceDatabase() *schema.Resource {
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
The MIT License (MIT)
|
||||||
|
|
||||||
|
Copyright (c) 2013-2016 Errplane Inc.
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
|
this software and associated documentation files (the "Software"), to deal in
|
||||||
|
the Software without restriction, including without limitation the rights to
|
||||||
|
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||||
|
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||||
|
subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||||
|
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||||
|
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||||
|
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||||
|
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
@ -0,0 +1,27 @@
|
||||||
|
# List
|
||||||
|
- bootstrap 3.3.5 [MIT LICENSE](https://github.com/twbs/bootstrap/blob/master/LICENSE)
|
||||||
|
- collectd.org [ISC LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE)
|
||||||
|
- github.com/armon/go-metrics [MIT LICENSE](https://github.com/armon/go-metrics/blob/master/LICENSE)
|
||||||
|
- github.com/BurntSushi/toml [WTFPL LICENSE](https://github.com/BurntSushi/toml/blob/master/COPYING)
|
||||||
|
- github.com/bmizerany/pat [MIT LICENSE](https://github.com/bmizerany/pat#license)
|
||||||
|
- github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE)
|
||||||
|
- github.com/dgryski/go-bits [MIT LICENSE](https://github.com/dgryski/go-bits/blob/master/LICENSE)
|
||||||
|
- github.com/dgryski/go-bitstream [MIT LICENSE](https://github.com/dgryski/go-bitstream/blob/master/LICENSE)
|
||||||
|
- github.com/gogo/protobuf/proto [BSD LICENSE](https://github.com/gogo/protobuf/blob/master/LICENSE)
|
||||||
|
- github.com/davecgh/go-spew/spew [ISC LICENSE](https://github.com/davecgh/go-spew/blob/master/LICENSE)
|
||||||
|
- github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE)
|
||||||
|
- github.com/hashicorp/go-msgpack [BSD LICENSE](https://github.com/hashicorp/go-msgpack/blob/master/LICENSE)
|
||||||
|
- github.com/hashicorp/raft [MPL LICENSE](https://github.com/hashicorp/raft/blob/master/LICENSE)
|
||||||
|
- github.com/hashicorp/raft-boltdb [MOZILLA PUBLIC LICENSE](https://github.com/hashicorp/raft-boltdb/blob/master/LICENSE)
|
||||||
|
- github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt)
|
||||||
|
- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE)
|
||||||
|
- github.com/kimor79/gollectd [BSD LICENSE](https://github.com/kimor79/gollectd/blob/master/LICENSE)
|
||||||
|
- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE)
|
||||||
|
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
|
||||||
|
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)
|
||||||
|
- glyphicons [LICENSE](http://glyphicons.com/license/)
|
||||||
|
- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
|
||||||
|
- golang.org/x/tools [BSD LICENSE](https://github.com/golang/tools/blob/master/LICENSE)
|
||||||
|
- gopkg.in/fatih/pool.v2 [MIT LICENSE](https://github.com/fatih/pool/blob/v2.0.0/LICENSE)
|
||||||
|
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)
|
||||||
|
- react 0.13.3 [BSD LICENSE](https://github.com/facebook/react/blob/master/LICENSE)
|
|
@ -0,0 +1,257 @@
|
||||||
|
# InfluxDB Client
|
||||||
|
|
||||||
|
[![GoDoc](https://godoc.org/github.com/influxdata/influxdb?status.svg)](http://godoc.org/github.com/influxdata/influxdb/client/v2)
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
**NOTE:** The Go client library now has a "v2" version, with the old version
|
||||||
|
being deprecated. The new version can be imported at
|
||||||
|
`import "github.com/influxdata/influxdb/client/v2"`. It is not backwards-compatible.
|
||||||
|
|
||||||
|
A Go client library written and maintained by the **InfluxDB** team.
|
||||||
|
This package provides convenience functions to read and write time series data.
|
||||||
|
It uses the HTTP protocol to communicate with your **InfluxDB** cluster.
|
||||||
|
|
||||||
|
|
||||||
|
## Getting Started
|
||||||
|
|
||||||
|
### Connecting To Your Database
|
||||||
|
|
||||||
|
Connecting to an **InfluxDB** database is straightforward. You will need a host
|
||||||
|
name, a port and the cluster user credentials if applicable. The default port is
|
||||||
|
8086. You can customize these settings to your specific installation via the
|
||||||
|
**InfluxDB** configuration file.
|
||||||
|
|
||||||
|
Though not necessary for experimentation, you may want to create a new user
|
||||||
|
and authenticate the connection to your database.
|
||||||
|
|
||||||
|
For more information please check out the
|
||||||
|
[Admin Docs](https://docs.influxdata.com/influxdb/v0.10/administration).
|
||||||
|
|
||||||
|
For the impatient, you can create a new admin user _bubba_ by firing off the
|
||||||
|
[InfluxDB CLI](https://github.com/influxdata/influxdb/blob/master/cmd/influx/main.go).
|
||||||
|
|
||||||
|
```shell
|
||||||
|
influx
|
||||||
|
> create user bubba with password 'bumblebeetuna'
|
||||||
|
> grant all privileges to bubba
|
||||||
|
```
|
||||||
|
|
||||||
|
And now for good measure set the credentials in you shell environment.
|
||||||
|
In the example below we will use $INFLUX_USER and $INFLUX_PWD
|
||||||
|
|
||||||
|
Now with the administrivia out of the way, let's connect to our database.
|
||||||
|
|
||||||
|
NOTE: If you've opted out of creating a user, you can omit Username and Password in
|
||||||
|
the configuration below.
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/client/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MyDB = "square_holes"
|
||||||
|
username = "bubba"
|
||||||
|
password = "bumblebeetuna"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Make client
|
||||||
|
c := client.NewHTTPClient(client.HTTPConfig{
|
||||||
|
Addr: "http://localhost:8086",
|
||||||
|
Username: username,
|
||||||
|
Password: password,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create a new point batch
|
||||||
|
bp := client.NewBatchPoints(client.BatchPointsConfig{
|
||||||
|
Database: MyDB,
|
||||||
|
Precision: "s",
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create a point and add to batch
|
||||||
|
tags := map[string]string{"cpu": "cpu-total"}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"idle": 10.1,
|
||||||
|
"system": 53.3,
|
||||||
|
"user": 46.6,
|
||||||
|
}
|
||||||
|
pt := client.NewPoint("cpu_usage", tags, fields, time.Now())
|
||||||
|
bp.AddPoint(pt)
|
||||||
|
|
||||||
|
// Write the batch
|
||||||
|
c.Write(bp)
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
### Inserting Data
|
||||||
|
|
||||||
|
Time series data aka *points* are written to the database using batch inserts.
|
||||||
|
The mechanism is to create one or more points and then create a batch aka
|
||||||
|
*batch points* and write these to a given database and series. A series is a
|
||||||
|
combination of a measurement (time/values) and a set of tags.
|
||||||
|
|
||||||
|
In this sample we will create a batch of a 1,000 points. Each point has a time and
|
||||||
|
a single value as well as 2 tags indicating a shape and color. We write these points
|
||||||
|
to a database called _square_holes_ using a measurement named _shapes_.
|
||||||
|
|
||||||
|
NOTE: You can specify a RetentionPolicy as part of the batch points. If not
|
||||||
|
provided InfluxDB will use the database _default_ retention policy.
|
||||||
|
|
||||||
|
```go
|
||||||
|
func writePoints(clnt client.Client) {
|
||||||
|
sampleSize := 1000
|
||||||
|
rand.Seed(42)
|
||||||
|
|
||||||
|
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
|
||||||
|
Database: "systemstats",
|
||||||
|
Precision: "us",
|
||||||
|
})
|
||||||
|
|
||||||
|
for i := 0; i < sampleSize; i++ {
|
||||||
|
regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"}
|
||||||
|
tags := map[string]string{
|
||||||
|
"cpu": "cpu-total",
|
||||||
|
"host": fmt.Sprintf("host%d", rand.Intn(1000)),
|
||||||
|
"region": regions[rand.Intn(len(regions))],
|
||||||
|
}
|
||||||
|
|
||||||
|
idle := rand.Float64() * 100.0
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"idle": idle,
|
||||||
|
"busy": 100.0 - idle,
|
||||||
|
}
|
||||||
|
|
||||||
|
bp.AddPoint(client.NewPoint(
|
||||||
|
"cpu_usage",
|
||||||
|
tags,
|
||||||
|
fields,
|
||||||
|
time.Now(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
err := clnt.Write(bp)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### Querying Data
|
||||||
|
|
||||||
|
One nice advantage of using **InfluxDB** the ability to query your data using familiar
|
||||||
|
SQL constructs. In this example we can create a convenience function to query the database
|
||||||
|
as follows:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// queryDB convenience function to query the database
|
||||||
|
func queryDB(clnt client.Client, cmd string) (res []client.Result, err error) {
|
||||||
|
q := client.Query{
|
||||||
|
Command: cmd,
|
||||||
|
Database: MyDB,
|
||||||
|
}
|
||||||
|
if response, err := clnt.Query(q); err == nil {
|
||||||
|
if response.Error() != nil {
|
||||||
|
return res, response.Error()
|
||||||
|
}
|
||||||
|
res = response.Results
|
||||||
|
} else {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Creating a Database
|
||||||
|
|
||||||
|
```go
|
||||||
|
_, err := queryDB(clnt, fmt.Sprintf("CREATE DATABASE %s", MyDB))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Count Records
|
||||||
|
|
||||||
|
```go
|
||||||
|
q := fmt.Sprintf("SELECT count(%s) FROM %s", "value", MyMeasurement)
|
||||||
|
res, err := queryDB(clnt, q)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
count := res[0].Series[0].Values[0][1]
|
||||||
|
log.Printf("Found a total of %v records\n", count)
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Find the last 10 _shapes_ records
|
||||||
|
|
||||||
|
```go
|
||||||
|
q := fmt.Sprintf("SELECT * FROM %s LIMIT %d", MyMeasurement, 20)
|
||||||
|
res, err = queryDB(clnt, q)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, row := range res[0].Series[0].Values {
|
||||||
|
t, err := time.Parse(time.RFC3339, row[0].(string))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
val := row[1].(string)
|
||||||
|
log.Printf("[%2d] %s: %s\n", i, t.Format(time.Stamp), val)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Using the UDP Client
|
||||||
|
|
||||||
|
The **InfluxDB** client also supports writing over UDP.
|
||||||
|
|
||||||
|
```go
|
||||||
|
func WriteUDP() {
|
||||||
|
// Make client
|
||||||
|
c := client.NewUDPClient("localhost:8089")
|
||||||
|
|
||||||
|
// Create a new point batch
|
||||||
|
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
|
||||||
|
Precision: "s",
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create a point and add to batch
|
||||||
|
tags := map[string]string{"cpu": "cpu-total"}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"idle": 10.1,
|
||||||
|
"system": 53.3,
|
||||||
|
"user": 46.6,
|
||||||
|
}
|
||||||
|
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
|
||||||
|
if err != nil {
|
||||||
|
panic(err.Error())
|
||||||
|
}
|
||||||
|
bp.AddPoint(pt)
|
||||||
|
|
||||||
|
// Write the batch
|
||||||
|
c.Write(bp)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Go Docs
|
||||||
|
|
||||||
|
Please refer to
|
||||||
|
[http://godoc.org/github.com/influxdata/influxdb/client/v2](http://godoc.org/github.com/influxdata/influxdb/client/v2)
|
||||||
|
for documentation.
|
||||||
|
|
||||||
|
## See Also
|
||||||
|
|
||||||
|
You can also examine how the client library is used by the
|
||||||
|
[InfluxDB CLI](https://github.com/influxdata/influxdb/blob/master/cmd/influx/main.go).
|
|
@ -0,0 +1,715 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultHost is the default host used to connect to an InfluxDB instance
|
||||||
|
DefaultHost = "localhost"
|
||||||
|
|
||||||
|
// DefaultPort is the default port used to connect to an InfluxDB instance
|
||||||
|
DefaultPort = 8086
|
||||||
|
|
||||||
|
// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
|
||||||
|
DefaultTimeout = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
// Query is used to send a command to the server. Both Command and Database are required.
|
||||||
|
type Query struct {
|
||||||
|
Command string
|
||||||
|
Database string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseConnectionString will parse a string to create a valid connection URL
|
||||||
|
func ParseConnectionString(path string, ssl bool) (url.URL, error) {
|
||||||
|
var host string
|
||||||
|
var port int
|
||||||
|
|
||||||
|
h, p, err := net.SplitHostPort(path)
|
||||||
|
if err != nil {
|
||||||
|
if path == "" {
|
||||||
|
host = DefaultHost
|
||||||
|
} else {
|
||||||
|
host = path
|
||||||
|
}
|
||||||
|
// If they didn't specify a port, always use the default port
|
||||||
|
port = DefaultPort
|
||||||
|
} else {
|
||||||
|
host = h
|
||||||
|
port, err = strconv.Atoi(p)
|
||||||
|
if err != nil {
|
||||||
|
return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
u := url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
}
|
||||||
|
if ssl {
|
||||||
|
u.Scheme = "https"
|
||||||
|
}
|
||||||
|
|
||||||
|
u.Host = net.JoinHostPort(host, strconv.Itoa(port))
|
||||||
|
|
||||||
|
return u, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config is used to specify what server to connect to.
|
||||||
|
// URL: The URL of the server connecting to.
|
||||||
|
// Username/Password are optional. They will be passed via basic auth if provided.
|
||||||
|
// UserAgent: If not provided, will default "InfluxDBClient",
|
||||||
|
// Timeout: If not provided, will default to 0 (no timeout)
|
||||||
|
type Config struct {
|
||||||
|
URL url.URL
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
UserAgent string
|
||||||
|
Timeout time.Duration
|
||||||
|
Precision string
|
||||||
|
UnsafeSsl bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewConfig will create a config to be used in connecting to the client
|
||||||
|
func NewConfig() Config {
|
||||||
|
return Config{
|
||||||
|
Timeout: DefaultTimeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client is used to make calls to the server.
|
||||||
|
type Client struct {
|
||||||
|
url url.URL
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
httpClient *http.Client
|
||||||
|
userAgent string
|
||||||
|
precision string
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ConsistencyOne requires at least one data node acknowledged a write.
|
||||||
|
ConsistencyOne = "one"
|
||||||
|
|
||||||
|
// ConsistencyAll requires all data nodes to acknowledge a write.
|
||||||
|
ConsistencyAll = "all"
|
||||||
|
|
||||||
|
// ConsistencyQuorum requires a quorum of data nodes to acknowledge a write.
|
||||||
|
ConsistencyQuorum = "quorum"
|
||||||
|
|
||||||
|
// ConsistencyAny allows for hinted hand off, potentially no write happened yet.
|
||||||
|
ConsistencyAny = "any"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewClient will instantiate and return a connected client to issue commands to the server.
|
||||||
|
func NewClient(c Config) (*Client, error) {
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
InsecureSkipVerify: c.UnsafeSsl,
|
||||||
|
}
|
||||||
|
|
||||||
|
tr := &http.Transport{
|
||||||
|
TLSClientConfig: tlsConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
client := Client{
|
||||||
|
url: c.URL,
|
||||||
|
username: c.Username,
|
||||||
|
password: c.Password,
|
||||||
|
httpClient: &http.Client{Timeout: c.Timeout, Transport: tr},
|
||||||
|
userAgent: c.UserAgent,
|
||||||
|
precision: c.Precision,
|
||||||
|
}
|
||||||
|
if client.userAgent == "" {
|
||||||
|
client.userAgent = "InfluxDBClient"
|
||||||
|
}
|
||||||
|
return &client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetAuth will update the username and passwords
|
||||||
|
func (c *Client) SetAuth(u, p string) {
|
||||||
|
c.username = u
|
||||||
|
c.password = p
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPrecision will update the precision
|
||||||
|
func (c *Client) SetPrecision(precision string) {
|
||||||
|
c.precision = precision
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query sends a command to the server and returns the Response
|
||||||
|
func (c *Client) Query(q Query) (*Response, error) {
|
||||||
|
u := c.url
|
||||||
|
|
||||||
|
u.Path = "query"
|
||||||
|
values := u.Query()
|
||||||
|
values.Set("q", q.Command)
|
||||||
|
values.Set("db", q.Database)
|
||||||
|
if c.precision != "" {
|
||||||
|
values.Set("epoch", c.precision)
|
||||||
|
}
|
||||||
|
u.RawQuery = values.Encode()
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("User-Agent", c.userAgent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var response Response
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
dec.UseNumber()
|
||||||
|
decErr := dec.Decode(&response)
|
||||||
|
|
||||||
|
// ignore this error if we got an invalid status code
|
||||||
|
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
|
||||||
|
decErr = nil
|
||||||
|
}
|
||||||
|
// If we got a valid decode error, send that back
|
||||||
|
if decErr != nil {
|
||||||
|
return nil, decErr
|
||||||
|
}
|
||||||
|
// If we don't have an error in our json response, and didn't get StatusOK, then send back an error
|
||||||
|
if resp.StatusCode != http.StatusOK && response.Error() == nil {
|
||||||
|
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write takes BatchPoints and allows for writing of multiple points with defaults
|
||||||
|
// If successful, error is nil and Response is nil
|
||||||
|
// If an error occurs, Response may contain additional information if populated.
|
||||||
|
func (c *Client) Write(bp BatchPoints) (*Response, error) {
|
||||||
|
u := c.url
|
||||||
|
u.Path = "write"
|
||||||
|
|
||||||
|
var b bytes.Buffer
|
||||||
|
for _, p := range bp.Points {
|
||||||
|
err := checkPointTypes(p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if p.Raw != "" {
|
||||||
|
if _, err := b.WriteString(p.Raw); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for k, v := range bp.Tags {
|
||||||
|
if p.Tags == nil {
|
||||||
|
p.Tags = make(map[string]string, len(bp.Tags))
|
||||||
|
}
|
||||||
|
p.Tags[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := b.WriteString(p.MarshalString()); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.WriteByte('\n'); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), &b)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.userAgent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
precision := bp.Precision
|
||||||
|
if precision == "" {
|
||||||
|
precision = c.precision
|
||||||
|
}
|
||||||
|
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("db", bp.Database)
|
||||||
|
params.Set("rp", bp.RetentionPolicy)
|
||||||
|
params.Set("precision", precision)
|
||||||
|
params.Set("consistency", bp.WriteConsistency)
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var response Response
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
||||||
|
var err = fmt.Errorf(string(body))
|
||||||
|
response.Err = err
|
||||||
|
return &response, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteLineProtocol takes a string with line returns to delimit each write
|
||||||
|
// If successful, error is nil and Response is nil
|
||||||
|
// If an error occurs, Response may contain additional information if populated.
|
||||||
|
func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) {
|
||||||
|
u := c.url
|
||||||
|
u.Path = "write"
|
||||||
|
|
||||||
|
r := strings.NewReader(data)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.userAgent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("db", database)
|
||||||
|
params.Set("rp", retentionPolicy)
|
||||||
|
params.Set("precision", precision)
|
||||||
|
params.Set("consistency", writeConsistency)
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var response Response
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
||||||
|
err := fmt.Errorf(string(body))
|
||||||
|
response.Err = err
|
||||||
|
return &response, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping will check to see if the server is up
|
||||||
|
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
|
||||||
|
func (c *Client) Ping() (time.Duration, string, error) {
|
||||||
|
now := time.Now()
|
||||||
|
u := c.url
|
||||||
|
u.Path = "ping"
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
req.Header.Set("User-Agent", c.userAgent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
version := resp.Header.Get("X-Influxdb-Version")
|
||||||
|
return time.Since(now), version, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Structs
|
||||||
|
|
||||||
|
// Result represents a resultset returned from a single statement.
|
||||||
|
type Result struct {
|
||||||
|
Series []models.Row
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON encodes the result into JSON.
|
||||||
|
func (r *Result) MarshalJSON() ([]byte, error) {
|
||||||
|
// Define a struct that outputs "error" as a string.
|
||||||
|
var o struct {
|
||||||
|
Series []models.Row `json:"series,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy fields to output struct.
|
||||||
|
o.Series = r.Series
|
||||||
|
if r.Err != nil {
|
||||||
|
o.Err = r.Err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(&o)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON decodes the data into the Result struct
|
||||||
|
func (r *Result) UnmarshalJSON(b []byte) error {
|
||||||
|
var o struct {
|
||||||
|
Series []models.Row `json:"series,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := json.NewDecoder(bytes.NewBuffer(b))
|
||||||
|
dec.UseNumber()
|
||||||
|
err := dec.Decode(&o)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Series = o.Series
|
||||||
|
if o.Err != "" {
|
||||||
|
r.Err = errors.New(o.Err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response represents a list of statement results.
|
||||||
|
type Response struct {
|
||||||
|
Results []Result
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON encodes the response into JSON.
|
||||||
|
func (r *Response) MarshalJSON() ([]byte, error) {
|
||||||
|
// Define a struct that outputs "error" as a string.
|
||||||
|
var o struct {
|
||||||
|
Results []Result `json:"results,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy fields to output struct.
|
||||||
|
o.Results = r.Results
|
||||||
|
if r.Err != nil {
|
||||||
|
o.Err = r.Err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(&o)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON decodes the data into the Response struct
|
||||||
|
func (r *Response) UnmarshalJSON(b []byte) error {
|
||||||
|
var o struct {
|
||||||
|
Results []Result `json:"results,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := json.NewDecoder(bytes.NewBuffer(b))
|
||||||
|
dec.UseNumber()
|
||||||
|
err := dec.Decode(&o)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Results = o.Results
|
||||||
|
if o.Err != "" {
|
||||||
|
r.Err = errors.New(o.Err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns the first error from any statement.
|
||||||
|
// Returns nil if no errors occurred on any statements.
|
||||||
|
func (r Response) Error() error {
|
||||||
|
if r.Err != nil {
|
||||||
|
return r.Err
|
||||||
|
}
|
||||||
|
for _, result := range r.Results {
|
||||||
|
if result.Err != nil {
|
||||||
|
return result.Err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Point defines the fields that will be written to the database
|
||||||
|
// Measurement, Time, and Fields are required
|
||||||
|
// Precision can be specified if the time is in epoch format (integer).
|
||||||
|
// Valid values for Precision are n, u, ms, s, m, and h
|
||||||
|
type Point struct {
|
||||||
|
Measurement string
|
||||||
|
Tags map[string]string
|
||||||
|
Time time.Time
|
||||||
|
Fields map[string]interface{}
|
||||||
|
Precision string
|
||||||
|
Raw string
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON will format the time in RFC3339Nano
|
||||||
|
// Precision is also ignored as it is only used for writing, not reading
|
||||||
|
// Or another way to say it is we always send back in nanosecond precision
|
||||||
|
func (p *Point) MarshalJSON() ([]byte, error) {
|
||||||
|
point := struct {
|
||||||
|
Measurement string `json:"measurement,omitempty"`
|
||||||
|
Tags map[string]string `json:"tags,omitempty"`
|
||||||
|
Time string `json:"time,omitempty"`
|
||||||
|
Fields map[string]interface{} `json:"fields,omitempty"`
|
||||||
|
Precision string `json:"precision,omitempty"`
|
||||||
|
}{
|
||||||
|
Measurement: p.Measurement,
|
||||||
|
Tags: p.Tags,
|
||||||
|
Fields: p.Fields,
|
||||||
|
Precision: p.Precision,
|
||||||
|
}
|
||||||
|
// Let it omit empty if it's really zero
|
||||||
|
if !p.Time.IsZero() {
|
||||||
|
point.Time = p.Time.UTC().Format(time.RFC3339Nano)
|
||||||
|
}
|
||||||
|
return json.Marshal(&point)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalString renders string representation of a Point with specified
|
||||||
|
// precision. The default precision is nanoseconds.
|
||||||
|
func (p *Point) MarshalString() string {
|
||||||
|
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
|
||||||
|
if err != nil {
|
||||||
|
return "# ERROR: " + err.Error() + " " + p.Measurement
|
||||||
|
}
|
||||||
|
if p.Precision == "" || p.Precision == "ns" || p.Precision == "n" {
|
||||||
|
return pt.String()
|
||||||
|
}
|
||||||
|
return pt.PrecisionString(p.Precision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON decodes the data into the Point struct
|
||||||
|
func (p *Point) UnmarshalJSON(b []byte) error {
|
||||||
|
var normal struct {
|
||||||
|
Measurement string `json:"measurement"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Time time.Time `json:"time"`
|
||||||
|
Precision string `json:"precision"`
|
||||||
|
Fields map[string]interface{} `json:"fields"`
|
||||||
|
}
|
||||||
|
var epoch struct {
|
||||||
|
Measurement string `json:"measurement"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Time *int64 `json:"time"`
|
||||||
|
Precision string `json:"precision"`
|
||||||
|
Fields map[string]interface{} `json:"fields"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := func() error {
|
||||||
|
var err error
|
||||||
|
dec := json.NewDecoder(bytes.NewBuffer(b))
|
||||||
|
dec.UseNumber()
|
||||||
|
if err = dec.Decode(&epoch); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Convert from epoch to time.Time, but only if Time
|
||||||
|
// was actually set.
|
||||||
|
var ts time.Time
|
||||||
|
if epoch.Time != nil {
|
||||||
|
ts, err = EpochToTime(*epoch.Time, epoch.Precision)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.Measurement = epoch.Measurement
|
||||||
|
p.Tags = epoch.Tags
|
||||||
|
p.Time = ts
|
||||||
|
p.Precision = epoch.Precision
|
||||||
|
p.Fields = normalizeFields(epoch.Fields)
|
||||||
|
return nil
|
||||||
|
}(); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := json.NewDecoder(bytes.NewBuffer(b))
|
||||||
|
dec.UseNumber()
|
||||||
|
if err := dec.Decode(&normal); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
normal.Time = SetPrecision(normal.Time, normal.Precision)
|
||||||
|
p.Measurement = normal.Measurement
|
||||||
|
p.Tags = normal.Tags
|
||||||
|
p.Time = normal.Time
|
||||||
|
p.Precision = normal.Precision
|
||||||
|
p.Fields = normalizeFields(normal.Fields)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove any notion of json.Number
|
||||||
|
func normalizeFields(fields map[string]interface{}) map[string]interface{} {
|
||||||
|
newFields := map[string]interface{}{}
|
||||||
|
|
||||||
|
for k, v := range fields {
|
||||||
|
switch v := v.(type) {
|
||||||
|
case json.Number:
|
||||||
|
jv, e := v.Float64()
|
||||||
|
if e != nil {
|
||||||
|
panic(fmt.Sprintf("unable to convert json.Number to float64: %s", e))
|
||||||
|
}
|
||||||
|
newFields[k] = jv
|
||||||
|
default:
|
||||||
|
newFields[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newFields
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchPoints is used to send batched data in a single write.
|
||||||
|
// Database and Points are required
|
||||||
|
// If no retention policy is specified, it will use the databases default retention policy.
|
||||||
|
// If tags are specified, they will be "merged" with all points. If a point already has that tag, it will be ignored.
|
||||||
|
// If time is specified, it will be applied to any point with an empty time.
|
||||||
|
// Precision can be specified if the time is in epoch format (integer).
|
||||||
|
// Valid values for Precision are n, u, ms, s, m, and h
|
||||||
|
type BatchPoints struct {
|
||||||
|
Points []Point `json:"points,omitempty"`
|
||||||
|
Database string `json:"database,omitempty"`
|
||||||
|
RetentionPolicy string `json:"retentionPolicy,omitempty"`
|
||||||
|
Tags map[string]string `json:"tags,omitempty"`
|
||||||
|
Time time.Time `json:"time,omitempty"`
|
||||||
|
Precision string `json:"precision,omitempty"`
|
||||||
|
WriteConsistency string `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON decodes the data into the BatchPoints struct
|
||||||
|
func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
|
||||||
|
var normal struct {
|
||||||
|
Points []Point `json:"points"`
|
||||||
|
Database string `json:"database"`
|
||||||
|
RetentionPolicy string `json:"retentionPolicy"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Time time.Time `json:"time"`
|
||||||
|
Precision string `json:"precision"`
|
||||||
|
}
|
||||||
|
var epoch struct {
|
||||||
|
Points []Point `json:"points"`
|
||||||
|
Database string `json:"database"`
|
||||||
|
RetentionPolicy string `json:"retentionPolicy"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Time *int64 `json:"time"`
|
||||||
|
Precision string `json:"precision"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := func() error {
|
||||||
|
var err error
|
||||||
|
if err = json.Unmarshal(b, &epoch); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Convert from epoch to time.Time
|
||||||
|
var ts time.Time
|
||||||
|
if epoch.Time != nil {
|
||||||
|
ts, err = EpochToTime(*epoch.Time, epoch.Precision)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bp.Points = epoch.Points
|
||||||
|
bp.Database = epoch.Database
|
||||||
|
bp.RetentionPolicy = epoch.RetentionPolicy
|
||||||
|
bp.Tags = epoch.Tags
|
||||||
|
bp.Time = ts
|
||||||
|
bp.Precision = epoch.Precision
|
||||||
|
return nil
|
||||||
|
}(); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(b, &normal); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
normal.Time = SetPrecision(normal.Time, normal.Precision)
|
||||||
|
bp.Points = normal.Points
|
||||||
|
bp.Database = normal.Database
|
||||||
|
bp.RetentionPolicy = normal.RetentionPolicy
|
||||||
|
bp.Tags = normal.Tags
|
||||||
|
bp.Time = normal.Time
|
||||||
|
bp.Precision = normal.Precision
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// utility functions
|
||||||
|
|
||||||
|
// Addr provides the current url as a string of the server the client is connected to.
|
||||||
|
func (c *Client) Addr() string {
|
||||||
|
return c.url.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkPointTypes ensures no unsupported types are submitted to influxdb, returning error if they are found.
|
||||||
|
func checkPointTypes(p Point) error {
|
||||||
|
for _, v := range p.Fields {
|
||||||
|
switch v.(type) {
|
||||||
|
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, float32, float64, bool, string, nil:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported point type: %T", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper functions
|
||||||
|
|
||||||
|
// EpochToTime takes a unix epoch time and uses precision to return back a time.Time
|
||||||
|
func EpochToTime(epoch int64, precision string) (time.Time, error) {
|
||||||
|
if precision == "" {
|
||||||
|
precision = "s"
|
||||||
|
}
|
||||||
|
var t time.Time
|
||||||
|
switch precision {
|
||||||
|
case "h":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Hour))
|
||||||
|
case "m":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Minute))
|
||||||
|
case "s":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Second))
|
||||||
|
case "ms":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Millisecond))
|
||||||
|
case "u":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Microsecond))
|
||||||
|
case "n":
|
||||||
|
t = time.Unix(0, epoch)
|
||||||
|
default:
|
||||||
|
return time.Time{}, fmt.Errorf("Unknown precision %q", precision)
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPrecision will round a time to the specified precision
|
||||||
|
func SetPrecision(t time.Time, precision string) time.Time {
|
||||||
|
switch precision {
|
||||||
|
case "n":
|
||||||
|
case "u":
|
||||||
|
return t.Round(time.Microsecond)
|
||||||
|
case "ms":
|
||||||
|
return t.Round(time.Millisecond)
|
||||||
|
case "s":
|
||||||
|
return t.Round(time.Second)
|
||||||
|
case "m":
|
||||||
|
return t.Round(time.Minute)
|
||||||
|
case "h":
|
||||||
|
return t.Round(time.Hour)
|
||||||
|
}
|
||||||
|
return t
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ConsistencyLevel represent a required replication criteria before a write can
|
||||||
|
// be returned as successful
|
||||||
|
type ConsistencyLevel int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ConsistencyLevelAny allows for hinted hand off, potentially no write happened yet
|
||||||
|
ConsistencyLevelAny ConsistencyLevel = iota
|
||||||
|
|
||||||
|
// ConsistencyLevelOne requires at least one data node acknowledged a write
|
||||||
|
ConsistencyLevelOne
|
||||||
|
|
||||||
|
// ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write
|
||||||
|
ConsistencyLevelQuorum
|
||||||
|
|
||||||
|
// ConsistencyLevelAll requires all data nodes to acknowledge a write
|
||||||
|
ConsistencyLevelAll
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrInvalidConsistencyLevel is returned when parsing the string version
|
||||||
|
// of a consistency level.
|
||||||
|
ErrInvalidConsistencyLevel = errors.New("invalid consistency level")
|
||||||
|
)
|
||||||
|
|
||||||
|
// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const
|
||||||
|
func ParseConsistencyLevel(level string) (ConsistencyLevel, error) {
|
||||||
|
switch strings.ToLower(level) {
|
||||||
|
case "any":
|
||||||
|
return ConsistencyLevelAny, nil
|
||||||
|
case "one":
|
||||||
|
return ConsistencyLevelOne, nil
|
||||||
|
case "quorum":
|
||||||
|
return ConsistencyLevelQuorum, nil
|
||||||
|
case "all":
|
||||||
|
return ConsistencyLevelAll, nil
|
||||||
|
default:
|
||||||
|
return 0, ErrInvalidConsistencyLevel
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,60 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"hash/fnv"
|
||||||
|
"sort"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Row represents a single row returned from the execution of a statement.
|
||||||
|
type Row struct {
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
Tags map[string]string `json:"tags,omitempty"`
|
||||||
|
Columns []string `json:"columns,omitempty"`
|
||||||
|
Values [][]interface{} `json:"values,omitempty"`
|
||||||
|
Err error `json:"err,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SameSeries returns true if r contains values for the same series as o.
|
||||||
|
func (r *Row) SameSeries(o *Row) bool {
|
||||||
|
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// tagsHash returns a hash of tag key/value pairs.
|
||||||
|
func (r *Row) tagsHash() uint64 {
|
||||||
|
h := fnv.New64a()
|
||||||
|
keys := r.tagsKeys()
|
||||||
|
for _, k := range keys {
|
||||||
|
h.Write([]byte(k))
|
||||||
|
h.Write([]byte(r.Tags[k]))
|
||||||
|
}
|
||||||
|
return h.Sum64()
|
||||||
|
}
|
||||||
|
|
||||||
|
// tagKeys returns a sorted list of tag keys.
|
||||||
|
func (r *Row) tagsKeys() []string {
|
||||||
|
a := make([]string, 0, len(r.Tags))
|
||||||
|
for k := range r.Tags {
|
||||||
|
a = append(a, k)
|
||||||
|
}
|
||||||
|
sort.Strings(a)
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rows represents a collection of rows. Rows implements sort.Interface.
|
||||||
|
type Rows []*Row
|
||||||
|
|
||||||
|
func (p Rows) Len() int { return len(p) }
|
||||||
|
|
||||||
|
func (p Rows) Less(i, j int) bool {
|
||||||
|
// Sort by name first.
|
||||||
|
if p[i].Name != p[j].Name {
|
||||||
|
return p[i].Name < p[j].Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort by tag set hash. Tags don't have a meaningful sort order so we
|
||||||
|
// just compute a hash and sort by that instead. This allows the tests
|
||||||
|
// to receive rows in a predictable order every time.
|
||||||
|
return p[i].tagsHash() < p[j].tagsHash()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
@ -0,0 +1,51 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
// Helper time methods since parsing time can easily overflow and we only support a
|
||||||
|
// specific time range.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// MaxNanoTime is the maximum time that can be represented via int64 nanoseconds since the epoch.
|
||||||
|
MaxNanoTime = time.Unix(0, math.MaxInt64).UTC()
|
||||||
|
// MinNanoTime is the minumum time that can be represented via int64 nanoseconds since the epoch.
|
||||||
|
MinNanoTime = time.Unix(0, math.MinInt64).UTC()
|
||||||
|
|
||||||
|
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
|
||||||
|
ErrTimeOutOfRange = fmt.Errorf("time outside range %s - %s", MinNanoTime, MaxNanoTime)
|
||||||
|
)
|
||||||
|
|
||||||
|
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
|
||||||
|
// supported range.
|
||||||
|
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
|
||||||
|
mult := GetPrecisionMultiplier(precision)
|
||||||
|
if t, ok := safeSignedMult(timestamp, mult); ok {
|
||||||
|
return time.Unix(0, t).UTC(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Time{}, ErrTimeOutOfRange
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckTime checks that a time is within the safe range.
|
||||||
|
func CheckTime(t time.Time) error {
|
||||||
|
if t.Before(MinNanoTime) || t.After(MaxNanoTime) {
|
||||||
|
return ErrTimeOutOfRange
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform the multiplication and check to make sure it didn't overflow.
|
||||||
|
func safeSignedMult(a, b int64) (int64, bool) {
|
||||||
|
if a == 0 || b == 0 || a == 1 || b == 1 {
|
||||||
|
return a * b, true
|
||||||
|
}
|
||||||
|
if a == math.MinInt64 || b == math.MaxInt64 {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
c := a * b
|
||||||
|
return c, c/b == a
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
package escape
|
||||||
|
|
||||||
|
import "bytes"
|
||||||
|
|
||||||
|
func Bytes(in []byte) []byte {
|
||||||
|
for b, esc := range Codes {
|
||||||
|
in = bytes.Replace(in, []byte{b}, esc, -1)
|
||||||
|
}
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
|
func Unescape(in []byte) []byte {
|
||||||
|
i := 0
|
||||||
|
inLen := len(in)
|
||||||
|
var out []byte
|
||||||
|
|
||||||
|
for {
|
||||||
|
if i >= inLen {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if in[i] == '\\' && i+1 < inLen {
|
||||||
|
switch in[i+1] {
|
||||||
|
case ',':
|
||||||
|
out = append(out, ',')
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
case '"':
|
||||||
|
out = append(out, '"')
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
case ' ':
|
||||||
|
out = append(out, ' ')
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
case '=':
|
||||||
|
out = append(out, '=')
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out = append(out, in[i])
|
||||||
|
i += 1
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
package escape
|
||||||
|
|
||||||
|
import "strings"
|
||||||
|
|
||||||
|
var (
|
||||||
|
Codes = map[byte][]byte{
|
||||||
|
',': []byte(`\,`),
|
||||||
|
'"': []byte(`\"`),
|
||||||
|
' ': []byte(`\ `),
|
||||||
|
'=': []byte(`\=`),
|
||||||
|
}
|
||||||
|
|
||||||
|
codesStr = map[string]string{}
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
for k, v := range Codes {
|
||||||
|
codesStr[string(k)] = string(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func UnescapeString(in string) string {
|
||||||
|
for b, esc := range codesStr {
|
||||||
|
in = strings.Replace(in, esc, b, -1)
|
||||||
|
}
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
|
func String(in string) string {
|
||||||
|
for b, esc := range codesStr {
|
||||||
|
in = strings.Replace(in, b, esc, -1)
|
||||||
|
}
|
||||||
|
return in
|
||||||
|
}
|
Loading…
Reference in New Issue