From 48d47b95e8b3e155c00b28fbd61910c7ce79fcc4 Mon Sep 17 00:00:00 2001 From: Tit Petric Date: Tue, 11 Sep 2018 20:35:45 +0200 Subject: [PATCH] upd(sam): refactor pubsub api --- config/pubsub.go | 7 ++++-- sam/repository/pubsub.go | 37 ++++++++++++---------------- sam/repository/pubsub_memory.go | 4 +-- sam/repository/pubsub_memory_test.go | 8 ++---- sam/repository/pubsub_redis.go | 4 +-- sam/service/pubsub.go | 10 +++----- sam/websocket/event_queue.go | 7 +----- 7 files changed, 31 insertions(+), 46 deletions(-) diff --git a/config/pubsub.go b/config/pubsub.go index 0f311ca8e..c4170375a 100644 --- a/config/pubsub.go +++ b/config/pubsub.go @@ -1,9 +1,11 @@ package config import ( + "log" + "time" + "github.com/namsral/flag" "github.com/pkg/errors" - "time" ) type ( @@ -24,7 +26,8 @@ func (c *PubSub) Validate() error { switch c.Mode { case "redis": if c.Mode == "redis" && c.RedisAddr == "" { - return errors.New("No host defined for mode=redis, PubSub.Redis is empty") + log.Println("[pubsub] No Redis Address defined for mode=redis, falling back to polling") + c.Mode = "poll" } case "poll": default: diff --git a/sam/repository/pubsub.go b/sam/repository/pubsub.go index 7676f56fe..bcf88b094 100644 --- a/sam/repository/pubsub.go +++ b/sam/repository/pubsub.go @@ -2,15 +2,19 @@ package repository import ( "context" - "log" ) type ( PubSub struct { - client PubSubClient + client pubSubModule } PubSubClient interface { + pubSubModule + Event(ctx context.Context, message string) error + } + + pubSubModule interface { Subscribe(ctx context.Context, channel string, onStart func() error, onMessage func(channel string, message []byte) error) error Publish(ctx context.Context, channel string, message string) error } @@ -21,31 +25,18 @@ type ( } ) -var pubsub PubSubClient +var pubsub *PubSub -func (PubSub) New() (PubSubClient, error) { +func (PubSub) New() *PubSub { // return singleton client if pubsub != nil { - return pubsub, nil - } - - // validate configs and fall back to poll mode on error - if err := flags.PubSub.Validate(); err != nil { - log.Printf("[pubsub] An error occured when creating PubSub instance: %+v", err) - log.Println("[pubsub] Reverting back to 'poll' and trying again") - flags.PubSub.Mode = "poll" - if err := flags.PubSub.Validate(); err != nil { - return nil, err - } + return pubsub } // store the singleton instance - save := func(client PubSubClient, err error) (PubSubClient, error) { - if err != nil { - return nil, err - } - pubsub = client - return pubsub, nil + save := func(client pubSubModule) *PubSub { + pubsub = &PubSub{client} + return pubsub } // create isntances based on mode @@ -59,6 +50,10 @@ func (ps *PubSub) Subscribe(ctx context.Context, channel string, onStart func() return ps.client.Subscribe(ctx, channel, onStart, onMessage) } +func (ps *PubSub) Event(ctx context.Context, message string) error { + return ps.Publish(ctx, "events", message) +} + func (ps *PubSub) Publish(ctx context.Context, channel, message string) error { return ps.client.Publish(ctx, channel, message) } diff --git a/sam/repository/pubsub_memory.go b/sam/repository/pubsub_memory.go index e342bd443..15ff2656d 100644 --- a/sam/repository/pubsub_memory.go +++ b/sam/repository/pubsub_memory.go @@ -14,8 +14,8 @@ type PubSubMemory struct { input chan *PubSubPayload } -func (PubSubMemory) New(config *config.PubSub) (*PubSubMemory, error) { - return &PubSubMemory{config, make(chan *PubSubPayload, 512)}, nil +func (PubSubMemory) New(config *config.PubSub) *PubSubMemory { + return &PubSubMemory{config, make(chan *PubSubPayload, 512)} } func (ps *PubSubMemory) Subscribe(ctx context.Context, channel string, onStart func() error, onMessage func(channel string, payload []byte) error) error { diff --git a/sam/repository/pubsub_memory_test.go b/sam/repository/pubsub_memory_test.go index 877e99fdf..1e1ce90f7 100644 --- a/sam/repository/pubsub_memory_test.go +++ b/sam/repository/pubsub_memory_test.go @@ -9,12 +9,9 @@ import ( ) func TestPubSubMemory(t *testing.T) { - p, err := PubSubMemory{}.New(&config.PubSub{ + p := PubSubMemory{}.New(&config.PubSub{ PollingInterval: time.Second, }) - if err != nil { - t.Fatalf("Unexpected error when creating new PubSubMemory object: %+v", err) - } calledOnConnect := false calledOnMessage := 0 @@ -56,8 +53,7 @@ func TestPubSubMemory(t *testing.T) { t.Fatalf("Expected PubSub channel exit after context cancellation") } - err = p.Publish(ctx, "events", "new message event") - if err == nil { + if err := p.Publish(ctx, "events", "new message event"); err == nil { t.Fatalf("Expected error from sending message on closed channel") } } diff --git a/sam/repository/pubsub_redis.go b/sam/repository/pubsub_redis.go index 7acb44c32..f617560ad 100644 --- a/sam/repository/pubsub_redis.go +++ b/sam/repository/pubsub_redis.go @@ -13,8 +13,8 @@ type PubSubRedis struct { config *config.PubSub } -func (PubSubRedis) New(config *config.PubSub) (*PubSubRedis, error) { - return &PubSubRedis{config}, nil +func (PubSubRedis) New(config *config.PubSub) *PubSubRedis { + return &PubSubRedis{config} } func (ps *PubSubRedis) dial() (redis.Conn, error) { diff --git a/sam/service/pubsub.go b/sam/service/pubsub.go index 6be6e809e..59c21438e 100644 --- a/sam/service/pubsub.go +++ b/sam/service/pubsub.go @@ -4,14 +4,10 @@ import ( "github.com/crusttech/crust/sam/repository" ) -type PubSub struct { +type pubSub struct { repository.PubSubClient } -func (PubSub) New() (*PubSub, error) { - rpo, err := repository.PubSub{}.New() - if err != nil { - return nil, err - } - return &PubSub{rpo}, nil +func PubSub() *pubSub { + return &pubSub{repository.PubSub{}.New()} } diff --git a/sam/websocket/event_queue.go b/sam/websocket/event_queue.go index 8d7316938..eb059df47 100644 --- a/sam/websocket/event_queue.go +++ b/sam/websocket/event_queue.go @@ -25,7 +25,6 @@ type ( eventQueue struct { origin uint64 - pubsub *service.PubSub queue chan *types.EventQueueItem } ) @@ -63,11 +62,7 @@ func (eq *eventQueue) feedSessions(ctx context.Context, config *repository.Flags newMessageEvent := make(chan struct{}, eventQueueBacklog) done := make(chan error, 1) - pubsub, err := service.PubSub{}.New() - if err != nil { - return err - } - + pubsub := service.PubSub() go func() { onConnect := func() error { return nil