3
0

Ported messaging provision to envoy

This commit is contained in:
Denis Arh
2020-11-30 07:51:57 +01:00
parent 83ad91416d
commit d89450ae39
15 changed files with 338 additions and 444 deletions

View File

@@ -1,42 +0,0 @@
package commands
import (
"io"
"os"
"github.com/spf13/cobra"
"github.com/cortezaproject/corteza-server/messaging/importer"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/cli"
)
func Importer() *cobra.Command {
cmd := &cobra.Command{
Use: "import",
Short: "Import",
Run: func(cmd *cobra.Command, args []string) {
var (
ctx = auth.SetSuperUserContext(cli.Context())
ff []io.Reader
err error
)
ctx = auth.SetSuperUserContext(ctx)
if len(args) > 0 {
ff = make([]io.Reader, len(args))
for a, arg := range args {
ff[a], err = os.Open(arg)
cli.HandleError(err)
}
cli.HandleError(importer.Import(ctx, ff...))
} else {
cli.HandleError(importer.Import(ctx, os.Stdin))
}
},
}
return cmd
}

View File

@@ -1,136 +0,0 @@
package importer
import (
"context"
"errors"
"fmt"
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/deinterfacer"
"github.com/cortezaproject/corteza-server/pkg/importer"
)
type (
Channel struct {
set types.ChannelSet
dirty map[uint64]bool
permissions importer.PermissionImporter
}
channelKeeper interface {
Update(*types.Channel) (*types.Channel, error)
Create(*types.Channel) (*types.Channel, error)
}
)
func NewChannelImport(permissions importer.PermissionImporter, set types.ChannelSet) *Channel {
if set == nil {
set = types.ChannelSet{}
}
out := &Channel{
set: set,
dirty: make(map[uint64]bool),
permissions: permissions,
}
return out
}
func (cImp *Channel) CastSet(set interface{}) error {
var name string
return deinterfacer.Each(set, func(index int, _ string, def interface{}) error {
if index > -1 {
// Channels defined as collection
deinterfacer.KVsetString(&name, "name", def)
}
return cImp.Cast(name, def)
})
}
func (cImp *Channel) Cast(name string, def interface{}) (err error) {
var channel *types.Channel
// if !importer.IsValidHandle(handle) {
// return errors.New("invalid channel handle")
// }
//
// handle = importer.NormalizeHandle(handle)
if channel, err = cImp.Get(name); err != nil {
return err
} else if channel == nil {
channel = &types.Channel{
Name: name,
}
cImp.set = append(cImp.set, channel)
} else if channel.ID == 0 {
return fmt.Errorf("channel name %q already defined in this import session", channel.Name)
} else {
cImp.dirty[channel.ID] = true
}
if name, ok := def.(string); ok && name != "" {
channel.Name = name
return nil
}
return deinterfacer.Each(def, func(_ int, key string, val interface{}) (err error) {
switch key {
case "name":
// already handled
case "type":
channel.Type = types.ChannelType(deinterfacer.ToString(val))
if !channel.Type.IsValid() {
return fmt.Errorf("invalid channel type %q for channel %q", channel.Type, channel.Name)
}
case "topic":
channel.Topic = deinterfacer.ToString(val)
case "allow", "deny":
return cImp.permissions.CastSet(types.ChannelRBACResource.String()+channel.Name, key, val)
default:
return fmt.Errorf("unexpected key %q for channel %q", key, channel.Name)
}
return err
})
}
func (cImp *Channel) Get(name string) (*types.Channel, error) {
// name = importer.NormalizeHandle(name)
//
// if !importer.IsValidHandle(name) {
// return nil, errors.New("invalid channel name")
// }
return cImp.set.FindByName(name), nil
}
func (cImp *Channel) Store(ctx context.Context, k channelKeeper) error {
return cImp.set.Walk(func(channel *types.Channel) (err error) {
var handle = channel.Name
if channel.ID == 0 {
channel, err = k.Create(channel)
} else if cImp.dirty[channel.ID] {
channel, err = k.Update(channel)
}
for errors.Unwrap(err) != nil {
err = errors.Unwrap(err)
}
if err != nil {
return
}
cImp.permissions.UpdateResources(types.ChannelRBACResource.String(), handle, channel.ID)
cImp.permissions.UpdateRoles(channel.Name, channel.ID)
return
})
}

View File

