Refactor eventbus, scheduler, corredor for naming consistenct
This commit is contained in:
parent
2955e70a6e
commit
63f03a2111
@ -34,7 +34,7 @@ func (app *App) Setup(log *zap.Logger, opts *app.Options) (err error) {
|
||||
app.Log = log.Named(SERVICE)
|
||||
app.Opts = opts
|
||||
|
||||
scheduler.OnTick(
|
||||
scheduler.Service().OnTick(
|
||||
event.ComposeOnInterval(),
|
||||
event.ComposeOnTimestamp(),
|
||||
)
|
||||
|
||||
@ -53,7 +53,7 @@ func (app *App) Setup(log *zap.Logger, opts *app.Options) (err error) {
|
||||
|
||||
monitor.Setup(app.log, opts.Monitor)
|
||||
|
||||
scheduler.Setup(log, eventbus.Default(), 0)
|
||||
scheduler.Setup(log, eventbus.Service(), 0)
|
||||
|
||||
return
|
||||
}
|
||||
@ -79,8 +79,8 @@ func (app *App) Activate(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Run scheduler
|
||||
scheduler.Run(ctx)
|
||||
// Start scheduler
|
||||
scheduler.Service().Start(ctx)
|
||||
|
||||
// Load corredor scripts
|
||||
corredor.Service().Load(ctx)
|
||||
|
||||
@ -13,10 +13,12 @@ import (
|
||||
migrate "github.com/cortezaproject/corteza-server/messaging/db"
|
||||
"github.com/cortezaproject/corteza-server/messaging/rest"
|
||||
"github.com/cortezaproject/corteza-server/messaging/service"
|
||||
"github.com/cortezaproject/corteza-server/messaging/service/event"
|
||||
"github.com/cortezaproject/corteza-server/messaging/websocket"
|
||||
"github.com/cortezaproject/corteza-server/pkg/app"
|
||||
"github.com/cortezaproject/corteza-server/pkg/auth"
|
||||
"github.com/cortezaproject/corteza-server/pkg/corredor"
|
||||
"github.com/cortezaproject/corteza-server/pkg/scheduler"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -34,6 +36,11 @@ func (app *App) Setup(log *zap.Logger, opts *app.Options) (err error) {
|
||||
app.Log = log.Named(SERVICE)
|
||||
app.Opts = opts
|
||||
|
||||
scheduler.Service().OnTick(
|
||||
event.MessagingOnInterval(),
|
||||
event.MessagingOnTimestamp(),
|
||||
)
|
||||
|
||||
app.ws = websocket.New(&websocket.Config{
|
||||
Timeout: opts.Websocket.Timeout,
|
||||
PingTimeout: opts.Websocket.PingTimeout,
|
||||
|
||||
@ -73,16 +73,16 @@ const onManualEventType = "onManual"
|
||||
|
||||
var (
|
||||
// Global corredor service
|
||||
gService *service
|
||||
gCorredor *service
|
||||
)
|
||||
|
||||
func Service() *service {
|
||||
return gService
|
||||
return gCorredor
|
||||
}
|
||||
|
||||
// Start connects to Corredor & initialize service
|
||||
func Start(ctx context.Context, logger *zap.Logger, opt options.CorredorOpt) (err error) {
|
||||
if gService != nil {
|
||||
if gCorredor != nil {
|
||||
// Prevent multiple initializations
|
||||
return
|
||||
}
|
||||
@ -95,7 +95,7 @@ func Start(ctx context.Context, logger *zap.Logger, opt options.CorredorOpt) (er
|
||||
return
|
||||
}
|
||||
|
||||
gService = NewService(conn, logger, opt)
|
||||
gCorredor = NewService(conn, logger, opt)
|
||||
return
|
||||
}
|
||||
|
||||
@ -106,7 +106,7 @@ func NewService(conn *grpc.ClientConn, logger *zap.Logger, opt options.CorredorO
|
||||
log: logger.Named("corredor"),
|
||||
registered: make(map[string][]uintptr),
|
||||
manual: make(map[string]map[string]bool),
|
||||
eventbus: eventbus.Default(),
|
||||
eventbus: eventbus.Service(),
|
||||
opt: opt,
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ func init() {
|
||||
}
|
||||
|
||||
// Returns
|
||||
func Default() *eventbus {
|
||||
func Service() *eventbus {
|
||||
return gEventBus
|
||||
}
|
||||
|
||||
|
||||
@ -33,25 +33,22 @@ const (
|
||||
var (
|
||||
now = func() time.Time { return time.Now() }
|
||||
|
||||
gService *service
|
||||
// Global scheduler
|
||||
gScheduler *service
|
||||
)
|
||||
|
||||
// Setup configures global scheduling service
|
||||
func Setup(log *zap.Logger, d dispatcher, interval time.Duration) {
|
||||
if gService != nil {
|
||||
if gScheduler != nil {
|
||||
// shut it down
|
||||
gService.active <- false
|
||||
gScheduler.active <- false
|
||||
}
|
||||
|
||||
gService = NewService(log, d, interval)
|
||||
gScheduler = NewService(log, d, interval)
|
||||
}
|
||||
|
||||
func OnTick(events ...eventbus.Event) {
|
||||
gService.OnTick(events...)
|
||||
}
|
||||
|
||||
func Run(ctx context.Context) {
|
||||
gService.Run(ctx)
|
||||
func Service() *service {
|
||||
return gScheduler
|
||||
}
|
||||
|
||||
func NewService(log *zap.Logger, d dispatcher, interval time.Duration) *service {
|
||||
@ -75,7 +72,7 @@ func (svc *service) OnTick(events ...eventbus.Event) {
|
||||
}
|
||||
|
||||
// Run starts event scheduler service
|
||||
func (svc service) Run(ctx context.Context) {
|
||||
func (svc service) Start(ctx context.Context) {
|
||||
if svc.active != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -13,13 +13,15 @@ import (
|
||||
"github.com/cortezaproject/corteza-server/pkg/app"
|
||||
"github.com/cortezaproject/corteza-server/pkg/auth"
|
||||
"github.com/cortezaproject/corteza-server/pkg/corredor"
|
||||
"github.com/cortezaproject/corteza-server/pkg/scheduler"
|
||||
"github.com/cortezaproject/corteza-server/system/auth/external"
|
||||
"github.com/cortezaproject/corteza-server/system/commands"
|
||||
migrate "github.com/cortezaproject/corteza-server/system/db"
|
||||
grpc2 "github.com/cortezaproject/corteza-server/system/grpc"
|
||||
systemGRPC "github.com/cortezaproject/corteza-server/system/grpc"
|
||||
"github.com/cortezaproject/corteza-server/system/proto"
|
||||
"github.com/cortezaproject/corteza-server/system/rest"
|
||||
"github.com/cortezaproject/corteza-server/system/service"
|
||||
"github.com/cortezaproject/corteza-server/system/service/event"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -34,6 +36,12 @@ const SERVICE = "system"
|
||||
func (app *App) Setup(log *zap.Logger, opts *app.Options) (err error) {
|
||||
app.Log = log.Named(SERVICE)
|
||||
app.Opts = opts
|
||||
|
||||
scheduler.Service().OnTick(
|
||||
event.SystemOnInterval(),
|
||||
event.SystemOnTimestamp(),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -115,14 +123,14 @@ func (app *App) MountApiRoutes(r chi.Router) {
|
||||
}
|
||||
|
||||
func (app *App) RegisterGrpcServices(server *grpc.Server) {
|
||||
proto.RegisterUsersServer(server, grpc2.NewUserService(
|
||||
proto.RegisterUsersServer(server, systemGRPC.NewUserService(
|
||||
service.DefaultUser,
|
||||
service.DefaultAuth,
|
||||
auth.DefaultJwtHandler,
|
||||
service.DefaultAccessControl,
|
||||
))
|
||||
|
||||
proto.RegisterRolesServer(server, grpc2.NewRoleService(
|
||||
proto.RegisterRolesServer(server, systemGRPC.NewRoleService(
|
||||
service.DefaultRole,
|
||||
))
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user