3
0

Added as and internal corteza formatter for federation, tests

This commit is contained in:
Peter Grlica
2021-03-01 12:01:53 +01:00
parent 57ffca16a6
commit 9450a3ad66
14 changed files with 1088 additions and 54 deletions

View File

@@ -329,7 +329,7 @@ endpoints:
path: "/nodes/{nodeID}/modules/exposed"
authentication: []
apis:
- name: readExposedAll
- name: readExposedInternal
method: GET
title: List all exposed modules changes
path: "/"
@@ -357,6 +357,34 @@ endpoints:
- type: string
name: sort
title: Sort items
- name: readExposedSocial
method: GET
title: List all exposed modules changes in activity streams format
path: "/social"
parameters:
path:
- type: uint64
name: nodeID
required: true
title: Node ID
get:
- type: uint64
name: lastSync
required: false
title: Last sync timestamp
- type: string
name: query
required: false
title: Search query
- type: uint
name: limit
title: Limit
- type: string
name: pageCursor
title: Page cursor
- type: string
name: sort
title: Sort items
- title: Sync data
description: Sync data
@@ -392,7 +420,7 @@ endpoints:
- type: string
name: sort
title: Sort items
- name: readExposed
- name: readExposedInternal
method: GET
title: List all records per module
path: "/{moduleID}/records/"
@@ -427,6 +455,41 @@ endpoints:
name: sort
required: false
title: Sort items
- name: readExposedSocial
method: GET
title: List all records per module in activitystreams format
path: "/{moduleID}/records/social/"
parameters:
path:
- type: uint64
name: nodeID
required: true
title: Node ID
- type: uint64
name: moduleID
required: true
title: Module ID
get:
- type: uint64
name: lastSync
required: false
title: Last sync timestamp
- type: string
name: query
required: false
title: Search query
- type: uint
name: limit
required: false
title: Limit
- type: string
name: pageCursor
required: false
title: Page cursor
- type: string
name: sort
required: false
title: Sort items
- title: Permissions
entrypoint: permissions

View File