@@ -1,28 +0,0 @@
package importer
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/cortezaproject/corteza-server/messaging/types"
)
func TestChannelImport_CastSet(t *testing.T) {
impFixTester(t, "channels", func(t *testing.T, ri *Channel) {
req := require.New(t)
req.NotNil(ri.set)
req.Len(ri.set, 3)
req.NotNil(ri.set.FindByName("General"))
req.Equal("Talk about anything", ri.set.FindByName("General").Topic)
req.Equal(types.ChannelTypePublic, ri.set.FindByName("General").Type)
req.NotNil(ri.set.FindByName("Random"))
req.Equal("", ri.set.FindByName("Random").Topic)
req.Equal(types.ChannelTypePublic, ri.set.FindByName("Random").Type)
req.NotNil(ri.set.FindByName("Secret"))
req.Equal(types.ChannelTypePrivate, ri.set.FindByName("Secret").Type)
})
}

View File

@@ -1,63 +0,0 @@
package importer
import (
"context"
"github.com/cortezaproject/corteza-server/messaging/service"
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/rbac"
"github.com/cortezaproject/corteza-server/pkg/settings"
sysService "github.com/cortezaproject/corteza-server/system/service"
sysTypes "github.com/cortezaproject/corteza-server/system/types"
"gopkg.in/yaml.v2"
"io"
)
// Import performs standard import procedure with default services
func Import(ctx context.Context, ff ...io.Reader) (err error) {
var (
cc types.ChannelSet
aux interface{}
)
cc, _, err = service.DefaultChannel.With(ctx).Find(types.ChannelFilter{})
if err != nil {
return err
}
var (
p = rbac.NewImporter(service.DefaultAccessControl.Whitelist())
imp = NewImporter(
p,
settings.NewImporter(),
NewChannelImport(p, cc),
)
// At the moment, we can not load roles from system service
// so we'll just use static set of known roles
//
// Roles are use for resolving access control
roles = sysTypes.RoleSet{
&sysTypes.Role{ID: rbac.EveryoneRoleID, Handle: "everyone"},
&sysTypes.Role{ID: rbac.AdminsRoleID, Handle: "admins"},
}
)
for _, f := range ff {
if err = yaml.NewDecoder(f).Decode(&aux); err != nil {
return
}
err = imp.Cast(aux)
if err != nil {
return
}
}
// Store all imported
return imp.Store(
ctx,
service.DefaultChannel.With(ctx),
service.DefaultAccessControl,
sysService.DefaultSettings,
roles,
)
}

View File

@@ -1,95 +0,0 @@
package importer
import (
"context"
"fmt"
"github.com/cortezaproject/corteza-server/pkg/settings"
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/deinterfacer"
"github.com/cortezaproject/corteza-server/pkg/importer"
"github.com/cortezaproject/corteza-server/pkg/rbac"
sysTypes "github.com/cortezaproject/corteza-server/system/types"
)
type (
Importer struct {
channels *Channel
permissions importer.PermissionImporter
settings importer.SettingImporter
}
channelFinder interface {
Find(context.Context) (types.ChannelSet, error)
}
)
func NewImporter(p importer.PermissionImporter, s importer.SettingImporter, ci *Channel) *Importer {
return &Importer{
channels: ci,
permissions: p,
settings: s,
}
}
func (imp *Importer) Cast(in interface{}) (err error) {
return deinterfacer.Each(in, func(index int, key string, val interface{}) (err error) {
switch key {
case "channels":
if imp.channels != nil {
return imp.channels.CastSet(val)
}
case "channel":
if imp.channels != nil {
return imp.channels.CastSet([]interface{}{val})
}
case "settings":
if imp.settings != nil {
return imp.settings.CastSet(val)
}
case "allow", "deny":
if imp.permissions != nil {
return imp.permissions.CastResourcesSet(key, val)
}
default:
err = fmt.Errorf("unexpected key %q", key)
}
return err
})
}
func (imp *Importer) Store(ctx context.Context, rk channelKeeper, pk rbac.ImportKeeper, sk settings.ImportKeeper, roles sysTypes.RoleSet) (err error) {
if imp.channels != nil {
err = imp.channels.Store(ctx, rk)
if err != nil {
return
}
}
if imp.permissions != nil {
// Make sure we properly replace channel handles with IDs
roles.Walk(func(r *sysTypes.Role) error {
imp.permissions.UpdateRoles(r.Handle, r.ID)
return nil
})
err = imp.permissions.Store(ctx, pk)
if err != nil {
return fmt.Errorf("could not provision permissions: %w", err)
}
}
if imp.settings != nil {
err = imp.settings.Store(ctx, sk)
if err != nil {
return fmt.Errorf("could not provision settings: %w", err)
}
}
return nil
}

