add redis-breaker

This commit is contained in:
41666 2020-12-02 17:57:59 -05:00
parent d3394412db
commit 00f0741e8b
2 changed files with 286 additions and 0 deletions

View file

@ -0,0 +1,156 @@
// Package redisbreaker provides a go-redis v8 instance designed for resilient caching via circuit breakers.
// tl;dr: If redis is lost, it can either cache objects in memory using sync.Map or dropping gracefully.
// As a side benefit, it means we don't need a redis server to develop locally, unless we want one :)
package redisbreaker
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/sony/gobreaker"
)
type RedisBreaker struct {
redisClient *redis.Client
breaker *gobreaker.CircuitBreaker
config *RedisBreakerConfig
inMemoryCache sync.Map
}
type RedisBreakerConfig struct {
Redis *redis.Options
Breaker gobreaker.Settings
UseInMemoryCache bool
DefaultTTL time.Duration
}
type inmemoryCacheObject struct {
expiresAt time.Time
object []byte
}
func NewRedisBreaker(config *RedisBreakerConfig) *RedisBreaker {
if config == nil {
config = &RedisBreakerConfig{
UseInMemoryCache: true,
}
}
if config.DefaultTTL == 0 {
config.DefaultTTL = 2 * time.Minute
}
breaker := &RedisBreaker{
config: config,
redisClient: redis.NewClient(config.Redis),
breaker: gobreaker.NewCircuitBreaker(config.Breaker),
}
return breaker
}
func (rb *RedisBreaker) doOr(
ctx context.Context,
func1 func(context.Context, string, interface{}) (interface{}, error),
func2 func(context.Context, string, interface{}) (interface{}, error),
key string,
object interface{},
) (interface{}, error) {
val, err := rb.breaker.Execute(func() (interface{}, error) {
return func1(ctx, key, object)
})
if err == gobreaker.ErrOpenState || err == gobreaker.ErrTooManyRequests {
return func2(ctx, key, object)
}
return val, err
}
// Set pushes an object into the cache with the specified default TTL, using SetEX
func (rb *RedisBreaker) Set(ctx context.Context, key string, object interface{}) error {
_, err := rb.doOr(ctx, rb.setRedis, rb.setInmemory, key, object)
return err
}
func (rb *RedisBreaker) setRedis(ctx context.Context, key string, object interface{}) (interface{}, error) {
objectJSON, err := json.Marshal(object)
if err != nil {
return nil, err
}
return rb.redisClient.SetEX(ctx, key, objectJSON, rb.config.DefaultTTL).Result()
}
func (rb *RedisBreaker) setInmemory(ctx context.Context, key string, object interface{}) (interface{}, error) {
if rb.config.UseInMemoryCache {
objectJSON, err := json.Marshal(object)
if err != nil {
return nil, err
}
rb.inMemoryCache.Store(key, inmemoryCacheObject{
expiresAt: time.Now().Add(rb.config.DefaultTTL),
object: objectJSON,
})
}
return nil, nil
}
// Get pulls an object from cache, returning ok = true if it succeeded.
func (rb *RedisBreaker) Get(ctx context.Context, key string, object interface{}) (bool, error) {
ok, err := rb.doOr(ctx, rb.getRedis, rb.getInmemory, key, object)
return ok.(bool), err
}
func (rb *RedisBreaker) getRedis(ctx context.Context, key string, object interface{}) (interface{}, error) {
result := rb.redisClient.Get(ctx, key)
if result.Err() != nil {
if result.Err() == redis.Nil {
return false, nil
}
return false, result.Err()
}
objectJSON, err := result.Bytes()
if err != nil {
return false, err
}
err = json.Unmarshal(objectJSON, object)
return true, err
}
func (rb *RedisBreaker) getInmemory(ctx context.Context, key string, object interface{}) (interface{}, error) {
if !rb.config.UseInMemoryCache {
return false, nil
}
cacheObjIntf, ok := rb.inMemoryCache.Load(key)
if !ok {
return false, nil
}
cacheObj, ok := cacheObjIntf.(inmemoryCacheObject)
if !ok {
return false, nil
}
if time.Now().After(cacheObj.expiresAt) {
return false, nil
}
err := json.Unmarshal(cacheObj.object, object)
if err != nil {
return false, err
}
return true, nil
}

