diff --git a/compose/encoder/encoder.go b/compose/encoder/encoder.go deleted file mode 100644 index 6aeee8e16..000000000 --- a/compose/encoder/encoder.go +++ /dev/null @@ -1,122 +0,0 @@ -package encoder - -import ( - syst "github.com/cortezaproject/corteza-server/system/types" -) - -type ( - multiple uint - - field struct { - name string - encodeAllMulti bool - } - - FlatWriter interface { - Write([]string) error - Flush() - } - - StructuredEncoder interface { - Encode(interface{}) error - } - - userFinder func(ID uint64) (*syst.User, error) - - flatWriter struct { - w FlatWriter - ff []field - u userFinder - tz string - } - - structuredEncoder struct { - w StructuredEncoder - ff []field - u userFinder - tz string - } -) - -func Field(name string) field { - return field{name: name} -} - -func MakeFields(nn ...string) []field { - ff := make([]field, len(nn)) - for i := range nn { - ff[i] = field{name: nn[i]} - } - - return ff -} - -func preprocessHeader(hh []field, tz string) []field { - nhh := make([]field, 0) - - // We need to prepare additional header fields for exporting - if tz != "" && tz != "UTC" { - for _, f := range hh { - switch f.name { - case "createdAt", - "updatedAt", - "deletedAt": - - nhh = append(nhh, f, field{name: f.name + "_date"}, field{name: f.name + "_time"}) - break - default: - nhh = append(nhh, f) - break - } - } - } else { - return hh - } - return nhh -} - -func MultiValueField(name string) field { - return field{name: name, encodeAllMulti: true} -} - -func NewFlatWriter(w FlatWriter, header bool, u userFinder, tz string, ff ...field) *flatWriter { - f := &flatWriter{ - w: w, - ff: preprocessHeader(ff, tz), - u: u, - tz: tz, - } - - if header { - f.writeHeader() - } - - return f -} - -func (enc flatWriter) Flush() { - enc.w.Flush() -} - -func (enc flatWriter) writeHeader() { - ss := make([]string, len(enc.ff)) - for i := range enc.ff { - ss[i] = enc.ff[i].name - } - - _ = enc.w.Write(ss) -} - -func NewStructuredEncoder(w StructuredEncoder, u userFinder, tz string, ff ...field) *structuredEncoder { - return &structuredEncoder{ - w: w, - // No need for additional timezone headers, since the output is structured - ff: ff, - u: u, - tz: tz, - } -} - -func (enc structuredEncoder) Flush() { - // noop -} diff --git a/compose/encoder/encoder_xlsx.go b/compose/encoder/encoder_xlsx.go deleted file mode 100644 index fc4fe48c0..000000000 --- a/compose/encoder/encoder_xlsx.go +++ /dev/null @@ -1,55 +0,0 @@ -package encoder - -import ( - "io" - - "github.com/360EntSecGroup-Skylar/excelize/v2" -) - -type ( - excelizeEncoder struct { - row int - f *excelize.File - w io.Writer - ff []field - u userFinder - tz string - } -) - -func NewExcelizeEncoder(w io.Writer, header bool, u userFinder, tz string, ff ...field) *excelizeEncoder { - enc := &excelizeEncoder{ - f: excelize.NewFile(), - w: w, - ff: preprocessHeader(ff, tz), - u: u, - tz: tz, - } - - if header { - enc.writeHeader() - } - - return enc -} - -func (enc *excelizeEncoder) Flush() { - _ = enc.f.Write(enc.w) -} - -// Returns current row + column to alphanumeric cell name -func (enc excelizeEncoder) pos(col int) string { - cn, _ := excelize.CoordinatesToCellName(col, enc.row) - return cn -} - -func (enc excelizeEncoder) sheet() string { - return "Sheet1" -} - -func (enc *excelizeEncoder) writeHeader() { - enc.row++ - for p := range enc.ff { - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(p+1), enc.ff[p].name) - } -} diff --git a/compose/encoder/record.go b/compose/encoder/record.go deleted file mode 100644 index 3d629ce1d..000000000 --- a/compose/encoder/record.go +++ /dev/null @@ -1,316 +0,0 @@ -package encoder - -import ( - "fmt" - "strconv" - "time" - - "github.com/cortezaproject/corteza-server/compose/types" -) - -type ( - parsedTime struct { - field string - value string - } -) - -// Time formatter -// -// Takes ptr to time.Time so we can conver both cases (value + ptr) -// The function also generates additional fields with included timezone. -func fmtTime(field string, tp *time.Time, tz string) (pt []*parsedTime, err error) { - if tp == nil { - return pt, nil - } - - pt = append(pt, &parsedTime{ - field: field, - value: tp.UTC().Format(time.RFC3339), - }) - if tz == "" || tz == "UTC" { - return - } - - loc, err := time.LoadLocation(tz) - if err != nil { - return pt, err - } - tt := tp.In(loc) - pt = append(pt, - &parsedTime{ - field: field + "_date", - value: tt.Format("2006-01-02"), - }, &parsedTime{ - field: field + "_time", - value: tt.Format("15:04:05"), - }) - return -} - -func fmtUint64(u uint64) string { - return strconv.FormatUint(u, 10) -} - -func fmtSysUser(u uint64, finder userFinder) (string, error) { - if u <= 0 || finder == nil { - return fmtUint64(u), nil - } - su, err := finder(u) - if err != nil { - return "", err - } - - switch true { - case su.Name != "": - return su.Name, nil - case su.Handle != "": - return su.Handle, nil - case su.Email != "": - return su.Email, nil - case su.Username != "": - return su.Username, nil - } - - return fmt.Sprintf("%d", su.ID), nil -} - -func (enc flatWriter) Record(r *types.Record) (err error) { - var out = make([]string, len(enc.ff)) - - procTime := func(d []string, pts []*parsedTime, base int) { - for i, p := range pts { - d[base+i] = p.value - } - } - - for f, field := range enc.ff { - switch field.name { - case "recordID", "ID": - out[f] = fmtUint64(r.ID) - case "moduleID": - out[f] = fmtUint64(r.ModuleID) - case "namespaceID": - out[f] = fmtUint64(r.NamespaceID) - case "ownedBy": - out[f], err = fmtSysUser(r.OwnedBy, enc.u) - if err != nil { - return err - } - case "createdBy": - out[f], err = fmtSysUser(r.CreatedBy, enc.u) - if err != nil { - return err - } - case "createdAt": - tt, err := fmtTime("createdAt", &r.CreatedAt, enc.tz) - if err != nil { - return err - } - procTime(out, tt, f) - case "updatedBy": - out[f], err = fmtSysUser(r.UpdatedBy, enc.u) - if err != nil { - return err - } - case "updatedAt": - tt, err := fmtTime("updatedAt", r.UpdatedAt, enc.tz) - if err != nil { - return err - } - procTime(out, tt, f) - case "deletedBy": - out[f], err = fmtSysUser(r.DeletedBy, enc.u) - if err != nil { - return err - } - case "deletedAt": - tt, err := fmtTime("deletedAt", r.DeletedAt, enc.tz) - if err != nil { - return err - } - procTime(out, tt, f) - default: - vv := r.Values.FilterByName(field.name) - // @todo support for field.encodeAllMulti - if len(vv) > 0 { - out[f] = vv[0].Value - } - } - } - - defer enc.w.Flush() - - return enc.w.Write(out) -} - -func (enc structuredEncoder) Record(r *types.Record) (err error) { - var ( - // Exporter can choose fields so we need this buffer - // to hold just what we need - out = make(map[string]interface{}) - vv types.RecordValueSet - c int - ) - - procTime := func(d map[string]interface{}, pts []*parsedTime) { - for _, p := range pts { - d[p.field] = p.value - } - } - - for _, f := range enc.ff { - switch f.name { - case "recordID", "ID": - // encode all numbers as string (to prevent overflow of uint64 values) - out[f.name] = fmtUint64(r.ID) - case "moduleID": - // encode all numbers as string (to prevent overflow of uint64 values) - out[f.name] = fmtUint64(r.ModuleID) - case "namespaceID": - // encode all numbers as string (to prevent overflow of uint64 values) - out[f.name] = fmtUint64(r.NamespaceID) - case "ownedBy": - out[f.name], err = fmtSysUser(r.OwnedBy, enc.u) - if err != nil { - return err - } - case "createdBy": - out[f.name], err = fmtSysUser(r.CreatedBy, enc.u) - if err != nil { - return err - } - case "createdAt": - tt, err := fmtTime("createdAt", &r.CreatedAt, enc.tz) - if err != nil { - return err - } - procTime(out, tt) - case "updatedBy": - out[f.name], err = fmtSysUser(r.UpdatedBy, enc.u) - if err != nil { - return err - } - case "updatedAt": - if r.UpdatedAt == nil { - out[f.name] = nil - } else { - tt, err := fmtTime("updatedAt", r.UpdatedAt, enc.tz) - if err != nil { - return err - } - procTime(out, tt) - } - - case "deletedBy": - out[f.name], err = fmtSysUser(r.DeletedBy, enc.u) - if err != nil { - return err - } - case "deletedAt": - if r.DeletedAt == nil { - out[f.name] = nil - } else { - tt, err := fmtTime("deletedAt", r.DeletedAt, enc.tz) - if err != nil { - return err - } - procTime(out, tt) - } - - default: - vv = r.Values.FilterByName(f.name) - c = len(vv) - - if c == 0 { - break - } - - if c == 1 { - out[f.name] = vv[0].Value - } else { - multi := make([]string, c) - - for n := range vv { - multi[n] = vv[n].Value - } - - out[f.name] = multi - } - } - } - - return enc.w.Encode(out) -} - -func (enc *excelizeEncoder) Record(r *types.Record) (err error) { - enc.row++ - var u string - - procTime := func(pts []*parsedTime, base int) { - for i, p := range pts { - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(base+i), p.value) - } - } - - for p, f := range enc.ff { - p++ - switch f.name { - case "recordID", "ID": - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), fmtUint64(r.ID)) - case "moduleID": - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), fmtUint64(r.ModuleID)) - case "namespaceID": - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), fmtUint64(r.NamespaceID)) - case "ownedBy": - u, err = fmtSysUser(r.OwnedBy, enc.u) - if err != nil { - return err - } - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), u) - case "createdBy": - u, err = fmtSysUser(r.CreatedBy, enc.u) - if err != nil { - return err - } - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), u) - case "createdAt": - tt, err := fmtTime("createdAt", &r.CreatedAt, enc.tz) - if err != nil { - return err - } - procTime(tt, p) - case "updatedBy": - u, err = fmtSysUser(r.UpdatedBy, enc.u) - if err != nil { - return err - } - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), u) - case "updatedAt": - tt, err := fmtTime("updatedAt", r.UpdatedAt, enc.tz) - if err != nil { - return err - } - procTime(tt, p) - case "deletedBy": - u, err = fmtSysUser(r.DeletedBy, enc.u) - if err != nil { - return err - } - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), u) - case "deletedAt": - tt, err := fmtTime("deletedAt", r.DeletedAt, enc.tz) - if err != nil { - return err - } - procTime(tt, p) - default: - vv := r.Values.FilterByName(f.name) - if len(vv) > 0 { - _ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), vv[0].Value) - } - } - } - - return nil -} diff --git a/compose/encoder/record_test.go b/compose/encoder/record_test.go deleted file mode 100644 index 406a13e8d..000000000 --- a/compose/encoder/record_test.go +++ /dev/null @@ -1,102 +0,0 @@ -package encoder - -import ( - "bytes" - "encoding/csv" - "encoding/json" - "testing" - "time" - - "github.com/cortezaproject/corteza-server/compose/types" - "github.com/stretchr/testify/require" -) - -func Test_RecordEncoding(t *testing.T) { - type args struct { - } - tests := []struct { - name string - ff []field - rr []*types.Record - - flatResult string - structResult string - }{ - { - name: "covering the basics", - ff: MakeFields("recordID", "ownedBy", "createdAt", "deletedAt", "some-foo-field", "foo", "fff"), - rr: []*types.Record{ - &types.Record{ - ID: 12345, - OwnedBy: 12345, - CreatedAt: time.Unix(1504976400, 0), - }, - &types.Record{ - ID: 54321, - OwnedBy: 12345, - CreatedAt: time.Unix(12345, 0), - Values: []*types.RecordValue{ - { - Name: "foo", - Value: "bar", - }, - { - Name: "fff", - Value: "1", - }, - { - Name: "fff", - Value: "2", - }, - }, - }, - }, - - flatResult: `recordID,ownedBy,createdAt,deletedAt,some-foo-field,foo,fff` + "\n" + - `12345,12345,2017-09-09T17:00:00Z,,,,` + "\n" + - `54321,12345,1970-01-01T03:25:45Z,,,bar,1` + "\n", - - structResult: `{"createdAt":"2017-09-09T17:00:00Z","deletedAt":null,"ownedBy":"12345","recordID":"12345"}` + "\n" + - `{"createdAt":"1970-01-01T03:25:45Z","deletedAt":null,"fff":["1","2"],"foo":"bar","ownedBy":"12345","recordID":"54321"}` + "\n", - }, - } - - for _, tt := range tests { - t.Run(tt.name+" (csv)", func(t *testing.T) { - buf := bytes.NewBuffer([]byte{}) - csvWriter := csv.NewWriter(buf) - - fenc := NewFlatWriter(csvWriter, true, nil, "UTC", tt.ff...) - for _, r := range tt.rr { - if err := fenc.Record(r); err != nil { - t.Errorf("unexpected error = %v,", err) - } - } - - csvWriter.Flush() - require.True(t, - buf.String() == tt.flatResult, - "Unexpected result: \n%s\n%s", - buf.String(), - tt.flatResult) - }) - - t.Run(tt.name+" (json)", func(t *testing.T) { - buf := bytes.NewBuffer([]byte{}) - jsonEnc := json.NewEncoder(buf) - - senc := NewStructuredEncoder(jsonEnc, nil, "UTC", tt.ff...) - for _, r := range tt.rr { - if err := senc.Record(r); err != nil { - t.Errorf("unexpected error = %v,", err) - } - } - - require.True(t, - buf.String() == tt.structResult, - "Unexpected result: \n%s\n%s", - buf.String(), - tt.structResult) - }) - } -} diff --git a/compose/rest/record.go b/compose/rest/record.go index cf84ce731..2e123a4db 100644 --- a/compose/rest/record.go +++ b/compose/rest/record.go @@ -2,24 +2,28 @@ package rest import ( "context" - "encoding/csv" "encoding/json" "fmt" + "io" "net/http" "strconv" "strings" + "time" - "github.com/cortezaproject/corteza-server/compose/encoder" "github.com/cortezaproject/corteza-server/compose/rest/request" "github.com/cortezaproject/corteza-server/compose/service" "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/pkg/api" "github.com/cortezaproject/corteza-server/pkg/corredor" + "github.com/cortezaproject/corteza-server/pkg/envoy" + "github.com/cortezaproject/corteza-server/pkg/envoy/csv" + ejson "github.com/cortezaproject/corteza-server/pkg/envoy/json" + "github.com/cortezaproject/corteza-server/pkg/envoy/resource" + estore "github.com/cortezaproject/corteza-server/pkg/envoy/store" "github.com/cortezaproject/corteza-server/pkg/filter" "github.com/cortezaproject/corteza-server/pkg/payload" "github.com/cortezaproject/corteza-server/store" systemService "github.com/cortezaproject/corteza-server/system/service" - systemTypes "github.com/cortezaproject/corteza-server/system/types" ) type ( @@ -316,8 +320,63 @@ func (ctrl *Record) ImportRun(ctx context.Context, r *request.RecordImportRun) ( ses.OnError = r.OnError // Errors are presented in the session - ctrl.record.Import(ctx, ses) - return ses, nil + err = func() (err error) { + if ses.Progress.StartedAt != nil { + return fmt.Errorf("unable to start import: import session already active") + } + + sa := time.Now() + ses.Progress.StartedAt = &sa + + // Prepare additional metadata + tpl := resource.NewComposeRecordTemplate( + strconv.FormatUint(ses.ModuleID, 10), + strconv.FormatUint(ses.NamespaceID, 10), + ses.Name, + resource.MapToMappingTplSet(ses.Fields), + ) + + // Shape the data + ses.Resources = append(ses.Resources, tpl) + rt := resource.ComposeRecordShaper() + ses.Resources, err = resource.Shape(ses.Resources, rt) + + // Build + cfg := &estore.EncoderConfig{ + // For now the identifier is ignored, so this will never occur + OnExisting: resource.Skip, + Defer: func() { + ses.Progress.Completed++ + }, + } + if ses.OnError == service.IMPORT_ON_ERROR_SKIP { + cfg.DeferNok = func(err error) error { + ses.Progress.Failed++ + ses.Progress.FailReason = err.Error() + + return nil + } + } + se := estore.NewStoreEncoder(service.DefaultStore, cfg) + bld := envoy.NewBuilder(se) + g, err := bld.Build(ctx, ses.Resources...) + if err != nil { + return err + } + + // Encode + err = envoy.Encode(ctx, g, se) + now := time.Now() + ses.Progress.FinishedAt = &now + if err != nil { + ses.Progress.FailReason = err.Error() + return err + } + + return + }() + + return ses, ctrl.record.RecordImport(ctx, err) } func (ctrl *Record) ImportProgress(ctx context.Context, r *request.RecordImportProgress) (interface{}, error) { @@ -331,31 +390,23 @@ func (ctrl *Record) ImportProgress(ctx context.Context, r *request.RecordImportP } func (ctrl *Record) Export(ctx context.Context, r *request.RecordExport) (interface{}, error) { - type ( - // ad-hoc interface for our encoder - Encoder interface { - service.Encoder - Flush() - } - ) - var ( err error - // Record encoder - recordEncoder Encoder - filename = fmt.Sprintf("; filename=%s.%s", r.Filename, r.Ext) - f = types.RecordFilter{ + rf = &types.RecordFilter{ + Query: r.Filter, NamespaceID: r.NamespaceID, ModuleID: r.ModuleID, - Query: r.Filter, } + f = estore.NewDecodeFilter(). + ComposeRecord(rf) contentType string ) - // Access control. + + // Access control if _, err = ctrl.module.FindByID(ctx, r.NamespaceID, r.ModuleID); err != nil { return nil, err } @@ -365,47 +416,37 @@ func (ctrl *Record) Export(ctx context.Context, r *request.RecordExport) (interf } return func(w http.ResponseWriter, req *http.Request) { - ff := encoder.MakeFields(r.Fields...) - - if len(ff) == 0 { + if len(r.Fields) == 0 { http.Error(w, "no record value fields provided", http.StatusBadRequest) } - // Custom user getter function for the underlying encoders. - // - // not the most optimal solution; we have no other means to do a proper preload of users - // @todo preload users - users := map[uint64]*systemTypes.User{} - - uf := func(ID uint64) (*systemTypes.User, error) { - var err error - - if _, exists := users[ID]; exists { - // nonexistent users are also cached! - return users[ID], nil - } - - // @todo this "communication" between system and compose - // services is ad-hoc solution - users[ID], err = ctrl.userFinder.FindByID(ctx, ID) - if err != nil { - return nil, err - } - return users[ID], nil + fx := make(map[string]bool) + for _, f := range r.Fields { + fx[f] = true } + sd := estore.Decoder() + nn, err := sd.Decode(ctx, service.DefaultStore, f) + if err != nil { + http.Error(w, fmt.Sprintf("failed to fetch records: %s", err.Error()), http.StatusBadRequest) + } + + var encoder envoy.PrepareEncodeStreammer + switch strings.ToLower(r.Ext) { case "json", "jsonl", "ldjson", "ndjson": contentType = "application/jsonl" - recordEncoder = encoder.NewStructuredEncoder(json.NewEncoder(w), uf, r.Timezone, ff...) + encoder = ejson.NewBulkRecordEncoder(&ejson.EncoderConfig{ + Fields: fx, + Timezone: r.Timezone, + }) case "csv": contentType = "text/csv" - recordEncoder = encoder.NewFlatWriter(csv.NewWriter(w), true, uf, r.Timezone, ff...) - - case "xlsx": - contentType = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" - recordEncoder = encoder.NewExcelizeEncoder(w, true, uf, r.Timezone, ff...) + encoder = csv.NewBulkRecordEncoder(&csv.EncoderConfig{ + Fields: fx, + Timezone: r.Timezone, + }) default: http.Error(w, "unsupported format ("+r.Ext+")", http.StatusBadRequest) @@ -415,13 +456,28 @@ func (ctrl *Record) Export(ctx context.Context, r *request.RecordExport) (interf w.Header().Add("Content-Type", contentType) w.Header().Add("Content-Disposition", "attachment"+filename) - if err = ctrl.record.Export(ctx, f, recordEncoder); err != nil { + bld := envoy.NewBuilder(encoder) + g, err := bld.Build(ctx, nn...) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) - return } - recordEncoder.Flush() - }, nil + err = envoy.Encode(ctx, g, encoder) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + ss := encoder.Stream() + + // Find only the stream we are interested in + for _, s := range ss { + if s.Resource == resource.COMPOSE_RECORD_RESOURCE_TYPE { + io.Copy(w, s.Source) + } + } + + err = ctrl.record.RecordExport(ctx, *rf) + + }, err } func (ctrl Record) Exec(ctx context.Context, r *request.RecordExec) (interface{}, error) { diff --git a/compose/service/record.go b/compose/service/record.go index 1020cee7b..aabc393d0 100644 --- a/compose/service/record.go +++ b/compose/service/record.go @@ -13,9 +13,7 @@ import ( "github.com/cortezaproject/corteza-server/pkg/actionlog" "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/pkg/corredor" - "github.com/cortezaproject/corteza-server/pkg/envoy" "github.com/cortezaproject/corteza-server/pkg/envoy/resource" - estore "github.com/cortezaproject/corteza-server/pkg/envoy/store" "github.com/cortezaproject/corteza-server/pkg/errors" "github.com/cortezaproject/corteza-server/pkg/eventbus" "github.com/cortezaproject/corteza-server/pkg/label" @@ -79,8 +77,8 @@ type ( Report(ctx context.Context, namespaceID, moduleID uint64, metrics, dimensions, filter string) (interface{}, error) Find(ctx context.Context, filter types.RecordFilter) (set types.RecordSet, f types.RecordFilter, err error) - Export(context.Context, types.RecordFilter, Encoder) error - Import(context.Context, *recordImportSession) error + RecordExport(context.Context, types.RecordFilter) error + RecordImport(context.Context, error) error Create(ctx context.Context, record *types.Record) (*types.Record, error) Update(ctx context.Context, record *types.Record) (*types.Record, error) @@ -97,10 +95,6 @@ type ( EventEmitting(enable bool) } - Encoder interface { - Record(*types.Record) error - } - recordImportSession struct { Name string `json:"-"` SessionID uint64 `json:"sessionID,string"` @@ -311,96 +305,13 @@ func (svc record) Find(ctx context.Context, filter types.RecordFilter) (set type return set, f, svc.recordAction(ctx, aProps, RecordActionSearch, err) } -func (svc record) Import(ctx context.Context, ses *recordImportSession) (err error) { - var ( - aProps = &recordActionProps{} - ) - - err = func() (err error) { - if ses.Progress.StartedAt != nil { - return fmt.Errorf("unable to start import: import session already active") - } - - sa := time.Now() - ses.Progress.StartedAt = &sa - - // Prepare additional metadata - tpl := resource.NewComposeRecordTemplate( - strconv.FormatUint(ses.ModuleID, 10), - strconv.FormatUint(ses.NamespaceID, 10), - ses.Name, - resource.MapToMappingTplSet(ses.Fields), - ) - - // Shape the data - ses.Resources = append(ses.Resources, tpl) - rt := resource.ComposeRecordShaper() - ses.Resources, err = resource.Shape(ses.Resources, rt) - - // Build - cfg := &estore.EncoderConfig{ - // For now the identifier is ignored, so this will never occur - OnExisting: resource.Skip, - Defer: func() { - ses.Progress.Completed++ - }, - } - if ses.OnError == IMPORT_ON_ERROR_SKIP { - cfg.DeferNok = func(err error) error { - ses.Progress.Failed++ - ses.Progress.FailReason = err.Error() - - return nil - } - } - se := estore.NewStoreEncoder(svc.store, cfg) - bld := envoy.NewBuilder(se) - g, err := bld.Build(ctx, ses.Resources...) - if err != nil { - return err - } - - // Encode - err = envoy.Encode(ctx, g, se) - ses.Progress.FinishedAt = now() - if err != nil { - ses.Progress.FailReason = err.Error() - ses.Progress.Failed++ - return err - } - - return - }() - - return svc.recordAction(ctx, aProps, RecordActionImport, err) +func (svc record) RecordImport(ctx context.Context, err error) error { + return svc.recordAction(ctx, &recordActionProps{}, RecordActionImport, err) } -// Export returns all records -// -// @todo better value handling -func (svc record) Export(ctx context.Context, f types.RecordFilter, enc Encoder) (err error) { - var ( - aProps = &recordActionProps{filter: &f} - - m *types.Module - set types.RecordSet - ) - - err = func() (err error) { - m, err = loadModule(ctx, svc.store, f.ModuleID) - if err != nil { - return err - } - - set, _, err = store.SearchComposeRecords(ctx, svc.store, m, f) - if err != nil { - return err - } - - return set.Walk(enc.Record) - }() - - return svc.recordAction(ctx, aProps, RecordActionExport, err) +// RecordExport records that the export has occurred +func (svc record) RecordExport(ctx context.Context, f types.RecordFilter) (err error) { + return svc.recordAction(ctx, &recordActionProps{filter: &f}, RecordActionExport, err) } // Bulk handles provided set of bulk record operations. @@ -541,7 +452,7 @@ func (svc record) create(ctx context.Context, new *types.Record) (rec *types.Rec return nil, RecordErrNotAllowedToCreate() } - if err = svc.generalValueSetValidation(m, new.Values); err != nil { + if err = RecordValueSanitazion(m, new.Values); err != nil { return } @@ -550,12 +461,10 @@ func (svc record) create(ctx context.Context, new *types.Record) (rec *types.Rec ) if svc.optEmitEvents { - // Handle input payload if rve = svc.procCreate(ctx, svc.store, invokerID, m, new); !rve.IsValid() { return nil, RecordErrValueInput().Wrap(rve) } - new.Values = svc.formatter.Run(m, new.Values) if err = svc.eventbus.WaitFor(ctx, event.RecordBeforeCreate(new, nil, m, ns, rve)); err != nil { return } else if !rve.IsValid() { @@ -563,8 +472,7 @@ func (svc record) create(ctx context.Context, new *types.Record) (rec *types.Rec } } - // Assign defaults (only on missing values) - new.Values = svc.setDefaultValues(m, new.Values) + new.Values = RecordValueDefaults(m, new.Values) // Handle payload from automation scripts if rve = svc.procCreate(ctx, svc.store, invokerID, m, new); !rve.IsValid() { @@ -594,6 +502,166 @@ func (svc record) create(ctx context.Context, new *types.Record) (rec *types.Rec return } +// RecordValueSanitazion does basic field and format validation +// +// Received values must fit the data model: on unknown fields +// or multi/single value mismatch we return an error +// +// Record value errors is intentionally NOT used here; if input fails here +// we can assume that form builder (or whatever it was that assembled the record values) +// was misconfigured and will most likely failed to properly parse the +// record value errors payload too +func RecordValueSanitazion(m *types.Module, vv types.RecordValueSet) (err error) { + var ( + aProps = &recordActionProps{} + numeric = regexp.MustCompile(`^[1-9](\d+)$`) + ) + + err = vv.Walk(func(v *types.RecordValue) error { + var field = m.Fields.FindByName(v.Name) + if field == nil { + return RecordErrFieldNotFound(aProps.setField(v.Name)) + } + + if field.IsRef() { + if v.Value == "" { + return nil + } + + if !numeric.MatchString(v.Value) { + return RecordErrInvalidReferenceFormat(aProps.setField(v.Name).setValue(v.Value)) + } + } + + return nil + }) + + if err != nil { + return + } + + // Make sure there are no multi values in a non-multi value fields + err = m.Fields.Walk(func(field *types.ModuleField) error { + if !field.Multi && len(vv.FilterByName(field.Name)) > 1 { + return RecordErrInvalidValueStructure(aProps.setField(field.Name)) + } + + return nil + }) + + if err != nil { + return + } + + return +} + +func RecordUpdateOwner(invokerID uint64, r, old *types.Record) *types.Record { + if old == nil { + if r.OwnedBy == 0 { + // If od owner is not set, make current user + // the owner of the record + r.OwnedBy = invokerID + } + } else { + if r.OwnedBy == 0 { + if old.OwnedBy > 0 { + // Owner not set/send in the payload + // + // Fallback to old owner (if set) + r.OwnedBy = old.OwnedBy + } else { + // If od owner is not set, make current user + // the owner of the record + r.OwnedBy = invokerID + } + } + } + + return r +} + +func RecordValueMerger(ctx context.Context, ac recordValueAccessController, m *types.Module, vv, old types.RecordValueSet) (types.RecordValueSet, *types.RecordValueErrorSet) { + if old != nil { + // Value merge process does not know anything about permissions so + // in case when new values are missing but do exist in the old set and their update/read is denied + // we need to copy them to ensure value merge process them correctly + for _, f := range m.Fields { + if len(vv.FilterByName(f.Name)) == 0 && !ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(f.Name)) { + // copy all fields from old to new + vv = append(vv, old.FilterByName(f.Name).GetClean()...) + } + } + + // Merge new (updated) values with old ones + // This way we get list of updated, stale and deleted values + // that we can selectively update in the repository + vv = old.Merge(vv) + } + + rve := &types.RecordValueErrorSet{} + _ = vv.Walk(func(v *types.RecordValue) error { + if v.IsUpdated() && !ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(v.Name)) { + rve.Push(types.RecordValueError{Kind: "updateDenied", Meta: map[string]interface{}{"field": v.Name, "value": v.Value}}) + } + + return nil + }) + + return vv, rve +} + +func RecordPreparer(ctx context.Context, s store.Storer, ss recordValuesSanitizer, vv recordValuesValidator, ff recordValuesFormatter, m *types.Module, new *types.Record) *types.RecordValueErrorSet { + // Before values are processed further and + // sent to automation scripts (if any) + // we need to make sure it does not get un-sanitized data + new.Values = ss.Run(m, new.Values) + + rve := &types.RecordValueErrorSet{} + values.Expression(ctx, m, new, nil, rve) + + if !rve.IsValid() { + return rve + } + + // Run validation of the updated records + rve = vv.Run(ctx, s, m, new) + if !rve.IsValid() { + return rve + } + + // Cleanup the values + new.Values = new.Values.GetClean() + + // Formatting + new.Values = ff.Run(m, new.Values) + + return nil +} + +func RecordValueDefaults(m *types.Module, vv types.RecordValueSet) (out types.RecordValueSet) { + out = vv + + for _, f := range m.Fields { + if f.DefaultValue == nil { + continue + } + + for i, dv := range f.DefaultValue { + // Default values on field are (might be) without field name and place + if !out.Has(f.Name, uint(i)) { + out = append(out, &types.RecordValue{ + Name: f.Name, + Value: dv.Value, + Place: uint(i), + }) + } + } + } + + return +} + // Raw update function that is responsible for value validation, event dispatching // and update. func (svc record) update(ctx context.Context, upd *types.Record) (rec *types.Record, err error) { @@ -628,7 +696,7 @@ func (svc record) update(ctx context.Context, upd *types.Record) (rec *types.Rec return nil, RecordErrStaleData() } - if err = svc.generalValueSetValidation(m, upd.Values); err != nil { + if err = RecordValueSanitazion(m, upd.Values); err != nil { return } @@ -642,17 +710,6 @@ func (svc record) update(ctx context.Context, upd *types.Record) (rec *types.Rec return nil, RecordErrValueInput().Wrap(rve) } - // Before we pass values to record-before-update handling events - // values needs do be cleaned up - // - // Value merge inside procUpdate sets delete flag we need - // when changes are applied but we do not want deleted values - // to be sent to handler - upd.Values = upd.Values.GetClean() - - // Before we pass values to automation scripts, they should be formatted - upd.Values = svc.formatter.Run(m, upd.Values) - // Scripts can (besides simple error value) return complex record value error set // that is passed back to the UI or any other API consumer // @@ -720,15 +777,9 @@ func (svc record) Create(ctx context.Context, new *types.Record) (rec *types.Rec // of the creation procedure and after results are back from the automation scripts // // Both these points introduce external data that need to be checked fully in the same manner -func (svc record) procCreate(ctx context.Context, s store.Storer, invokerID uint64, m *types.Module, new *types.Record) *types.RecordValueErrorSet { - // Mark all values as updated (new) +func (svc record) procCreate(ctx context.Context, s store.Storer, invokerID uint64, m *types.Module, new *types.Record) (rve *types.RecordValueErrorSet) { new.Values.SetUpdatedFlag(true) - // Before values are processed further and - // sent to automation scripts (if any) - // we need to make sure it does not get un-sanitized data - new.Values = svc.sanitizer.Run(m, new.Values) - // Reset values to new record // to make sure nobody slips in something we do not want new.ID = nextID() @@ -739,33 +790,13 @@ func (svc record) procCreate(ctx context.Context, s store.Storer, invokerID uint new.DeletedAt = nil new.DeletedBy = 0 - if new.OwnedBy == 0 { - // If od owner is not set, make current user - // the owner of the record - new.OwnedBy = invokerID - } - - rve := &types.RecordValueErrorSet{} - _ = new.Values.Walk(func(v *types.RecordValue) error { - if v.IsUpdated() && !svc.ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(v.Name)) { - rve.Push(types.RecordValueError{Kind: "updateDenied", Meta: map[string]interface{}{"field": v.Name, "value": v.Value}}) - } - - return nil - }) - + new = RecordUpdateOwner(invokerID, new, nil) + new.Values, rve = RecordValueMerger(ctx, svc.ac, m, new.Values, nil) if !rve.IsValid() { return rve } - - values.Expression(ctx, m, new, nil, rve) - - if !rve.IsValid() { - return rve - } - - // Run validation of the updated records - return svc.validator.Run(ctx, s, m, new) + rve = RecordPreparer(ctx, svc.store, svc.sanitizer, svc.validator, svc.formatter, m, new) + return rve } func (svc record) Update(ctx context.Context, upd *types.Record) (rec *types.Record, err error) { @@ -789,17 +820,10 @@ func (svc record) Update(ctx context.Context, upd *types.Record) (rec *types.Rec // of the update procedure and after results are back from the automation scripts // // Both these points introduce external data that need to be checked fully in the same manner -func (svc record) procUpdate(ctx context.Context, s store.Storer, invokerID uint64, m *types.Module, upd *types.Record, old *types.Record) *types.RecordValueErrorSet { +func (svc record) procUpdate(ctx context.Context, s store.Storer, invokerID uint64, m *types.Module, upd *types.Record, old *types.Record) (rve *types.RecordValueErrorSet) { // Mark all values as updated (new) upd.Values.SetUpdatedFlag(true) - // First sanitization - // - // Before values are merged with existing data and - // sent to automation scripts (if any) - // we need to make sure it does not get sanitized data - upd.Values = svc.sanitizer.Run(m, upd.Values) - // Copy values to updated record // to make sure nobody slips in something we do not want upd.CreatedAt = old.CreatedAt @@ -809,55 +833,13 @@ func (svc record) procUpdate(ctx context.Context, s store.Storer, invokerID uint upd.DeletedAt = old.DeletedAt upd.DeletedBy = old.DeletedBy - if upd.OwnedBy == 0 { - if old.OwnedBy > 0 { - // Owner not set/send in the payload - // - // Fallback to old owner (if set) - upd.OwnedBy = old.OwnedBy - } else { - // If od owner is not set, make current user - // the owner of the record - upd.OwnedBy = invokerID - } - } - - // Value merge process does not know anything about permissions so - // in case when new values are missing but do exist in the old set and their update/read is denied - // we need to copy them to ensure value merge process them correctly - for _, f := range m.Fields { - if len(upd.Values.FilterByName(f.Name)) == 0 && !svc.ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(f.Name)) { - // copy all fields from old to new - upd.Values = append(upd.Values, old.Values.FilterByName(f.Name).GetClean()...) - } - } - - // Merge new (updated) values with old ones - // This way we get list of updated, stale and deleted values - // that we can selectively update in the repository - upd.Values = old.Values.Merge(upd.Values) - - rve := &types.RecordValueErrorSet{} - _ = upd.Values.Walk(func(v *types.RecordValue) error { - if v.IsUpdated() && !svc.ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(v.Name)) { - rve.Push(types.RecordValueError{Kind: "updateDenied", Meta: map[string]interface{}{"field": v.Name, "value": v.Value}}) - } - - return nil - }) - + upd = RecordUpdateOwner(invokerID, upd, old) + upd.Values, rve = RecordValueMerger(ctx, svc.ac, m, upd.Values, old.Values) if !rve.IsValid() { return rve } - - values.Expression(ctx, m, upd, old, rve) - - if !rve.IsValid() { - return rve - } - - // Run validation of the updated records - return svc.validator.Run(ctx, s, m, upd) + rve = RecordPreparer(ctx, svc.store, svc.sanitizer, svc.validator, svc.formatter, m, upd) + return rve } func (svc record) recordInfoUpdate(ctx context.Context, r *types.Record) { @@ -1262,7 +1244,7 @@ func (svc record) Iterator(ctx context.Context, f types.RecordFilter, fn eventbu recordableAction = RecordActionIteratorClone // Assign defaults (only on missing values) - rec.Values = svc.setDefaultValues(m, rec.Values) + rec.Values = RecordValueDefaults(m, rec.Values) // Handle payload from automation scripts if rve := svc.procCreate(ctx, svc.store, invokerID, m, rec); !rve.IsValid() { @@ -1311,83 +1293,6 @@ func (svc record) Iterator(ctx context.Context, f types.RecordFilter, fn eventbu } -func (svc record) setDefaultValues(m *types.Module, vv types.RecordValueSet) (out types.RecordValueSet) { - out = vv - - for _, f := range m.Fields { - if f.DefaultValue == nil { - continue - } - - for i, dv := range f.DefaultValue { - // Default values on field are (might be) without field name and place - if !out.Has(f.Name, uint(i)) { - out = append(out, &types.RecordValue{ - Name: f.Name, - Value: dv.Value, - Place: uint(i), - }) - } - } - } - - return -} - -// Does basic field and format validation -// -// Received values must fit the data model: on unknown fields -// or multi/single value mismatch we return an error -// -// Record value errors is intentionally NOT used here; if input fails here -// we can assume that form builder (or whatever it was that assembled the record values) -// was misconfigured and will most likely failed to properly parse the -// record value errors payload too -func (svc record) generalValueSetValidation(m *types.Module, vv types.RecordValueSet) (err error) { - var ( - aProps = &recordActionProps{} - numeric = regexp.MustCompile(`^[1-9](\d+)$`) - ) - - err = vv.Walk(func(v *types.RecordValue) error { - var field = m.Fields.FindByName(v.Name) - if field == nil { - return RecordErrFieldNotFound(aProps.setField(v.Name)) - } - - if field.IsRef() { - if v.Value == "" { - return nil - } - - if !numeric.MatchString(v.Value) { - return RecordErrInvalidReferenceFormat(aProps.setField(v.Name).setValue(v.Value)) - } - } - - return nil - }) - - if err != nil { - return - } - - // Make sure there are no multi values in a non-multi value fields - err = m.Fields.Walk(func(field *types.ModuleField) error { - if !field.Multi && len(vv.FilterByName(field.Name)) > 1 { - return RecordErrInvalidValueStructure(aProps.setField(field.Name)) - } - - return nil - }) - - if err != nil { - return - } - - return -} - // checks record-value-read access permissions for all module fields and removes unreadable fields from all records func trimUnreadableRecordFields(ctx context.Context, ac recordValueAccessController, m *types.Module, rr ...*types.Record) { var ( diff --git a/tests/compose/record_test.go b/tests/compose/record_test.go index dc1fb86f2..c03c39d01 100644 --- a/tests/compose/record_test.go +++ b/tests/compose/record_test.go @@ -4,20 +4,21 @@ import ( "bytes" "context" "fmt" - "github.com/cortezaproject/corteza-server/compose/service" - "github.com/cortezaproject/corteza-server/compose/types" - "github.com/cortezaproject/corteza-server/pkg/id" - "github.com/cortezaproject/corteza-server/store" - "github.com/cortezaproject/corteza-server/tests/helpers" - "github.com/steinfletcher/apitest" - "github.com/steinfletcher/apitest-jsonpath" - "github.com/stretchr/testify/require" "io/ioutil" "mime/multipart" "net/http" "net/url" "testing" "time" + + "github.com/cortezaproject/corteza-server/compose/service" + "github.com/cortezaproject/corteza-server/compose/types" + "github.com/cortezaproject/corteza-server/pkg/id" + "github.com/cortezaproject/corteza-server/store" + "github.com/cortezaproject/corteza-server/tests/helpers" + "github.com/steinfletcher/apitest" + jsonpath "github.com/steinfletcher/apitest-jsonpath" + "github.com/stretchr/testify/require" ) func (h helper) clearRecords() { @@ -315,8 +316,10 @@ func TestRecordExport(t *testing.T) { h.clearRecords() module := h.repoMakeRecordModuleWithFields("record export module") + expected := "id,name\n" for i := 0; i < 10; i++ { - h.makeRecord(module, &types.RecordValue{Name: "name", Value: fmt.Sprintf("d%d", i), Place: uint(i)}) + r := h.makeRecord(module, &types.RecordValue{Name: "name", Value: fmt.Sprintf("d%d", i), Place: uint(i)}) + expected += fmt.Sprintf("%d,d%d\n", r.ID, i) } // we'll not use standard asserts (AssertNoErrors) here, @@ -330,7 +333,7 @@ func TestRecordExport(t *testing.T) { b, err := ioutil.ReadAll(r.Response.Body) h.noError(err) - h.a.Equal("name\nd0\nd1\nd2\nd3\nd4\nd5\nd6\nd7\nd8\nd9\n", string(b)) + h.a.Equal(expected, string(b)) } func (h helper) apiInitRecordImport(api *apitest.APITest, url, f string, file []byte) *apitest.Response { @@ -408,6 +411,7 @@ func TestRecordImportInit_invalidFileFormat(t *testing.T) { func TestRecordImportRun(t *testing.T) { h := newHelper(t) h.clearRecords() + h.allow(types.ModuleRBACResource.AppendWildcard(), "record.create") module := h.repoMakeRecordModuleWithFields("record import run module") tests := []struct { @@ -449,6 +453,74 @@ func TestRecordImportRun_sessionNotFound(t *testing.T) { End() } +func TestRecordImportRunForbidden(t *testing.T) { + h := newHelper(t) + h.clearRecords() + h.deny(types.ModuleRBACResource.AppendWildcard(), "record.create") + + module := h.repoMakeRecordModuleWithFields("record import run module") + tests := []struct { + Name string + Content string + }{ + { + Name: "f1.csv", + Content: "fname,femail\nv1,v2\n", + }, + } + + for _, test := range tests { + t.Run(t.Name(), func(t *testing.T) { + url := fmt.Sprintf("/namespace/%d/module/%d/record/import", module.NamespaceID, module.ID) + rsp := &rImportSession{} + api := h.apiInit() + + r := h.apiInitRecordImport(api, url, test.Name, []byte(test.Content)).End() + r.JSON(rsp) + + h.apiRunRecordImport(api, fmt.Sprintf("%s/%s", url, rsp.Response.SessionID), `{"fields":{"fname":"name","femail":"email"},"onError":"fail"}`). + Assert(helpers.AssertErrorP("not allowed to create records for module")). + End() + }) + } +} + +func TestRecordImportRunForbidden_field(t *testing.T) { + h := newHelper(t) + h.clearRecords() + h.allow(types.ModuleRBACResource.AppendWildcard(), "record.create") + + module := h.repoMakeRecordModuleWithFields("record import run module") + + f := module.Fields.FindByName("name") + h.deny(types.ModuleFieldRBACResource.AppendID(f.ID), "record.value.update") + + tests := []struct { + Name string + Content string + }{ + { + Name: "f1.csv", + Content: "fname,femail\nv1,v2\n", + }, + } + + for _, test := range tests { + t.Run(t.Name(), func(t *testing.T) { + url := fmt.Sprintf("/namespace/%d/module/%d/record/import", module.NamespaceID, module.ID) + rsp := &rImportSession{} + api := h.apiInit() + + r := h.apiInitRecordImport(api, url, test.Name, []byte(test.Content)).End() + r.JSON(rsp) + + h.apiRunRecordImport(api, fmt.Sprintf("%s/%s", url, rsp.Response.SessionID), `{"fields":{"fname":"name","femail":"email"},"onError":"fail"}`). + Assert(helpers.AssertErrorP("1 issue(s) found")). + End() + }) + } +} + func TestRecordImportImportProgress(t *testing.T) { h := newHelper(t) h.clearRecords() diff --git a/tests/helpers/assert.go b/tests/helpers/assert.go index cc7129972..ae6fced76 100644 --- a/tests/helpers/assert.go +++ b/tests/helpers/assert.go @@ -151,3 +151,23 @@ func AssertBody(expected string) assertFn { return nil } } + +// AssertErrorP checks if the expected error is part of the error messsage +func AssertErrorP(expectedError string) assertFn { + return func(rsp *http.Response, _ *http.Request) (err error) { + tmp := StdErrorResponse{} + if err = DecodeBody(rsp, &tmp); err != nil { + return err + } + + if tmp.Error.Message == "" { + return errors.Errorf("No error, expecting error with: %v", expectedError) + } + + if !strings.Contains(tmp.Error.Message, expectedError) { + return errors.Errorf("Expecting error with %v, got: %v", expectedError, tmp.Error.Message) + } + + return nil + } +}