From 41895b5dade8d28195af4dbca1b3cd7c44a2b2bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Thu, 29 Jul 2021 12:02:53 +0200 Subject: [PATCH] Improve request pre-processing Make the query planner run at run-time, allowing us to taylor based on the requested definitions. This allows us to request data from any step of the model, not just the root steps. --- pkg/report/model.go | 65 ++++++---------------- pkg/report/model_graph.go | 112 ++++++++++++++++---------------------- pkg/report/step_load.go | 54 +++++++++++++++++- system/service/report.go | 2 +- 4 files changed, 116 insertions(+), 117 deletions(-) diff --git a/pkg/report/model.go b/pkg/report/model.go index 39d2ca6fb..230bf5dd9 100644 --- a/pkg/report/model.go +++ b/pkg/report/model.go @@ -10,9 +10,9 @@ import ( type ( model struct { - ran bool - steps []step - datasources DatasourceSet + ran bool + steps []step + nodes map[string]*modelGraphNode } // M is the model interface that should be used when trying to model the datasource @@ -20,7 +20,7 @@ type ( Add(...step) M Run(context.Context) error Load(context.Context, ...*FrameDefinition) ([]*Frame, error) - Describe(source string) (FrameDescriptionSet, error) + Describe(ctx context.Context, source string) (FrameDescriptionSet, error) } stepSet []step @@ -48,26 +48,15 @@ type ( // Additional steps may not be added after the `M.Run(context.Context)` was called func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*StepDefinition) (M, error) { steps := make([]step, 0, len(dd)) - ss := make(DatasourceSet, 0, len(steps)*2) err := func() error { for _, d := range dd { switch { case d.Load != nil: if sources == nil { - return errors.New("no datasources defined") + return errors.New("no datasource providers defined") } - - s, ok := sources[d.Load.Source] - if !ok { - return fmt.Errorf("unresolved datasource: %s", d.Load.Source) - } - ds, err := s.Datasource(ctx, d.Load) - if err != nil { - return err - } - - ss = append(ss, ds) + steps = append(steps, &stepLoad{def: d.Load, dsp: sources[d.Load.Source]}) case d.Join != nil: steps = append(steps, &stepJoin{def: d.Join}) @@ -89,8 +78,7 @@ func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*St } return &model{ - steps: steps, - datasources: ss, + steps: steps, }, nil } @@ -113,7 +101,7 @@ func (m *model) Run(ctx context.Context) (err error) { return errors.New("model already ran") } - if len(m.steps)+len(m.datasources) == 0 { + if len(m.steps) == 0 { return errors.New("no model steps defined") } @@ -130,28 +118,8 @@ func (m *model) Run(ctx context.Context) (err error) { return fmt.Errorf("%s: failed to validate the model: %w", errPfx, err) } - // construct the step graph - // - // If there are no steps, there is nothing to reduce - if len(m.steps) == 0 { - return nil - } - err = func() (err error) { - gg, err := m.buildStepGraph(m.steps, m.datasources) - if err != nil { - return err - } - - m.datasources = nil - for _, n := range gg { - aux, err := m.reduceGraph(ctx, n) - if err != nil { - return err - } - m.datasources = append(m.datasources, aux) - } - return nil - }() + // construct a model graph for future optimizations + m.nodes, err = m.buildStepGraph(m.steps) if err != nil { return fmt.Errorf("%s: %w", errPfx, err) } @@ -162,7 +130,7 @@ func (m *model) Run(ctx context.Context) (err error) { // Describe returns the descriptions for the requested model datasources // // The Run method must be called before the description can be provided. -func (m *model) Describe(source string) (out FrameDescriptionSet, err error) { +func (m *model) Describe(ctx context.Context, source string) (out FrameDescriptionSet, err error) { var ds Datasource err = func() error { @@ -170,8 +138,8 @@ func (m *model) Describe(source string) (out FrameDescriptionSet, err error) { return fmt.Errorf("model was not yet ran") } - ds := m.datasources.Find(source) - if ds == nil { + ds, err = m.datasource(ctx, &FrameDefinition{Source: source}) + if err != nil { return fmt.Errorf("model does not contain the datasource: %s", source) } @@ -207,10 +175,9 @@ func (m *model) Load(ctx context.Context, dd ...*FrameDefinition) (ff []*Frame, } def = dd[0] - - ds = m.datasources.Find(def.Source) - if ds == nil { - return fmt.Errorf("unresolved datasource: %s", def.Source) + ds, err = m.datasource(ctx, def) + if err != nil { + return err } return nil diff --git a/pkg/report/model_graph.go b/pkg/report/model_graph.go index 1ee83366b..1cf3a7d9f 100644 --- a/pkg/report/model_graph.go +++ b/pkg/report/model_graph.go @@ -1,22 +1,21 @@ package report -import "context" +import ( + "context" + "fmt" +) type ( modelGraphNode struct { step step - ds Datasource - - pp []*modelGraphNode - cc []*modelGraphNode + pp []*modelGraphNode + cc []*modelGraphNode } ) // Internally, the model uses a stepGraph to resolve the dependencies between the steps // allowing us to perform some preprocessing, such as size reduction and shape validation -// -// @todo most of this might need to be done at runtime, not buildtime -func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode, error) { +func (m *model) buildStepGraph(ss stepSet) (map[string]*modelGraphNode, error) { mp := make(map[string]*modelGraphNode) for _, s := range ss { @@ -30,7 +29,7 @@ func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode, } mp[s.Name()] = n } else { - n.step = s + return nil, fmt.Errorf("step name not unique: %s", s.Name()) } // make sure the child step is in there @@ -38,11 +37,9 @@ func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode, c, ok := mp[src] if !ok { c = &modelGraphNode{ - // will be added later + // step is added later when we get to it step: nil, pp: []*modelGraphNode{n}, - - ds: dd.Find(src), } mp[src] = c } @@ -50,81 +47,64 @@ func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode, } } - // return all of the root nodes - out := make([]*modelGraphNode, 0, len(ss)) - for _, n := range mp { - if len(n.pp) == 0 { - out = append(out, n) - } - } - - return out, nil + return mp, nil } -func (m *model) reduceGraph(ctx context.Context, n *modelGraphNode) (out Datasource, err error) { +// Datasource attempts to return the smallest possible sub-tree for the given request +// +// Flow outline: +// * Find the start node that corresponds to the provided definition +// * Recursively traverse down the branches +// * When returning, see if the given node can be merged with it's datasource +// +// We try to offload as much work to the datasource to reduce the size of the output frames to reduce +// additional processing. +func (m *model) datasource(ctx context.Context, def *FrameDefinition) (ds Datasource, err error) { + start, ok := m.nodes[def.Source] + if !ok { + return nil, fmt.Errorf("unresolved source: %s", def.Source) + } + + return m.reduceBranch(ctx, start) +} + +func (m *model) reduceBranch(ctx context.Context, n *modelGraphNode) (out Datasource, err error) { + // leaf node, nothing else to do + if len(n.cc) == 0 { + return n.step.Run(ctx) + } + + // traverse down the branches auxO := make([]Datasource, len(n.cc)) - if len(n.cc) > 0 { - for i, c := range n.cc { - out, err = m.reduceGraph(ctx, c) - if err != nil { - return nil, err - } - auxO[i] = out - } - } - - bail := func() (out Datasource, err error) { - if n.step == nil { - if n.ds != nil { - return n.ds, nil - } - - return out, nil - } - - aux, err := n.step.Run(ctx, auxO...) + for i, c := range n.cc { + auxO[i], err = m.reduceBranch(ctx, c) if err != nil { - return nil, err + return } - - return aux, nil } - if n.step == nil { - return bail() - } - - // check if this one can reduce the existing datasources - // - // for now, only "simple branches are supported" - var o Datasource + // for now only the join step expects multiple inputs; it can't be reduced if len(auxO) > 1 { - return bail() - } else if len(auxO) > 0 { - // use the only available output - o = auxO[0] - } else { - // use own datasource (in case of leaves nodes) - o = n.ds + return n.step.Run(ctx, auxO...) } + // try to reduce the group step + o := auxO[0] if n.step.Def().Group != nil { gds, ok := o.(GroupableDatasource) if !ok { - return bail() + return n.step.Run(ctx, auxO...) } ok, err = gds.Group(n.step.Def().Group.GroupDefinition, n.step.Name()) if err != nil { return nil, err } else if !ok { - return bail() + return n.step.Run(ctx, auxO...) } - out = gds - // we've covered this step with the child step; ignore it - return out, nil + return gds, nil } - return bail() + return n.step.Run(ctx, auxO...) } diff --git a/pkg/report/step_load.go b/pkg/report/step_load.go index 423789202..c99302ff1 100644 --- a/pkg/report/step_load.go +++ b/pkg/report/step_load.go @@ -1,8 +1,14 @@ package report +import ( + "context" + "errors" + "fmt" +) + type ( stepLoad struct { - ds Datasource + dsp DatasourceProvider def *LoadStepDefinition } @@ -19,3 +25,49 @@ type ( Rows *RowDefinition `json:"rows,omitempty"` } ) + +func (j *stepLoad) Run(ctx context.Context, _ ...Datasource) (Datasource, error) { + return j.dsp.Datasource(ctx, j.Def().Load) +} + +func (j *stepLoad) Validate() error { + pfx := "invalid load step: " + + // base things... + switch { + case j.def.Name == "": + return errors.New(pfx + "dimension name not defined") + + case j.def.Source == "": + return errors.New(pfx + "datasource not defined") + case j.def.Definition == nil: + return errors.New(pfx + "source definition not provided") + } + + // provider + switch { + case j.dsp == nil: + return errors.New(pfx + "datasource provider not defined") + } + + // columns... + for i, g := range j.def.Columns { + if g.Name == "" { + return fmt.Errorf("%scolumn key alias missing for column: %d", pfx, i) + } + } + + return nil +} + +func (d *stepLoad) Name() string { + return d.def.Name +} + +func (d *stepLoad) Source() []string { + return nil +} + +func (d *stepLoad) Def() *StepDefinition { + return &StepDefinition{Load: d.def} +} diff --git a/system/service/report.go b/system/service/report.go index 6489f1c7a..4b5fb9c81 100644 --- a/system/service/report.go +++ b/system/service/report.go @@ -313,7 +313,7 @@ func (svc *report) DescribeFresh(ctx context.Context, src types.ReportDataSource var auxOut rep.FrameDescriptionSet for _, s := range sources { - auxOut, err = model.Describe(s) + auxOut, err = model.Describe(ctx, s) if err != nil { return err }