View File

@@ -1,68 +0,0 @@
package importer
import (
"fmt"
"os"
"testing"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
"github.com/cortezaproject/corteza-server/pkg/rbac"
"github.com/cortezaproject/corteza-server/pkg/settings"
"github.com/cortezaproject/corteza-server/system/service"
)
var (
pi *rbac.Importer
si *settings.Importer
imp *Importer
)
func TestMain(m *testing.M) {
resetMocks()
os.Exit(m.Run())
}
func resetMocks() {
// whitelist = nil, anything can be added
pi = rbac.NewImporter(service.AccessControl(nil).Whitelist())
si = settings.NewImporter()
imp = NewImporter(
pi,
si,
NewChannelImport(pi, nil),
)
}
func impFixTester(t *testing.T, name string, tester interface{}) {
t.Run(name, func(t *testing.T) {
// We're not calling reset mocks BEFORE calling tester()
// because we want to have an option to set it up as we want
defer resetMocks()
var aux interface{}
req := require.New(t)
f, err := os.Open(fmt.Sprintf("testdata/%s.yaml", name))
req.NoError(err)
req.NoError(yaml.NewDecoder(f).Decode(&aux))
req.NotNil(aux)
if reqError, ok := tester.(error); ok {
req.EqualError(imp.Cast(aux), reqError.Error())
return
} else {
req.NoError(imp.Cast(aux))
}
switch tester := tester.(type) {
case func(*testing.T, *Channel):
tester(t, imp.channels)
case func(*testing.T, *Importer):
tester(t, imp)
default:
panic("unsupported tester function signature")
}
})
}

View File

@@ -0,0 +1,28 @@
package resource
import (
"github.com/cortezaproject/corteza-server/messaging/types"
)
type (
MessagingChannel struct {
*base
Res *types.Channel
}
)
func NewMessagingChannel(res *types.Channel) *MessagingChannel {
r := &MessagingChannel{
base: &base{},
}
r.SetResourceType(MESSAGING_CHANNEL_RESOURCE_TYPE)
r.Res = res
r.AddIdentifier(identifiers(res.ID)...)
return r
}
func (r *MessagingChannel) SysID() uint64 {
return r.Res.ID
}

View File

@@ -2,6 +2,7 @@ package resource
import (
ct "github.com/cortezaproject/corteza-server/compose/types"
mt "github.com/cortezaproject/corteza-server/messaging/types"
st "github.com/cortezaproject/corteza-server/system/types"
)
@@ -44,6 +45,7 @@ var (
SETTINGS_RESOURCE_TYPE = "system:setting:"
USER_RESOURCE_TYPE = st.UserRBACResource.String()
DATA_SOURCE_RESOURCE_TYPE = "data:raw:"
MESSAGING_CHANNEL_RESOURCE_TYPE = mt.ChannelRBACResource.String()
)
func MakeIdentifiers(ss ...string) Identifiers {

View File

@@ -119,6 +119,10 @@ func (se *storeEncoder) Prepare(ctx context.Context, ee ...*envoy.ResourceState)
case *resource.RbacRule:
err = f(NewRbacRuleState(res, se.cfg), e)
// Messaging resources
case *resource.MessagingChannel:
err = f(NewMessagingChannelState(res, se.cfg), e)
default:
err = ErrUnknownResource
}

View File

