Added pipeline handler
This commit is contained in:
parent
9e498e9db6
commit
2388158141
77
pkg/apigw/pipeline.go
Normal file
77
pkg/apigw/pipeline.go
Normal file
@ -0,0 +1,77 @@
|
||||
package apigw
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sort"
|
||||
)
|
||||
|
||||
type (
|
||||
Execer interface {
|
||||
Exec(context.Context, *scp, map[string]interface{}) error
|
||||
}
|
||||
|
||||
Sorter interface {
|
||||
Weight() int
|
||||
}
|
||||
|
||||
ErrorHandler interface {
|
||||
Exec(context.Context, *scp, error)
|
||||
}
|
||||
|
||||
Payload struct {
|
||||
params map[string]interface{}
|
||||
worker Worker
|
||||
}
|
||||
|
||||
Worker interface {
|
||||
Execer
|
||||
Sorter
|
||||
}
|
||||
|
||||
workers []Payload
|
||||
|
||||
pl struct {
|
||||
w workers
|
||||
err ErrorHandler
|
||||
}
|
||||
|
||||
scp struct {
|
||||
req *http.Request
|
||||
writer http.ResponseWriter
|
||||
}
|
||||
)
|
||||
|
||||
// Exec takes care of error handling and main
|
||||
// functionality that takes place in worker
|
||||
func (pp *pl) Exec(ctx context.Context, scope *scp) (err error) {
|
||||
for _, w := range pp.w {
|
||||
err = w.worker.Exec(ctx, scope, w.params)
|
||||
|
||||
if err != nil {
|
||||
// call the error handler
|
||||
pp.err.Exec(ctx, scope, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Add registers a new worker with parameters
|
||||
// fethed from store
|
||||
func (pp *pl) Add(ff Worker, p map[string]interface{}) {
|
||||
pp.w = append(pp.w, Payload{worker: ff, params: p})
|
||||
sort.Sort(pp.w)
|
||||
}
|
||||
|
||||
// add error handler
|
||||
func (pp *pl) ErrorHandler(ff ErrorHandler) {
|
||||
pp.err = ff
|
||||
}
|
||||
|
||||
func (a workers) Len() int { return len(a) }
|
||||
func (a workers) Less(i, j int) bool {
|
||||
return a[i].worker.Weight() < a[j].worker.Weight()
|
||||
}
|
||||
func (a workers) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
Loading…
x
Reference in New Issue
Block a user