From 00f0741e8bc7b970197927c9e66f4bb5fd4bdecc Mon Sep 17 00:00:00 2001 From: Katalina Okano Date: Wed, 2 Dec 2020 17:57:59 -0500 Subject: [PATCH] add redis-breaker --- src/redis-breaker/redisbreaker.go | 156 +++++++++++++++++++++++++ src/redis-breaker/redisbreaker_test.go | 130 +++++++++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 src/redis-breaker/redisbreaker.go create mode 100644 src/redis-breaker/redisbreaker_test.go diff --git a/src/redis-breaker/redisbreaker.go b/src/redis-breaker/redisbreaker.go new file mode 100644 index 0000000..63e63a2 --- /dev/null +++ b/src/redis-breaker/redisbreaker.go @@ -0,0 +1,156 @@ +// Package redisbreaker provides a go-redis v8 instance designed for resilient caching via circuit breakers. +// tl;dr: If redis is lost, it can either cache objects in memory using sync.Map or dropping gracefully. +// As a side benefit, it means we don't need a redis server to develop locally, unless we want one :) +package redisbreaker + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/go-redis/redis/v8" + "github.com/sony/gobreaker" +) + +type RedisBreaker struct { + redisClient *redis.Client + breaker *gobreaker.CircuitBreaker + + config *RedisBreakerConfig + inMemoryCache sync.Map +} + +type RedisBreakerConfig struct { + Redis *redis.Options + Breaker gobreaker.Settings + UseInMemoryCache bool + DefaultTTL time.Duration +} + +type inmemoryCacheObject struct { + expiresAt time.Time + object []byte +} + +func NewRedisBreaker(config *RedisBreakerConfig) *RedisBreaker { + if config == nil { + config = &RedisBreakerConfig{ + UseInMemoryCache: true, + } + } + + if config.DefaultTTL == 0 { + config.DefaultTTL = 2 * time.Minute + } + + breaker := &RedisBreaker{ + config: config, + redisClient: redis.NewClient(config.Redis), + breaker: gobreaker.NewCircuitBreaker(config.Breaker), + } + + return breaker +} + +func (rb *RedisBreaker) doOr( + ctx context.Context, + func1 func(context.Context, string, interface{}) (interface{}, error), + func2 func(context.Context, string, interface{}) (interface{}, error), + key string, + object interface{}, +) (interface{}, error) { + val, err := rb.breaker.Execute(func() (interface{}, error) { + return func1(ctx, key, object) + }) + if err == gobreaker.ErrOpenState || err == gobreaker.ErrTooManyRequests { + return func2(ctx, key, object) + } + + return val, err +} + +// Set pushes an object into the cache with the specified default TTL, using SetEX +func (rb *RedisBreaker) Set(ctx context.Context, key string, object interface{}) error { + _, err := rb.doOr(ctx, rb.setRedis, rb.setInmemory, key, object) + return err +} + +func (rb *RedisBreaker) setRedis(ctx context.Context, key string, object interface{}) (interface{}, error) { + objectJSON, err := json.Marshal(object) + if err != nil { + return nil, err + } + + return rb.redisClient.SetEX(ctx, key, objectJSON, rb.config.DefaultTTL).Result() +} + +func (rb *RedisBreaker) setInmemory(ctx context.Context, key string, object interface{}) (interface{}, error) { + if rb.config.UseInMemoryCache { + objectJSON, err := json.Marshal(object) + if err != nil { + return nil, err + } + + rb.inMemoryCache.Store(key, inmemoryCacheObject{ + expiresAt: time.Now().Add(rb.config.DefaultTTL), + object: objectJSON, + }) + } + + return nil, nil +} + +// Get pulls an object from cache, returning ok = true if it succeeded. +func (rb *RedisBreaker) Get(ctx context.Context, key string, object interface{}) (bool, error) { + ok, err := rb.doOr(ctx, rb.getRedis, rb.getInmemory, key, object) + + return ok.(bool), err +} + +func (rb *RedisBreaker) getRedis(ctx context.Context, key string, object interface{}) (interface{}, error) { + result := rb.redisClient.Get(ctx, key) + if result.Err() != nil { + if result.Err() == redis.Nil { + return false, nil + } + + return false, result.Err() + } + + objectJSON, err := result.Bytes() + if err != nil { + return false, err + } + + err = json.Unmarshal(objectJSON, object) + + return true, err +} + +func (rb *RedisBreaker) getInmemory(ctx context.Context, key string, object interface{}) (interface{}, error) { + if !rb.config.UseInMemoryCache { + return false, nil + } + + cacheObjIntf, ok := rb.inMemoryCache.Load(key) + if !ok { + return false, nil + } + + cacheObj, ok := cacheObjIntf.(inmemoryCacheObject) + if !ok { + return false, nil + } + + if time.Now().After(cacheObj.expiresAt) { + return false, nil + } + + err := json.Unmarshal(cacheObj.object, object) + if err != nil { + return false, err + } + + return true, nil +} diff --git a/src/redis-breaker/redisbreaker_test.go b/src/redis-breaker/redisbreaker_test.go new file mode 100644 index 0000000..da17065 --- /dev/null +++ b/src/redis-breaker/redisbreaker_test.go @@ -0,0 +1,130 @@ +package redisbreaker_test + +import ( + "context" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/go-redis/redis/v8" + "github.com/onsi/gomega" + "github.com/sony/gobreaker" + + redisbreaker "github.com/roleypoly/roleypoly/src/redis-breaker" +) + +func getBreaker(breakerOpen bool) (*redisbreaker.RedisBreaker, *miniredis.Miniredis) { + redisServer, err := miniredis.Run() + if err != nil { + panic(err) + } + + config := &redisbreaker.RedisBreakerConfig{ + Redis: &redis.Options{ + Addr: redisServer.Addr(), + }, + UseInMemoryCache: true, + DefaultTTL: 1 * time.Second, + } + + if breakerOpen { + config.Breaker.ReadyToTrip = func(gobreaker.Counts) bool { + return true + } + + redisServer.Close() + } + + rb := redisbreaker.NewRedisBreaker(config) + + if breakerOpen { + // forcibly open the breaker + rb.Set(context.Background(), "@@@breaker@@@", nil) + } + + return rb, redisServer +} + +type TestData struct { + IAmAField1 string + IAmAField2 int + IAmAField3 map[string]interface{} +} + +var testData = TestData{ + IAmAField1: "hello world!", + IAmAField2: 420 * 69, + IAmAField3: map[string]interface{}{ + "foxes": "are so heckin cute", + }, +} + +func getSet(t *testing.T, openCircuit bool) { + g := gomega.NewGomegaWithT(t) + rb, rds := getBreaker(openCircuit) + defer rds.Close() + + err := rb.Set(context.Background(), "test-data", testData) + g.Expect(err).To(gomega.BeNil()) + + output := TestData{} + ok, err := rb.Get(context.Background(), "test-data", &output) + g.Expect(err).To(gomega.BeNil()) + + g.Expect(ok).To(gomega.BeTrue(), "ok should be true") + g.Expect(output).To(gomega.Equal(testData), "testData should match output data") +} + +func TestGetSet(t *testing.T) { + getSet(t, false) +} + +func TestGetSetOpenCircuit(t *testing.T) { + getSet(t, true) +} + +func getNotInCache(t *testing.T, openCircuit bool) { + g := gomega.NewGomegaWithT(t) + rb, rds := getBreaker(openCircuit) + defer rds.Close() + + output := TestData{} + ok, err := rb.Get(context.Background(), "not-test-data", &output) + g.Expect(err).To(gomega.BeNil()) + + g.Expect(ok).To(gomega.BeFalse(), "ok should be false") + g.Expect(output).To(gomega.BeZero(), "output should be 'zero'") +} + +func TestGetNotInCache(t *testing.T) { + getNotInCache(t, false) +} + +func TestGetNotInCacheOpenCircuit(t *testing.T) { + getNotInCache(t, true) +} + +func getAfterTTL(t *testing.T, openCircuit bool) { + g := gomega.NewGomegaWithT(t) + rb, rds := getBreaker(openCircuit) + defer rds.Close() + + err := rb.Set(context.Background(), "test-expired", testData) + g.Expect(err).To(gomega.BeNil()) + + rds.FastForward(1 * time.Second) + time.Sleep(1 * time.Second) + + output := TestData{} + ok, err := rb.Get(context.Background(), "test-expired", &output) + g.Expect(ok).To(gomega.BeFalse(), "ok should be false") + g.Expect(output).To(gomega.BeZero(), "output should be 'zero'") +} + +func TestGetAfterTTL(t *testing.T) { + getAfterTTL(t, false) +} + +func TestGetAfterTTLOpenCircuit(t *testing.T) { + getAfterTTL(t, true) +}