From e9b2fe49ae86f159badd7a90805b26de87e60cd2 Mon Sep 17 00:00:00 2001 From: Denis Arh Date: Sun, 29 Dec 2019 18:54:55 +0100 Subject: [PATCH] More tests and coverage for eventbus --- go.mod | 1 + pkg/eventbus/eventbus.go | 36 +++++++++++++---- pkg/eventbus/eventbus_test.go | 39 ++++++++++++++++++ pkg/eventbus/trigger_test.go | 76 ++++++++++++++++++++++++++++++----- pkg/eventbus/triggers.go | 3 ++ 5 files changed, 139 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 01f75a595..b055df08d 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/steinfletcher/apitest-jsonpath v1.3.0 github.com/stretchr/testify v1.4.0 github.com/titpetric/factory v0.0.0-20190806200833-ae4b02b9e034 + go.uber.org/atomic v1.5.0 go.uber.org/zap v1.13.0 golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 google.golang.org/grpc v1.22.1 diff --git a/pkg/eventbus/eventbus.go b/pkg/eventbus/eventbus.go index 181c319e6..b8f32de0c 100644 --- a/pkg/eventbus/eventbus.go +++ b/pkg/eventbus/eventbus.go @@ -21,6 +21,10 @@ type ( } eventbus struct { + wg *sync.WaitGroup + + // Read & write locking + // prevent event handling during trigger (un)registration l *sync.RWMutex // list of registered handlers @@ -37,13 +41,14 @@ func init() { gEventBus = New() } -// Returns +// Service returns global event bus service func Service() *eventbus { return gEventBus } func New() *eventbus { return &eventbus{ + wg: &sync.WaitGroup{}, l: &sync.RWMutex{}, triggers: make(map[uintptr]*trigger), } @@ -56,8 +61,15 @@ func (b *eventbus) WaitFor(ctx context.Context, ev Event) (err error) { b.l.RLock() defer b.l.RUnlock() for _, t := range b.find(ev) { - if err = t.Handle(ctx, ev); err != nil { - return err + err = func(ctx context.Context, t *trigger) error { + b.wg.Add(1) + defer b.wg.Done() + return t.Handle(ctx, ev) + + }(ctx, t) + + if err != nil { + return } } @@ -69,23 +81,33 @@ func (b *eventbus) Dispatch(ctx context.Context, ev Event) { b.l.RLock() defer b.l.RUnlock() for _, t := range b.find(ev) { + b.wg.Add(1) go func(ctx context.Context, t *trigger) { + defer b.wg.Done() _ = t.Handle(ctx, ev) }(ctx, t) } } +// Waits for all dispatched events +// +// Should only be used for testing +func (b *eventbus) wait() { + b.wg.Wait() +} + // Finds all registered triggers compatible with given event // -// It returns sorted +// It returns sorted triggers // // There is still room for improvement (performance wise) by indexing // resources and events of each trigger. func (b *eventbus) find(ev Event) (tt TriggerSet) { + if ev == nil { + return + } + for _, t := range b.triggers { - if ev == nil { - continue - } if !t.Match(ev) { continue diff --git a/pkg/eventbus/eventbus_test.go b/pkg/eventbus/eventbus_test.go index f8f2b28e7..08a39ed58 100644 --- a/pkg/eventbus/eventbus_test.go +++ b/pkg/eventbus/eventbus_test.go @@ -1,11 +1,14 @@ package eventbus import ( + "context" + "fmt" "math/rand" "testing" "time" "github.com/stretchr/testify/assert" + "go.uber.org/atomic" ) func TestEventbusRegUnreg(t *testing.T) { @@ -68,3 +71,39 @@ func BenchmarkEventbusRegUnreg(b *testing.B) { bus.Unregister(ptr) } } + +func TestEventFiring(t *testing.T) { + var ( + a = assert.New(t) + + // Let's make sure our simple tests don't + // cause any extra stress with dat race conditions + i = &atomic.Int32{} + + ev = func(ev string) Event { + return &mockEvent{rType: "resource", eType: ev} + } + + ctx = context.Background() + bus = New() + ) + + bus.Register(func(ctx context.Context, ev Event) error { i.Inc(); return nil }, On("inc"), For("resource")) + bus.Register(func(ctx context.Context, ev Event) error { i.Dec(); return nil }, On("dec"), For("resource")) + bus.Register(func(ctx context.Context, ev Event) error { return fmt.Errorf("handl-err") }, On("err"), For("resource")) + + a.Equal(int32(0), i.Load()) + a.NoError(bus.WaitFor(ctx, ev("inc"))) + a.Equal(int32(1), i.Load()) + a.NoError(bus.WaitFor(ctx, ev("inc"))) + a.Equal(int32(2), i.Load()) + a.NoError(bus.WaitFor(ctx, ev("dec"))) + a.Equal(int32(1), i.Load()) + a.EqualError(bus.WaitFor(ctx, ev("err")), "handl-err") + + bus.Dispatch(ctx, ev("dec")) + bus.Dispatch(ctx, ev("inc")) + bus.Dispatch(ctx, ev("inc")) + bus.wait() + a.Equal(int32(2), i.Load()) +} diff --git a/pkg/eventbus/trigger_test.go b/pkg/eventbus/trigger_test.go index d58304c27..7aa6fd5a5 100644 --- a/pkg/eventbus/trigger_test.go +++ b/pkg/eventbus/trigger_test.go @@ -1,28 +1,35 @@ package eventbus import ( + "context" + "fmt" + "sort" "testing" "github.com/stretchr/testify/assert" + + "github.com/cortezaproject/corteza-server/pkg/auth" ) type ( - MockEvent struct { + mockEvent struct { rType string eType string match func(name string, op string, values ...string) bool + + identity auth.Identifiable } ) -func (e MockEvent) ResourceType() string { +func (e mockEvent) ResourceType() string { return e.rType } -func (e MockEvent) EventType() string { +func (e mockEvent) EventType() string { return e.eType } -func (e MockEvent) Match(name string, op string, values ...string) bool { +func (e mockEvent) Match(name string, op string, values ...string) bool { if e.match == nil { return true } @@ -30,6 +37,10 @@ func (e MockEvent) Match(name string, op string, values ...string) bool { return e.match(name, op, values...) } +func (e *mockEvent) SetInvoker(identity auth.Identifiable) { + e.identity = identity +} + func TestTrigger_Match(t *testing.T) { cases := []struct { name string @@ -44,22 +55,22 @@ func TestTrigger_Match(t *testing.T) { }, {"empty resource", []TriggerRegOp{For("foo"), On("bar")}, - &MockEvent{rType: "", eType: "bar"}, + &mockEvent{rType: "", eType: "bar"}, false, }, {"empty event", []TriggerRegOp{For("foo"), On("bar")}, - &MockEvent{rType: "foo", eType: ""}, + &mockEvent{rType: "foo", eType: ""}, false, }, {"simple foo-bar test", []TriggerRegOp{For("foo"), On("bar")}, - &MockEvent{rType: "foo", eType: "bar"}, + &mockEvent{rType: "foo", eType: "bar"}, true, }, {"constraint match", []TriggerRegOp{For("foo"), On("bar"), Constraint("baz", "", "baz")}, - &MockEvent{ + &mockEvent{ rType: "foo", eType: "bar", match: func(name string, op string, values ...string) bool { @@ -69,7 +80,7 @@ func TestTrigger_Match(t *testing.T) { }, {"constraint mismatch", []TriggerRegOp{For("foo"), On("bar"), Constraint("baz", "", "baz")}, - &MockEvent{ + &mockEvent{ rType: "foo", eType: "bar", match: func(name string, op string, values ...string) bool { @@ -138,3 +149,50 @@ func TestTrigger_RegOps(t *testing.T) { }) } } + +func TestTriggerHandler(t *testing.T) { + var ( + a = assert.New(t) + ctx = context.Background() + ev = &mockEvent{} + passedthrough bool + + trSimple = &trigger{ + handler: func(ctx context.Context, ev Event) error { + passedthrough = true + a.True(auth.IsSuperUser(ev.(*mockEvent).identity)) + return nil + }, + } + ) + + a.False(passedthrough) + a.False(auth.IsSuperUser(ev.identity)) + + trSimple.Handle(auth.SetSuperUserContext(ctx), ev) + + a.True(auth.IsSuperUser(ev.identity)) + a.True(passedthrough, "expecting to pass through simple handler") +} + +func TestTriggerSorting(t *testing.T) { + var ( + a = assert.New(t) + tt = TriggerSet{ + NewTrigger(nil, Weight(3)), + NewTrigger(nil, Weight(1)), + NewTrigger(nil, Weight(2)), + } + + w2s = func(tt TriggerSet) (out string) { + for _, t := range tt { + out += fmt.Sprintf("%d,", t.weight) + } + return + } + ) + + a.Equal(w2s(tt), "3,1,2,") + sort.Sort(tt) + a.Equal(w2s(tt), "1,2,3,") +} diff --git a/pkg/eventbus/triggers.go b/pkg/eventbus/triggers.go index 55a55b7ad..1579b1423 100644 --- a/pkg/eventbus/triggers.go +++ b/pkg/eventbus/triggers.go @@ -4,6 +4,7 @@ import ( "context" "github.com/cortezaproject/corteza-server/pkg/auth" + "github.com/cortezaproject/corteza-server/pkg/sentry" ) type ( @@ -64,6 +65,8 @@ func (t trigger) Match(re Event) bool { } func (t trigger) Handle(ctx context.Context, ev Event) error { + defer sentry.Recover() + if eis, ok := ev.(eventInvokerSettable); ok { eis.SetInvoker(auth.GetIdentityFromContext(ctx)) }