3
0

Switch all *after event from dispatch to waitfor

It removes a problem where (http) requests ends before *after script are
executed with canceled context
This commit is contained in:
Denis Arh
2020-06-27 21:21:54 +02:00
parent 3626bdba0d
commit c89b4713e9
10 changed files with 39 additions and 42 deletions

View File

@@ -242,7 +242,7 @@ func (svc module) Create(new *types.Module) (m *types.Module, err error) {
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.ModuleAfterCreate(m, nil, ns))
_ = svc.eventbus.WaitFor(svc.ctx, event.ModuleAfterCreate(m, nil, ns))
return nil
})
@@ -313,7 +313,7 @@ func (svc module) Update(upd *types.Module) (m *types.Module, err error) {
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.ModuleAfterUpdate(upd, m, ns))
_ = svc.eventbus.WaitFor(svc.ctx, event.ModuleAfterUpdate(upd, m, ns))
return nil
})
@@ -358,7 +358,7 @@ func (svc module) DeleteByID(namespaceID, moduleID uint64) (err error) {
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.ModuleAfterDelete(nil, m, ns))
_ = svc.eventbus.WaitFor(svc.ctx, event.ModuleAfterDelete(nil, m, ns))
return err
})

View File

@@ -202,7 +202,7 @@ func (svc namespace) Create(new *types.Namespace) (ns *types.Namespace, err erro
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.NamespaceAfterCreate(ns, nil))
_ = svc.eventbus.WaitFor(svc.ctx, event.NamespaceAfterCreate(ns, nil))
return
})
@@ -255,7 +255,7 @@ func (svc namespace) Update(upd *types.Namespace) (ns *types.Namespace, err erro
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.NamespaceAfterUpdate(upd, ns))
_ = svc.eventbus.WaitFor(svc.ctx, event.NamespaceAfterUpdate(upd, ns))
return
})
@@ -295,7 +295,7 @@ func (svc namespace) DeleteByID(namespaceID uint64) (err error) {
return
}
defer svc.eventbus.Dispatch(svc.ctx, event.NamespaceAfterDelete(nil, del))
_ = svc.eventbus.WaitFor(svc.ctx, event.NamespaceAfterDelete(nil, del))
return
})

View File

@@ -293,7 +293,7 @@ func (svc page) Create(new *types.Page) (p *types.Page, err error) {
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.PageAfterCreate(new, nil, ns))
_ = svc.eventbus.WaitFor(svc.ctx, event.PageAfterCreate(new, nil, ns))
return err
})
@@ -356,7 +356,7 @@ func (svc page) Update(upd *types.Page) (p *types.Page, err error) {
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.PageAfterUpdate(upd, p, ns))
_ = svc.eventbus.WaitFor(svc.ctx, event.PageAfterUpdate(upd, p, ns))
return err
})
@@ -402,7 +402,7 @@ func (svc page) DeleteByID(namespaceID, pageID uint64) (err error) {
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.PageAfterDelete(nil, del, ns))
_ = svc.eventbus.WaitFor(svc.ctx, event.PageAfterDelete(nil, del, ns))
return err
})

View File

