Fix Join-Gateway step scope corruption
This commit is contained in:
parent
14026ec47e
commit
4faed45a98
@ -28,18 +28,28 @@ func NewGatewayPath(s Step, t pathTester) (gwp *GatewayPath, err error) {
|
|||||||
|
|
||||||
// joinGateway handles merging/joining of multiple paths into
|
// joinGateway handles merging/joining of multiple paths into
|
||||||
// a single path forward
|
// a single path forward
|
||||||
type joinGateway struct {
|
type (
|
||||||
StepIdentifier
|
joinGateway struct {
|
||||||
paths Steps
|
StepIdentifier
|
||||||
scopes map[Step]*expr.Vars
|
paths Steps
|
||||||
l sync.Mutex
|
scopes map[uint64]map[Step]*expr.Vars
|
||||||
}
|
l sync.Mutex
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// JoinGateway fn initializes join gateway with all paths that are expected to be partial
|
// JoinGateway fn initializes join gateway with all paths that are expected to be partial
|
||||||
func JoinGateway(ss ...Step) *joinGateway {
|
func JoinGateway(ss ...Step) *joinGateway {
|
||||||
return &joinGateway{
|
return &joinGateway{
|
||||||
paths: ss,
|
paths: ss,
|
||||||
scopes: make(map[Step]*expr.Vars),
|
|
||||||
|
// group scopes by session and step
|
||||||
|
// this prevents scope corruption when same workflow
|
||||||
|
// is executed multiple times
|
||||||
|
//
|
||||||
|
// might not be the best way where to keep the state of the join-gateway
|
||||||
|
// but it beats hidden variables in the scope or dedicated prop in the
|
||||||
|
// ExecRequest
|
||||||
|
scopes: make(map[uint64]map[Step]*expr.Vars),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,19 +67,26 @@ func (gw *joinGateway) Exec(_ context.Context, r *ExecRequest) (ExecResponse, er
|
|||||||
return nil, fmt.Errorf("unknown parent for join gateway")
|
return nil, fmt.Errorf("unknown parent for join gateway")
|
||||||
}
|
}
|
||||||
|
|
||||||
gw.scopes[r.Parent] = r.Scope
|
if len(gw.scopes[r.SessionID]) == 0 {
|
||||||
if len(gw.scopes) < len(gw.paths) {
|
gw.scopes[r.SessionID] = make(map[Step]*expr.Vars)
|
||||||
|
}
|
||||||
|
|
||||||
|
gw.scopes[r.SessionID][r.Parent] = r.Scope
|
||||||
|
if len(gw.scopes[r.SessionID]) < len(gw.paths) {
|
||||||
return &partial{}, nil
|
return &partial{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// All collected, merge scope parent all paths in the defined order
|
// All collected, merge scope parent all paths in the defined order
|
||||||
var merged *expr.Vars
|
var merged *expr.Vars
|
||||||
for _, p := range gw.paths {
|
for _, p := range gw.paths {
|
||||||
if gw.scopes[p] != nil {
|
if gw.scopes[r.SessionID][p] != nil {
|
||||||
merged = merged.MustMerge(gw.scopes[p])
|
merged = merged.MustMerge(gw.scopes[r.SessionID][p])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// all inbound paths visited, cleanup scopes for the session
|
||||||
|
delete(gw.scopes, r.SessionID)
|
||||||
|
|
||||||
return merged, nil
|
return merged, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -30,17 +30,21 @@ func TestJoinGateway(t *testing.T) {
|
|||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
r, err = gw.Exec(context.TODO(), &ExecRequest{Parent: p1})
|
r, err = gw.Exec(context.TODO(), &ExecRequest{Parent: p1, SessionID: 1})
|
||||||
req.NoError(err)
|
req.NoError(err)
|
||||||
req.Equal(&partial{}, r)
|
req.Equal(&partial{}, r)
|
||||||
|
|
||||||
r, err = gw.Exec(context.TODO(), &ExecRequest{Parent: p2})
|
r, err = gw.Exec(context.TODO(), &ExecRequest{Parent: p2, SessionID: 1})
|
||||||
req.NoError(err)
|
req.NoError(err)
|
||||||
req.Equal(&partial{}, r)
|
req.Equal(&partial{}, r)
|
||||||
|
|
||||||
r, err = gw.Exec(context.TODO(), &ExecRequest{Parent: p3})
|
r, err = gw.Exec(context.TODO(), &ExecRequest{Parent: p3, SessionID: 1})
|
||||||
req.NoError(err)
|
req.NoError(err)
|
||||||
req.IsType(&expr.Vars{}, r)
|
req.IsType(&expr.Vars{}, r)
|
||||||
|
|
||||||
|
r, err = gw.Exec(context.TODO(), &ExecRequest{Parent: p1, SessionID: 2})
|
||||||
|
req.NoError(err)
|
||||||
|
req.Equal(&partial{}, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestForkGateway(t *testing.T) {
|
func TestForkGateway(t *testing.T) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user