More tests and coverage for eventbus
This commit is contained in:
1
go.mod
1
go.mod
@@ -49,6 +49,7 @@ require (
|
|||||||
github.com/steinfletcher/apitest-jsonpath v1.3.0
|
github.com/steinfletcher/apitest-jsonpath v1.3.0
|
||||||
github.com/stretchr/testify v1.4.0
|
github.com/stretchr/testify v1.4.0
|
||||||
github.com/titpetric/factory v0.0.0-20190806200833-ae4b02b9e034
|
github.com/titpetric/factory v0.0.0-20190806200833-ae4b02b9e034
|
||||||
|
go.uber.org/atomic v1.5.0
|
||||||
go.uber.org/zap v1.13.0
|
go.uber.org/zap v1.13.0
|
||||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5
|
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5
|
||||||
google.golang.org/grpc v1.22.1
|
google.golang.org/grpc v1.22.1
|
||||||
|
|||||||
@@ -21,6 +21,10 @@ type (
|
|||||||
}
|
}
|
||||||
|
|
||||||
eventbus struct {
|
eventbus struct {
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
|
||||||
|
// Read & write locking
|
||||||
|
// prevent event handling during trigger (un)registration
|
||||||
l *sync.RWMutex
|
l *sync.RWMutex
|
||||||
|
|
||||||
// list of registered handlers
|
// list of registered handlers
|
||||||
@@ -37,13 +41,14 @@ func init() {
|
|||||||
gEventBus = New()
|
gEventBus = New()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns
|
// Service returns global event bus service
|
||||||
func Service() *eventbus {
|
func Service() *eventbus {
|
||||||
return gEventBus
|
return gEventBus
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *eventbus {
|
func New() *eventbus {
|
||||||
return &eventbus{
|
return &eventbus{
|
||||||
|
wg: &sync.WaitGroup{},
|
||||||
l: &sync.RWMutex{},
|
l: &sync.RWMutex{},
|
||||||
triggers: make(map[uintptr]*trigger),
|
triggers: make(map[uintptr]*trigger),
|
||||||
}
|
}
|
||||||
@@ -56,8 +61,15 @@ func (b *eventbus) WaitFor(ctx context.Context, ev Event) (err error) {
|
|||||||
b.l.RLock()
|
b.l.RLock()
|
||||||
defer b.l.RUnlock()
|
defer b.l.RUnlock()
|
||||||
for _, t := range b.find(ev) {
|
for _, t := range b.find(ev) {
|
||||||
if err = t.Handle(ctx, ev); err != nil {
|
err = func(ctx context.Context, t *trigger) error {
|
||||||
return err
|
b.wg.Add(1)
|
||||||
|
defer b.wg.Done()
|
||||||
|
return t.Handle(ctx, ev)
|
||||||
|
|
||||||
|
}(ctx, t)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -69,23 +81,33 @@ func (b *eventbus) Dispatch(ctx context.Context, ev Event) {
|
|||||||
b.l.RLock()
|
b.l.RLock()
|
||||||
defer b.l.RUnlock()
|
defer b.l.RUnlock()
|
||||||
for _, t := range b.find(ev) {
|
for _, t := range b.find(ev) {
|
||||||
|
b.wg.Add(1)
|
||||||
go func(ctx context.Context, t *trigger) {
|
go func(ctx context.Context, t *trigger) {
|
||||||
|
defer b.wg.Done()
|
||||||
_ = t.Handle(ctx, ev)
|
_ = t.Handle(ctx, ev)
|
||||||
}(ctx, t)
|
}(ctx, t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Waits for all dispatched events
|
||||||
|
//
|
||||||
|
// Should only be used for testing
|
||||||
|
func (b *eventbus) wait() {
|
||||||
|
b.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// Finds all registered triggers compatible with given event
|
// Finds all registered triggers compatible with given event
|
||||||
//
|
//
|
||||||
// It returns sorted
|
// It returns sorted triggers
|
||||||
//
|
//
|
||||||
// There is still room for improvement (performance wise) by indexing
|
// There is still room for improvement (performance wise) by indexing
|
||||||
// resources and events of each trigger.
|
// resources and events of each trigger.
|
||||||
func (b *eventbus) find(ev Event) (tt TriggerSet) {
|
func (b *eventbus) find(ev Event) (tt TriggerSet) {
|
||||||
|
if ev == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for _, t := range b.triggers {
|
for _, t := range b.triggers {
|
||||||
if ev == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !t.Match(ev) {
|
if !t.Match(ev) {
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
package eventbus
|
package eventbus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestEventbusRegUnreg(t *testing.T) {
|
func TestEventbusRegUnreg(t *testing.T) {
|
||||||
@@ -68,3 +71,39 @@ func BenchmarkEventbusRegUnreg(b *testing.B) {
|
|||||||
bus.Unregister(ptr)
|
bus.Unregister(ptr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEventFiring(t *testing.T) {
|
||||||
|
var (
|
||||||
|
a = assert.New(t)
|
||||||
|
|
||||||
|
// Let's make sure our simple tests don't
|
||||||
|
// cause any extra stress with dat race conditions
|
||||||
|
i = &atomic.Int32{}
|
||||||
|
|
||||||
|
ev = func(ev string) Event {
|
||||||
|
return &mockEvent{rType: "resource", eType: ev}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = context.Background()
|
||||||
|
bus = New()
|
||||||
|
)
|
||||||
|
|
||||||
|
bus.Register(func(ctx context.Context, ev Event) error { i.Inc(); return nil }, On("inc"), For("resource"))
|
||||||
|
bus.Register(func(ctx context.Context, ev Event) error { i.Dec(); return nil }, On("dec"), For("resource"))
|
||||||
|
bus.Register(func(ctx context.Context, ev Event) error { return fmt.Errorf("handl-err") }, On("err"), For("resource"))
|
||||||
|
|
||||||
|
a.Equal(int32(0), i.Load())
|
||||||
|
a.NoError(bus.WaitFor(ctx, ev("inc")))
|
||||||
|
a.Equal(int32(1), i.Load())
|
||||||
|
a.NoError(bus.WaitFor(ctx, ev("inc")))
|
||||||
|
a.Equal(int32(2), i.Load())
|
||||||
|
a.NoError(bus.WaitFor(ctx, ev("dec")))
|
||||||
|
a.Equal(int32(1), i.Load())
|
||||||
|
a.EqualError(bus.WaitFor(ctx, ev("err")), "handl-err")
|
||||||
|
|
||||||
|
bus.Dispatch(ctx, ev("dec"))
|
||||||
|
bus.Dispatch(ctx, ev("inc"))
|
||||||
|
bus.Dispatch(ctx, ev("inc"))
|
||||||
|
bus.wait()
|
||||||
|
a.Equal(int32(2), i.Load())
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,28 +1,35 @@
|
|||||||
package eventbus
|
package eventbus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/cortezaproject/corteza-server/pkg/auth"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
MockEvent struct {
|
mockEvent struct {
|
||||||
rType string
|
rType string
|
||||||
eType string
|
eType string
|
||||||
match func(name string, op string, values ...string) bool
|
match func(name string, op string, values ...string) bool
|
||||||
|
|
||||||
|
identity auth.Identifiable
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e MockEvent) ResourceType() string {
|
func (e mockEvent) ResourceType() string {
|
||||||
return e.rType
|
return e.rType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e MockEvent) EventType() string {
|
func (e mockEvent) EventType() string {
|
||||||
return e.eType
|
return e.eType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e MockEvent) Match(name string, op string, values ...string) bool {
|
func (e mockEvent) Match(name string, op string, values ...string) bool {
|
||||||
if e.match == nil {
|
if e.match == nil {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -30,6 +37,10 @@ func (e MockEvent) Match(name string, op string, values ...string) bool {
|
|||||||
return e.match(name, op, values...)
|
return e.match(name, op, values...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *mockEvent) SetInvoker(identity auth.Identifiable) {
|
||||||
|
e.identity = identity
|
||||||
|
}
|
||||||
|
|
||||||
func TestTrigger_Match(t *testing.T) {
|
func TestTrigger_Match(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
@@ -44,22 +55,22 @@ func TestTrigger_Match(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{"empty resource",
|
{"empty resource",
|
||||||
[]TriggerRegOp{For("foo"), On("bar")},
|
[]TriggerRegOp{For("foo"), On("bar")},
|
||||||
&MockEvent{rType: "", eType: "bar"},
|
&mockEvent{rType: "", eType: "bar"},
|
||||||
false,
|
false,
|
||||||
},
|
},
|
||||||
{"empty event",
|
{"empty event",
|
||||||
[]TriggerRegOp{For("foo"), On("bar")},
|
[]TriggerRegOp{For("foo"), On("bar")},
|
||||||
&MockEvent{rType: "foo", eType: ""},
|
&mockEvent{rType: "foo", eType: ""},
|
||||||
false,
|
false,
|
||||||
},
|
},
|
||||||
{"simple foo-bar test",
|
{"simple foo-bar test",
|
||||||
[]TriggerRegOp{For("foo"), On("bar")},
|
[]TriggerRegOp{For("foo"), On("bar")},
|
||||||
&MockEvent{rType: "foo", eType: "bar"},
|
&mockEvent{rType: "foo", eType: "bar"},
|
||||||
true,
|
true,
|
||||||
},
|
},
|
||||||
{"constraint match",
|
{"constraint match",
|
||||||
[]TriggerRegOp{For("foo"), On("bar"), Constraint("baz", "", "baz")},
|
[]TriggerRegOp{For("foo"), On("bar"), Constraint("baz", "", "baz")},
|
||||||
&MockEvent{
|
&mockEvent{
|
||||||
rType: "foo",
|
rType: "foo",
|
||||||
eType: "bar",
|
eType: "bar",
|
||||||
match: func(name string, op string, values ...string) bool {
|
match: func(name string, op string, values ...string) bool {
|
||||||
@@ -69,7 +80,7 @@ func TestTrigger_Match(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{"constraint mismatch",
|
{"constraint mismatch",
|
||||||
[]TriggerRegOp{For("foo"), On("bar"), Constraint("baz", "", "baz")},
|
[]TriggerRegOp{For("foo"), On("bar"), Constraint("baz", "", "baz")},
|
||||||
&MockEvent{
|
&mockEvent{
|
||||||
rType: "foo",
|
rType: "foo",
|
||||||
eType: "bar",
|
eType: "bar",
|
||||||
match: func(name string, op string, values ...string) bool {
|
match: func(name string, op string, values ...string) bool {
|
||||||
@@ -138,3 +149,50 @@ func TestTrigger_RegOps(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTriggerHandler(t *testing.T) {
|
||||||
|
var (
|
||||||
|
a = assert.New(t)
|
||||||
|
ctx = context.Background()
|
||||||
|
ev = &mockEvent{}
|
||||||
|
passedthrough bool
|
||||||
|
|
||||||
|
trSimple = &trigger{
|
||||||
|
handler: func(ctx context.Context, ev Event) error {
|
||||||
|
passedthrough = true
|
||||||
|
a.True(auth.IsSuperUser(ev.(*mockEvent).identity))
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
a.False(passedthrough)
|
||||||
|
a.False(auth.IsSuperUser(ev.identity))
|
||||||
|
|
||||||
|
trSimple.Handle(auth.SetSuperUserContext(ctx), ev)
|
||||||
|
|
||||||
|
a.True(auth.IsSuperUser(ev.identity))
|
||||||
|
a.True(passedthrough, "expecting to pass through simple handler")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTriggerSorting(t *testing.T) {
|
||||||
|
var (
|
||||||
|
a = assert.New(t)
|
||||||
|
tt = TriggerSet{
|
||||||
|
NewTrigger(nil, Weight(3)),
|
||||||
|
NewTrigger(nil, Weight(1)),
|
||||||
|
NewTrigger(nil, Weight(2)),
|
||||||
|
}
|
||||||
|
|
||||||
|
w2s = func(tt TriggerSet) (out string) {
|
||||||
|
for _, t := range tt {
|
||||||
|
out += fmt.Sprintf("%d,", t.weight)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
a.Equal(w2s(tt), "3,1,2,")
|
||||||
|
sort.Sort(tt)
|
||||||
|
a.Equal(w2s(tt), "1,2,3,")
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/cortezaproject/corteza-server/pkg/auth"
|
"github.com/cortezaproject/corteza-server/pkg/auth"
|
||||||
|
"github.com/cortezaproject/corteza-server/pkg/sentry"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
@@ -64,6 +65,8 @@ func (t trigger) Match(re Event) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t trigger) Handle(ctx context.Context, ev Event) error {
|
func (t trigger) Handle(ctx context.Context, ev Event) error {
|
||||||
|
defer sentry.Recover()
|
||||||
|
|
||||||
if eis, ok := ev.(eventInvokerSettable); ok {
|
if eis, ok := ev.(eventInvokerSettable); ok {
|
||||||
eis.SetInvoker(auth.GetIdentityFromContext(ctx))
|
eis.SetInvoker(auth.GetIdentityFromContext(ctx))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user