View file

@ -0,0 +1,130 @@
package redisbreaker_test
import (
"context"
"testing"
"time"
"github.com/alicebob/miniredis/v2"
"github.com/go-redis/redis/v8"
"github.com/onsi/gomega"
"github.com/sony/gobreaker"
redisbreaker "github.com/roleypoly/roleypoly/src/redis-breaker"
)
func getBreaker(breakerOpen bool) (*redisbreaker.RedisBreaker, *miniredis.Miniredis) {
redisServer, err := miniredis.Run()
if err != nil {
panic(err)
}
config := &redisbreaker.RedisBreakerConfig{
Redis: &redis.Options{
Addr: redisServer.Addr(),
},
UseInMemoryCache: true,
DefaultTTL: 1 * time.Second,
}
if breakerOpen {
config.Breaker.ReadyToTrip = func(gobreaker.Counts) bool {
return true
}
redisServer.Close()
}
rb := redisbreaker.NewRedisBreaker(config)
if breakerOpen {
// forcibly open the breaker
rb.Set(context.Background(), "@@@breaker@@@", nil)
}
return rb, redisServer
}
type TestData struct {
IAmAField1 string
IAmAField2 int
IAmAField3 map[string]interface{}
}
var testData = TestData{
IAmAField1: "hello world!",
IAmAField2: 420 * 69,
IAmAField3: map[string]interface{}{
"foxes": "are so heckin cute",
},
}
func getSet(t *testing.T, openCircuit bool) {
g := gomega.NewGomegaWithT(t)
rb, rds := getBreaker(openCircuit)
defer rds.Close()
err := rb.Set(context.Background(), "test-data", testData)
g.Expect(err).To(gomega.BeNil())
output := TestData{}
ok, err := rb.Get(context.Background(), "test-data", &output)
g.Expect(err).To(gomega.BeNil())
g.Expect(ok).To(gomega.BeTrue(), "ok should be true")
g.Expect(output).To(gomega.Equal(testData), "testData should match output data")
}
func TestGetSet(t *testing.T) {
getSet(t, false)
}
func TestGetSetOpenCircuit(t *testing.T) {
getSet(t, true)
}
func getNotInCache(t *testing.T, openCircuit bool) {
g := gomega.NewGomegaWithT(t)
rb, rds := getBreaker(openCircuit)
defer rds.Close()
output := TestData{}
ok, err := rb.Get(context.Background(), "not-test-data", &output)
g.Expect(err).To(gomega.BeNil())
g.Expect(ok).To(gomega.BeFalse(), "ok should be false")
g.Expect(output).To(gomega.BeZero(), "output should be 'zero'")
}
func TestGetNotInCache(t *testing.T) {
getNotInCache(t, false)
}
func TestGetNotInCacheOpenCircuit(t *testing.T) {
getNotInCache(t, true)
}
func getAfterTTL(t *testing.T, openCircuit bool) {
g := gomega.NewGomegaWithT(t)
rb, rds := getBreaker(openCircuit)
defer rds.Close()
err := rb.Set(context.Background(), "test-expired", testData)
g.Expect(err).To(gomega.BeNil())
rds.FastForward(1 * time.Second)
time.Sleep(1 * time.Second)
output := TestData{}
ok, err := rb.Get(context.Background(), "test-expired", &output)
g.Expect(ok).To(gomega.BeFalse(), "ok should be false")
g.Expect(output).To(gomega.BeZero(), "output should be 'zero'")
}
func TestGetAfterTTL(t *testing.T) {
getAfterTTL(t, false)
}
func TestGetAfterTTLOpenCircuit(t *testing.T) {
getAfterTTL(t, true)
}