@@ -7,15 +7,14 @@ import (
"strconv"
"time"
"github.com/cortezaproject/corteza-server/compose/service/values"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/titpetric/factory"
"github.com/cortezaproject/corteza-server/compose/decoder"
"github.com/cortezaproject/corteza-server/compose/repository"
"github.com/cortezaproject/corteza-server/compose/service/event"
"github.com/cortezaproject/corteza-server/compose/service/values"
"github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/eventbus"
)
@@ -619,10 +618,8 @@ func (svc record) create(new *types.Record) (rec *types.Record, err error) {
rec = new
if svc.optEmitEvents {
defer func() {
new.Values = svc.formatter.Run(m, new.Values)
svc.eventbus.Dispatch(svc.ctx, event.RecordAfterCreateImmutable(new, nil, m, ns, nil))
}()
new.Values = svc.formatter.Run(m, new.Values)
_ = svc.eventbus.WaitFor(svc.ctx, event.RecordAfterCreateImmutable(new, nil, m, ns, nil))
}
return
@@ -730,11 +727,9 @@ func (svc record) update(upd *types.Record) (rec *types.Record, err error) {
rec = upd
if svc.optEmitEvents {
defer func() {
// Before we pass values to automation scripts, they should be formatted
upd.Values = svc.formatter.Run(m, upd.Values)
svc.eventbus.Dispatch(svc.ctx, event.RecordAfterUpdateImmutable(upd, old, m, ns, nil))
}()
// Before we pass values to automation scripts, they should be formatted
upd.Values = svc.formatter.Run(m, upd.Values)
_ = svc.eventbus.WaitFor(svc.ctx, event.RecordAfterUpdateImmutable(upd, old, m, ns, nil))
}
return
}
@@ -912,7 +907,7 @@ func (svc record) delete(namespaceID, moduleID, recordID uint64) (del *types.Rec
}
if svc.optEmitEvents {
defer svc.eventbus.Dispatch(svc.ctx, event.RecordAfterDeleteImmutable(nil, del, m, ns, nil))
_ = svc.eventbus.WaitFor(svc.ctx, event.RecordAfterDeleteImmutable(nil, del, m, ns, nil))
}
return del, nil

View File

@@ -749,7 +749,7 @@ func (svc service) exec(ctx context.Context, script string, runAs string, args S
ctx, cancel := context.WithTimeout(
// We need a new, independent context here
// to be sure this is executed safely & fully
// without any outside interfeance (cancellation, timeouts)
// without any outside interference (cancellation, timeouts)
context.Background(),
svc.opt.DefaultExecTimeout,
)

View File

@@ -21,6 +21,7 @@ type (
}
eventbus struct {
// waitgroup for dispatch
wg *sync.WaitGroup
// Read & write locking
@@ -60,10 +61,11 @@ func New() *eventbus {
// WaitFor is synchronous event dispatcher
//
// It waits for all handlers and fails on first error
// It waits for each handler and fails on first error
func (b *eventbus) WaitFor(ctx context.Context, ev Event) (err error) {
b.l.RLock()
defer b.l.RUnlock()
for _, t := range b.find(ev) {
err = func(ctx context.Context, t *handler) error {
b.wg.Add(1)

View File

@@ -145,7 +145,7 @@ func (svc *application) Create(new *types.Application) (app *types.Application,
aaProps.setApplication(app)
defer svc.eventbus.Dispatch(svc.ctx, event.ApplicationAfterCreate(new, nil))
_ = svc.eventbus.WaitFor(svc.ctx, event.ApplicationAfterCreate(new, nil))
return nil
})
@@ -185,7 +185,7 @@ func (svc *application) Update(upd *types.Application) (app *types.Application,
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.ApplicationAfterUpdate(upd, app))
_ = svc.eventbus.WaitFor(svc.ctx, event.ApplicationAfterUpdate(upd, app))
return nil
})
@@ -219,7 +219,7 @@ func (svc *application) Delete(ID uint64) (err error) {
return
}
defer svc.eventbus.Dispatch(svc.ctx, event.ApplicationAfterDelete(nil, app))
_ = svc.eventbus.WaitFor(svc.ctx, event.ApplicationAfterDelete(nil, app))
return nil
})
@@ -255,7 +255,7 @@ func (svc *application) Undelete(ID uint64) (err error) {
}
// @todo add event
// defer svc.eventbus.Dispatch(svc.ctx, event.ApplicationAfterUndelete(nil, app))
// _ = svc.eventbus.WaitFor(svc.ctx, event.ApplicationAfterUndelete(nil, app))
return nil
})

View File

