upd(sam): events queue over channel
This commit is contained in:
parent
0b687269dc
commit
74d9365123
5
Makefile
5
Makefile
@ -82,6 +82,11 @@ test.pubsub: $(GOTEST)
|
||||
perl -pi -e 's/command-line-arguments/.\/sam\/repository/g' .cover.out
|
||||
$(GO) tool cover -func=.cover.out | grep --color "^\|[^0-9]0.0%"
|
||||
|
||||
test.events: $(GOTEST)
|
||||
$(GOTEST) -run Events -covermode count -coverprofile .cover.out -v ./sam/repository/events*.go ./sam/repository/flags*.go ./sam/repository/error*.go
|
||||
perl -pi -e 's/command-line-arguments/.\/sam\/repository/g' .cover.out
|
||||
$(GO) tool cover -func=.cover.out | grep --color "^\|[^0-9]0.0%"
|
||||
|
||||
test.crm: $(GOTEST)
|
||||
$(GOTEST) -covermode count -coverprofile .cover.out -v ./crm/repository/...
|
||||
$(GO) tool cover -func=.cover.out | grep --color "^\|[^0-9]0.0%"
|
||||
|
||||
@ -9,9 +9,10 @@ type (
|
||||
)
|
||||
|
||||
const (
|
||||
ErrDatabaseError = repositoryError("DatabaseError")
|
||||
ErrNotImplemented = repositoryError("NotImplemented")
|
||||
ErrConfigError = repositoryError("ConfigError")
|
||||
ErrDatabaseError = repositoryError("DatabaseError")
|
||||
ErrNotImplemented = repositoryError("NotImplemented")
|
||||
ErrConfigError = repositoryError("ConfigError")
|
||||
ErrEventsPullClosed = repositoryError("EventsPullClosed")
|
||||
)
|
||||
|
||||
func (e repositoryError) Error() string {
|
||||
|
||||
@ -32,66 +32,43 @@ The reading of the event queue table is triggered by pubsub.
|
||||
*/
|
||||
|
||||
type (
|
||||
Events interface {
|
||||
With(ctx context.Context, db *factory.DB) Events
|
||||
|
||||
Pull(origin uint64) ([]*types.EventQueueItem, error)
|
||||
Push(eqi *types.EventQueueItem) error
|
||||
Sync(origin, id uint64) error
|
||||
EventsRepository interface {
|
||||
Pull(ctx context.Context) (*types.EventQueueItem, error)
|
||||
Push(ctx context.Context, item *types.EventQueueItem) error
|
||||
}
|
||||
|
||||
events struct {
|
||||
*repository
|
||||
pipe chan *types.EventQueueItem
|
||||
}
|
||||
)
|
||||
|
||||
func NewEvents(ctx context.Context, db *factory.DB) Events {
|
||||
return (&events{}).With(ctx, db)
|
||||
var eventsPipe chan *types.EventQueueItem
|
||||
|
||||
func Events() EventsRepository {
|
||||
if eventsPipe == nil {
|
||||
eventsPipe = make(chan *types.EventQueueItem, 512)
|
||||
}
|
||||
return &events{eventsPipe}
|
||||
}
|
||||
|
||||
func (r *events) With(ctx context.Context, db *factory.DB) Events {
|
||||
return &events{
|
||||
repository: r.repository.With(ctx, db),
|
||||
func (r *events) Pull(ctx context.Context) (*types.EventQueueItem, error) {
|
||||
select {
|
||||
case res, ok := <-r.pipe:
|
||||
if !ok {
|
||||
return res, ErrEventsPullClosed.New()
|
||||
}
|
||||
return res, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *events) Pull(origin uint64) ([]*types.EventQueueItem, error) {
|
||||
var ee = make([]*types.EventQueueItem, 0)
|
||||
|
||||
return ee, r.db().Quiet().Select(&ee, `
|
||||
SELECT *
|
||||
FROM event_queue
|
||||
WHERE origin <> ?
|
||||
AND id > GREATEST(COALESCE((SELECT rel_last FROM event_queue_synced WHERE origin = ?), 0), ?)
|
||||
LIMIT 50`, origin, origin, origin)
|
||||
}
|
||||
|
||||
func (r *events) Push(eqi *types.EventQueueItem) error {
|
||||
eqi.ID = factory.Sonyflake.NextID()
|
||||
return r.db().Quiet().Insert("event_queue", eqi)
|
||||
}
|
||||
|
||||
func (r *events) Sync(origin, id uint64) error {
|
||||
type evqs struct {
|
||||
Origin uint64 `db:"origin"`
|
||||
LastEvent uint64 `db:"rel_last"`
|
||||
func (r *events) Push(ctx context.Context, item *types.EventQueueItem) error {
|
||||
item.ID = factory.Sonyflake.NextID()
|
||||
select {
|
||||
case r.pipe <- item:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// @todo do we even need this?
|
||||
return r.db().Quiet().Replace("event_queue_synced", evqs{
|
||||
Origin: origin,
|
||||
LastEvent: id,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *events) Cleanup() error {
|
||||
return exec(r.db().Exec("DELETE FROM event_queue WHERE id < (SELECT MIN(rel_last) FROM event_queue_synced)"))
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
do we need event_queue_synced??
|
||||
do we need stable server id or can it be regenerad on each run?
|
||||
|
||||
|
||||
*/
|
||||
|
||||
@ -1,8 +1,34 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/crusttech/crust/sam/types"
|
||||
)
|
||||
|
||||
func TestEvents(t *testing.T) {
|
||||
assert := func(ok bool, format string, args ...interface{}) {
|
||||
if !ok {
|
||||
t.Fatalf(format, args...)
|
||||
}
|
||||
}
|
||||
queue := Events()
|
||||
|
||||
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
|
||||
defer cancel()
|
||||
|
||||
queue.Push(ctx, &types.EventQueueItem{Subscriber: "test1"})
|
||||
queue.Push(ctx, &types.EventQueueItem{Subscriber: "test2"})
|
||||
queue.Push(ctx, &types.EventQueueItem{Subscriber: "test3"})
|
||||
|
||||
for i := 1; i <= 3; i++ {
|
||||
item, err := queue.Pull(ctx)
|
||||
assert(err == nil, "Expected non-error queue return, got %+v", err)
|
||||
assert(item != nil, "Expected non-empty queue item")
|
||||
expected := fmt.Sprintf("test%d", i)
|
||||
assert(item.Subscriber == expected, "Expected subscriber value doesn't match: %s != %s", expected, item.Subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,7 +6,6 @@ import (
|
||||
"github.com/titpetric/factory"
|
||||
|
||||
"github.com/crusttech/crust/sam/repository"
|
||||
"github.com/crusttech/crust/sam/service"
|
||||
"github.com/crusttech/crust/sam/types"
|
||||
)
|
||||
|
||||
@ -38,82 +37,39 @@ func EventQueue(origin uint64) *eventQueue {
|
||||
}
|
||||
}
|
||||
|
||||
func (eq *eventQueue) store(ctx context.Context, qp repository.Events) {
|
||||
// @todo: retire this function, use Events().Push(ctx, item) directly.
|
||||
func (eq *eventQueue) store(ctx context.Context, qp repository.EventsRepository) {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case eqi := <-eq.queue:
|
||||
qp.Push(eqi)
|
||||
qp.Push(ctx, eqi)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (eq *eventQueue) feedSessions(ctx context.Context, config *repository.Flags, qp repository.Events, store eventQueueWalker) error {
|
||||
newMessageEvent := make(chan struct{}, eventQueueBacklog)
|
||||
done := make(chan error, 1)
|
||||
|
||||
pubsub := service.PubSub()
|
||||
go func() {
|
||||
onConnect := func() error {
|
||||
return nil
|
||||
}
|
||||
onMessage := func(message string, payload []byte) error {
|
||||
newMessageEvent <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
done <- pubsub.Subscribe(ctx, "events", onConnect, onMessage)
|
||||
}()
|
||||
|
||||
poll := func() error {
|
||||
for {
|
||||
items, err := qp.Pull(eq.origin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(items) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var lastSyncedId uint64
|
||||
|
||||
for _, item := range items {
|
||||
if item.Subscriber == "" {
|
||||
// Distribute payload to all connected sessions
|
||||
store.Walk(func(s *Session) {
|
||||
s.sendBytes(item.Payload)
|
||||
})
|
||||
} else {
|
||||
// Distribute payload to specific subscribers
|
||||
store.Walk(func(s *Session) {
|
||||
if s.subs.Get(item.Subscriber) != nil {
|
||||
s.sendBytes(item.Payload)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
lastSyncedId = item.ID
|
||||
|
||||
}
|
||||
|
||||
if lastSyncedId > 0 {
|
||||
qp.Sync(eq.origin, lastSyncedId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (eq *eventQueue) feedSessions(ctx context.Context, config *repository.Flags, qp repository.EventsRepository, store eventQueueWalker) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-newMessageEvent:
|
||||
if err := poll(); err != nil {
|
||||
return err
|
||||
}
|
||||
case err := <-done:
|
||||
item, err := qp.Pull(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if item.Subscriber == "" {
|
||||
// Distribute payload to all connected sessions
|
||||
store.Walk(func(s *Session) {
|
||||
s.sendBytes(item.Payload)
|
||||
})
|
||||
} else {
|
||||
// Distribute payload to specific subscribers
|
||||
store.Walk(func(s *Session) {
|
||||
if s.subs.Get(item.Subscriber) != nil {
|
||||
s.sendBytes(item.Payload)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -11,12 +11,12 @@ import (
|
||||
|
||||
func MountRoutes(ctx context.Context, config *repository.Flags) func(chi.Router) {
|
||||
return func(r chi.Router) {
|
||||
events := repository.NewEvents(ctx, nil)
|
||||
events := repository.Events()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
if err := eq.feedSessions(ctx, config, events, store); err != nil {
|
||||
log.Printf("Error when starting sessions event feed: %+v", err)
|
||||
log.Printf("Error in sessions event feed: %+v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
package websocket
|
||||
|
||||
import (
|
||||
"github.com/crusttech/crust/sam/types"
|
||||
"github.com/crusttech/crust/sam/websocket/outgoing"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/crusttech/crust/sam/repository"
|
||||
"github.com/crusttech/crust/sam/types"
|
||||
"github.com/crusttech/crust/sam/websocket/outgoing"
|
||||
)
|
||||
|
||||
// Sends message to subscribers
|
||||
@ -14,16 +16,7 @@ func (s *Session) sendToAllSubscribers(p outgoing.MessageEncoder, channelID stri
|
||||
return err
|
||||
}
|
||||
|
||||
eq.push(s.ctx, &types.EventQueueItem{Payload: pb, Subscriber: channelID})
|
||||
|
||||
store.Walk(func(sess *Session) {
|
||||
// send message only to users with subscribed channels
|
||||
if sess.subs.Get(channelID) != nil {
|
||||
sess.sendBytes(pb)
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
return repository.Events().Push(s.ctx, &types.EventQueueItem{Payload: pb, Subscriber: channelID})
|
||||
}
|
||||
|
||||
// Sends message to all connected clients
|
||||
@ -33,16 +26,12 @@ func (s *Session) sendToAll(p outgoing.MessageEncoder) error {
|
||||
return err
|
||||
}
|
||||
|
||||
eq.push(s.ctx, &types.EventQueueItem{Payload: pb})
|
||||
|
||||
store.Walk(func(sess *Session) {
|
||||
// send message only to users with subscribed channels
|
||||
sess.sendBytes(pb)
|
||||
})
|
||||
|
||||
return nil
|
||||
return repository.Events().Push(s.ctx, &types.EventQueueItem{Payload: pb})
|
||||
}
|
||||
|
||||
// @todo: this isn't going to be correct - a user may have open multiple clients,
|
||||
// that will connect to different edge SAM servers. It should also go
|
||||
// through a repository.Events().Push (EventQueueItem) path.
|
||||
func (s *Session) sendReply(p outgoing.MessageEncoder) error {
|
||||
pb, err := p.EncodeMessage()
|
||||
if err != nil {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user