Fix handling large states in the Consul backend
This commit is contained in:
parent
e5fb84a01a
commit
5f444a6862
|
@ -198,72 +198,92 @@ func (c *RemoteClient) Put(data []byte) error {
|
||||||
verb = consulapi.KVSet
|
verb = consulapi.KVSet
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the payload is too large we first write the chunks and replace it
|
// The payload may be too large to store in a single KV entry in Consul. We
|
||||||
// 524288 is the default value, we just hope the user did not set a smaller
|
// could try to determine whether it will fit or not before sending the
|
||||||
// one but there is really no reason for them to do so, if they changed it
|
// request but since we are using the Transaction API and not the KV API,
|
||||||
// it is certainly to set a larger value.
|
// it grows by about a 1/3 when it is base64 encoded plus the overhead of
|
||||||
limit := 524288
|
// the fields specific to the Transaction API.
|
||||||
if len(payload) > limit {
|
// Rather than trying to calculate the overhead (which could change from
|
||||||
md5 := md5.Sum(data)
|
// one version of Consul to another, and between Consul Community Edition
|
||||||
chunks := split(payload, limit)
|
// and Consul Enterprise), we try to send the whole state in one request, if
|
||||||
chunkPaths := make([]string, 0)
|
// it fails because it is too big we then split it in chunks and send each
|
||||||
|
// chunk separately.
|
||||||
|
// When splitting in chunks, we make each chunk 524288 bits, which is the
|
||||||
|
// default max size for raft. If the user changed it, we still may send
|
||||||
|
// chunks too big and fail but this is not a setting that should be fiddled
|
||||||
|
// with anyway.
|
||||||
|
|
||||||
// First we write the new chunks
|
store := func(payload []byte) error {
|
||||||
for i, p := range chunks {
|
// KV.Put doesn't return the new index, so we use a single operation
|
||||||
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i)
|
// transaction to get the new index with a single request.
|
||||||
chunkPaths = append(chunkPaths, path)
|
txOps := consulapi.KVTxnOps{
|
||||||
_, err := kv.Put(&consulapi.KVPair{
|
&consulapi.KVTxnOp{
|
||||||
Key: path,
|
Verb: verb,
|
||||||
Value: p,
|
Key: c.Path,
|
||||||
}, nil)
|
Value: payload,
|
||||||
|
Index: c.modifyIndex,
|
||||||
if err != nil {
|
},
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We update the link to point to the new chunks
|
ok, resp, _, err := kv.Txn(txOps, nil)
|
||||||
payload, err = json.Marshal(map[string]interface{}{
|
if err != nil {
|
||||||
"current-hash": fmt.Sprintf("%x", md5),
|
return err
|
||||||
"chunks": chunkPaths,
|
}
|
||||||
})
|
// transaction was rolled back
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resp.Results) != 1 {
|
||||||
|
// this probably shouldn't happen
|
||||||
|
return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
|
||||||
|
}
|
||||||
|
|
||||||
|
c.modifyIndex = resp.Results[0].ModifyIndex
|
||||||
|
|
||||||
|
// We remove all the old chunks
|
||||||
|
cleanupOldChunks()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = store(payload); err == nil {
|
||||||
|
// The payload was small enough to be stored
|
||||||
|
return nil
|
||||||
|
} else if !strings.Contains(err.Error(), "too large") {
|
||||||
|
// We failed for some other reason, report this to the user
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// The payload was too large so we split it in multiple chunks
|
||||||
|
|
||||||
|
md5 := md5.Sum(data)
|
||||||
|
chunks := split(payload, 524288)
|
||||||
|
chunkPaths := make([]string, 0)
|
||||||
|
|
||||||
|
// First we write the new chunks
|
||||||
|
for i, p := range chunks {
|
||||||
|
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i)
|
||||||
|
chunkPaths = append(chunkPaths, path)
|
||||||
|
_, err := kv.Put(&consulapi.KVPair{
|
||||||
|
Key: path,
|
||||||
|
Value: p,
|
||||||
|
}, nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var txOps consulapi.KVTxnOps
|
// Then we update the link to point to the new chunks
|
||||||
// KV.Put doesn't return the new index, so we use a single operation
|
payload, err = json.Marshal(map[string]interface{}{
|
||||||
// transaction to get the new index with a single request.
|
"current-hash": fmt.Sprintf("%x", md5),
|
||||||
txOps = consulapi.KVTxnOps{
|
"chunks": chunkPaths,
|
||||||
&consulapi.KVTxnOp{
|
})
|
||||||
Verb: verb,
|
|
||||||
Key: c.Path,
|
|
||||||
Value: payload,
|
|
||||||
Index: c.modifyIndex,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
ok, resp, _, err := kv.Txn(txOps, nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// transaction was rolled back
|
return store(payload)
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(resp.Results) != 1 {
|
|
||||||
// this probably shouldn't happen
|
|
||||||
return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
|
|
||||||
}
|
|
||||||
|
|
||||||
c.modifyIndex = resp.Results[0].ModifyIndex
|
|
||||||
|
|
||||||
// We remove all the old chunks
|
|
||||||
cleanupOldChunks()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Delete() error {
|
func (c *RemoteClient) Delete() error {
|
||||||
|
|
Loading…
Reference in New Issue