diff --git a/pkg/corredor/conn_test.go b/pkg/corredor/conn_test.go new file mode 100644 index 000000000..e8328b02d --- /dev/null +++ b/pkg/corredor/conn_test.go @@ -0,0 +1,94 @@ +package corredor + +import ( + "context" + "fmt" + "math/rand" + "net" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + + "github.com/cortezaproject/corteza-server/pkg/app/options" + "github.com/cortezaproject/corteza-server/pkg/logger" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +func TestNewConnectionWithDisabled(t *testing.T) { + c, err := NewConnection(nil, options.CorredorOpt{Enabled: false}, nil) + assert.Nil(t, c) + assert.Nil(t, err) +} + +func TestNewConnection(t *testing.T) { + var ( + ctx = context.Background() + + dbgLog = logger.MakeDebugLogger() + + a = assert.New(t) + wg = &sync.WaitGroup{} + + lstnr = openListener(t) + grpcServer = grpc.NewServer() + + opt = options.CorredorOpt{ + Enabled: true, + Log: true, + MaxBackoffDelay: 1, + Addr: lstnr.Addr().String(), + } + ) + + a.NotNil(lstnr) + defer lstnr.Close() + wg.Add(1) + go func() { + defer wg.Done() + a.NoError(grpcServer.Serve(lstnr)) + }() + + grpcClientConn, err := NewConnection(ctx, opt, dbgLog) + a.NoError(err) + + // Go and + NewService(grpcClientConn, nil, dbgLog, opt) + + grpcClientConn.WaitForStateChange(ctx, connectivity.Ready) + grpcServer.GracefulStop() + lstnr.Close() + + t.Log("waiting for connection to close") + wg.Wait() +} + +func openListener(t *testing.T) (ln net.Listener) { + var ( + tries = 1000 + minPort = 50000 + maxPort = 63000 + port int + err error + ) + + for tries > 0 { + port = minPort + rand.Intn(maxPort-minPort) + t.Logf("trying to find available port for gRPC connection: %d", port) + ln, err = net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + if err == nil { + return ln + } + + t.Errorf("error: %s", err) + tries-- + } + + return nil +} diff --git a/pkg/corredor/filter_test.go b/pkg/corredor/filter_test.go new file mode 100644 index 000000000..1ad61e8ad --- /dev/null +++ b/pkg/corredor/filter_test.go @@ -0,0 +1,17 @@ +package corredor + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestManualScriptFilterResourcePrefixing(t *testing.T) { + f := &ManualScriptFilter{ + ResourceTypes: []string{"system", "system:one", "two"}, + } + + f.PrefixResource("system") + + assert.New(t).Equal(f.ResourceTypes, []string{"system", "system:one", "system:two"}) +} diff --git a/pkg/corredor/service.go b/pkg/corredor/service.go index c06a99c9d..221f4d7cf 100644 --- a/pkg/corredor/service.go +++ b/pkg/corredor/service.go @@ -2,8 +2,6 @@ package corredor import ( "context" - "encoding/json" - "fmt" "github.com/pkg/errors" "go.uber.org/zap" @@ -28,7 +26,7 @@ type ( registered map[string][]uintptr // list of all registered onManual triggers & scripts - // map[resource][script-name]bool + // map[script-name][resource]bool manual map[string]map[string]bool // Combined list of client and server scripts @@ -45,15 +43,7 @@ type ( } Event interface { - // ResourceType from resource that fired the event - ResourceType() string - - // Event that was fired - EventType() string - - // Match tests if given constraints match - // event's internal values - Match(name string, op string, values ...string) bool + eventbus.Event // Encode (event) to arguments passed to // event handlers ([automation ]script runner) @@ -97,18 +87,18 @@ func Start(ctx context.Context, logger *zap.Logger, opt options.CorredorOpt) (er return } - gCorredor = NewService(conn, logger, opt) + gCorredor = NewService(conn, eventbus.Service(), logger, opt) return } -func NewService(conn *grpc.ClientConn, logger *zap.Logger, opt options.CorredorOpt) *service { +func NewService(conn *grpc.ClientConn, er eventRegistrator, logger *zap.Logger, opt options.CorredorOpt) *service { return &service{ ssClient: NewServerScriptsClient(conn), csClient: NewClientScriptsClient(conn), log: logger.Named("corredor"), registered: make(map[string][]uintptr), manual: make(map[string]map[string]bool), - eventbus: eventbus.Service(), + eventbus: er, opt: opt, } } @@ -156,11 +146,11 @@ func (svc service) ExecOnManual(ctx context.Context, script string, event Event) return errors.Errorf("triggered event type is not onManual (%q)", evt) } - if _, ok := svc.manual[res]; !ok { - return errors.Errorf("unregistered onManual resource %q", res) + if _, ok := svc.manual[script]; !ok { + return errors.Errorf("unregistered onManual script %q", script) } - if _, ok := svc.manual[res][script]; !ok { + if _, ok := svc.manual[script][res]; !ok { return errors.Errorf("unregistered onManual script %q for resource %q", script, res) } @@ -185,10 +175,25 @@ func (svc *service) loadServerScripts(ctx context.Context) { return } - svc.manual = make(map[string]map[string]bool) - svc.sScripts = make([]*Script, len(rsp.Scripts)) + svc.registerServerScripts(rsp.Scripts...) +} - for i, script := range rsp.Scripts { +// Registers Corredor scripts to eventbus and list of manual scripts +func (svc *service) registerServerScripts(ss ...*ServerScript) { + svc.sScripts = make([]*Script, len(ss)) + + // Remove all previously registered triggers + for _, ptrs := range svc.registered { + if len(ptrs) > 0 { + svc.eventbus.Unregister(ptrs...) + } + } + + // Reset indexes + svc.registered = make(map[string][]uintptr) + svc.manual = make(map[string]map[string]bool) + + for i, script := range ss { svc.sScripts[i] = &Script{ Name: script.Name, Label: script.Label, @@ -198,125 +203,59 @@ func (svc *service) loadServerScripts(ctx context.Context) { } if len(script.Errors) == 0 { - svc.registerTriggers(script) + svc.manual[script.Name] = pluckManualTriggers(script) + svc.registered[script.Name] = svc.registerTriggers(script) } - } -} -func (svc *service) loadClientScripts(ctx context.Context) { - var ( - err error - rsp *ClientScriptListResponse - ) - - svc.log.Debug("reloading client scripts") - rsp, err = svc.csClient.List(ctx, &ClientScriptListRequest{}, grpc.WaitForReady(true)) - if err != nil { - svc.log.Error("could not load corredor client scripts", zap.Error(err)) - return - } - - svc.cScripts = make([]*Script, len(rsp.Scripts)) - - for i, script := range rsp.Scripts { - svc.cScripts[i] = &Script{ - Name: script.Name, - Label: script.Label, - Description: script.Description, - Errors: script.Errors, - Triggers: script.Triggers, - Bundle: script.Bundle, - Type: script.Type, - } + svc.log.Debug( + "registered", + zap.String("script", script.Name), + zap.Int("manual", len(svc.manual[script.Name])), + zap.Int("triggers", len(svc.registered[script.Name])), + ) } } // Creates handler function for eventbus subsystem // -// If trigger has "onManual" -func (svc service) registerTriggers(script *ServerScript) { +// If trigger has "onManual" event type, it removes it and +// registers that script to the list of manual triggers +func (svc *service) registerTriggers(script *ServerScript) []uintptr { var ( - ops []eventbus.TriggerRegOp - handler eventbus.Handler - err error + ops []eventbus.TriggerRegOp + err error + ptrs = make([]uintptr, 0, len(script.Triggers)) - log = svc.log.With(zap.String("scriptName", script.Name)) + log = svc.log.With(zap.String("script", script.Name)) ) - if ptrs, has := svc.registered[script.Name]; has && len(ptrs) > 0 { - // Unregister previously registered triggers - svc.eventbus.Unregister(ptrs...) - log.Debug( - "triggers unregistered", - zap.Uintptrs("triggers", ptrs), - ) - } - - // Make room for new - svc.registered[script.Name] = make([]uintptr, 0) - for i := range script.Triggers { // We're modifying trigger in the loop, // so let's make a copy we can play with trigger := *script.Triggers[i] - if popOnManualEventType(&trigger) { - for _, res := range trigger.Resources { - if svc.manual[res] == nil { - svc.manual[res] = make(map[string]bool) - } - - svc.manual[res][script.Name] = true - } - - log.Debug("manual trigger registered", zap.Strings("resources", trigger.Resources)) - - if len(trigger.Events) == 0 { - // We've removed the last event - // - // break now to prevent code below to - // complain about missing event types - continue - } + if len(trigger.Events) == 0 { + // We've removed the last event + // + // break now to prevent code below to + // complain about missing event types + continue } - if ops, err = svc.makeTriggerRegOpts(&trigger); err != nil { + if ops, err = makeTriggerOpts(&trigger); err != nil { log.Warn( - "trigger could not be registered", + "could not make trigger options", zap.Error(err), ) continue } - var runAs = trigger.RunAs - - handler = func(ctx context.Context, ev eventbus.Event) error { - // Is this compatible event? - - if ce, ok := ev.(Event); ok { - if len(runAs) > 0 { - jwt, err := svc.jwtMaker(runAs) - if err != nil { - return err - } - - ctx = auth.SetJwtToContext(ctx, jwt) - } - - return svc.exec(ctx, script.Name, ce) - } - - return nil - } - - ptr := svc.eventbus.Register(handler, ops...) - - log.Debug( - "trigger registered", - zap.Uintptr("triggers", ptr), - ) + ptr := svc.eventbus.Register(makeEventHandler(svc, script.Name, trigger.RunAs), ops...) + ptrs = append(ptrs, ptr) } + + return ptrs } // Exec finds and runs specific script with given event @@ -438,35 +377,34 @@ func (svc service) exec(ctx context.Context, script string, event Event) (err er return } -func (svc service) makeTriggerRegOpts(t *Trigger) (oo []eventbus.TriggerRegOp, err error) { - if len(t.Events) == 0 { - return nil, fmt.Errorf("can not generate trigger without at least one events") - } - if len(t.Resources) == 0 { - return nil, fmt.Errorf("can not generate trigger without at least one resource") - } +func (svc *service) loadClientScripts(ctx context.Context) { + var ( + err error + rsp *ClientScriptListResponse + ) - oo = append(oo, eventbus.On(t.Events...)) - oo = append(oo, eventbus.For(t.Resources...)) - - for i := range t.Constraints { - oo = append(oo, eventbus.Constraint( - t.Constraints[i].Name, - t.Constraints[i].Op, - t.Constraints[i].Value..., - )) - } - - return -} - -func encodeArguments(args map[string]string, key string, val interface{}) (err error) { - var tmp []byte - - if tmp, err = json.Marshal(val); err != nil { + svc.log.Debug("reloading client scripts") + rsp, err = svc.csClient.List(ctx, &ClientScriptListRequest{}, grpc.WaitForReady(true)) + if err != nil { + svc.log.Error("could not load corredor client scripts", zap.Error(err)) return } - args[key] = string(tmp) - return + svc.registerClientScripts(rsp.Scripts...) +} + +func (svc *service) registerClientScripts(ss ...*ClientScript) { + svc.cScripts = make([]*Script, len(ss)) + + for i, script := range ss { + svc.cScripts[i] = &Script{ + Name: script.Name, + Label: script.Label, + Description: script.Description, + Errors: script.Errors, + Triggers: script.Triggers, + Bundle: script.Bundle, + Type: script.Type, + } + } } diff --git a/pkg/corredor/service_test.go b/pkg/corredor/service_test.go new file mode 100644 index 000000000..e2ca1ab31 --- /dev/null +++ b/pkg/corredor/service_test.go @@ -0,0 +1,63 @@ +package corredor + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFindOnManual(t *testing.T) { + var ( + svc = &service{ + sScripts: ScriptSet{ + &Script{ + Triggers: []*Trigger{ + &Trigger{ + Events: []string{"ev"}, + Resources: []string{"res"}, + }, + }, + }, + &Script{ + Triggers: []*Trigger{ + &Trigger{ + Events: []string{"foo"}, + Resources: []string{"bar"}, + }, + }, + }, + }, + cScripts: ScriptSet{ + &Script{ + Triggers: []*Trigger{ + &Trigger{ + Events: []string{"ev"}, + Resources: []string{"res"}, + }, + }, + }, + &Script{ + Triggers: []*Trigger{ + &Trigger{ + Events: []string{"foo"}, + Resources: []string{"bar"}, + }, + }, + }, + }, + } + filter = ManualScriptFilter{ + ResourceTypes: []string{"res"}, + EventTypes: []string{"ev"}, + ExcludeServerScripts: false, + ExcludeClientScripts: false, + } + + o, _, err = svc.FindOnManual(filter) + + a = assert.New(t) + ) + + a.NoError(err) + a.Len(o, 2) +} diff --git a/pkg/corredor/util.go b/pkg/corredor/util.go index 6f5c8b9d1..555405061 100644 --- a/pkg/corredor/util.go +++ b/pkg/corredor/util.go @@ -1,6 +1,12 @@ package corredor import ( + "context" + "encoding/json" + "fmt" + + "github.com/cortezaproject/corteza-server/pkg/auth" + "github.com/cortezaproject/corteza-server/pkg/eventbus" "github.com/cortezaproject/corteza-server/pkg/slice" ) @@ -20,6 +26,88 @@ func popOnManualEventType(trigger *Trigger) (found bool) { return } +// pluckManualTriggers removes all manual triggers from the list of script's triggers +// +// and returns a hash map with resources from these manual triggers +func pluckManualTriggers(script *ServerScript) map[string]bool { + var ( + hash = make(map[string]bool) + ) + + for i := range script.Triggers { + // We're modifying trigger in the loop, + // so let's make a copy we can play with + trigger := *script.Triggers[i] + + if popOnManualEventType(&trigger) { + for _, res := range trigger.Resources { + hash[res] = true + } + } + } + + return hash +} + +// converts trigger's constraint to eventbus' constraint options +func makeTriggerOpts(t *Trigger) (oo []eventbus.TriggerRegOp, err error) { + if len(t.Events) == 0 { + return nil, fmt.Errorf("can not generate trigger without at least one events") + } + if len(t.Resources) == 0 { + return nil, fmt.Errorf("can not generate trigger without at least one resource") + } + + oo = append(oo, eventbus.On(t.Events...)) + oo = append(oo, eventbus.For(t.Resources...)) + + for i := range t.Constraints { + oo = append(oo, eventbus.Constraint( + t.Constraints[i].Name, + t.Constraints[i].Op, + t.Constraints[i].Value..., + )) + } + + return +} + +// makes event-handler callback +func makeEventHandler(svc *service, script string, runAs string) eventbus.Handler { + return func(ctx context.Context, ev eventbus.Event) error { + // Is this compatible event? + + if ce, ok := ev.(Event); ok { + if len(runAs) > 0 { + jwt, err := svc.jwtMaker(runAs) + if err != nil { + return err + } + + ctx = auth.SetJwtToContext(ctx, jwt) + } + + return svc.exec(ctx, script, ce) + } + + return nil + } +} + +// encode adds entry (with json encoded value) to hash map +// used to prepare data for transmission +func encodeArguments(args map[string]string, key string, val interface{}) (err error) { + var tmp []byte + + if tmp, err = json.Marshal(val); err != nil { + return + } + + args[key] = string(tmp) + return +} + +// Creates a filter fn for script filtering func makeScriptFilter(f ManualScriptFilter) func(s *Script) (b bool, err error) { return func(s *Script) (b bool, err error) { b = true