upd(sam): refactor pubsub api
This commit is contained in:
parent
cef9af9903
commit
48d47b95e8
@ -1,9 +1,11 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/namsral/flag"
|
||||
"github.com/pkg/errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -24,7 +26,8 @@ func (c *PubSub) Validate() error {
|
||||
switch c.Mode {
|
||||
case "redis":
|
||||
if c.Mode == "redis" && c.RedisAddr == "" {
|
||||
return errors.New("No host defined for mode=redis, PubSub.Redis is empty")
|
||||
log.Println("[pubsub] No Redis Address defined for mode=redis, falling back to polling")
|
||||
c.Mode = "poll"
|
||||
}
|
||||
case "poll":
|
||||
default:
|
||||
|
||||
@ -2,15 +2,19 @@ package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
)
|
||||
|
||||
type (
|
||||
PubSub struct {
|
||||
client PubSubClient
|
||||
client pubSubModule
|
||||
}
|
||||
|
||||
PubSubClient interface {
|
||||
pubSubModule
|
||||
Event(ctx context.Context, message string) error
|
||||
}
|
||||
|
||||
pubSubModule interface {
|
||||
Subscribe(ctx context.Context, channel string, onStart func() error, onMessage func(channel string, message []byte) error) error
|
||||
Publish(ctx context.Context, channel string, message string) error
|
||||
}
|
||||
@ -21,31 +25,18 @@ type (
|
||||
}
|
||||
)
|
||||
|
||||
var pubsub PubSubClient
|
||||
var pubsub *PubSub
|
||||
|
||||
func (PubSub) New() (PubSubClient, error) {
|
||||
func (PubSub) New() *PubSub {
|
||||
// return singleton client
|
||||
if pubsub != nil {
|
||||
return pubsub, nil
|
||||
}
|
||||
|
||||
// validate configs and fall back to poll mode on error
|
||||
if err := flags.PubSub.Validate(); err != nil {
|
||||
log.Printf("[pubsub] An error occured when creating PubSub instance: %+v", err)
|
||||
log.Println("[pubsub] Reverting back to 'poll' and trying again")
|
||||
flags.PubSub.Mode = "poll"
|
||||
if err := flags.PubSub.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pubsub
|
||||
}
|
||||
|
||||
// store the singleton instance
|
||||
save := func(client PubSubClient, err error) (PubSubClient, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pubsub = client
|
||||
return pubsub, nil
|
||||
save := func(client pubSubModule) *PubSub {
|
||||
pubsub = &PubSub{client}
|
||||
return pubsub
|
||||
}
|
||||
|
||||
// create isntances based on mode
|
||||
@ -59,6 +50,10 @@ func (ps *PubSub) Subscribe(ctx context.Context, channel string, onStart func()
|
||||
return ps.client.Subscribe(ctx, channel, onStart, onMessage)
|
||||
}
|
||||
|
||||
func (ps *PubSub) Event(ctx context.Context, message string) error {
|
||||
return ps.Publish(ctx, "events", message)
|
||||
}
|
||||
|
||||
func (ps *PubSub) Publish(ctx context.Context, channel, message string) error {
|
||||
return ps.client.Publish(ctx, channel, message)
|
||||
}
|
||||
|
||||
@ -14,8 +14,8 @@ type PubSubMemory struct {
|
||||
input chan *PubSubPayload
|
||||
}
|
||||
|
||||
func (PubSubMemory) New(config *config.PubSub) (*PubSubMemory, error) {
|
||||
return &PubSubMemory{config, make(chan *PubSubPayload, 512)}, nil
|
||||
func (PubSubMemory) New(config *config.PubSub) *PubSubMemory {
|
||||
return &PubSubMemory{config, make(chan *PubSubPayload, 512)}
|
||||
}
|
||||
|
||||
func (ps *PubSubMemory) Subscribe(ctx context.Context, channel string, onStart func() error, onMessage func(channel string, payload []byte) error) error {
|
||||
|
||||
@ -9,12 +9,9 @@ import (
|
||||
)
|
||||
|
||||
func TestPubSubMemory(t *testing.T) {
|
||||
p, err := PubSubMemory{}.New(&config.PubSub{
|
||||
p := PubSubMemory{}.New(&config.PubSub{
|
||||
PollingInterval: time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error when creating new PubSubMemory object: %+v", err)
|
||||
}
|
||||
|
||||
calledOnConnect := false
|
||||
calledOnMessage := 0
|
||||
@ -56,8 +53,7 @@ func TestPubSubMemory(t *testing.T) {
|
||||
t.Fatalf("Expected PubSub channel exit after context cancellation")
|
||||
}
|
||||
|
||||
err = p.Publish(ctx, "events", "new message event")
|
||||
if err == nil {
|
||||
if err := p.Publish(ctx, "events", "new message event"); err == nil {
|
||||
t.Fatalf("Expected error from sending message on closed channel")
|
||||
}
|
||||
}
|
||||
|
||||
@ -13,8 +13,8 @@ type PubSubRedis struct {
|
||||
config *config.PubSub
|
||||
}
|
||||
|
||||
func (PubSubRedis) New(config *config.PubSub) (*PubSubRedis, error) {
|
||||
return &PubSubRedis{config}, nil
|
||||
func (PubSubRedis) New(config *config.PubSub) *PubSubRedis {
|
||||
return &PubSubRedis{config}
|
||||
}
|
||||
|
||||
func (ps *PubSubRedis) dial() (redis.Conn, error) {
|
||||
|
||||
@ -4,14 +4,10 @@ import (
|
||||
"github.com/crusttech/crust/sam/repository"
|
||||
)
|
||||
|
||||
type PubSub struct {
|
||||
type pubSub struct {
|
||||
repository.PubSubClient
|
||||
}
|
||||
|
||||
func (PubSub) New() (*PubSub, error) {
|
||||
rpo, err := repository.PubSub{}.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &PubSub{rpo}, nil
|
||||
func PubSub() *pubSub {
|
||||
return &pubSub{repository.PubSub{}.New()}
|
||||
}
|
||||
|
||||
@ -25,7 +25,6 @@ type (
|
||||
|
||||
eventQueue struct {
|
||||
origin uint64
|
||||
pubsub *service.PubSub
|
||||
queue chan *types.EventQueueItem
|
||||
}
|
||||
)
|
||||
@ -63,11 +62,7 @@ func (eq *eventQueue) feedSessions(ctx context.Context, config *repository.Flags
|
||||
newMessageEvent := make(chan struct{}, eventQueueBacklog)
|
||||
done := make(chan error, 1)
|
||||
|
||||
pubsub, err := service.PubSub{}.New()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pubsub := service.PubSub()
|
||||
go func() {
|
||||
onConnect := func() error {
|
||||
return nil
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user