From 2388158141d8a68593fe863534fcdafbe09ce5f8 Mon Sep 17 00:00:00 2001 From: Peter Grlica Date: Tue, 6 Jul 2021 17:10:45 +0200 Subject: [PATCH] Added pipeline handler --- pkg/apigw/pipeline.go | 77 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 pkg/apigw/pipeline.go diff --git a/pkg/apigw/pipeline.go b/pkg/apigw/pipeline.go new file mode 100644 index 000000000..0f28ed2af --- /dev/null +++ b/pkg/apigw/pipeline.go @@ -0,0 +1,77 @@ +package apigw + +import ( + "context" + "net/http" + "sort" +) + +type ( + Execer interface { + Exec(context.Context, *scp, map[string]interface{}) error + } + + Sorter interface { + Weight() int + } + + ErrorHandler interface { + Exec(context.Context, *scp, error) + } + + Payload struct { + params map[string]interface{} + worker Worker + } + + Worker interface { + Execer + Sorter + } + + workers []Payload + + pl struct { + w workers + err ErrorHandler + } + + scp struct { + req *http.Request + writer http.ResponseWriter + } +) + +// Exec takes care of error handling and main +// functionality that takes place in worker +func (pp *pl) Exec(ctx context.Context, scope *scp) (err error) { + for _, w := range pp.w { + err = w.worker.Exec(ctx, scope, w.params) + + if err != nil { + // call the error handler + pp.err.Exec(ctx, scope, err) + return + } + } + + return +} + +// Add registers a new worker with parameters +// fethed from store +func (pp *pl) Add(ff Worker, p map[string]interface{}) { + pp.w = append(pp.w, Payload{worker: ff, params: p}) + sort.Sort(pp.w) +} + +// add error handler +func (pp *pl) ErrorHandler(ff ErrorHandler) { + pp.err = ff +} + +func (a workers) Len() int { return len(a) } +func (a workers) Less(i, j int) bool { + return a[i].worker.Weight() < a[j].worker.Weight() +} +func (a workers) Swap(i, j int) { a[i], a[j] = a[j], a[i] }