@@ -210,7 +210,7 @@ func (svc auth) External(profile goth.User) (u *types.User, err error) {
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.AuthAfterLogin(u, authProvider))
_ = svc.eventbus.WaitFor(svc.ctx, event.AuthAfterLogin(u, authProvider))
return svc.recordAction(svc.ctx, aam, AuthActionUpdateCredentials, nil)
} else {
// Scenario: linked to an invalid user
@@ -273,7 +273,7 @@ func (svc auth) External(profile goth.User) (u *types.User, err error) {
aam.setUser(nil)
svc.ctx = internalAuth.SetIdentityToContext(svc.ctx, u)
defer svc.eventbus.Dispatch(svc.ctx, event.AuthAfterSignup(u, authProvider))
_ = svc.eventbus.WaitFor(svc.ctx, event.AuthAfterSignup(u, authProvider))
svc.recordAction(svc.ctx, aam, AuthActionExternalSignup, nil)
@@ -292,7 +292,7 @@ func (svc auth) External(profile goth.User) (u *types.User, err error) {
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.AuthAfterLogin(u, authProvider))
_ = svc.eventbus.WaitFor(svc.ctx, event.AuthAfterLogin(u, authProvider))
// If user
if !u.Valid() {
@@ -395,7 +395,7 @@ func (svc auth) InternalSignUp(input *types.User, password string) (u *types.Use
}
}
defer svc.eventbus.Dispatch(svc.ctx, event.AuthAfterLogin(eUser, authProvider))
_ = svc.eventbus.WaitFor(svc.ctx, event.AuthAfterLogin(eUser, authProvider))
u = eUser
return nil
@@ -444,7 +444,7 @@ func (svc auth) InternalSignUp(input *types.User, password string) (u *types.Use
}
aam.setUser(u)
defer svc.eventbus.Dispatch(svc.ctx, event.AuthAfterSignup(u, authProvider))
_ = svc.eventbus.WaitFor(svc.ctx, event.AuthAfterSignup(u, authProvider))
if err = svc.autoPromote(u); err != nil {
return err
@@ -548,7 +548,7 @@ func (svc auth) InternalLogin(email string, password string) (u *types.User, err
return AuthErrFailedUnconfirmedEmail()
}
defer svc.eventbus.Dispatch(svc.ctx, event.AuthAfterLogin(u, authProvider))
_ = svc.eventbus.WaitFor(svc.ctx, event.AuthAfterLogin(u, authProvider))
return nil
})

View File

@@ -220,7 +220,7 @@ func (svc role) Create(new *types.Role) (r *types.Role, err error) {
raProps.setRole(r)
defer svc.eventbus.Dispatch(svc.ctx, event.RoleAfterCreate(new, r))
_ = svc.eventbus.WaitFor(svc.ctx, event.RoleAfterCreate(new, r))
return
})
@@ -268,7 +268,7 @@ func (svc role) Update(upd *types.Role) (r *types.Role, err error) {
return err
}
defer svc.eventbus.Dispatch(svc.ctx, event.RoleAfterUpdate(upd, r))
_ = svc.eventbus.WaitFor(svc.ctx, event.RoleAfterUpdate(upd, r))
return nil
})
@@ -323,7 +323,7 @@ func (svc role) Delete(roleID uint64) (err error) {
return
}
defer svc.eventbus.Dispatch(svc.ctx, event.RoleAfterDelete(nil, r))
_ = svc.eventbus.WaitFor(svc.ctx, event.RoleAfterDelete(nil, r))
return
})
@@ -542,7 +542,7 @@ func (svc role) MemberAdd(roleID, memberID uint64) (err error) {
return
}
defer svc.eventbus.Dispatch(svc.ctx, event.RoleMemberAfterAdd(m, r))
_ = svc.eventbus.WaitFor(svc.ctx, event.RoleMemberAfterAdd(m, r))
return nil
})
@@ -589,7 +589,7 @@ func (svc role) MemberRemove(roleID, memberID uint64) (err error) {
return
}
defer svc.eventbus.Dispatch(svc.ctx, event.RoleMemberAfterRemove(m, r))
_ = svc.eventbus.WaitFor(svc.ctx, event.RoleMemberAfterRemove(m, r))
return nil
})

View File

@@ -339,7 +339,7 @@ func (svc user) Create(new *types.User) (u *types.User, err error) {
return
}
defer svc.eventbus.Dispatch(svc.ctx, event.UserAfterCreate(new, u))
_ = svc.eventbus.WaitFor(svc.ctx, event.UserAfterCreate(new, u))
return
})
@@ -400,7 +400,7 @@ func (svc user) Update(upd *types.User) (u *types.User, err error) {
return
}
defer svc.eventbus.Dispatch(svc.ctx, event.UserAfterUpdate(upd, u))
_ = svc.eventbus.WaitFor(svc.ctx, event.UserAfterUpdate(upd, u))
return
})
@@ -496,7 +496,7 @@ func (svc user) Delete(userID uint64) (err error) {
return
}
defer svc.eventbus.Dispatch(svc.ctx, event.UserAfterDelete(nil, u))
_ = svc.eventbus.WaitFor(svc.ctx, event.UserAfterDelete(nil, u))
return nil
})