@@ -0,0 +1,170 @@
package store
import (
"context"
"github.com/cortezaproject/corteza-server/messaging/types"
"time"
"github.com/cortezaproject/corteza-server/pkg/envoy"
"github.com/cortezaproject/corteza-server/pkg/envoy/resource"
"github.com/cortezaproject/corteza-server/store"
)
type (
messagingChannelState struct {
cfg *EncoderConfig
res *resource.MessagingChannel
ch *types.Channel
}
)
func NewMessagingChannelState(res *resource.MessagingChannel, cfg *EncoderConfig) resourceState {
return &messagingChannelState{
cfg: cfg,
res: res,
}
}
func (n *messagingChannelState) Prepare(ctx context.Context, s store.Storer, rs *envoy.ResourceState) (err error) {
// Initial values
if n.res.Res.CreatedAt.IsZero() {
n.res.Res.CreatedAt = time.Now()
}
// Get the existing channel
n.ch, err = findMessagingChannelS(ctx, s, makeGenericFilter(n.res.Identifiers()))
if err != nil {
return err
}
if n.ch != nil {
n.res.Res.ID = n.ch.ID
}
return nil
}
// Encode encodes the given messagingChannel
func (n *messagingChannelState) Encode(ctx context.Context, s store.Storer, state *envoy.ResourceState) (err error) {
res := n.res.Res
exists := n.ch != nil && n.ch.ID > 0
// Determine the ID
if res.ID <= 0 && exists {
res.ID = n.ch.ID
}
if res.ID <= 0 {
res.ID = NextID()
}
// This is not possible, but let's do it anyway
if state.Conflicting {
return nil
}
// Create fresh messagingChannel
if !exists {
return store.CreateMessagingChannel(ctx, s, res)
}
// Update existing messagingChannel
switch n.cfg.OnExisting {
case Skip:
return nil
case MergeLeft:
res = mergeMessagingChannels(n.ch, res)
case MergeRight:
res = mergeMessagingChannels(res, n.ch)
}
err = store.UpdateMessagingChannel(ctx, s, res)
if err != nil {
return err
}
n.res.Res = res
return nil
}
// mergeMessagingChannels merges b into a, prioritising a
func mergeMessagingChannels(a, b *types.Channel) *types.Channel {
c := *a
if c.Name == "" {
c.Name = b.Name
}
return &c
}
// findMessagingChannelRS looks for the ch in the resources & the store
//
// Provided resources are prioritized.
func findMessagingChannelRS(ctx context.Context, s store.Storer, rr resource.InterfaceSet, ii resource.Identifiers) (ap *types.Channel, err error) {
ap = findMessagingChannelR(rr, ii)
if ap != nil {
return ap, nil
}
return findMessagingChannelS(ctx, s, makeGenericFilter(ii))
}
// findMessagingChannelS looks for the ch in the store
func findMessagingChannelS(ctx context.Context, s store.Storer, gf genericFilter) (ap *types.Channel, err error) {
if gf.id > 0 {
ap, err = store.LookupMessagingChannelByID(ctx, s, gf.id)
if err != nil && err != store.ErrNotFound {
return nil, err
}
if ap != nil {
return
}
}
q := gf.handle
if q == "" {
q = gf.name
}
if q != "" {
var aa types.ChannelSet
aa, _, err = store.SearchMessagingChannels(ctx, s, types.ChannelFilter{Query: q})
if err != nil && err != store.ErrNotFound {
return nil, err
}
if len(aa) > 0 {
ap = aa[0]
return
}
}
return nil, nil
}
// findMessagingChannelR looks for the ch in the resource set
func findMessagingChannelR(rr resource.InterfaceSet, ii resource.Identifiers) (ap *types.Channel) {
var chRes *resource.MessagingChannel
rr.Walk(func(r resource.Interface) error {
ar, ok := r.(*resource.MessagingChannel)
if !ok {
return nil
}
if ar.Identifiers().HasAny(ii) {
chRes = ar
}
return nil
})
// Found it
if chRes != nil {
return chRes.Res
}
return nil
}

View File