@@ -20,13 +20,15 @@ type (
// Internal API interface
SyncDataAPI interface {
ReadExposedAll(context.Context, *request.SyncDataReadExposedAll) (interface{}, error)
ReadExposed(context.Context, *request.SyncDataReadExposed) (interface{}, error)
ReadExposedInternal(context.Context, *request.SyncDataReadExposedInternal) (interface{}, error)
ReadExposedSocial(context.Context, *request.SyncDataReadExposedSocial) (interface{}, error)
}
// HTTP API interface
SyncData struct {
ReadExposedAll func(http.ResponseWriter, *http.Request)
ReadExposed func(http.ResponseWriter, *http.Request)
ReadExposedAll func(http.ResponseWriter, *http.Request)
ReadExposedInternal func(http.ResponseWriter, *http.Request)
ReadExposedSocial func(http.ResponseWriter, *http.Request)
}
)
@@ -48,15 +50,31 @@ func NewSyncData(h SyncDataAPI) *SyncData {
api.Send(w, r, value)
},
ReadExposed: func(w http.ResponseWriter, r *http.Request) {
ReadExposedInternal: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
params := request.NewSyncDataReadExposed()
params := request.NewSyncDataReadExposedInternal()
if err := params.Fill(r); err != nil {
api.Send(w, r, err)
return
}
value, err := h.ReadExposed(r.Context(), params)
value, err := h.ReadExposedInternal(r.Context(), params)
if err != nil {
api.Send(w, r, err)
return
}
api.Send(w, r, value)
},
ReadExposedSocial: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
params := request.NewSyncDataReadExposedSocial()
if err := params.Fill(r); err != nil {
api.Send(w, r, err)
return
}
value, err := h.ReadExposedSocial(r.Context(), params)
if err != nil {
api.Send(w, r, err)
return
@@ -71,6 +89,7 @@ func (h SyncData) MountRoutes(r chi.Router, middlewares ...func(http.Handler) ht
r.Group(func(r chi.Router) {
r.Use(middlewares...)
r.Get("/nodes/{nodeID}/modules/exposed/records/", h.ReadExposedAll)
r.Get("/nodes/{nodeID}/modules/{moduleID}/records/", h.ReadExposed)
r.Get("/nodes/{nodeID}/modules/{moduleID}/records/", h.ReadExposedInternal)
r.Get("/nodes/{nodeID}/modules/{moduleID}/records/social/", h.ReadExposedSocial)
})
}

View File

@@ -19,26 +19,44 @@ import (
type (
// Internal API interface
SyncStructureAPI interface {
ReadExposedAll(context.Context, *request.SyncStructureReadExposedAll) (interface{}, error)
ReadExposedInternal(context.Context, *request.SyncStructureReadExposedInternal) (interface{}, error)
ReadExposedSocial(context.Context, *request.SyncStructureReadExposedSocial) (interface{}, error)
}
// HTTP API interface
SyncStructure struct {
ReadExposedAll func(http.ResponseWriter, *http.Request)
ReadExposedInternal func(http.ResponseWriter, *http.Request)
ReadExposedSocial func(http.ResponseWriter, *http.Request)
}
)
func NewSyncStructure(h SyncStructureAPI) *SyncStructure {
return &SyncStructure{
ReadExposedAll: func(w http.ResponseWriter, r *http.Request) {
ReadExposedInternal: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
params := request.NewSyncStructureReadExposedAll()
params := request.NewSyncStructureReadExposedInternal()
if err := params.Fill(r); err != nil {
api.Send(w, r, err)
return
}
value, err := h.ReadExposedAll(r.Context(), params)
value, err := h.ReadExposedInternal(r.Context(), params)
if err != nil {
api.Send(w, r, err)
return
}
api.Send(w, r, value)
},
ReadExposedSocial: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
params := request.NewSyncStructureReadExposedSocial()
if err := params.Fill(r); err != nil {
api.Send(w, r, err)
return
}
value, err := h.ReadExposedSocial(r.Context(), params)
if err != nil {
api.Send(w, r, err)
return
@@ -52,6 +70,7 @@ func NewSyncStructure(h SyncStructureAPI) *SyncStructure {
func (h SyncStructure) MountRoutes(r chi.Router, middlewares ...func(http.Handler) http.Handler) {
r.Group(func(r chi.Router) {
r.Use(middlewares...)
r.Get("/nodes/{nodeID}/modules/exposed/", h.ReadExposedAll)
r.Get("/nodes/{nodeID}/modules/exposed/", h.ReadExposedInternal)
r.Get("/nodes/{nodeID}/modules/exposed/social", h.ReadExposedSocial)
})
}

View File

@@ -61,7 +61,44 @@ type (
Sort string
}
SyncDataReadExposed struct {
SyncDataReadExposedInternal struct {
// NodeID PATH parameter
//
// Node ID
NodeID uint64 `json:",string"`
// ModuleID PATH parameter
//
// Module ID
ModuleID uint64 `json:",string"`
// LastSync GET parameter
//
// Last sync timestamp
LastSync uint64 `json:",string"`
// Query GET parameter
//
// Search query
Query string
// Limit GET parameter
//
// Limit
Limit uint
// PageCursor GET parameter
//
// Page cursor
PageCursor string
// Sort GET parameter
//
// Sort items
Sort string
}
SyncDataReadExposedSocial struct {
// NodeID PATH parameter
//
// Node ID
@@ -210,13 +247,13 @@ func (r *SyncDataReadExposedAll) Fill(req *http.Request) (err error) {
return err
}
// NewSyncDataReadExposed request
func NewSyncDataReadExposed() *SyncDataReadExposed {
return &SyncDataReadExposed{}
// NewSyncDataReadExposedInternal request
func NewSyncDataReadExposedInternal() *SyncDataReadExposedInternal {
return &SyncDataReadExposedInternal{}
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposed) Auditable() map[string]interface{} {
func (r SyncDataReadExposedInternal) Auditable() map[string]interface{} {
return map[string]interface{}{
"nodeID": r.NodeID,
"moduleID": r.ModuleID,
@@ -229,42 +266,165 @@ func (r SyncDataReadExposed) Auditable() map[string]interface{} {
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposed) GetNodeID() uint64 {
func (r SyncDataReadExposedInternal) GetNodeID() uint64 {
return r.NodeID
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposed) GetModuleID() uint64 {
func (r SyncDataReadExposedInternal) GetModuleID() uint64 {
return r.ModuleID
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposed) GetLastSync() uint64 {
func (r SyncDataReadExposedInternal) GetLastSync() uint64 {
return r.LastSync
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposed) GetQuery() string {
func (r SyncDataReadExposedInternal) GetQuery() string {
return r.Query
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposed) GetLimit() uint {
func (r SyncDataReadExposedInternal) GetLimit() uint {
return r.Limit
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposed) GetPageCursor() string {
func (r SyncDataReadExposedInternal) GetPageCursor() string {
return r.PageCursor
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposed) GetSort() string {
func (r SyncDataReadExposedInternal) GetSort() string {
return r.Sort
}
// Fill processes request and fills internal variables
func (r *SyncDataReadExposed) Fill(req *http.Request) (err error) {
func (r *SyncDataReadExposedInternal) Fill(req *http.Request) (err error) {
if strings.ToLower(req.Header.Get("content-type")) == "application/json" {
err = json.NewDecoder(req.Body).Decode(r)
switch {
case err == io.EOF:
err = nil
case err != nil:
return fmt.Errorf("error parsing http request body: %w", err)
}
}
{
// GET params
tmp := req.URL.Query()
if val, ok := tmp["lastSync"]; ok && len(val) > 0 {
r.LastSync, err = payload.ParseUint64(val[0]), nil
if err != nil {
return err
}
}
if val, ok := tmp["query"]; ok && len(val) > 0 {
r.Query, err = val[0], nil
if err != nil {
return err
}
}
if val, ok := tmp["limit"]; ok && len(val) > 0 {
r.Limit, err = payload.ParseUint(val[0]), nil
if err != nil {
return err
}
}
if val, ok := tmp["pageCursor"]; ok && len(val) > 0 {
r.PageCursor, err = val[0], nil
if err != nil {
return err
}
}
if val, ok := tmp["sort"]; ok && len(val) > 0 {
r.Sort, err = val[0], nil
if err != nil {
return err
}
}
}
{
var val string
// path params
val = chi.URLParam(req, "nodeID")
r.NodeID, err = payload.ParseUint64(val), nil
if err != nil {
return err
}
val = chi.URLParam(req, "moduleID")
r.ModuleID, err = payload.ParseUint64(val), nil
if err != nil {
return err
}
}
return err
}
// NewSyncDataReadExposedSocial request
func NewSyncDataReadExposedSocial() *SyncDataReadExposedSocial {
return &SyncDataReadExposedSocial{}
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposedSocial) Auditable() map[string]interface{} {
return map[string]interface{}{
"nodeID": r.NodeID,
"moduleID": r.ModuleID,
"lastSync": r.LastSync,
"query": r.Query,
"limit": r.Limit,
"pageCursor": r.PageCursor,
"sort": r.Sort,
}
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposedSocial) GetNodeID() uint64 {
return r.NodeID
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposedSocial) GetModuleID() uint64 {
return r.ModuleID
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposedSocial) GetLastSync() uint64 {
return r.LastSync
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposedSocial) GetQuery() string {
return r.Query
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposedSocial) GetLimit() uint {
return r.Limit
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposedSocial) GetPageCursor() string {
return r.PageCursor
}
// Auditable returns all auditable/loggable parameters
func (r SyncDataReadExposedSocial) GetSort() string {
return r.Sort
}
// Fill processes request and fills internal variables
func (r *SyncDataReadExposedSocial) Fill(req *http.Request) (err error) {
if strings.ToLower(req.Header.Get("content-type")) == "application/json" {
err = json.NewDecoder(req.Body).Decode(r)

View File

@@ -29,7 +29,39 @@ var (
type (
// Internal API interface
SyncStructureReadExposedAll struct {
SyncStructureReadExposedInternal struct {
// NodeID PATH parameter
//
// Node ID
NodeID uint64 `json:",string"`
// LastSync GET parameter
//
// Last sync timestamp
LastSync uint64 `json:",string"`
// Query GET parameter
//
// Search query
Query string
// Limit GET parameter
//
// Limit
Limit uint
// PageCursor GET parameter
//
// Page cursor
PageCursor string
// Sort GET parameter
//
// Sort items
Sort string
}
SyncStructureReadExposedSocial struct {
// NodeID PATH parameter
//
// Node ID
@@ -62,13 +94,13 @@ type (
}
)
// NewSyncStructureReadExposedAll request
func NewSyncStructureReadExposedAll() *SyncStructureReadExposedAll {
return &SyncStructureReadExposedAll{}
// NewSyncStructureReadExposedInternal request
func NewSyncStructureReadExposedInternal() *SyncStructureReadExposedInternal {
return &SyncStructureReadExposedInternal{}
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedAll) Auditable() map[string]interface{} {
func (r SyncStructureReadExposedInternal) Auditable() map[string]interface{} {
return map[string]interface{}{
"nodeID": r.NodeID,
"lastSync": r.LastSync,
@@ -80,37 +112,148 @@ func (r SyncStructureReadExposedAll) Auditable() map[string]interface{} {
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedAll) GetNodeID() uint64 {
func (r SyncStructureReadExposedInternal) GetNodeID() uint64 {
return r.NodeID
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedAll) GetLastSync() uint64 {
func (r SyncStructureReadExposedInternal) GetLastSync() uint64 {
return r.LastSync
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedAll) GetQuery() string {
func (r SyncStructureReadExposedInternal) GetQuery() string {
return r.Query
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedAll) GetLimit() uint {
func (r SyncStructureReadExposedInternal) GetLimit() uint {
return r.Limit
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedAll) GetPageCursor() string {
func (r SyncStructureReadExposedInternal) GetPageCursor() string {
return r.PageCursor
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedAll) GetSort() string {
func (r SyncStructureReadExposedInternal) GetSort() string {
return r.Sort
}
// Fill processes request and fills internal variables
func (r *SyncStructureReadExposedAll) Fill(req *http.Request) (err error) {
func (r *SyncStructureReadExposedInternal) Fill(req *http.Request) (err error) {
if strings.ToLower(req.Header.Get("content-type")) == "application/json" {
err = json.NewDecoder(req.Body).Decode(r)
switch {
case err == io.EOF:
err = nil
case err != nil:
return fmt.Errorf("error parsing http request body: %w", err)
}
}
{
// GET params
tmp := req.URL.Query()
if val, ok := tmp["lastSync"]; ok && len(val) > 0 {
r.LastSync, err = payload.ParseUint64(val[0]), nil
if err != nil {
return err
}
}
if val, ok := tmp["query"]; ok && len(val) > 0 {
r.Query, err = val[0], nil
if err != nil {
return err
}
}
if val, ok := tmp["limit"]; ok && len(val) > 0 {
r.Limit, err = payload.ParseUint(val[0]), nil
if err != nil {
return err
}
}
if val, ok := tmp["pageCursor"]; ok && len(val) > 0 {
r.PageCursor, err = val[0], nil
if err != nil {
return err
}
}
if val, ok := tmp["sort"]; ok && len(val) > 0 {
r.Sort, err = val[0], nil
if err != nil {
return err
}
}
}
{
var val string
// path params
val = chi.URLParam(req, "nodeID")
r.NodeID, err = payload.ParseUint64(val), nil
if err != nil {
return err
}
}
return err
}
// NewSyncStructureReadExposedSocial request
func NewSyncStructureReadExposedSocial() *SyncStructureReadExposedSocial {
return &SyncStructureReadExposedSocial{}
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedSocial) Auditable() map[string]interface{} {
return map[string]interface{}{
"nodeID": r.NodeID,
"lastSync": r.LastSync,
"query": r.Query,
"limit": r.Limit,
"pageCursor": r.PageCursor,
"sort": r.Sort,
}
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedSocial) GetNodeID() uint64 {
return r.NodeID
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedSocial) GetLastSync() uint64 {
return r.LastSync
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedSocial) GetQuery() string {
return r.Query
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedSocial) GetLimit() uint {
return r.Limit
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedSocial) GetPageCursor() string {
return r.PageCursor
}
// Auditable returns all auditable/loggable parameters
func (r SyncStructureReadExposedSocial) GetSort() string {
return r.Sort
}
// Fill processes request and fills internal variables
func (r *SyncStructureReadExposedSocial) Fill(req *http.Request) (err error) {
if strings.ToLower(req.Header.Get("content-type")) == "application/json" {
err = json.NewDecoder(req.Body).Decode(r)

View File

@@ -3,6 +3,7 @@ package rest
import (
"context"
"fmt"
"net/http"
"strings"
"time"
@@ -11,6 +12,8 @@ import (
"github.com/cortezaproject/corteza-server/federation/rest/request"
"github.com/cortezaproject/corteza-server/federation/service"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/errors"
"github.com/cortezaproject/corteza-server/pkg/federation"
"github.com/cortezaproject/corteza-server/pkg/filter"
ss "github.com/cortezaproject/corteza-server/system/service"
st "github.com/cortezaproject/corteza-server/system/types"
@@ -107,14 +110,74 @@ func (ctrl SyncData) ReadExposedAll(ctx context.Context, r *request.SyncDataRead
return listResponse{&responseSet}, nil
}
func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExposed) (interface{}, error) {
// ReadExposedInternal fetches all the data - records (with paging)
// for an exposed module in an internal format
func (ctrl SyncData) ReadExposedInternal(ctx context.Context, r *request.SyncDataReadExposedInternal) (interface{}, error) {
return func(w http.ResponseWriter, req *http.Request) {
payload, err := ctrl.readExposed(ctx, r)
if err != nil {
errors.ServeHTTP(w, req, err, false)
return
}
fEncoder := federation.NewEncoder(w, service.DefaultOptions)
err = fEncoder.Encode(payload, federation.CortezaInternalData)
if err != nil {
errors.ServeHTTP(w, req, err, false)
return
}
return
}, nil
}
// ReadExposedInternal fetches all the data - records (with paging)
// for an exposed module in activity streams format
func (ctrl SyncData) ReadExposedSocial(ctx context.Context, r *request.SyncDataReadExposedSocial) (interface{}, error) {
return func(w http.ResponseWriter, req *http.Request) {
rr := request.SyncDataReadExposedInternal{
NodeID: r.NodeID,
ModuleID: r.ModuleID,
LastSync: r.LastSync,
Query: r.Query,
Limit: r.Limit,
PageCursor: r.PageCursor,
Sort: r.Sort,
}
payload, err := ctrl.readExposed(ctx, &rr)
if err != nil {
errors.ServeHTTP(w, req, err, false)
return
}
fEncoder := federation.NewEncoder(w, service.DefaultOptions)
err = fEncoder.Encode(payload, federation.ActivityStreamsData)
if err != nil {
errors.ServeHTTP(w, req, err, false)
return
}
return
}, nil
}
// readExposed fetches all the data - records (with paging) for an exposed module in an internal format
func (ctrl SyncData) readExposed(ctx context.Context, r *request.SyncDataReadExposedInternal) (interface{}, error) {
var (
err error
em *types.ExposedModule
users st.UserSet
node *types.Node
)
if _, err = service.DefaultNode.FindBySharedNodeID(ctx, r.NodeID); err != nil {
if node, err = service.DefaultNode.FindBySharedNodeID(ctx, r.NodeID); err != nil {
return nil, err
}
@@ -172,9 +235,11 @@ func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExp
return nil, err
}
return listRecordResponse{
Set: &list,
Filter: &f,
return federation.ListDataPayload{
NodeID: node.ID,
ModuleID: em.ID,
Filter: &f,
Set: &list,
}, nil
}

View File

@@ -2,27 +2,97 @@ package rest
import (
"context"
"net/http"
"github.com/cortezaproject/corteza-server/federation/rest/request"
"github.com/cortezaproject/corteza-server/federation/service"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/errors"
"github.com/cortezaproject/corteza-server/pkg/federation"
"github.com/cortezaproject/corteza-server/pkg/filter"
)
type (
SyncStructure struct{}
listModuleResponse struct {
Filter *types.ExposedModuleFilter `json:"filter"`
Set *types.ExposedModuleSet `json:"set"`
}
)
func (SyncStructure) New() *SyncStructure {
return &SyncStructure{}
}
func (ctrl SyncStructure) ReadExposedAll(ctx context.Context, r *request.SyncStructureReadExposedAll) (interface{}, error) {
// ReadExposedInternal gets the exposed module info and serves
// the internal Corteza format of the structure
func (ctrl SyncStructure) ReadExposedInternal(ctx context.Context, r *request.SyncStructureReadExposedInternal) (interface{}, error) {
return func(w http.ResponseWriter, req *http.Request) {
var (
err error
ef federation.EncodingFormat = federation.CortezaInternalStructure
)
w.Header().Add("Content-Type", "application/json")
fEncoder := federation.NewEncoder(w, service.DefaultOptions)
payload, err := ctrl.readExposedAll(ctx, r)
if err != nil {
errors.ServeHTTP(w, req, err, false)
return
}
err = fEncoder.Encode(*payload, ef)
if err != nil {
errors.ServeHTTP(w, req, err, false)
return
}
return
}, nil
}
// ReadExposedSocial gets the exposed module info and serves
// the activity streams format of the structure
func (ctrl SyncStructure) ReadExposedSocial(ctx context.Context, r *request.SyncStructureReadExposedSocial) (interface{}, error) {
return func(w http.ResponseWriter, req *http.Request) {
var (
err error
ef federation.EncodingFormat = federation.ActivityStreamsStructure
)
w.Header().Add("Content-Type", "application/json")
fEncoder := federation.NewEncoder(w, service.DefaultOptions)
rr := request.SyncStructureReadExposedInternal{
NodeID: r.NodeID,
LastSync: r.LastSync,
Query: r.Query,
Limit: r.Limit,
PageCursor: r.PageCursor,
Sort: r.Sort,
}
payload, err := ctrl.readExposedAll(ctx, &rr)
if err != nil {
errors.ServeHTTP(w, req, err, false)
return
}
err = fEncoder.Encode(*payload, ef)
if err != nil {
errors.ServeHTTP(w, req, err, false)
return
}
return
}, nil
}
// readExposedAll fetches the exposed modules for the specific node
func (ctrl SyncStructure) readExposedAll(ctx context.Context, r *request.SyncStructureReadExposedInternal) (*federation.ListStructurePayload, error) {
var (
err error
node *types.Node
@@ -51,8 +121,9 @@ func (ctrl SyncStructure) ReadExposedAll(ctx context.Context, r *request.SyncStr
return nil, err
}
return listModuleResponse{
Set: &list,
return &federation.ListStructurePayload{
NodeID: node.ID,
Filter: &f,
Set: &list,
}, nil
}

View File

@@ -2,9 +2,10 @@ package service
import (
"context"
"github.com/cortezaproject/corteza-server/pkg/logger"
"time"
"github.com/cortezaproject/corteza-server/pkg/logger"
cs "github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/auth"

14
pkg/federation/adapter.go Normal file
View File

@@ -0,0 +1,14 @@
package federation
import (
"io"
"github.com/cortezaproject/corteza-server/pkg/options"
)
type (
EncoderAdapter interface {
BuildStructure(io.Writer, options.FederationOpt, interface{}) (interface{}, error)
BuildData(io.Writer, options.FederationOpt, interface{}) (interface{}, error)
}
)

27
pkg/federation/corteza.go Normal file
View File

@@ -0,0 +1,27 @@
package federation
import (
"io"
"github.com/cortezaproject/corteza-server/pkg/options"
)
type (
EncoderAdapterCortezaInternal struct{}
)
// Build a default Corteza response
func (a EncoderAdapterCortezaInternal) BuildStructure(w io.Writer, o options.FederationOpt, p interface{}) (interface{}, error) {
return listModuleResponseCortezaInternal{
Filter: p.(ListStructurePayload).Filter,
Set: p.(ListStructurePayload).Set,
}, nil
}
// Build a default Corteza response
func (a EncoderAdapterCortezaInternal) BuildData(w io.Writer, o options.FederationOpt, p interface{}) (interface{}, error) {
return listRecordResponseCortezaInternal{
Filter: p.(ListDataPayload).Filter,
Set: p.(ListDataPayload).Set,
}, nil
}

63
pkg/federation/encoder.go Normal file
View File

@@ -0,0 +1,63 @@
package federation
import (
"encoding/json"
"io"
"github.com/cortezaproject/corteza-server/pkg/options"
)
const (
ActivityStreamsStructure EncodingFormat = 0
CortezaInternalStructure EncodingFormat = 1
ActivityStreamsData EncodingFormat = 2
CortezaInternalData EncodingFormat = 3
)
type (
EncodingFormat int
Encoder struct {
w io.Writer
o options.FederationOpt
}
)
func NewEncoder(w io.Writer, o options.FederationOpt) *Encoder {
return &Encoder{w: w, o: o}
}
// Encode the specific format per payload
func (e Encoder) Encode(payload interface{}, t EncodingFormat) error {
var (
ea EncoderAdapter
enc = json.NewEncoder(e.w)
resp interface{}
err error
)
switch t {
case ActivityStreamsStructure:
ea = &EncoderAdapterActivityStreams{}
resp, err = ea.BuildStructure(e.w, e.o, payload)
break
case CortezaInternalStructure:
ea = &EncoderAdapterCortezaInternal{}
resp, err = ea.BuildStructure(e.w, e.o, payload)
break
case ActivityStreamsData:
ea = &EncoderAdapterActivityStreams{}
resp, err = ea.BuildData(e.w, e.o, payload)
break
case CortezaInternalData:
ea = &EncoderAdapterCortezaInternal{}
resp, err = ea.BuildData(e.w, e.o, payload)
break
}
if err != nil {
return err
}
return enc.Encode(resp)
}

View File

@@ -0,0 +1,155 @@
package federation
import (
"strings"
"testing"
"time"
ct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/stretchr/testify/require"
)
func TestEncoder_encodeStructure(t *testing.T) {
var (
opts = options.FederationOpt{
Enabled: true,
Host: "example.ltd",
}
payload = ListStructurePayload{
Filter: &types.ExposedModuleFilter{
Paging: filter.Paging{
Limit: 42,
},
},
Set: &types.ExposedModuleSet{
&types.ExposedModule{
ID: 123,
ComposeModuleID: 456,
ComposeNamespaceID: 789,
NodeID: 1111,
Handle: "test_module",
Name: "Test Module",
CreatedBy: 2222,
CreatedAt: time.Date(2020, 10, 10, 10, 10, 10, 10, time.UTC),
Fields: types.ModuleFieldSet{
&types.ModuleField{Kind: "String", Name: "test_module_first_field", Label: "Test Module First Field", IsMulti: false},
&types.ModuleField{Kind: "String", Name: "test_module_second_field", Label: "Test Module Second Field", IsMulti: false},
},
},
},
}
tcc = []struct {
name string
format EncodingFormat
expect string
}{
{
"Encode structure in Activity Streams format",
ActivityStreamsStructure,
`{"@context":"https://www.w3.org/ns/activitystreams","itemsPerPage":42,"items":[{"@context":"https://www.w3.org/ns/activitystreams","type":"Module","summary":"Structure for module Test Module on node 1111","name":"Test Module","handle":"test_module","url":"https://example.ltd/nodes/1111/modules/123","node":"1111","federationModule":"123","composeModule":"456","composeNamespace":"789","createdAt":"2020-10-10T10:10:10.00000001Z","createdBy":"2222","attributedTo":[{"@context":"https://www.w3.org/ns/activitystreams","id":"https://example.ltd/system/users/2222","type":"User"}],"fields":[{"kind":"String","name":"test_module_first_field","label":"Test Module First Field","isMulti":false},{"kind":"String","name":"test_module_second_field","label":"Test Module Second Field","isMulti":false}]}]}`,
},
{
"Encode structure in internal Corteza format",
CortezaInternalStructure,
`{"filter":{"nodeID":"0","composeModuleID":"0","composeNamespaceID":"0","lastSync":0,"handle":"","name":"","query":"","limit":42},"set":[{"moduleID":"123","nodeID":"1111","composeModuleID":"456","composeNamespaceID":"789","handle":"test_module","name":"Test Module","fields":[{"kind":"String","name":"test_module_first_field","label":"Test Module First Field","isMulti":false},{"kind":"String","name":"test_module_second_field","label":"Test Module Second Field","isMulti":false}],"createdAt":"2020-10-10T10:10:10.00000001Z","createdBy":"2222"}]}`,
},
}
)
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
var (
req = require.New(t)
writer = strings.Builder{}
)
encoder := NewEncoder(&writer, opts)
err := encoder.Encode(payload, tc.format)
req.NoError(err)
req.Equal(tc.expect, strings.TrimSuffix(writer.String(), "\n"))
})
}
}
func TestEncoder_encodeData(t *testing.T) {
var (
opts = options.FederationOpt{
Enabled: true,
Host: "example.ltd",
}
payload = ListDataPayload{
Filter: &ct.RecordFilter{
ModuleID: 456,
NamespaceID: 789,
Paging: filter.Paging{
Limit: 42,
},
},
Set: &ct.RecordSet{
&ct.Record{
ID: 123,
ModuleID: 456,
NamespaceID: 789,
CreatedBy: 2222,
CreatedAt: time.Date(2020, 10, 10, 10, 10, 10, 10, time.UTC),
Values: ct.RecordValueSet{
&ct.RecordValue{RecordID: 1231111, Name: "First Record First Value", Value: "First Record First Value"},
&ct.RecordValue{RecordID: 1231112, Name: "First Record Second Value", Value: "First Record Second Value"},
},
},
&ct.Record{
ID: 124,
ModuleID: 456,
NamespaceID: 789,
CreatedBy: 2222,
CreatedAt: time.Date(2020, 10, 10, 10, 10, 10, 10, time.UTC),
Values: ct.RecordValueSet{
&ct.RecordValue{RecordID: 1231111, Name: "Second Record First Value", Value: "Second Record First Value"},
&ct.RecordValue{RecordID: 1231112, Name: "Second Record Second Value", Value: "Second Record Second Value"},
},
},
},
NodeID: 1111,
ModuleID: 3333,
}
tcc = []struct {
name string
format EncodingFormat
expect string
}{
{
"Encode structure in Activity Streams format",
ActivityStreamsData,
`{"@context":"https://www.w3.org/ns/activitystreams","itemsPerPage":42,"items":[{"@context":"https://www.w3.org/ns/activitystreams","type":"Record","summary":"Data for module 456 on node 1111","url":"https://example.ltd/nodes/1111/modules/3333/records/social/","node":"1111","federationModule":"456","composeModule":"456","composeNamespace":"789","createdAt":"2020-10-10T10:10:10.00000001Z","createdBy":"2222","attributedTo":[{"@context":"https://www.w3.org/ns/activitystreams","id":"https://example.ltd/system/users/2222","type":"User"}],"values":[{"name":"First Record First Value","value":"First Record First Value"},{"name":"First Record Second Value","value":"First Record Second Value"}]},{"@context":"https://www.w3.org/ns/activitystreams","type":"Record","summary":"Data for module 456 on node 1111","url":"https://example.ltd/nodes/1111/modules/3333/records/social/","node":"1111","federationModule":"456","composeModule":"456","composeNamespace":"789","createdAt":"2020-10-10T10:10:10.00000001Z","createdBy":"2222","attributedTo":[{"@context":"https://www.w3.org/ns/activitystreams","id":"https://example.ltd/system/users/2222","type":"User"}],"values":[{"name":"Second Record First Value","value":"Second Record First Value"},{"name":"Second Record Second Value","value":"Second Record Second Value"}]}]}`,
},
{
"Encode structure in internal Corteza format",
CortezaInternalData,
`{"filter":{"moduleID":"456","namespaceID":"789","query":"","deleted":0,"limit":42},"set":[{"recordID":"123","moduleID":"456","values":[{"name":"First Record First Value","value":"First Record First Value"},{"name":"First Record Second Value","value":"First Record Second Value"}],"namespaceID":"789","ownedBy":"0","createdAt":"2020-10-10T10:10:10.00000001Z","createdBy":"2222"},{"recordID":"124","moduleID":"456","values":[{"name":"Second Record First Value","value":"Second Record First Value"},{"name":"Second Record Second Value","value":"Second Record Second Value"}],"namespaceID":"789","ownedBy":"0","createdAt":"2020-10-10T10:10:10.00000001Z","createdBy":"2222"}]}`,
},
}
)
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
var (
req = require.New(t)
writer = strings.Builder{}
)
encoder := NewEncoder(&writer, opts)
err := encoder.Encode(payload, tc.format)
req.NoError(err)
req.Equal(tc.expect, strings.TrimSuffix(writer.String(), "\n"))
})
}
}

79
pkg/federation/formats.go Normal file
View File

@@ -0,0 +1,79 @@
package federation
import (
"time"
ct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/federation/types"
)
type (
listResponsePagingActivityStreams struct {
Type string `json:"type"`
Name string `json:"name"`
Href string `json:"href"`
}
listResponseItemActivityStreams struct {
Context string `json:"@context"`
Type string `json:"type"`
Summary string `json:"summary"`
Name string `json:"name,omitempty"`
Handle string `json:"handle,omitempty"`
Url string `json:"url"`
Node uint64 `json:"node,string"`
FederationModule uint64 `json:"federationModule,string"`
ComposeModule uint64 `json:"composeModule,string"`
ComposeNamespace uint64 `json:"composeNamespace,string"`
CreatedAt time.Time `json:"createdAt,omitempty"`
CreatedBy uint64 `json:"createdBy,string" `
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
UpdatedBy uint64 `json:"updatedBy,string,omitempty" `
DeletedAt *time.Time `json:"deletedAt,omitempty"`
DeletedBy uint64 `json:"deletedBy,string,omitempty" `
Attribution []listResponseItemAttribution `json:"attributedTo"`
Fields types.ModuleFieldSet `json:"fields,omitempty"`
Values ct.RecordValueSet `json:"values,omitempty"`
}
listResponseItemAttribution struct {
Context string `json:"@context"`
Id string `json:"id"`
Type string `json:"type"`
}
listModuleResponseActivityStreams struct {
Context string `json:"@context"`
ItemsPerPage uint `json:"itemsPerPage"`
Next *listResponsePagingActivityStreams `json:"next,omitempty"`
Prev *listResponsePagingActivityStreams `json:"prev,omitempty"`
Items interface{} `json:"items"`
}
listModuleResponseCortezaInternal struct {
Filter *types.ExposedModuleFilter `json:"filter"`
Set *types.ExposedModuleSet `json:"set"`
}
listRecordResponseCortezaInternal struct {
Filter *ct.RecordFilter `json:"filter"`
Set *ct.RecordSet `json:"set"`
}
ListStructurePayload struct {
NodeID uint64
Filter *types.ExposedModuleFilter `json:"filter"`
Set *types.ExposedModuleSet `json:"set"`
}
ListDataPayload struct {
NodeID uint64
ModuleID uint64
Filter *ct.RecordFilter `json:"filter"`
Set *ct.RecordSet `json:"set"`
}
)

155
pkg/federation/social.go Normal file
View File

@@ -0,0 +1,155 @@
package federation
import (
"fmt"
"io"
"github.com/cortezaproject/corteza-server/pkg/options"
)
type (
EncoderAdapterActivityStreams struct{}
)
// Build an activity streams format from default internal Corteza
// payload, including the author, activitystreams metadata and paging
// custom metadata
func (a EncoderAdapterActivityStreams) BuildStructure(w io.Writer, o options.FederationOpt, p interface{}) (interface{}, error) {
var (
next, prev *listResponsePagingActivityStreams
items []listResponseItemActivityStreams
)
payload := p.(ListStructurePayload)
if payload.Filter.Paging.NextPage != nil {
next = &listResponsePagingActivityStreams{
Type: "Link",
Name: "Next page",
Href: fmt.Sprintf("https://%s/nodes/%d/modules/exposed/?pageCursor=%s", o.Host, payload.NodeID, payload.Filter.Paging.NextPage.Encode()),
}
}
if payload.Filter.Paging.PrevPage != nil {
prev = &listResponsePagingActivityStreams{
Type: "Link",
Name: "Previous page",
Href: fmt.Sprintf("https://%s/nodes/%d/modules/exposed/?pageCursor=%s", o.Host, payload.NodeID, payload.Filter.Paging.PrevPage.Encode()),
}
}
// loop through items and format them
for _, v := range *payload.Set {
item := listResponseItemActivityStreams{
Context: "https://www.w3.org/ns/activitystreams",
Type: "Module",
Summary: fmt.Sprintf("Structure for module %s on node %d", v.Name, v.NodeID),
Url: fmt.Sprintf("https://%s/nodes/%d/modules/%d", o.Host, v.NodeID, v.ID),
Name: v.Name,
Handle: v.Handle,
Node: v.NodeID,
FederationModule: v.ID,
ComposeModule: v.ComposeModuleID,
ComposeNamespace: v.ComposeNamespaceID,
Attribution: []listResponseItemAttribution{
{
Context: "https://www.w3.org/ns/activitystreams",
Type: "User",
Id: fmt.Sprintf("https://%s/system/users/%d", o.Host, v.CreatedBy),
},
},
CreatedAt: v.CreatedAt,
CreatedBy: v.CreatedBy,
UpdatedAt: v.UpdatedAt,
UpdatedBy: v.UpdatedBy,
DeletedAt: v.DeletedAt,
DeletedBy: v.DeletedBy,
Fields: v.Fields,
}
items = append(items, item)
}
return listModuleResponseActivityStreams{
Context: "https://www.w3.org/ns/activitystreams",
ItemsPerPage: payload.Filter.Limit,
Items: items,
Next: next,
Prev: prev,
}, nil
}
// Build an activity streams format from default internal Corteza
// payload, including the author, activitystreams metadata and paging
// custom metadata
func (a EncoderAdapterActivityStreams) BuildData(w io.Writer, o options.FederationOpt, p interface{}) (interface{}, error) {
var (
next, prev *listResponsePagingActivityStreams
items []listResponseItemActivityStreams
)
payload := p.(ListDataPayload)
if payload.Filter.Paging.NextPage != nil {
next = &listResponsePagingActivityStreams{
Type: "Link",
Name: "Next page",
Href: fmt.Sprintf("https://%s/nodes/%d/modules/%d/records/social/?pageCursor=%s", o.Host, payload.NodeID, payload.ModuleID, payload.Filter.Paging.NextPage.Encode()),
}
}
if payload.Filter.Paging.PrevPage != nil {
prev = &listResponsePagingActivityStreams{
Type: "Link",
Name: "Previous page",
Href: fmt.Sprintf("https://%s/nodes/%d/modules/%d/records/social/?pageCursor=%s", o.Host, payload.NodeID, payload.ModuleID, payload.Filter.Paging.PrevPage.Encode()),
}
}
// loop through items and format them
for _, v := range *payload.Set {
item := listResponseItemActivityStreams{
Context: "https://www.w3.org/ns/activitystreams",
Type: "Record",
Summary: fmt.Sprintf("Data for module %d on node %d", v.ModuleID, payload.NodeID),
Url: fmt.Sprintf("https://%s/nodes/%d/modules/%d/records/social/", o.Host, payload.NodeID, payload.ModuleID),
Node: payload.NodeID,
FederationModule: v.ModuleID,
ComposeModule: v.ModuleID,
ComposeNamespace: v.NamespaceID,
Attribution: []listResponseItemAttribution{
{
Context: "https://www.w3.org/ns/activitystreams",
Type: "User",
Id: fmt.Sprintf("https://%s/system/users/%d", o.Host, v.CreatedBy),
},
},
CreatedAt: v.CreatedAt,
CreatedBy: v.CreatedBy,
UpdatedAt: v.UpdatedAt,
UpdatedBy: v.UpdatedBy,
DeletedAt: v.DeletedAt,
DeletedBy: v.DeletedBy,
Values: v.Values,
}
items = append(items, item)
}
return listModuleResponseActivityStreams{
Context: "https://www.w3.org/ns/activitystreams",
ItemsPerPage: payload.Filter.Limit,
Items: items,
Next: next,
Prev: prev,
}, nil
}