helper/shadow: ComparedValue

This commit is contained in:
Mitchell Hashimoto 2016-10-06 12:15:41 -07:00
parent 548a585762
commit 50e0647c53
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
3 changed files with 307 additions and 0 deletions

View File

@ -0,0 +1,128 @@
package shadow
import (
"sync"
)
// ComparedValue is a struct that finds a value by comparing some key
// to the list of stored values. This is useful when there is no easy
// uniquely identifying key that works in a map (for that, use KeyedValue).
//
// ComparedValue is very expensive, relative to other Value types. Try to
// limit the number of values stored in a ComparedValue by potentially
// nesting it within a KeyedValue (a keyed value points to a compared value,
// for example).
type ComparedValue struct {
// Func is a function that is given the lookup key and a single
// stored value. If it matches, it returns true.
Func func(k, v interface{}) bool
lock sync.Mutex
once sync.Once
closed bool
values []interface{}
waiters map[interface{}]*Value
}
// Close closes the value. This can never fail. For a definition of
// "close" see the ErrClosed docs.
func (w *ComparedValue) Close() error {
w.lock.Lock()
defer w.lock.Unlock()
// Set closed to true always
w.closed = true
// For all waiters, complete with ErrClosed
for k, val := range w.waiters {
val.SetValue(ErrClosed)
delete(w.waiters, k)
}
return nil
}
// Value returns the value that was set for the given key, or blocks
// until one is available.
func (w *ComparedValue) Value(k interface{}) interface{} {
v, val := w.valueWaiter(k)
if val == nil {
return v
}
return val.Value()
}
// ValueOk gets the value for the given key, returning immediately if the
// value doesn't exist. The second return argument is true if the value exists.
func (w *ComparedValue) ValueOk(k interface{}) (interface{}, bool) {
v, val := w.valueWaiter(k)
return v, val == nil
}
func (w *ComparedValue) SetValue(v interface{}) {
w.lock.Lock()
defer w.lock.Unlock()
w.once.Do(w.init)
// Check if we already have this exact value (by simply comparing
// with == directly). If we do, then we don't insert it again.
found := false
for _, v2 := range w.values {
if v == v2 {
found = true
break
}
}
if !found {
// Set the value, always
w.values = append(w.values, v)
}
// Go through the waiters
for k, val := range w.waiters {
if w.Func(k, v) {
val.SetValue(v)
delete(w.waiters, k)
}
}
}
func (w *ComparedValue) valueWaiter(k interface{}) (interface{}, *Value) {
w.lock.Lock()
w.once.Do(w.init)
// Look for a pre-existing value
for _, v := range w.values {
if w.Func(k, v) {
w.lock.Unlock()
return v, nil
}
}
// If we're closed, return that
if w.closed {
w.lock.Unlock()
return ErrClosed, nil
}
// Pre-existing value doesn't exist, create a waiter
val := w.waiters[k]
if val == nil {
val = new(Value)
w.waiters[k] = val
}
w.lock.Unlock()
// Return the waiter
return nil, val
}
// Must be called with w.lock held.
func (w *ComparedValue) init() {
w.waiters = make(map[interface{}]*Value)
if w.Func == nil {
w.Func = func(k, v interface{}) bool { return k == v }
}
}

View File

@ -0,0 +1,178 @@
package shadow
import (
"testing"
"time"
)
func TestComparedValue(t *testing.T) {
v := &ComparedValue{
Func: func(k, v interface{}) bool { return k == v },
}
// Start trying to get the value
valueCh := make(chan interface{})
go func() {
valueCh <- v.Value("foo")
}()
// We should not get the value
select {
case <-valueCh:
t.Fatal("shouldn't receive value")
case <-time.After(10 * time.Millisecond):
}
// Set the value
v.SetValue("foo")
val := <-valueCh
// Verify
if val != "foo" {
t.Fatalf("bad: %#v", val)
}
// We should get the next value
val = v.Value("foo")
if val != "foo" {
t.Fatalf("bad: %#v", val)
}
}
func TestComparedValue_setFirst(t *testing.T) {
v := &ComparedValue{
Func: func(k, v interface{}) bool { return k == v },
}
// Set the value
v.SetValue("foo")
val := v.Value("foo")
// Verify
if val != "foo" {
t.Fatalf("bad: %#v", val)
}
}
func TestComparedValueOk(t *testing.T) {
v := &ComparedValue{
Func: func(k, v interface{}) bool { return k == v },
}
// Try
val, ok := v.ValueOk("foo")
if ok {
t.Fatal("should not be ok")
}
// Set
v.SetValue("foo")
// Try again
val, ok = v.ValueOk("foo")
if !ok {
t.Fatal("should be ok")
}
// Verify
if val != "foo" {
t.Fatalf("bad: %#v", val)
}
}
func TestComparedValueClose(t *testing.T) {
v := &ComparedValue{
Func: func(k, v interface{}) bool { return k == v },
}
// Close
v.Close()
// Try again
val, ok := v.ValueOk("foo")
if !ok {
t.Fatal("should be ok")
}
// Verify
if val != ErrClosed {
t.Fatalf("bad: %#v", val)
}
}
func TestComparedValueClose_blocked(t *testing.T) {
var v ComparedValue
// Start reading this should be blocking
valueCh := make(chan interface{})
go func() {
valueCh <- v.Value("foo")
}()
// We should not get the value
select {
case <-valueCh:
t.Fatal("shouldn't receive value")
case <-time.After(10 * time.Millisecond):
}
// Close
v.Close()
// Verify
val := <-valueCh
if val != ErrClosed {
t.Fatalf("bad: %#v", val)
}
}
func TestComparedValueClose_existing(t *testing.T) {
var v ComparedValue
// Set a value
v.SetValue("foo")
// Close
v.Close()
// Try again
val, ok := v.ValueOk("foo")
if !ok {
t.Fatal("should be ok")
}
// Verify
if val != "foo" {
t.Fatalf("bad: %#v", val)
}
}
func TestComparedValueClose_existingBlocked(t *testing.T) {
var v ComparedValue
// Start reading this should be blocking
valueCh := make(chan interface{})
go func() {
valueCh <- v.Value("foo")
}()
// Wait
time.Sleep(10 * time.Millisecond)
// Set a value
v.SetValue("foo")
// Close
v.Close()
// Try again
val, ok := v.ValueOk("foo")
if !ok {
t.Fatal("should be ok")
}
// Verify
if val != "foo" {
t.Fatalf("bad: %#v", val)
}
}

View File

@ -108,6 +108,7 @@ func (w *KeyedValue) valueWaiter(k string) (interface{}, *Value) {
// If we're closed, return that // If we're closed, return that
if w.closed { if w.closed {
w.lock.Unlock()
return ErrClosed, nil return ErrClosed, nil
} }