Fix attachment management for all supported types
* Add missing expr.Bytes expr. type * Fix attachment upload when []bytes used (missing type, improper size calculation) * Fix attachment upload when ReadSeeker used (missing size calculation) * Fix attachment upload when Reader used (missing logic, missing size calculation)
This commit is contained in:
@@ -114,6 +114,7 @@ func Initialize(ctx context.Context, log *zap.Logger, s store.Storer, ws websock
|
||||
&expr.Reader{},
|
||||
&expr.Vars{},
|
||||
&expr.HttpRequest{},
|
||||
&expr.Bytes{},
|
||||
|
||||
&automation.EmailMessage{},
|
||||
)
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package automation
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/compose/types"
|
||||
@@ -90,12 +93,72 @@ func (h attachmentHandler) create(ctx context.Context, args *attachmentCreateArg
|
||||
|
||||
switch {
|
||||
case len(args.contentBytes) > 0:
|
||||
size = int64(len(args.contentString))
|
||||
size = int64(len(args.contentBytes))
|
||||
fh = bytes.NewReader(args.contentBytes)
|
||||
|
||||
case args.contentStream != nil:
|
||||
if rs, is := args.contentStream.(io.ReadSeeker); is {
|
||||
_, err = rs.Seek(0, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
size, err = getReaderSize(rs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = rs.Seek(0, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fh = rs
|
||||
} else {
|
||||
// In case we only got a reader...
|
||||
//
|
||||
// For future proofing, for handling larger attachment, we create a temp.
|
||||
// file which we then use as a reader
|
||||
|
||||
// Preparations
|
||||
tmpf, err := ioutil.TempFile("", "reader")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tmpf.Close()
|
||||
defer os.Remove(tmpf.Name())
|
||||
|
||||
// Writing content to file
|
||||
w := bufio.NewWriter(tmpf)
|
||||
r := args.contentStream
|
||||
buf := make([]byte, 1024)
|
||||
for {
|
||||
// read
|
||||
n, err := r.Read(buf)
|
||||
if err != nil && err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// on-the-fly size calculation
|
||||
size += int64(n)
|
||||
|
||||
// write
|
||||
if _, err := w.Write(buf[:n]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if err = w.Flush(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = tmpf.Seek(0, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fh = tmpf
|
||||
}
|
||||
|
||||
default:
|
||||
@@ -138,3 +201,21 @@ func lookupAttachment(ctx context.Context, svc attachmentService, args attachmen
|
||||
|
||||
return nil, fmt.Errorf("empty attachment lookup params")
|
||||
}
|
||||
|
||||
func getReaderSize(r io.Reader) (size int64, err error) {
|
||||
buf := make([]byte, 1024)
|
||||
var n int
|
||||
for {
|
||||
// read a chunk
|
||||
n, err = r.Read(buf)
|
||||
if err != nil && err != io.EOF {
|
||||
return 0, err
|
||||
}
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
size += int64(n)
|
||||
}
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
68
tests/workflows/attachment_management_types_test.go
Normal file
68
tests/workflows/attachment_management_types_test.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package workflows
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/automation/types"
|
||||
cmpTypes "github.com/cortezaproject/corteza-server/compose/types"
|
||||
"github.com/cortezaproject/corteza-server/pkg/expr"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type (
|
||||
auxReader struct {
|
||||
read bool
|
||||
}
|
||||
)
|
||||
|
||||
func Test_attachment_management_types(t *testing.T) {
|
||||
var (
|
||||
ctx = bypassRBAC(context.Background())
|
||||
req = require.New(t)
|
||||
)
|
||||
|
||||
req.NoError(defStore.TruncateAttachments(ctx))
|
||||
|
||||
loadNewScenario(ctx, t)
|
||||
|
||||
var (
|
||||
aux = struct {
|
||||
AttachedString *cmpTypes.Attachment
|
||||
AttachedReader *cmpTypes.Attachment
|
||||
AttachedReadSeeker *cmpTypes.Attachment
|
||||
AttachedBytes *cmpTypes.Attachment
|
||||
}{}
|
||||
)
|
||||
|
||||
v := &expr.Vars{}
|
||||
v.AssignFieldValue("sourceString", expr.Must(expr.NewString("hello")))
|
||||
v.AssignFieldValue("sourceReader", expr.Must(expr.NewReader(&auxReader{})))
|
||||
v.AssignFieldValue("sourceReadSeeker", expr.Must(expr.NewReader(strings.NewReader("hello"))))
|
||||
v.AssignFieldValue("sourceBytes", expr.Must(expr.NewBytes([]byte{'h', 'e', 'l', 'l', 'o'})))
|
||||
|
||||
vars, _ := mustExecWorkflow(ctx, t, "attachments", types.WorkflowExecParams{
|
||||
Input: v,
|
||||
})
|
||||
req.NoError(vars.Decode(&aux))
|
||||
|
||||
req.NotNil(aux.AttachedString)
|
||||
req.NotNil(aux.AttachedReader)
|
||||
req.NotNil(aux.AttachedReadSeeker)
|
||||
req.NotNil(aux.AttachedBytes)
|
||||
}
|
||||
|
||||
func (ar *auxReader) Read(dst []byte) (int, error) {
|
||||
if ar.read {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
for i := range dst {
|
||||
dst[i] = byte('a')
|
||||
}
|
||||
|
||||
ar.read = true
|
||||
return len(dst), nil
|
||||
}
|
||||
9
tests/workflows/testdata/attachment_management_types/data_model.yaml
vendored
Normal file
9
tests/workflows/testdata/attachment_management_types/data_model.yaml
vendored
Normal file
@@ -0,0 +1,9 @@
|
||||
namespaces:
|
||||
ns1:
|
||||
name: Namespace#1
|
||||
|
||||
modules:
|
||||
mod1:
|
||||
name: Module#1
|
||||
fields:
|
||||
f1: { label: 'Field1', kind: 'File' }
|
||||
96
tests/workflows/testdata/attachment_management_types/workflow.yaml
vendored
Normal file
96
tests/workflows/testdata/attachment_management_types/workflow.yaml
vendored
Normal file
@@ -0,0 +1,96 @@
|
||||
workflows:
|
||||
attachments:
|
||||
enabled: true
|
||||
trace: true
|
||||
triggers:
|
||||
- enabled: true
|
||||
stepID: 11
|
||||
|
||||
steps:
|
||||
- stepID: 11
|
||||
kind: function
|
||||
ref: composeRecordsNew
|
||||
arguments:
|
||||
- { target: module, type: Handle, value: "mod1" }
|
||||
- { target: namespace, type: Handle, value: "ns1" }
|
||||
results:
|
||||
- { target: attachableString, expr: "record" }
|
||||
- stepID: 12
|
||||
kind: function
|
||||
ref: composeRecordsNew
|
||||
arguments:
|
||||
- { target: module, type: Handle, value: "mod1" }
|
||||
- { target: namespace, type: Handle, value: "ns1" }
|
||||
results:
|
||||
- { target: attachableReader, expr: "record" }
|
||||
- stepID: 13
|
||||
kind: function
|
||||
ref: composeRecordsNew
|
||||
arguments:
|
||||
- { target: module, type: Handle, value: "mod1" }
|
||||
- { target: namespace, type: Handle, value: "ns1" }
|
||||
results:
|
||||
- { target: attachableReadSeeker, expr: "record" }
|
||||
- stepID: 14
|
||||
kind: function
|
||||
ref: composeRecordsNew
|
||||
arguments:
|
||||
- { target: module, type: Handle, value: "mod1" }
|
||||
- { target: namespace, type: Handle, value: "ns1" }
|
||||
results:
|
||||
- { target: attachableBytes, expr: "record" }
|
||||
|
||||
|
||||
|
||||
|
||||
# Store attachment from given string
|
||||
- stepID: 20
|
||||
kind: function
|
||||
ref: attachmentCreate
|
||||
arguments:
|
||||
- { target: content, type: String, expr: "sourceString" }
|
||||
- { target: name, type: String, value: "att.txt" }
|
||||
- { target: resource, type: ComposeRecord, expr: "attachableString" }
|
||||
- { target: fieldName, type: String, value: "f1" }
|
||||
results:
|
||||
- { target: attachedString, expr: "attachment" }
|
||||
- stepID: 21
|
||||
kind: function
|
||||
ref: attachmentCreate
|
||||
arguments:
|
||||
- { target: content, type: Reader, expr: "sourceReader" }
|
||||
- { target: name, type: String, value: "att.txt" }
|
||||
- { target: resource, type: ComposeRecord, expr: "attachableReader" }
|
||||
- { target: fieldName, type: String, value: "f1" }
|
||||
results:
|
||||
- { target: attachedReader, expr: "attachment" }
|
||||
- stepID: 22
|
||||
kind: function
|
||||
ref: attachmentCreate
|
||||
arguments:
|
||||
- { target: content, type: Reader, expr: "sourceReadSeeker" }
|
||||
- { target: name, type: String, value: "att.txt" }
|
||||
- { target: resource, type: ComposeRecord, expr: "attachableReadSeeker" }
|
||||
- { target: fieldName, type: String, value: "f1" }
|
||||
results:
|
||||
- { target: attachedReadSeeker, expr: "attachment" }
|
||||
- stepID: 23
|
||||
kind: function
|
||||
ref: attachmentCreate
|
||||
arguments:
|
||||
- { target: content, type: Bytes, expr: "sourceBytes" }
|
||||
- { target: name, type: String, value: "att.txt" }
|
||||
- { target: resource, type: ComposeRecord, expr: "attachableBytes" }
|
||||
- { target: fieldName, type: String, value: "f1" }
|
||||
results:
|
||||
- { target: attachedBytes, expr: "attachment" }
|
||||
|
||||
paths:
|
||||
- { parentID: 11, childID: 12 }
|
||||
- { parentID: 12, childID: 13 }
|
||||
- { parentID: 13, childID: 14 }
|
||||
|
||||
- { parentID: 14, childID: 20 }
|
||||
- { parentID: 20, childID: 21 }
|
||||
- { parentID: 21, childID: 22 }
|
||||
- { parentID: 22, childID: 23 }
|
||||
Reference in New Issue
Block a user