Merge pull request #25856 from remilapeyre/consul-split-state

Split the state in chunks when they outgrow the limit of Consul KV store
This commit is contained in:
Alisdair McDiarmid 2020-09-15 10:00:25 -04:00 committed by GitHub
commit e183163739
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 316 additions and 12 deletions

View File

@ -72,7 +72,9 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
c.mu.Lock()
defer c.mu.Unlock()
pair, _, err := c.Client.KV().Get(c.Path, nil)
kv := c.Client.KV()
chunked, hash, chunks, pair, err := c.chunkedMode()
if err != nil {
return nil, err
}
@ -82,17 +84,36 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
c.modifyIndex = pair.ModifyIndex
payload := pair.Value
var payload []byte
if chunked {
for _, c := range chunks {
pair, _, err := kv.Get(c, nil)
if err != nil {
return nil, err
}
if pair == nil {
return nil, fmt.Errorf("Key %q could not be found", c)
}
payload = append(payload, pair.Value[:]...)
}
} else {
payload = pair.Value
}
// If the payload starts with 0x1f, it's gzip, not json
if len(pair.Value) >= 1 && pair.Value[0] == '\x1f' {
if data, err := uncompressState(pair.Value); err == nil {
payload = data
} else {
if len(payload) >= 1 && payload[0] == '\x1f' {
payload, err = uncompressState(payload)
if err != nil {
return nil, err
}
}
md5 := md5.Sum(pair.Value)
md5 := md5.Sum(payload)
if hash != "" && fmt.Sprintf("%x", md5) != hash {
return nil, fmt.Errorf("The remote state does not match the expected hash")
}
return &remote.Payload{
Data: payload,
MD5: md5[:],
@ -100,9 +121,65 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
}
func (c *RemoteClient) Put(data []byte) error {
// The state can be stored in 4 different ways, based on the payload size
// and whether the user enabled gzip:
// - single entry mode with plain JSON: a single JSON is stored at
// "tfstate/my_project"
// - single entry mode gzip: the JSON payload is first gziped and stored at
// "tfstate/my_project"
// - chunked mode with plain JSON: the JSON payload is split in pieces and
// stored like so:
// - "tfstate/my_project" -> a JSON payload that contains the path of
// the chunks and an MD5 sum like so:
// {
// "current-hash": "abcdef1234",
// "chunks": [
// "tfstate/my_project/tfstate.abcdef1234/0",
// "tfstate/my_project/tfstate.abcdef1234/1",
// "tfstate/my_project/tfstate.abcdef1234/2",
// ]
// }
// - "tfstate/my_project/tfstate.abcdef1234/0" -> The first chunk
// - "tfstate/my_project/tfstate.abcdef1234/1" -> The next one
// - ...
// - chunked mode with gzip: the same system but we gziped the JSON payload
// before splitting it in chunks
//
// When overwritting the current state, we need to clean the old chunks if
// we were in chunked mode (no matter whether we need to use chunks for the
// new one). To do so based on the 4 possibilities above we look at the
// value at "tfstate/my_project" and if it is:
// - absent then it's a new state and there will be nothing to cleanup,
// - not a JSON payload we were in single entry mode with gzip so there will
// be nothing to cleanup
// - a JSON payload, then we were either single entry mode with plain JSON
// or in chunked mode. To differentiate between the two we look whether a
// "current-hash" key is present in the payload. If we find one we were
// in chunked mode and we will need to remove the old chunks (whether or
// not we were using gzip does not matter in that case).
c.mu.Lock()
defer c.mu.Unlock()
kv := c.Client.KV()
// First we determine what mode we were using and to prepare the cleanup
chunked, hash, _, _, err := c.chunkedMode()
if err != nil {
return err
}
cleanupOldChunks := func() {}
if chunked {
cleanupOldChunks = func() {
// We ignore all errors that can happen here because we already
// saved the new state and there is no way to return a warning to
// the user. We may end up with dangling chunks but there is no way
// to be sure we won't.
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash)
kv.DeleteTree(path, nil)
}
}
payload := data
if c.GZip {
if compressedState, err := compressState(data); err == nil {
@ -112,8 +189,6 @@ func (c *RemoteClient) Put(data []byte) error {
}
}
kv := c.Client.KV()
// default to doing a CAS
verb := consulapi.KVCAS
@ -123,9 +198,44 @@ func (c *RemoteClient) Put(data []byte) error {
verb = consulapi.KVSet
}
// If the payload is too large we first write the chunks and replace it
// 524288 is the default value, we just hope the user did not set a smaller
// one but there is really no reason for them to do so, if they changed it
// it is certainly to set a larger value.
limit := 524288
if len(payload) > limit {
md5 := md5.Sum(data)
chunks := split(payload, limit)
chunkPaths := make([]string, 0)
// First we write the new chunks
for i, p := range chunks {
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i)
chunkPaths = append(chunkPaths, path)
_, err := kv.Put(&consulapi.KVPair{
Key: path,
Value: p,
}, nil)
if err != nil {
return err
}
}
// We update the link to point to the new chunks
payload, err = json.Marshal(map[string]interface{}{
"current-hash": fmt.Sprintf("%x", md5),
"chunks": chunkPaths,
})
if err != nil {
return err
}
}
var txOps consulapi.KVTxnOps
// KV.Put doesn't return the new index, so we use a single operation
// transaction to get the new index with a single request.
txOps := consulapi.KVTxnOps{
txOps = consulapi.KVTxnOps{
&consulapi.KVTxnOp{
Verb: verb,
Key: c.Path,
@ -138,7 +248,6 @@ func (c *RemoteClient) Put(data []byte) error {
if err != nil {
return err
}
// transaction was rolled back
if !ok {
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
@ -150,6 +259,10 @@ func (c *RemoteClient) Put(data []byte) error {
}
c.modifyIndex = resp.Results[0].ModifyIndex
// We remove all the old chunks
cleanupOldChunks()
return nil
}
@ -158,7 +271,20 @@ func (c *RemoteClient) Delete() error {
defer c.mu.Unlock()
kv := c.Client.KV()
_, err := kv.Delete(c.Path, nil)
chunked, hash, _, _, err := c.chunkedMode()
if err != nil {
return err
}
_, err = kv.Delete(c.Path, nil)
// If there were chunks we need to remove them
if chunked {
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash)
kv.DeleteTree(path, nil)
}
return err
}
@ -473,3 +599,42 @@ func uncompressState(data []byte) ([]byte, error) {
}
return b.Bytes(), nil
}
func split(payload []byte, limit int) [][]byte {
var chunk []byte
chunks := make([][]byte, 0, len(payload)/limit+1)
for len(payload) >= limit {
chunk, payload = payload[:limit], payload[limit:]
chunks = append(chunks, chunk)
}
if len(payload) > 0 {
chunks = append(chunks, payload[:])
}
return chunks
}
func (c *RemoteClient) chunkedMode() (bool, string, []string, *consulapi.KVPair, error) {
kv := c.Client.KV()
pair, _, err := kv.Get(c.Path, nil)
if err != nil {
return false, "", nil, pair, err
}
if pair != nil {
var d map[string]interface{}
err = json.Unmarshal(pair.Value, &d)
// If there is an error when unmarshaling the payload, the state has
// probably been gziped in single entry mode.
if err == nil {
// If we find the "current-hash" key we were in chunked mode
hash, ok := d["current-hash"]
if ok {
chunks := make([]string, 0)
for _, c := range d["chunks"].([]interface{}) {
chunks = append(chunks, c.(string))
}
return true, hash.(string), chunks, pair, nil
}
}
}
return false, "", nil, pair, nil
}

View File

@ -1,9 +1,14 @@
package consul
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
"net"
"reflect"
"strings"
"sync"
"testing"
"time"
@ -80,6 +85,140 @@ func TestRemoteClient_gzipUpgrade(t *testing.T) {
remote.TestClient(t, state.(*remote.State).Client)
}
// TestConsul_largeState tries to write a large payload using the Consul state
// manager, as there is a limit to the size of the values in the KV store it
// will need to be split up before being saved and put back together when read.
func TestConsul_largeState(t *testing.T) {
path := "tf-unit/test-large-state"
b := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
"address": srv.HTTPAddr,
"path": path,
}))
s, err := b.StateMgr(backend.DefaultStateName)
if err != nil {
t.Fatal(err)
}
c := s.(*remote.State).Client.(*RemoteClient)
c.Path = path
// testPaths fails the test if the keys found at the prefix don't match
// what is expected
testPaths := func(t *testing.T, expected []string) {
kv := c.Client.KV()
pairs, _, err := kv.List(c.Path, nil)
if err != nil {
t.Fatal(err)
}
res := make([]string, 0)
for _, p := range pairs {
res = append(res, p.Key)
}
if !reflect.DeepEqual(res, expected) {
t.Fatalf("Wrong keys: %#v", res)
}
}
testPayload := func(t *testing.T, data map[string]string, keys []string) {
payload, err := json.Marshal(data)
if err != nil {
t.Fatal(err)
}
err = c.Put(payload)
if err != nil {
t.Fatal("could not put payload", err)
}
remote, err := c.Get()
if err != nil {
t.Fatal(err)
}
// md5 := md5.Sum(payload)
// if !bytes.Equal(md5[:], remote.MD5) {
// t.Fatal("the md5 sums do not match")
// }
if !bytes.Equal(payload, remote.Data) {
t.Fatal("the data do not match")
}
testPaths(t, keys)
}
// The default limit for the size of the value in Consul is 524288 bytes
testPayload(
t,
map[string]string{
"foo": strings.Repeat("a", 524288+2),
},
[]string{
"tf-unit/test-large-state",
"tf-unit/test-large-state/tfstate.2cb96f52c9fff8e0b56cb786ec4d2bed/0",
"tf-unit/test-large-state/tfstate.2cb96f52c9fff8e0b56cb786ec4d2bed/1",
},
)
// We try to replace the payload with a small one, the old chunks should be removed
testPayload(
t,
map[string]string{"var": "a"},
[]string{"tf-unit/test-large-state"},
)
// Test with gzip and chunks
b = backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
"address": srv.HTTPAddr,
"path": path,
"gzip": true,
}))
s, err = b.StateMgr(backend.DefaultStateName)
if err != nil {
t.Fatal(err)
}
c = s.(*remote.State).Client.(*RemoteClient)
c.Path = path
// We need a long random string so it results in multiple chunks even after
// being gziped
// We use a fixed seed so the test can be reproductible
rand.Seed(1234)
RandStringRunes := func(n int) string {
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
testPayload(
t,
map[string]string{
"bar": RandStringRunes(5 * (524288 + 2)),
},
[]string{
"tf-unit/test-large-state",
"tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/0",
"tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/1",
"tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/2",
"tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/3",
},
)
// Deleting the state should remove all chunks
err = c.Delete()
if err != nil {
t.Fatal(err)
}
testPaths(t, []string{})
}
func TestConsul_stateLock(t *testing.T) {
testCases := []string{
fmt.Sprintf("tf-unit/%s", time.Now().String()),