Refactor corredor, add tests
This commit is contained in:
parent
e9b2fe49ae
commit
8a1d808466
94
pkg/corredor/conn_test.go
Normal file
94
pkg/corredor/conn_test.go
Normal file
@ -0,0 +1,94 @@
|
||||
package corredor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/app/options"
|
||||
"github.com/cortezaproject/corteza-server/pkg/logger"
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func TestNewConnectionWithDisabled(t *testing.T) {
|
||||
c, err := NewConnection(nil, options.CorredorOpt{Enabled: false}, nil)
|
||||
assert.Nil(t, c)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestNewConnection(t *testing.T) {
|
||||
var (
|
||||
ctx = context.Background()
|
||||
|
||||
dbgLog = logger.MakeDebugLogger()
|
||||
|
||||
a = assert.New(t)
|
||||
wg = &sync.WaitGroup{}
|
||||
|
||||
lstnr = openListener(t)
|
||||
grpcServer = grpc.NewServer()
|
||||
|
||||
opt = options.CorredorOpt{
|
||||
Enabled: true,
|
||||
Log: true,
|
||||
MaxBackoffDelay: 1,
|
||||
Addr: lstnr.Addr().String(),
|
||||
}
|
||||
)
|
||||
|
||||
a.NotNil(lstnr)
|
||||
defer lstnr.Close()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
a.NoError(grpcServer.Serve(lstnr))
|
||||
}()
|
||||
|
||||
grpcClientConn, err := NewConnection(ctx, opt, dbgLog)
|
||||
a.NoError(err)
|
||||
|
||||
// Go and
|
||||
NewService(grpcClientConn, nil, dbgLog, opt)
|
||||
|
||||
grpcClientConn.WaitForStateChange(ctx, connectivity.Ready)
|
||||
grpcServer.GracefulStop()
|
||||
lstnr.Close()
|
||||
|
||||
t.Log("waiting for connection to close")
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func openListener(t *testing.T) (ln net.Listener) {
|
||||
var (
|
||||
tries = 1000
|
||||
minPort = 50000
|
||||
maxPort = 63000
|
||||
port int
|
||||
err error
|
||||
)
|
||||
|
||||
for tries > 0 {
|
||||
port = minPort + rand.Intn(maxPort-minPort)
|
||||
t.Logf("trying to find available port for gRPC connection: %d", port)
|
||||
ln, err = net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
|
||||
if err == nil {
|
||||
return ln
|
||||
}
|
||||
|
||||
t.Errorf("error: %s", err)
|
||||
tries--
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
17
pkg/corredor/filter_test.go
Normal file
17
pkg/corredor/filter_test.go
Normal file
@ -0,0 +1,17 @@
|
||||
package corredor
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestManualScriptFilterResourcePrefixing(t *testing.T) {
|
||||
f := &ManualScriptFilter{
|
||||
ResourceTypes: []string{"system", "system:one", "two"},
|
||||
}
|
||||
|
||||
f.PrefixResource("system")
|
||||
|
||||
assert.New(t).Equal(f.ResourceTypes, []string{"system", "system:one", "system:two"})
|
||||
}
|
||||
@ -2,8 +2,6 @@ package corredor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
@ -28,7 +26,7 @@ type (
|
||||
registered map[string][]uintptr
|
||||
|
||||
// list of all registered onManual triggers & scripts
|
||||
// map[resource][script-name]bool
|
||||
// map[script-name][resource]bool
|
||||
manual map[string]map[string]bool
|
||||
|
||||
// Combined list of client and server scripts
|
||||
@ -45,15 +43,7 @@ type (
|
||||
}
|
||||
|
||||
Event interface {
|
||||
// ResourceType from resource that fired the event
|
||||
ResourceType() string
|
||||
|
||||
// Event that was fired
|
||||
EventType() string
|
||||
|
||||
// Match tests if given constraints match
|
||||
// event's internal values
|
||||
Match(name string, op string, values ...string) bool
|
||||
eventbus.Event
|
||||
|
||||
// Encode (event) to arguments passed to
|
||||
// event handlers ([automation ]script runner)
|
||||
@ -97,18 +87,18 @@ func Start(ctx context.Context, logger *zap.Logger, opt options.CorredorOpt) (er
|
||||
return
|
||||
}
|
||||
|
||||
gCorredor = NewService(conn, logger, opt)
|
||||
gCorredor = NewService(conn, eventbus.Service(), logger, opt)
|
||||
return
|
||||
}
|
||||
|
||||
func NewService(conn *grpc.ClientConn, logger *zap.Logger, opt options.CorredorOpt) *service {
|
||||
func NewService(conn *grpc.ClientConn, er eventRegistrator, logger *zap.Logger, opt options.CorredorOpt) *service {
|
||||
return &service{
|
||||
ssClient: NewServerScriptsClient(conn),
|
||||
csClient: NewClientScriptsClient(conn),
|
||||
log: logger.Named("corredor"),
|
||||
registered: make(map[string][]uintptr),
|
||||
manual: make(map[string]map[string]bool),
|
||||
eventbus: eventbus.Service(),
|
||||
eventbus: er,
|
||||
opt: opt,
|
||||
}
|
||||
}
|
||||
@ -156,11 +146,11 @@ func (svc service) ExecOnManual(ctx context.Context, script string, event Event)
|
||||
return errors.Errorf("triggered event type is not onManual (%q)", evt)
|
||||
}
|
||||
|
||||
if _, ok := svc.manual[res]; !ok {
|
||||
return errors.Errorf("unregistered onManual resource %q", res)
|
||||
if _, ok := svc.manual[script]; !ok {
|
||||
return errors.Errorf("unregistered onManual script %q", script)
|
||||
}
|
||||
|
||||
if _, ok := svc.manual[res][script]; !ok {
|
||||
if _, ok := svc.manual[script][res]; !ok {
|
||||
return errors.Errorf("unregistered onManual script %q for resource %q", script, res)
|
||||
}
|
||||
|
||||
@ -185,10 +175,25 @@ func (svc *service) loadServerScripts(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
svc.manual = make(map[string]map[string]bool)
|
||||
svc.sScripts = make([]*Script, len(rsp.Scripts))
|
||||
svc.registerServerScripts(rsp.Scripts...)
|
||||
}
|
||||
|
||||
for i, script := range rsp.Scripts {
|
||||
// Registers Corredor scripts to eventbus and list of manual scripts
|
||||
func (svc *service) registerServerScripts(ss ...*ServerScript) {
|
||||
svc.sScripts = make([]*Script, len(ss))
|
||||
|
||||
// Remove all previously registered triggers
|
||||
for _, ptrs := range svc.registered {
|
||||
if len(ptrs) > 0 {
|
||||
svc.eventbus.Unregister(ptrs...)
|
||||
}
|
||||
}
|
||||
|
||||
// Reset indexes
|
||||
svc.registered = make(map[string][]uintptr)
|
||||
svc.manual = make(map[string]map[string]bool)
|
||||
|
||||
for i, script := range ss {
|
||||
svc.sScripts[i] = &Script{
|
||||
Name: script.Name,
|
||||
Label: script.Label,
|
||||
@ -198,125 +203,59 @@ func (svc *service) loadServerScripts(ctx context.Context) {
|
||||
}
|
||||
|
||||
if len(script.Errors) == 0 {
|
||||
svc.registerTriggers(script)
|
||||
svc.manual[script.Name] = pluckManualTriggers(script)
|
||||
svc.registered[script.Name] = svc.registerTriggers(script)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *service) loadClientScripts(ctx context.Context) {
|
||||
var (
|
||||
err error
|
||||
rsp *ClientScriptListResponse
|
||||
)
|
||||
|
||||
svc.log.Debug("reloading client scripts")
|
||||
rsp, err = svc.csClient.List(ctx, &ClientScriptListRequest{}, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
svc.log.Error("could not load corredor client scripts", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
svc.cScripts = make([]*Script, len(rsp.Scripts))
|
||||
|
||||
for i, script := range rsp.Scripts {
|
||||
svc.cScripts[i] = &Script{
|
||||
Name: script.Name,
|
||||
Label: script.Label,
|
||||
Description: script.Description,
|
||||
Errors: script.Errors,
|
||||
Triggers: script.Triggers,
|
||||
Bundle: script.Bundle,
|
||||
Type: script.Type,
|
||||
}
|
||||
svc.log.Debug(
|
||||
"registered",
|
||||
zap.String("script", script.Name),
|
||||
zap.Int("manual", len(svc.manual[script.Name])),
|
||||
zap.Int("triggers", len(svc.registered[script.Name])),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Creates handler function for eventbus subsystem
|
||||
//
|
||||
// If trigger has "onManual"
|
||||
func (svc service) registerTriggers(script *ServerScript) {
|
||||
// If trigger has "onManual" event type, it removes it and
|
||||
// registers that script to the list of manual triggers
|
||||
func (svc *service) registerTriggers(script *ServerScript) []uintptr {
|
||||
var (
|
||||
ops []eventbus.TriggerRegOp
|
||||
handler eventbus.Handler
|
||||
err error
|
||||
ops []eventbus.TriggerRegOp
|
||||
err error
|
||||
ptrs = make([]uintptr, 0, len(script.Triggers))
|
||||
|
||||
log = svc.log.With(zap.String("scriptName", script.Name))
|
||||
log = svc.log.With(zap.String("script", script.Name))
|
||||
)
|
||||
|
||||
if ptrs, has := svc.registered[script.Name]; has && len(ptrs) > 0 {
|
||||
// Unregister previously registered triggers
|
||||
svc.eventbus.Unregister(ptrs...)
|
||||
log.Debug(
|
||||
"triggers unregistered",
|
||||
zap.Uintptrs("triggers", ptrs),
|
||||
)
|
||||
}
|
||||
|
||||
// Make room for new
|
||||
svc.registered[script.Name] = make([]uintptr, 0)
|
||||
|
||||
for i := range script.Triggers {
|
||||
// We're modifying trigger in the loop,
|
||||
// so let's make a copy we can play with
|
||||
trigger := *script.Triggers[i]
|
||||
|
||||
if popOnManualEventType(&trigger) {
|
||||
for _, res := range trigger.Resources {
|
||||
if svc.manual[res] == nil {
|
||||
svc.manual[res] = make(map[string]bool)
|
||||
}
|
||||
|
||||
svc.manual[res][script.Name] = true
|
||||
}
|
||||
|
||||
log.Debug("manual trigger registered", zap.Strings("resources", trigger.Resources))
|
||||
|
||||
if len(trigger.Events) == 0 {
|
||||
// We've removed the last event
|
||||
//
|
||||
// break now to prevent code below to
|
||||
// complain about missing event types
|
||||
continue
|
||||
}
|
||||
if len(trigger.Events) == 0 {
|
||||
// We've removed the last event
|
||||
//
|
||||
// break now to prevent code below to
|
||||
// complain about missing event types
|
||||
continue
|
||||
}
|
||||
|
||||
if ops, err = svc.makeTriggerRegOpts(&trigger); err != nil {
|
||||
if ops, err = makeTriggerOpts(&trigger); err != nil {
|
||||
log.Warn(
|
||||
"trigger could not be registered",
|
||||
"could not make trigger options",
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
var runAs = trigger.RunAs
|
||||
|
||||
handler = func(ctx context.Context, ev eventbus.Event) error {
|
||||
// Is this compatible event?
|
||||
|
||||
if ce, ok := ev.(Event); ok {
|
||||
if len(runAs) > 0 {
|
||||
jwt, err := svc.jwtMaker(runAs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx = auth.SetJwtToContext(ctx, jwt)
|
||||
}
|
||||
|
||||
return svc.exec(ctx, script.Name, ce)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
ptr := svc.eventbus.Register(handler, ops...)
|
||||
|
||||
log.Debug(
|
||||
"trigger registered",
|
||||
zap.Uintptr("triggers", ptr),
|
||||
)
|
||||
ptr := svc.eventbus.Register(makeEventHandler(svc, script.Name, trigger.RunAs), ops...)
|
||||
ptrs = append(ptrs, ptr)
|
||||
}
|
||||
|
||||
return ptrs
|
||||
}
|
||||
|
||||
// Exec finds and runs specific script with given event
|
||||
@ -438,35 +377,34 @@ func (svc service) exec(ctx context.Context, script string, event Event) (err er
|
||||
return
|
||||
}
|
||||
|
||||
func (svc service) makeTriggerRegOpts(t *Trigger) (oo []eventbus.TriggerRegOp, err error) {
|
||||
if len(t.Events) == 0 {
|
||||
return nil, fmt.Errorf("can not generate trigger without at least one events")
|
||||
}
|
||||
if len(t.Resources) == 0 {
|
||||
return nil, fmt.Errorf("can not generate trigger without at least one resource")
|
||||
}
|
||||
func (svc *service) loadClientScripts(ctx context.Context) {
|
||||
var (
|
||||
err error
|
||||
rsp *ClientScriptListResponse
|
||||
)
|
||||
|
||||
oo = append(oo, eventbus.On(t.Events...))
|
||||
oo = append(oo, eventbus.For(t.Resources...))
|
||||
|
||||
for i := range t.Constraints {
|
||||
oo = append(oo, eventbus.Constraint(
|
||||
t.Constraints[i].Name,
|
||||
t.Constraints[i].Op,
|
||||
t.Constraints[i].Value...,
|
||||
))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func encodeArguments(args map[string]string, key string, val interface{}) (err error) {
|
||||
var tmp []byte
|
||||
|
||||
if tmp, err = json.Marshal(val); err != nil {
|
||||
svc.log.Debug("reloading client scripts")
|
||||
rsp, err = svc.csClient.List(ctx, &ClientScriptListRequest{}, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
svc.log.Error("could not load corredor client scripts", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
args[key] = string(tmp)
|
||||
return
|
||||
svc.registerClientScripts(rsp.Scripts...)
|
||||
}
|
||||
|
||||
func (svc *service) registerClientScripts(ss ...*ClientScript) {
|
||||
svc.cScripts = make([]*Script, len(ss))
|
||||
|
||||
for i, script := range ss {
|
||||
svc.cScripts[i] = &Script{
|
||||
Name: script.Name,
|
||||
Label: script.Label,
|
||||
Description: script.Description,
|
||||
Errors: script.Errors,
|
||||
Triggers: script.Triggers,
|
||||
Bundle: script.Bundle,
|
||||
Type: script.Type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
63
pkg/corredor/service_test.go
Normal file
63
pkg/corredor/service_test.go
Normal file
@ -0,0 +1,63 @@
|
||||
package corredor
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestFindOnManual(t *testing.T) {
|
||||
var (
|
||||
svc = &service{
|
||||
sScripts: ScriptSet{
|
||||
&Script{
|
||||
Triggers: []*Trigger{
|
||||
&Trigger{
|
||||
Events: []string{"ev"},
|
||||
Resources: []string{"res"},
|
||||
},
|
||||
},
|
||||
},
|
||||
&Script{
|
||||
Triggers: []*Trigger{
|
||||
&Trigger{
|
||||
Events: []string{"foo"},
|
||||
Resources: []string{"bar"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
cScripts: ScriptSet{
|
||||
&Script{
|
||||
Triggers: []*Trigger{
|
||||
&Trigger{
|
||||
Events: []string{"ev"},
|
||||
Resources: []string{"res"},
|
||||
},
|
||||
},
|
||||
},
|
||||
&Script{
|
||||
Triggers: []*Trigger{
|
||||
&Trigger{
|
||||
Events: []string{"foo"},
|
||||
Resources: []string{"bar"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
filter = ManualScriptFilter{
|
||||
ResourceTypes: []string{"res"},
|
||||
EventTypes: []string{"ev"},
|
||||
ExcludeServerScripts: false,
|
||||
ExcludeClientScripts: false,
|
||||
}
|
||||
|
||||
o, _, err = svc.FindOnManual(filter)
|
||||
|
||||
a = assert.New(t)
|
||||
)
|
||||
|
||||
a.NoError(err)
|
||||
a.Len(o, 2)
|
||||
}
|
||||
@ -1,6 +1,12 @@
|
||||
package corredor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/auth"
|
||||
"github.com/cortezaproject/corteza-server/pkg/eventbus"
|
||||
"github.com/cortezaproject/corteza-server/pkg/slice"
|
||||
)
|
||||
|
||||
@ -20,6 +26,88 @@ func popOnManualEventType(trigger *Trigger) (found bool) {
|
||||
return
|
||||
}
|
||||
|
||||
// pluckManualTriggers removes all manual triggers from the list of script's triggers
|
||||
//
|
||||
// and returns a hash map with resources from these manual triggers
|
||||
func pluckManualTriggers(script *ServerScript) map[string]bool {
|
||||
var (
|
||||
hash = make(map[string]bool)
|
||||
)
|
||||
|
||||
for i := range script.Triggers {
|
||||
// We're modifying trigger in the loop,
|
||||
// so let's make a copy we can play with
|
||||
trigger := *script.Triggers[i]
|
||||
|
||||
if popOnManualEventType(&trigger) {
|
||||
for _, res := range trigger.Resources {
|
||||
hash[res] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return hash
|
||||
}
|
||||
|
||||
// converts trigger's constraint to eventbus' constraint options
|
||||
func makeTriggerOpts(t *Trigger) (oo []eventbus.TriggerRegOp, err error) {
|
||||
if len(t.Events) == 0 {
|
||||
return nil, fmt.Errorf("can not generate trigger without at least one events")
|
||||
}
|
||||
if len(t.Resources) == 0 {
|
||||
return nil, fmt.Errorf("can not generate trigger without at least one resource")
|
||||
}
|
||||
|
||||
oo = append(oo, eventbus.On(t.Events...))
|
||||
oo = append(oo, eventbus.For(t.Resources...))
|
||||
|
||||
for i := range t.Constraints {
|
||||
oo = append(oo, eventbus.Constraint(
|
||||
t.Constraints[i].Name,
|
||||
t.Constraints[i].Op,
|
||||
t.Constraints[i].Value...,
|
||||
))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// makes event-handler callback
|
||||
func makeEventHandler(svc *service, script string, runAs string) eventbus.Handler {
|
||||
return func(ctx context.Context, ev eventbus.Event) error {
|
||||
// Is this compatible event?
|
||||
|
||||
if ce, ok := ev.(Event); ok {
|
||||
if len(runAs) > 0 {
|
||||
jwt, err := svc.jwtMaker(runAs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx = auth.SetJwtToContext(ctx, jwt)
|
||||
}
|
||||
|
||||
return svc.exec(ctx, script, ce)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// encode adds entry (with json encoded value) to hash map
|
||||
// used to prepare data for transmission
|
||||
func encodeArguments(args map[string]string, key string, val interface{}) (err error) {
|
||||
var tmp []byte
|
||||
|
||||
if tmp, err = json.Marshal(val); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
args[key] = string(tmp)
|
||||
return
|
||||
}
|
||||
|
||||
// Creates a filter fn for script filtering
|
||||
func makeScriptFilter(f ManualScriptFilter) func(s *Script) (b bool, err error) {
|
||||
return func(s *Script) (b bool, err error) {
|
||||
b = true
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user