Split the state in chunks when they outgrow the limit of Consul KV store

The Consul KV store limits the size of the values in the KV store to 524288
bytes. Once the state reaches this limit Consul will refuse to save it. It is
currently possible to try to bypass this limitation by enable Gzip but the issue
will manifest itself later. This is particularly inconvenient as it is possible
for the state to reach this limit without changing the Terraform configuration
as datasources or computed attributes can suddenly return more data than they
used to. Several users already had issues with this.

To fix the problem once and for all we now split the payload in chunks of 524288
bytes when they are to large and store them separatly in the KV store. A small
JSON payload that references all the chunks so we can retrieve them later and
concatenate them to reconstruct the payload.

While this has the caveat of requiring multiple calls to Consul that cannot be
done as a single transaction as those have the same size limit, we use unique
paths for the chunks and CAS when setting the last payload so possible issues
during calls to Put() should not result in unreadable states.

Closes https://github.com/hashicorp/terraform/issues/19182
This commit is contained in:
Rémi Lapeyre 2020-08-14 17:38:18 +02:00
parent f5c8ef19ad
commit e680211bc0
3 changed files with 322 additions and 12 deletions

View File

@ -1,6 +1,7 @@
package consul package consul
import ( import (
"flag"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -39,6 +40,10 @@ func newConsulTestServer() (*testutil.TestServer, error) {
srv, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) { srv, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
c.LogLevel = "warn" c.LogLevel = "warn"
if !flag.Parsed() {
flag.Parse()
}
if !testing.Verbose() { if !testing.Verbose() {
c.Stdout = ioutil.Discard c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard c.Stderr = ioutil.Discard

View File

@ -9,6 +9,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"strings"
"sync" "sync"
"time" "time"
@ -71,7 +72,9 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
pair, _, err := c.Client.KV().Get(c.Path, nil) kv := c.Client.KV()
chunked, hash, chunks, pair, err := c.chunkedMode()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -81,17 +84,36 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
c.modifyIndex = pair.ModifyIndex c.modifyIndex = pair.ModifyIndex
payload := pair.Value var payload []byte
if chunked {
for _, c := range chunks {
pair, _, err := kv.Get(c, nil)
if err != nil {
return nil, err
}
if pair == nil {
return nil, fmt.Errorf("Key %q could not be found", c)
}
payload = append(payload, pair.Value[:]...)
}
} else {
payload = pair.Value
}
// If the payload starts with 0x1f, it's gzip, not json // If the payload starts with 0x1f, it's gzip, not json
if len(pair.Value) >= 1 && pair.Value[0] == '\x1f' { if len(payload) >= 1 && payload[0] == '\x1f' {
if data, err := uncompressState(pair.Value); err == nil { payload, err = uncompressState(payload)
payload = data if err != nil {
} else {
return nil, err return nil, err
} }
} }
md5 := md5.Sum(pair.Value) md5 := md5.Sum(payload)
if hash != "" && fmt.Sprintf("%x", md5) != hash {
return nil, fmt.Errorf("The remote state does not match the expected hash")
}
return &remote.Payload{ return &remote.Payload{
Data: payload, Data: payload,
MD5: md5[:], MD5: md5[:],
@ -99,9 +121,65 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
} }
func (c *RemoteClient) Put(data []byte) error { func (c *RemoteClient) Put(data []byte) error {
// The state can be stored in 4 different ways, based on the payload size
// and whether the user enabled gzip:
// - single entry mode with plain JSON: a single JSON is stored at
// "tfstate/my_project"
// - single entry mode gzip: the JSON payload is first gziped and stored at
// "tfstate/my_project"
// - chunked mode with plain JSON: the JSON payload is split in pieces and
// stored like so:
// - "tfstate/my_project" -> a JSON payload that contains the path of
// the chunks and an MD5 sum like so:
// {
// "current-hash": "abcdef1234",
// "chunks": [
// "tfstate/my_project/tfstate.abcdef1234/0",
// "tfstate/my_project/tfstate.abcdef1234/1",
// "tfstate/my_project/tfstate.abcdef1234/2",
// ]
// }
// - "tfstate/my_project/tfstate.abcdef1234/0" -> The first chunk
// - "tfstate/my_project/tfstate.abcdef1234/1" -> The next one
// - ...
// - chunked mode with gzip: the same system but we gziped the JSON payload
// before splitting it in chunks
//
// When overwritting the current state, we need to clean the old chunks if
// we were in chunked mode (no matter whether we need to use chunks for the
// new one). To do so based on the 4 possibilities above we look at the
// value at "tfstate/my_project" and if it is:
// - absent then it's a new state and there will be nothing to cleanup,
// - not a JSON payload we were in single entry mode with gzip so there will
// be nothing to cleanup
// - a JSON payload, then we were either single entry mode with plain JSON
// or in chunked mode. To differentiate between the two we look whether a
// "current-hash" key is present in the payload. If we find one we were
// in chunked mode and we will need to remove the old chunks (whether or
// not we were using gzip does not matter in that case).
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
kv := c.Client.KV()
// First we determine what mode we were using and to prepare the cleanup
chunked, hash, _, _, err := c.chunkedMode()
if err != nil {
return err
}
cleanupOldChunks := func() {}
if chunked {
cleanupOldChunks = func() {
// We ignore all errors that can happen here because we already
// saved the new state and there is no way to return a warning to
// the user. We may end up with dangling chunks but there is no way
// to be sure we won't.
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash)
kv.DeleteTree(path, nil)
}
}
payload := data payload := data
if c.GZip { if c.GZip {
if compressedState, err := compressState(data); err == nil { if compressedState, err := compressState(data); err == nil {
@ -111,8 +189,6 @@ func (c *RemoteClient) Put(data []byte) error {
} }
} }
kv := c.Client.KV()
// default to doing a CAS // default to doing a CAS
verb := consulapi.KVCAS verb := consulapi.KVCAS
@ -122,9 +198,44 @@ func (c *RemoteClient) Put(data []byte) error {
verb = consulapi.KVSet verb = consulapi.KVSet
} }
// If the payload is too large we first write the chunks and replace it
// 524288 is the default value, we just hope the user did not set a smaller
// one but there is really no reason for them to do so, if they changed it
// it is certainly to set a larger value.
limit := 524288
if len(payload) > limit {
md5 := md5.Sum(data)
chunks := split(payload, limit)
chunkPaths := make([]string, 0)
// First we write the new chunks
for i, p := range chunks {
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i)
chunkPaths = append(chunkPaths, path)
_, err := kv.Put(&consulapi.KVPair{
Key: path,
Value: p,
}, nil)
if err != nil {
return err
}
}
// We update the link to point to the new chunks
payload, err = json.Marshal(map[string]interface{}{
"current-hash": fmt.Sprintf("%x", md5),
"chunks": chunkPaths,
})
if err != nil {
return err
}
}
var txOps consulapi.KVTxnOps
// KV.Put doesn't return the new index, so we use a single operation // KV.Put doesn't return the new index, so we use a single operation
// transaction to get the new index with a single request. // transaction to get the new index with a single request.
txOps := consulapi.KVTxnOps{ txOps = consulapi.KVTxnOps{
&consulapi.KVTxnOp{ &consulapi.KVTxnOp{
Verb: verb, Verb: verb,
Key: c.Path, Key: c.Path,
@ -137,7 +248,6 @@ func (c *RemoteClient) Put(data []byte) error {
if err != nil { if err != nil {
return err return err
} }
// transaction was rolled back // transaction was rolled back
if !ok { if !ok {
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors) return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
@ -149,6 +259,10 @@ func (c *RemoteClient) Put(data []byte) error {
} }
c.modifyIndex = resp.Results[0].ModifyIndex c.modifyIndex = resp.Results[0].ModifyIndex
// We remove all the old chunks
cleanupOldChunks()
return nil return nil
} }
@ -157,7 +271,20 @@ func (c *RemoteClient) Delete() error {
defer c.mu.Unlock() defer c.mu.Unlock()
kv := c.Client.KV() kv := c.Client.KV()
_, err := kv.Delete(c.Path, nil)
chunked, hash, _, _, err := c.chunkedMode()
if err != nil {
return err
}
_, err = kv.Delete(c.Path, nil)
// If there were chunks we need to remove them
if chunked {
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash)
kv.DeleteTree(path, nil)
}
return err return err
} }
@ -466,3 +593,42 @@ func uncompressState(data []byte) ([]byte, error) {
} }
return b.Bytes(), nil return b.Bytes(), nil
} }
func split(payload []byte, limit int) [][]byte {
var chunk []byte
chunks := make([][]byte, 0, len(payload)/limit+1)
for len(payload) >= limit {
chunk, payload = payload[:limit], payload[limit:]
chunks = append(chunks, chunk)
}
if len(payload) > 0 {
chunks = append(chunks, payload[:])
}
return chunks
}
func (c *RemoteClient) chunkedMode() (bool, string, []string, *consulapi.KVPair, error) {
kv := c.Client.KV()
pair, _, err := kv.Get(c.Path, nil)
if err != nil {
return false, "", nil, pair, err
}
if pair != nil {
var d map[string]interface{}
err = json.Unmarshal(pair.Value, &d)
// If there is an error when unmarshaling the payload, the state has
// probably been gziped in single entry mode.
if err == nil {
// If we find the "current-hash" key we were in chunked mode
hash, ok := d["current-hash"]
if ok {
chunks := make([]string, 0)
for _, c := range d["chunks"].([]interface{}) {
chunks = append(chunks, c.(string))
}
return true, hash.(string), chunks, pair, nil
}
}
}
return false, "", nil, pair, nil
}

View File

@ -1,9 +1,14 @@
package consul package consul
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"math/rand"
"net" "net"
"reflect"
"strings"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -71,6 +76,140 @@ func TestRemoteClient_gzipUpgrade(t *testing.T) {
remote.TestClient(t, state.(*remote.State).Client) remote.TestClient(t, state.(*remote.State).Client)
} }
// TestConsul_largeState tries to write a large payload using the Consul state
// manager, as there is a limit to the size of the values in the KV store it
// will need to be split up before being saved and put back together when read.
func TestConsul_largeState(t *testing.T) {
path := "tf-unit/test-large-state"
b := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
"address": srv.HTTPAddr,
"path": path,
}))
s, err := b.StateMgr(backend.DefaultStateName)
if err != nil {
t.Fatal(err)
}
c := s.(*remote.State).Client.(*RemoteClient)
c.Path = path
// testPaths fails the test if the keys found at the prefix don't match
// what is expected
testPaths := func(t *testing.T, expected []string) {
kv := c.Client.KV()
pairs, _, err := kv.List(c.Path, nil)
if err != nil {
t.Fatal(err)
}
res := make([]string, 0)
for _, p := range pairs {
res = append(res, p.Key)
}
if !reflect.DeepEqual(res, expected) {
t.Fatalf("Wrong keys: %#v", res)
}
}
testPayload := func(t *testing.T, data map[string]string, keys []string) {
payload, err := json.Marshal(data)
if err != nil {
t.Fatal(err)
}
err = c.Put(payload)
if err != nil {
t.Fatal("could not put payload", err)
}
remote, err := c.Get()
if err != nil {
t.Fatal(err)
}
// md5 := md5.Sum(payload)
// if !bytes.Equal(md5[:], remote.MD5) {
// t.Fatal("the md5 sums do not match")
// }
if !bytes.Equal(payload, remote.Data) {
t.Fatal("the data do not match")
}
testPaths(t, keys)
}
// The default limit for the size of the value in Consul is 524288 bytes
testPayload(
t,
map[string]string{
"foo": strings.Repeat("a", 524288+2),
},
[]string{
"tf-unit/test-large-state",
"tf-unit/test-large-state/tfstate.2cb96f52c9fff8e0b56cb786ec4d2bed/0",
"tf-unit/test-large-state/tfstate.2cb96f52c9fff8e0b56cb786ec4d2bed/1",
},
)
// We try to replace the payload with a small one, the old chunks should be removed
testPayload(
t,
map[string]string{"var": "a"},
[]string{"tf-unit/test-large-state"},
)
// Test with gzip and chunks
b = backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
"address": srv.HTTPAddr,
"path": path,
"gzip": true,
}))
s, err = b.StateMgr(backend.DefaultStateName)
if err != nil {
t.Fatal(err)
}
c = s.(*remote.State).Client.(*RemoteClient)
c.Path = path
// We need a long random string so it results in multiple chunks even after
// being gziped
// We use a fixed seed so the test can be reproductible
rand.Seed(1234)
RandStringRunes := func(n int) string {
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
testPayload(
t,
map[string]string{
"bar": RandStringRunes(5 * (524288 + 2)),
},
[]string{
"tf-unit/test-large-state",
"tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/0",
"tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/1",
"tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/2",
"tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/3",
},
)
// Deleting the state should remove all chunks
err = c.Delete()
if err != nil {
t.Fatal(err)
}
testPaths(t, []string{})
}
func TestConsul_stateLock(t *testing.T) { func TestConsul_stateLock(t *testing.T) {
path := fmt.Sprintf("tf-unit/%s", time.Now().String()) path := fmt.Sprintf("tf-unit/%s", time.Now().String())