Add eventbus pkg
This commit is contained in:
52
pkg/eventbus/README.adoc
Normal file
52
pkg/eventbus/README.adoc
Normal file
@@ -0,0 +1,52 @@
|
||||
# pkg/eventbus
|
||||
|
||||
Package handles event dispatching and trigger registration
|
||||
|
||||
## Event
|
||||
|
||||
Event (in context of eventbus package) contains of resource type, event type and a matcher.
|
||||
|
||||
### Resource type
|
||||
|
||||
Resource types are identifiers of internal resources, like user, role, request, ...
|
||||
|
||||
### Event type
|
||||
|
||||
Type describes direct or indirect user action or some other event in the system.
|
||||
|
||||
### Event matcher
|
||||
|
||||
Matcher is a function on event that helps filtering fired events.
|
||||
It decides (returns true or false) if fired event is a match for any of a registered triggers
|
||||
|
||||
Bus has basic internal filtering for resource & event type.
|
||||
Other constraints are passed to event's matcher function.
|
||||
|
||||
.Matcher will receive 2+ string parameters:
|
||||
- name
|
||||
- operator
|
||||
- zero or more values
|
||||
|
||||
It's matcher responsibility to handle contents of name, operator and values parameters.
|
||||
|
||||
Constraint checker procedure will call matcher for each constraint.
|
||||
All constraints must match.
|
||||
First non-match will break constraint checking procedure.
|
||||
|
||||
## Triggers
|
||||
|
||||
Trigger is combination of (event matching) rules/constraints and event handler.
|
||||
Handler is a callback function called when a compatible event is fired.
|
||||
|
||||
Trigger can respond to multiple resource/event combinations.
|
||||
|
||||
### Constraints
|
||||
|
||||
Matcher function is called multiple times, once per each trigger constraint.
|
||||
If any of the calls result in a non-match, check
|
||||
|
||||
Trigger without constraint is considered a match.
|
||||
|
||||
### Weight
|
||||
|
||||
Weight controls order of trigger execution.
|
||||
146
pkg/eventbus/eventbus.go
Normal file
146
pkg/eventbus/eventbus.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type (
|
||||
Event interface {
|
||||
// ResourceType from resource that fired the event
|
||||
ResourceType() string
|
||||
|
||||
// Type of event fired
|
||||
EventType() string
|
||||
|
||||
// Match tests if given constraints match
|
||||
// event's internal values
|
||||
Match(name string, op string, values ...string) bool
|
||||
}
|
||||
|
||||
eventbus struct {
|
||||
l *sync.RWMutex
|
||||
|
||||
// list of registered handlers
|
||||
triggers map[uintptr]*trigger
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
// Global eventbus
|
||||
gEventBus *eventbus
|
||||
)
|
||||
|
||||
func init() {
|
||||
gEventBus = New()
|
||||
}
|
||||
|
||||
// WaitFor is a package level wrapper for global instance of eventbus
|
||||
func WaitFor(ctx context.Context, ev Event) (err error) {
|
||||
return gEventBus.WaitFor(ctx, ev)
|
||||
}
|
||||
|
||||
// Dispatch is a package level wrapper for global instance of eventbus
|
||||
func Dispatch(ctx context.Context, ev Event) {
|
||||
gEventBus.Dispatch(ctx, ev)
|
||||
}
|
||||
|
||||
// Register is a package level wrapper for global instance of eventbus
|
||||
func Register(handler Handler, ops ...TriggerRegOp) uintptr {
|
||||
return gEventBus.Register(handler, ops...)
|
||||
}
|
||||
|
||||
// Unregister is a package level wrapper for global instance of eventbus
|
||||
func Unregister(ptrs ...uintptr) {
|
||||
gEventBus.Unregister(ptrs...)
|
||||
}
|
||||
|
||||
// Returns
|
||||
func Default() *eventbus {
|
||||
return gEventBus
|
||||
}
|
||||
|
||||
func New() *eventbus {
|
||||
return &eventbus{
|
||||
l: &sync.RWMutex{},
|
||||
triggers: make(map[uintptr]*trigger),
|
||||
}
|
||||
}
|
||||
|
||||
// WaitFor is synchronous event dispatcher
|
||||
//
|
||||
// It waits for all handlers and fails on first error
|
||||
func (b *eventbus) WaitFor(ctx context.Context, ev Event) (err error) {
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
for _, t := range b.find(ev) {
|
||||
if err = t.Handle(ctx, ev); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Dispatch runs events asynchronously
|
||||
func (b *eventbus) Dispatch(ctx context.Context, ev Event) {
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
for _, t := range b.find(ev) {
|
||||
go func(ctx context.Context, t *trigger) {
|
||||
_ = t.Handle(ctx, ev)
|
||||
}(ctx, t)
|
||||
}
|
||||
}
|
||||
|
||||
// Finds all registered triggers compatible with given event
|
||||
//
|
||||
// It returns sorted
|
||||
//
|
||||
// There is still room for improvement (performance wise) by indexing
|
||||
// resources and events of each trigger.
|
||||
func (b *eventbus) find(ev Event) (tt TriggerSet) {
|
||||
for _, t := range b.triggers {
|
||||
if ev == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if !t.Match(ev) {
|
||||
continue
|
||||
}
|
||||
|
||||
tt = append(tt, t)
|
||||
}
|
||||
|
||||
sort.Sort(tt)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Register creates a new trigger with given handler, resource, event with other options and constraints
|
||||
//
|
||||
// It returns a trigger identifier that can be used to remove (unregister) trigger later
|
||||
func (b *eventbus) Register(h Handler, ops ...TriggerRegOp) uintptr {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
var (
|
||||
trigger = NewTrigger(h, ops...)
|
||||
ptr = uintptr(unsafe.Pointer(trigger))
|
||||
)
|
||||
|
||||
b.triggers[ptr] = trigger
|
||||
return ptr
|
||||
}
|
||||
|
||||
// Unregister removes one or more registered triggers
|
||||
func (b *eventbus) Unregister(ptrs ...uintptr) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
for _, ptr := range ptrs {
|
||||
delete(b.triggers, ptr)
|
||||
}
|
||||
}
|
||||
70
pkg/eventbus/eventbus_test.go
Normal file
70
pkg/eventbus/eventbus_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestEventbusRegUnreg(t *testing.T) {
|
||||
var (
|
||||
a = assert.New(t)
|
||||
|
||||
bus = New()
|
||||
)
|
||||
|
||||
a.Empty(bus.triggers)
|
||||
h1 := bus.Register(nil)
|
||||
a.NotZero(h1)
|
||||
h2 := bus.Register(nil)
|
||||
a.NotZero(h2)
|
||||
h3 := bus.Register(nil)
|
||||
a.NotZero(h3)
|
||||
a.Len(bus.triggers, 3)
|
||||
bus.Unregister(h1)
|
||||
a.Len(bus.triggers, 2)
|
||||
bus.Unregister(h2)
|
||||
a.Len(bus.triggers, 1)
|
||||
bus.Unregister(h3)
|
||||
a.Empty(bus.triggers)
|
||||
}
|
||||
|
||||
func BenchmarkEventbusTriggerLookup(b *testing.B) {
|
||||
var (
|
||||
bus = New()
|
||||
ptrs []uintptr
|
||||
)
|
||||
|
||||
// Register handlers
|
||||
for n := 0; n < b.N; n++ {
|
||||
ptrs = append(ptrs, bus.Register(nil))
|
||||
}
|
||||
|
||||
// Register find & fire handlers
|
||||
for n := 0; n < b.N; n++ {
|
||||
bus.find(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEventbusRegUnreg(b *testing.B) {
|
||||
var (
|
||||
bus = New()
|
||||
ptrs []uintptr
|
||||
)
|
||||
|
||||
// Register handlers
|
||||
for n := 0; n < b.N; n++ {
|
||||
ptrs = append(ptrs, bus.Register(nil))
|
||||
}
|
||||
|
||||
// Shuffle stored pinters
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
rand.Shuffle(len(ptrs), func(i, j int) { ptrs[i], ptrs[j] = ptrs[j], ptrs[i] })
|
||||
|
||||
// Unregister all pointers
|
||||
for _, ptr := range ptrs {
|
||||
bus.Unregister(ptr)
|
||||
}
|
||||
}
|
||||
140
pkg/eventbus/trigger_test.go
Normal file
140
pkg/eventbus/trigger_test.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type (
|
||||
MockEvent struct {
|
||||
rType string
|
||||
eType string
|
||||
match func(name string, op string, values ...string) bool
|
||||
}
|
||||
)
|
||||
|
||||
func (e MockEvent) ResourceType() string {
|
||||
return e.rType
|
||||
}
|
||||
|
||||
func (e MockEvent) EventType() string {
|
||||
return e.eType
|
||||
}
|
||||
|
||||
func (e MockEvent) Match(name string, op string, values ...string) bool {
|
||||
if e.match == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
return e.match(name, op, values...)
|
||||
}
|
||||
|
||||
func TestTrigger_Match(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
ops []TriggerRegOp
|
||||
ev Event
|
||||
match bool
|
||||
}{
|
||||
{"nil event",
|
||||
nil,
|
||||
nil,
|
||||
false,
|
||||
},
|
||||
{"empty resource",
|
||||
[]TriggerRegOp{For("foo"), On("bar")},
|
||||
&MockEvent{rType: "", eType: "bar"},
|
||||
false,
|
||||
},
|
||||
{"empty event",
|
||||
[]TriggerRegOp{For("foo"), On("bar")},
|
||||
&MockEvent{rType: "foo", eType: ""},
|
||||
false,
|
||||
},
|
||||
{"simple foo-bar test",
|
||||
[]TriggerRegOp{For("foo"), On("bar")},
|
||||
&MockEvent{rType: "foo", eType: "bar"},
|
||||
true,
|
||||
},
|
||||
{"constraint match",
|
||||
[]TriggerRegOp{For("foo"), On("bar"), Constraint("baz", "", "baz")},
|
||||
&MockEvent{
|
||||
rType: "foo",
|
||||
eType: "bar",
|
||||
match: func(name string, op string, values ...string) bool {
|
||||
return len(values) > 0 && name == values[0]
|
||||
}},
|
||||
true,
|
||||
},
|
||||
{"constraint mismatch",
|
||||
[]TriggerRegOp{For("foo"), On("bar"), Constraint("baz", "", "baz")},
|
||||
&MockEvent{
|
||||
rType: "foo",
|
||||
eType: "bar",
|
||||
match: func(name string, op string, values ...string) bool {
|
||||
return false
|
||||
}},
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
var trigger = NewTrigger(nil, c.ops...)
|
||||
if c.match {
|
||||
assert.True(t, trigger.Match(c.ev), "Expecting to match")
|
||||
|
||||
} else {
|
||||
assert.False(t, trigger.Match(c.ev), "Expecting to not match")
|
||||
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrigger_RegOps(t *testing.T) {
|
||||
makeTestTrigger := func(t *trigger) *trigger {
|
||||
if t.resourceTypes == nil {
|
||||
t.resourceTypes = make(map[string]bool)
|
||||
}
|
||||
|
||||
if t.eventTypes == nil {
|
||||
t.eventTypes = make(map[string]bool)
|
||||
}
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
exp *trigger
|
||||
ops []TriggerRegOp
|
||||
}{
|
||||
{
|
||||
"empty",
|
||||
makeTestTrigger(&trigger{}),
|
||||
nil,
|
||||
},
|
||||
{
|
||||
"resource types",
|
||||
makeTestTrigger(&trigger{resourceTypes: map[string]bool{"foo": true, "bar": true}}),
|
||||
[]TriggerRegOp{For("foo", "bar")},
|
||||
},
|
||||
{
|
||||
"event types",
|
||||
makeTestTrigger(&trigger{eventTypes: map[string]bool{"foo": true, "bar": true}}),
|
||||
[]TriggerRegOp{On("foo", "bar")},
|
||||
},
|
||||
{
|
||||
"weight",
|
||||
makeTestTrigger(&trigger{weight: 42}),
|
||||
[]TriggerRegOp{Weight(42)},
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
assert.Equal(t, c.exp, NewTrigger(nil, c.ops...))
|
||||
})
|
||||
}
|
||||
}
|
||||
124
pkg/eventbus/triggers.go
Normal file
124
pkg/eventbus/triggers.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/auth"
|
||||
)
|
||||
|
||||
type (
|
||||
constraint struct {
|
||||
name string
|
||||
op string
|
||||
value []string
|
||||
}
|
||||
|
||||
constraintSet []constraint
|
||||
|
||||
Handler func(ctx context.Context, ev Event) error
|
||||
|
||||
trigger struct {
|
||||
handler Handler
|
||||
resourceTypes map[string]bool
|
||||
eventTypes map[string]bool
|
||||
constraints constraintSet
|
||||
weight int
|
||||
}
|
||||
|
||||
// @todo add sorting interface
|
||||
TriggerSet []*trigger
|
||||
|
||||
TriggerRegOp func(t *trigger)
|
||||
|
||||
eventInvokerSettable interface {
|
||||
SetInvoker(auth.Identifiable)
|
||||
}
|
||||
)
|
||||
|
||||
// Match matches trigger with resource event
|
||||
func (t trigger) Match(re Event) bool {
|
||||
if re == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(re.ResourceType()) == 0 || !t.resourceTypes[re.ResourceType()] {
|
||||
// Expecting to have valid resource type and match at least one
|
||||
// defined resource on the trigger
|
||||
return false
|
||||
}
|
||||
|
||||
if len(re.EventType()) == 0 || !t.eventTypes[re.EventType()] {
|
||||
// Expecting to have valid event type and match at least one
|
||||
// defined event on the trigger
|
||||
return false
|
||||
}
|
||||
|
||||
for _, c := range t.constraints {
|
||||
// Should match all constraints
|
||||
if !re.Match(c.name, c.op, c.value...) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (t trigger) Handle(ctx context.Context, ev Event) error {
|
||||
if eis, ok := ev.(eventInvokerSettable); ok {
|
||||
eis.SetInvoker(auth.GetIdentityFromContext(ctx))
|
||||
}
|
||||
|
||||
return t.handler(ctx, ev)
|
||||
}
|
||||
|
||||
func NewTrigger(h Handler, ops ...TriggerRegOp) *trigger {
|
||||
var t = &trigger{
|
||||
resourceTypes: make(map[string]bool),
|
||||
eventTypes: make(map[string]bool),
|
||||
handler: h,
|
||||
}
|
||||
|
||||
for _, op := range ops {
|
||||
op(t)
|
||||
}
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
func For(rr ...string) TriggerRegOp {
|
||||
return func(t *trigger) {
|
||||
for _, r := range rr {
|
||||
t.resourceTypes[r] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func On(ee ...string) TriggerRegOp {
|
||||
return func(t *trigger) {
|
||||
for _, e := range ee {
|
||||
t.eventTypes[e] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Constraint(name, op string, vv ...string) TriggerRegOp {
|
||||
return func(t *trigger) {
|
||||
t.constraints = append(t.constraints, constraint{
|
||||
name: name,
|
||||
op: op,
|
||||
value: vv,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Weight(weight int) TriggerRegOp {
|
||||
return func(t *trigger) {
|
||||
t.weight = weight
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger-set sorting:
|
||||
|
||||
func (set TriggerSet) Len() int { return len(set) }
|
||||
func (set TriggerSet) Swap(i, j int) { set[i], set[j] = set[j], set[i] }
|
||||
func (set TriggerSet) Less(i, j int) bool { return set[i].weight < set[j].weight }
|
||||
Reference in New Issue
Block a user