diff --git a/compose/app.go b/compose/app.go index 326960f70..95695af3c 100644 --- a/compose/app.go +++ b/compose/app.go @@ -34,7 +34,7 @@ func (app *App) Setup(log *zap.Logger, opts *app.Options) (err error) { app.Log = log.Named(SERVICE) app.Opts = opts - scheduler.OnTick( + scheduler.Service().OnTick( event.ComposeOnInterval(), event.ComposeOnTimestamp(), ) diff --git a/corteza/corteza.go b/corteza/corteza.go index 8334e7247..d8602dc46 100644 --- a/corteza/corteza.go +++ b/corteza/corteza.go @@ -53,7 +53,7 @@ func (app *App) Setup(log *zap.Logger, opts *app.Options) (err error) { monitor.Setup(app.log, opts.Monitor) - scheduler.Setup(log, eventbus.Default(), 0) + scheduler.Setup(log, eventbus.Service(), 0) return } @@ -79,8 +79,8 @@ func (app *App) Activate(ctx context.Context) (err error) { return err } - // Run scheduler - scheduler.Run(ctx) + // Start scheduler + scheduler.Service().Start(ctx) // Load corredor scripts corredor.Service().Load(ctx) diff --git a/messaging/app.go b/messaging/app.go index 3452674b7..c585eb8c6 100644 --- a/messaging/app.go +++ b/messaging/app.go @@ -13,10 +13,12 @@ import ( migrate "github.com/cortezaproject/corteza-server/messaging/db" "github.com/cortezaproject/corteza-server/messaging/rest" "github.com/cortezaproject/corteza-server/messaging/service" + "github.com/cortezaproject/corteza-server/messaging/service/event" "github.com/cortezaproject/corteza-server/messaging/websocket" "github.com/cortezaproject/corteza-server/pkg/app" "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/pkg/corredor" + "github.com/cortezaproject/corteza-server/pkg/scheduler" ) type ( @@ -34,6 +36,11 @@ func (app *App) Setup(log *zap.Logger, opts *app.Options) (err error) { app.Log = log.Named(SERVICE) app.Opts = opts + scheduler.Service().OnTick( + event.MessagingOnInterval(), + event.MessagingOnTimestamp(), + ) + app.ws = websocket.New(&websocket.Config{ Timeout: opts.Websocket.Timeout, PingTimeout: opts.Websocket.PingTimeout, diff --git a/pkg/corredor/service.go b/pkg/corredor/service.go index 258d0c421..f1d380989 100644 --- a/pkg/corredor/service.go +++ b/pkg/corredor/service.go @@ -73,16 +73,16 @@ const onManualEventType = "onManual" var ( // Global corredor service - gService *service + gCorredor *service ) func Service() *service { - return gService + return gCorredor } // Start connects to Corredor & initialize service func Start(ctx context.Context, logger *zap.Logger, opt options.CorredorOpt) (err error) { - if gService != nil { + if gCorredor != nil { // Prevent multiple initializations return } @@ -95,7 +95,7 @@ func Start(ctx context.Context, logger *zap.Logger, opt options.CorredorOpt) (er return } - gService = NewService(conn, logger, opt) + gCorredor = NewService(conn, logger, opt) return } @@ -106,7 +106,7 @@ func NewService(conn *grpc.ClientConn, logger *zap.Logger, opt options.CorredorO log: logger.Named("corredor"), registered: make(map[string][]uintptr), manual: make(map[string]map[string]bool), - eventbus: eventbus.Default(), + eventbus: eventbus.Service(), opt: opt, } } diff --git a/pkg/eventbus/eventbus.go b/pkg/eventbus/eventbus.go index 301178a0a..181c319e6 100644 --- a/pkg/eventbus/eventbus.go +++ b/pkg/eventbus/eventbus.go @@ -38,7 +38,7 @@ func init() { } // Returns -func Default() *eventbus { +func Service() *eventbus { return gEventBus } diff --git a/pkg/scheduler/service.go b/pkg/scheduler/service.go index 69f339aad..d9d956e7e 100644 --- a/pkg/scheduler/service.go +++ b/pkg/scheduler/service.go @@ -33,25 +33,22 @@ const ( var ( now = func() time.Time { return time.Now() } - gService *service + // Global scheduler + gScheduler *service ) // Setup configures global scheduling service func Setup(log *zap.Logger, d dispatcher, interval time.Duration) { - if gService != nil { + if gScheduler != nil { // shut it down - gService.active <- false + gScheduler.active <- false } - gService = NewService(log, d, interval) + gScheduler = NewService(log, d, interval) } -func OnTick(events ...eventbus.Event) { - gService.OnTick(events...) -} - -func Run(ctx context.Context) { - gService.Run(ctx) +func Service() *service { + return gScheduler } func NewService(log *zap.Logger, d dispatcher, interval time.Duration) *service { @@ -75,7 +72,7 @@ func (svc *service) OnTick(events ...eventbus.Event) { } // Run starts event scheduler service -func (svc service) Run(ctx context.Context) { +func (svc service) Start(ctx context.Context) { if svc.active != nil { return } diff --git a/system/app.go b/system/app.go index 93e2dd997..75a9bda6c 100644 --- a/system/app.go +++ b/system/app.go @@ -13,13 +13,15 @@ import ( "github.com/cortezaproject/corteza-server/pkg/app" "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/pkg/corredor" + "github.com/cortezaproject/corteza-server/pkg/scheduler" "github.com/cortezaproject/corteza-server/system/auth/external" "github.com/cortezaproject/corteza-server/system/commands" migrate "github.com/cortezaproject/corteza-server/system/db" - grpc2 "github.com/cortezaproject/corteza-server/system/grpc" + systemGRPC "github.com/cortezaproject/corteza-server/system/grpc" "github.com/cortezaproject/corteza-server/system/proto" "github.com/cortezaproject/corteza-server/system/rest" "github.com/cortezaproject/corteza-server/system/service" + "github.com/cortezaproject/corteza-server/system/service/event" ) type ( @@ -34,6 +36,12 @@ const SERVICE = "system" func (app *App) Setup(log *zap.Logger, opts *app.Options) (err error) { app.Log = log.Named(SERVICE) app.Opts = opts + + scheduler.Service().OnTick( + event.SystemOnInterval(), + event.SystemOnTimestamp(), + ) + return } @@ -115,14 +123,14 @@ func (app *App) MountApiRoutes(r chi.Router) { } func (app *App) RegisterGrpcServices(server *grpc.Server) { - proto.RegisterUsersServer(server, grpc2.NewUserService( + proto.RegisterUsersServer(server, systemGRPC.NewUserService( service.DefaultUser, service.DefaultAuth, auth.DefaultJwtHandler, service.DefaultAccessControl, )) - proto.RegisterRolesServer(server, grpc2.NewRoleService( + proto.RegisterRolesServer(server, systemGRPC.NewRoleService( service.DefaultRole, )) }