@@ -2,7 +2,6 @@ package yaml
import (
"context"
"github.com/cortezaproject/corteza-server/pkg/envoy"
"github.com/cortezaproject/corteza-server/pkg/envoy/resource"
"gopkg.in/yaml.v3"
@@ -12,6 +11,7 @@ type (
// Document defines the supported yaml structure
Document struct {
compose *compose
messaging *messaging
roles roleSet
users userSet
applications applicationSet
@@ -25,6 +25,10 @@ func (doc *Document) UnmarshalYAML(n *yaml.Node) (err error) {
return
}
if err = n.Decode(&doc.messaging); err != nil {
return
}
if doc.rbac, err = decodeRbac(n); err != nil {
return
}
@@ -57,6 +61,9 @@ func (doc *Document) Decode(ctx context.Context) ([]resource.Interface, error) {
if doc.compose != nil {
mm = append(mm, doc.compose)
}
if doc.messaging != nil {
mm = append(mm, doc.messaging)
}
if doc.roles != nil {
mm = append(mm, doc.roles)
}

View File

@@ -0,0 +1,29 @@
package yaml
import (
"reflect"
"github.com/cortezaproject/corteza-server/pkg/envoy/resource"
)
type (
messaging struct {
Channels messagingChannelSet `yaml:"channels"`
}
)
func (c messaging) MarshalEnvoy() ([]resource.Interface, error) {
nn := make([]resource.Interface, 0, 100)
rf := reflect.ValueOf(c)
for i := 0; i < rf.NumField(); i++ {
if mr, ok := rf.Field(i).Interface().(EnvoyMarshler); ok {
tmp, err := mr.MarshalEnvoy()
if err != nil {
return nil, err
}
nn = append(nn, tmp...)
}
}
return nn, nil
}

View File

@@ -0,0 +1,88 @@
package yaml
import (
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/envoy"
"github.com/cortezaproject/corteza-server/pkg/envoy/resource"
"gopkg.in/yaml.v3"
)
type (
messagingChannel struct {
// when messagingChannel is at least partially defined
res *types.Channel `yaml:",inline"`
// module's RBAC rules
rbac rbacRuleSet
}
messagingChannelSet []*messagingChannel
)
// UnmarshalYAML resolves set of messagingChannel definitions, either sequence or map
//
// When resolving map, key is used as handle
// Also supporting { handle: name } definitions
//
func (wset *messagingChannelSet) UnmarshalYAML(n *yaml.Node) error {
return eachSeq(n, func(v *yaml.Node) (err error) {
var (
wrap = &messagingChannel{}
)
if v == nil || !isKind(v, yaml.MappingNode) {
return nodeErr(n, "malformed messagingChannel definition")
}
wrap.res = &types.Channel{}
if err = v.Decode(&wrap.res); err != nil {
return
}
*wset = append(*wset, wrap)
return
})
}
func (wset messagingChannelSet) MarshalEnvoy() ([]resource.Interface, error) {
nn := make([]resource.Interface, 0, len(wset))
for _, res := range wset {
if tmp, err := res.MarshalEnvoy(); err != nil {
return nil, err
} else {
nn = append(nn, tmp...)
}
}
return nn, nil
}
func (wrap *messagingChannel) UnmarshalYAML(n *yaml.Node) (err error) {
if !isKind(n, yaml.MappingNode) {
return nodeErr(n, "messagingChannel definition must be a map")
}
if wrap.res == nil {
wrap.res = &types.Channel{}
}
if err = n.Decode(&wrap.res); err != nil {
return
}
if wrap.rbac, err = decodeRbac(n); err != nil {
return
}
return nil
}
func (wrap messagingChannel) MarshalEnvoy() ([]resource.Interface, error) {
rs := resource.NewMessagingChannel(wrap.res)
return envoy.CollectNodes(
rs,
wrap.rbac.bindResource(rs),
)
}

View File

@@ -2,16 +2,15 @@ package messaging
import (
"context"
"github.com/cortezaproject/corteza-server/messaging/importer"
"github.com/cortezaproject/corteza-server/messaging/types"
impAux "github.com/cortezaproject/corteza-server/pkg/importer"
"github.com/cortezaproject/corteza-server/provision/util"
"github.com/cortezaproject/corteza-server/store"
"go.uber.org/zap"
)
// Provision only where there are no channels
// provision only where there are no channels
func hasChannels(ctx context.Context, s store.Storer) (bool, error) {
if set, _, err := store.SearchMessagingChannels(ctx, s, types.ChannelFilter{}); err != nil {
if set, _, err := store.SearchMessagingChannels(ctx, s, types.ChannelFilter{IncludeDeleted: true}); err != nil {
return false, err
} else {
return len(set) > 0, nil
@@ -19,19 +18,18 @@ func hasChannels(ctx context.Context, s store.Storer) (bool, error) {
}
func Provision(ctx context.Context, log *zap.Logger, s store.Storer) error {
log.Info("provisioning messaging")
if channelsExist, err := hasChannels(ctx, s); err != nil {
return err
} else if !channelsExist {
log.Info("provisioning messaging")
readers, err := impAux.ReadStatic(Asset)
if err != nil {
// Provision from YAML files
// - access control
// - channels
if err = util.EncodeStatik(ctx, s, Asset, "/"); err != nil {
return err
}
return importer.Import(ctx, readers...)
} else {
log.Info("messaging already provisioned")
}
log.Info("messaging provisioned")
return nil
}