Improve request pre-processing
Make the query planner run at run-time, allowing us to taylor based on the requested definitions. This allows us to request data from any step of the model, not just the root steps.
This commit is contained in:
parent
04a79596d1
commit
41895b5dad
@ -10,9 +10,9 @@ import (
|
||||
|
||||
type (
|
||||
model struct {
|
||||
ran bool
|
||||
steps []step
|
||||
datasources DatasourceSet
|
||||
ran bool
|
||||
steps []step
|
||||
nodes map[string]*modelGraphNode
|
||||
}
|
||||
|
||||
// M is the model interface that should be used when trying to model the datasource
|
||||
@ -20,7 +20,7 @@ type (
|
||||
Add(...step) M
|
||||
Run(context.Context) error
|
||||
Load(context.Context, ...*FrameDefinition) ([]*Frame, error)
|
||||
Describe(source string) (FrameDescriptionSet, error)
|
||||
Describe(ctx context.Context, source string) (FrameDescriptionSet, error)
|
||||
}
|
||||
|
||||
stepSet []step
|
||||
@ -48,26 +48,15 @@ type (
|
||||
// Additional steps may not be added after the `M.Run(context.Context)` was called
|
||||
func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*StepDefinition) (M, error) {
|
||||
steps := make([]step, 0, len(dd))
|
||||
ss := make(DatasourceSet, 0, len(steps)*2)
|
||||
|
||||
err := func() error {
|
||||
for _, d := range dd {
|
||||
switch {
|
||||
case d.Load != nil:
|
||||
if sources == nil {
|
||||
return errors.New("no datasources defined")
|
||||
return errors.New("no datasource providers defined")
|
||||
}
|
||||
|
||||
s, ok := sources[d.Load.Source]
|
||||
if !ok {
|
||||
return fmt.Errorf("unresolved datasource: %s", d.Load.Source)
|
||||
}
|
||||
ds, err := s.Datasource(ctx, d.Load)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ss = append(ss, ds)
|
||||
steps = append(steps, &stepLoad{def: d.Load, dsp: sources[d.Load.Source]})
|
||||
|
||||
case d.Join != nil:
|
||||
steps = append(steps, &stepJoin{def: d.Join})
|
||||
@ -89,8 +78,7 @@ func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*St
|
||||
}
|
||||
|
||||
return &model{
|
||||
steps: steps,
|
||||
datasources: ss,
|
||||
steps: steps,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -113,7 +101,7 @@ func (m *model) Run(ctx context.Context) (err error) {
|
||||
return errors.New("model already ran")
|
||||
}
|
||||
|
||||
if len(m.steps)+len(m.datasources) == 0 {
|
||||
if len(m.steps) == 0 {
|
||||
return errors.New("no model steps defined")
|
||||
}
|
||||
|
||||
@ -130,28 +118,8 @@ func (m *model) Run(ctx context.Context) (err error) {
|
||||
return fmt.Errorf("%s: failed to validate the model: %w", errPfx, err)
|
||||
}
|
||||
|
||||
// construct the step graph
|
||||
//
|
||||
// If there are no steps, there is nothing to reduce
|
||||
if len(m.steps) == 0 {
|
||||
return nil
|
||||
}
|
||||
err = func() (err error) {
|
||||
gg, err := m.buildStepGraph(m.steps, m.datasources)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.datasources = nil
|
||||
for _, n := range gg {
|
||||
aux, err := m.reduceGraph(ctx, n)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.datasources = append(m.datasources, aux)
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
// construct a model graph for future optimizations
|
||||
m.nodes, err = m.buildStepGraph(m.steps)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", errPfx, err)
|
||||
}
|
||||
@ -162,7 +130,7 @@ func (m *model) Run(ctx context.Context) (err error) {
|
||||
// Describe returns the descriptions for the requested model datasources
|
||||
//
|
||||
// The Run method must be called before the description can be provided.
|
||||
func (m *model) Describe(source string) (out FrameDescriptionSet, err error) {
|
||||
func (m *model) Describe(ctx context.Context, source string) (out FrameDescriptionSet, err error) {
|
||||
var ds Datasource
|
||||
|
||||
err = func() error {
|
||||
@ -170,8 +138,8 @@ func (m *model) Describe(source string) (out FrameDescriptionSet, err error) {
|
||||
return fmt.Errorf("model was not yet ran")
|
||||
}
|
||||
|
||||
ds := m.datasources.Find(source)
|
||||
if ds == nil {
|
||||
ds, err = m.datasource(ctx, &FrameDefinition{Source: source})
|
||||
if err != nil {
|
||||
return fmt.Errorf("model does not contain the datasource: %s", source)
|
||||
}
|
||||
|
||||
@ -207,10 +175,9 @@ func (m *model) Load(ctx context.Context, dd ...*FrameDefinition) (ff []*Frame,
|
||||
}
|
||||
|
||||
def = dd[0]
|
||||
|
||||
ds = m.datasources.Find(def.Source)
|
||||
if ds == nil {
|
||||
return fmt.Errorf("unresolved datasource: %s", def.Source)
|
||||
ds, err = m.datasource(ctx, def)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@ -1,22 +1,21 @@
|
||||
package report
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type (
|
||||
modelGraphNode struct {
|
||||
step step
|
||||
ds Datasource
|
||||
|
||||
pp []*modelGraphNode
|
||||
cc []*modelGraphNode
|
||||
pp []*modelGraphNode
|
||||
cc []*modelGraphNode
|
||||
}
|
||||
)
|
||||
|
||||
// Internally, the model uses a stepGraph to resolve the dependencies between the steps
|
||||
// allowing us to perform some preprocessing, such as size reduction and shape validation
|
||||
//
|
||||
// @todo most of this might need to be done at runtime, not buildtime
|
||||
func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode, error) {
|
||||
func (m *model) buildStepGraph(ss stepSet) (map[string]*modelGraphNode, error) {
|
||||
mp := make(map[string]*modelGraphNode)
|
||||
|
||||
for _, s := range ss {
|
||||
@ -30,7 +29,7 @@ func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode,
|
||||
}
|
||||
mp[s.Name()] = n
|
||||
} else {
|
||||
n.step = s
|
||||
return nil, fmt.Errorf("step name not unique: %s", s.Name())
|
||||
}
|
||||
|
||||
// make sure the child step is in there
|
||||
@ -38,11 +37,9 @@ func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode,
|
||||
c, ok := mp[src]
|
||||
if !ok {
|
||||
c = &modelGraphNode{
|
||||
// will be added later
|
||||
// step is added later when we get to it
|
||||
step: nil,
|
||||
pp: []*modelGraphNode{n},
|
||||
|
||||
ds: dd.Find(src),
|
||||
}
|
||||
mp[src] = c
|
||||
}
|
||||
@ -50,81 +47,64 @@ func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode,
|
||||
}
|
||||
}
|
||||
|
||||
// return all of the root nodes
|
||||
out := make([]*modelGraphNode, 0, len(ss))
|
||||
for _, n := range mp {
|
||||
if len(n.pp) == 0 {
|
||||
out = append(out, n)
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
return mp, nil
|
||||
}
|
||||
|
||||
func (m *model) reduceGraph(ctx context.Context, n *modelGraphNode) (out Datasource, err error) {
|
||||
// Datasource attempts to return the smallest possible sub-tree for the given request
|
||||
//
|
||||
// Flow outline:
|
||||
// * Find the start node that corresponds to the provided definition
|
||||
// * Recursively traverse down the branches
|
||||
// * When returning, see if the given node can be merged with it's datasource
|
||||
//
|
||||
// We try to offload as much work to the datasource to reduce the size of the output frames to reduce
|
||||
// additional processing.
|
||||
func (m *model) datasource(ctx context.Context, def *FrameDefinition) (ds Datasource, err error) {
|
||||
start, ok := m.nodes[def.Source]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unresolved source: %s", def.Source)
|
||||
}
|
||||
|
||||
return m.reduceBranch(ctx, start)
|
||||
}
|
||||
|
||||
func (m *model) reduceBranch(ctx context.Context, n *modelGraphNode) (out Datasource, err error) {
|
||||
// leaf node, nothing else to do
|
||||
if len(n.cc) == 0 {
|
||||
return n.step.Run(ctx)
|
||||
}
|
||||
|
||||
// traverse down the branches
|
||||
auxO := make([]Datasource, len(n.cc))
|
||||
if len(n.cc) > 0 {
|
||||
for i, c := range n.cc {
|
||||
out, err = m.reduceGraph(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
auxO[i] = out
|
||||
}
|
||||
}
|
||||
|
||||
bail := func() (out Datasource, err error) {
|
||||
if n.step == nil {
|
||||
if n.ds != nil {
|
||||
return n.ds, nil
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
aux, err := n.step.Run(ctx, auxO...)
|
||||
for i, c := range n.cc {
|
||||
auxO[i], err = m.reduceBranch(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return
|
||||
}
|
||||
|
||||
return aux, nil
|
||||
}
|
||||
|
||||
if n.step == nil {
|
||||
return bail()
|
||||
}
|
||||
|
||||
// check if this one can reduce the existing datasources
|
||||
//
|
||||
// for now, only "simple branches are supported"
|
||||
var o Datasource
|
||||
// for now only the join step expects multiple inputs; it can't be reduced
|
||||
if len(auxO) > 1 {
|
||||
return bail()
|
||||
} else if len(auxO) > 0 {
|
||||
// use the only available output
|
||||
o = auxO[0]
|
||||
} else {
|
||||
// use own datasource (in case of leaves nodes)
|
||||
o = n.ds
|
||||
return n.step.Run(ctx, auxO...)
|
||||
}
|
||||
|
||||
// try to reduce the group step
|
||||
o := auxO[0]
|
||||
if n.step.Def().Group != nil {
|
||||
gds, ok := o.(GroupableDatasource)
|
||||
if !ok {
|
||||
return bail()
|
||||
return n.step.Run(ctx, auxO...)
|
||||
}
|
||||
|
||||
ok, err = gds.Group(n.step.Def().Group.GroupDefinition, n.step.Name())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
return bail()
|
||||
return n.step.Run(ctx, auxO...)
|
||||
}
|
||||
|
||||
out = gds
|
||||
// we've covered this step with the child step; ignore it
|
||||
return out, nil
|
||||
return gds, nil
|
||||
}
|
||||
|
||||
return bail()
|
||||
return n.step.Run(ctx, auxO...)
|
||||
}
|
||||
|
||||
@ -1,8 +1,14 @@
|
||||
package report
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type (
|
||||
stepLoad struct {
|
||||
ds Datasource
|
||||
dsp DatasourceProvider
|
||||
def *LoadStepDefinition
|
||||
}
|
||||
|
||||
@ -19,3 +25,49 @@ type (
|
||||
Rows *RowDefinition `json:"rows,omitempty"`
|
||||
}
|
||||
)
|
||||
|
||||
func (j *stepLoad) Run(ctx context.Context, _ ...Datasource) (Datasource, error) {
|
||||
return j.dsp.Datasource(ctx, j.Def().Load)
|
||||
}
|
||||
|
||||
func (j *stepLoad) Validate() error {
|
||||
pfx := "invalid load step: "
|
||||
|
||||
// base things...
|
||||
switch {
|
||||
case j.def.Name == "":
|
||||
return errors.New(pfx + "dimension name not defined")
|
||||
|
||||
case j.def.Source == "":
|
||||
return errors.New(pfx + "datasource not defined")
|
||||
case j.def.Definition == nil:
|
||||
return errors.New(pfx + "source definition not provided")
|
||||
}
|
||||
|
||||
// provider
|
||||
switch {
|
||||
case j.dsp == nil:
|
||||
return errors.New(pfx + "datasource provider not defined")
|
||||
}
|
||||
|
||||
// columns...
|
||||
for i, g := range j.def.Columns {
|
||||
if g.Name == "" {
|
||||
return fmt.Errorf("%scolumn key alias missing for column: %d", pfx, i)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *stepLoad) Name() string {
|
||||
return d.def.Name
|
||||
}
|
||||
|
||||
func (d *stepLoad) Source() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *stepLoad) Def() *StepDefinition {
|
||||
return &StepDefinition{Load: d.def}
|
||||
}
|
||||
|
||||
@ -313,7 +313,7 @@ func (svc *report) DescribeFresh(ctx context.Context, src types.ReportDataSource
|
||||
|
||||
var auxOut rep.FrameDescriptionSet
|
||||
for _, s := range sources {
|
||||
auxOut, err = model.Describe(s)
|
||||
auxOut, err = model.Describe(ctx, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user