3
0

Fix discovery feed routes for deleted resources

Also adds lock discovery service to prevent race condition in InitResourceActivityLog
This commit is contained in:
Vivek Patel
2022-03-14 16:45:31 +05:30
parent 248583b443
commit b6ff0f9210
8 changed files with 122 additions and 108 deletions

View File

@@ -24,7 +24,7 @@ endpoints:
title: Resource
path: "/system/users"
parameters:
path:
get:
- { name: userID, type: uint64, title: User ID }
- name: composeNamespaces
@@ -32,7 +32,7 @@ endpoints:
title: Resource
path: "/compose/namespaces"
parameters:
path:
get:
- { name: namespaceID, type: uint64, title: Namespace ID }
- name: composeModules
@@ -42,6 +42,7 @@ endpoints:
parameters:
path:
- { name: namespaceID, type: uint64, title: Namespace ID }
get:
- { name: moduleID, type: uint64, title: Module ID }
- name: composeRecords
@@ -52,6 +53,7 @@ endpoints:
path:
- { name: namespaceID, type: uint64, title: Namespace ID }
- { name: moduleID, type: uint64, title: Module ID }
get:
- { name: recordID, type: uint64, title: Record ID }
- path: "/feed"

View File

@@ -280,7 +280,7 @@ func (d composeResources) Records(ctx context.Context, namespaceID, moduleID uin
)
if recordID > 0 {
f.Query = fmt.Sprintf("ID=%d", recordID)
f.Query = fmt.Sprintf("recordID=%d", recordID)
}
if f.Paging, err = filter.NewPaging(limit, cur); err != nil {

View File

@@ -57,7 +57,7 @@ func (a resourceActivity) ResourceActivities(ctx context.Context, limit uint, cu
}
f.FromTimestamp = from
f.FromTimestamp = to
f.ToTimestamp = to
if f.Paging, err = filter.NewPaging(limit, cur); err != nil {
return err

View File

@@ -34,28 +34,23 @@ var (
type (
// Internal API interface
ResourcesSystemUsers struct {
// UserID PATH parameter
// Limit GET parameter
//
// Limit
Limit uint
// PageCursor GET parameter
//
// Page cursor
PageCursor string
// UserID GET parameter
//
// User ID
UserID uint64 `json:",string"`
// Limit GET parameter
//
// Limit
Limit uint
// PageCursor GET parameter
//
// Page cursor
PageCursor string
}
ResourcesComposeNamespaces struct {
// NamespaceID PATH parameter
//
// Namespace ID
NamespaceID uint64 `json:",string"`
// Limit GET parameter
//
// Limit
@@ -65,6 +60,11 @@ type (
//
// Page cursor
PageCursor string
// NamespaceID GET parameter
//
// Namespace ID
NamespaceID uint64 `json:",string"`
}
ResourcesComposeModules struct {
@@ -73,11 +73,6 @@ type (
// Namespace ID
NamespaceID uint64 `json:",string"`
// ModuleID PATH parameter
//
// Module ID
ModuleID uint64 `json:",string"`
// Limit GET parameter
//
// Limit
@@ -87,6 +82,11 @@ type (
//
// Page cursor
PageCursor string
// ModuleID GET parameter
//
// Module ID
ModuleID uint64 `json:",string"`
}
ResourcesComposeRecords struct {
@@ -100,11 +100,6 @@ type (
// Module ID
ModuleID uint64 `json:",string"`
// RecordID PATH parameter
//
// Record ID
RecordID uint64 `json:",string"`
// Limit GET parameter
//
// Limit
@@ -114,6 +109,11 @@ type (
//
// Page cursor
PageCursor string
// RecordID GET parameter
//
// Record ID
RecordID uint64 `json:",string"`
}
)
@@ -125,17 +125,12 @@ func NewResourcesSystemUsers() *ResourcesSystemUsers {
// Auditable returns all auditable/loggable parameters
func (r ResourcesSystemUsers) Auditable() map[string]interface{} {
return map[string]interface{}{
"userID": r.UserID,
"limit": r.Limit,
"pageCursor": r.PageCursor,
"userID": r.UserID,
}
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesSystemUsers) GetUserID() uint64 {
return r.UserID
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesSystemUsers) GetLimit() uint {
return r.Limit
@@ -146,6 +141,11 @@ func (r ResourcesSystemUsers) GetPageCursor() string {
return r.PageCursor
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesSystemUsers) GetUserID() uint64 {
return r.UserID
}
// Fill processes request and fills internal variables
func (r *ResourcesSystemUsers) Fill(req *http.Request) (err error) {
@@ -165,18 +165,12 @@ func (r *ResourcesSystemUsers) Fill(req *http.Request) (err error) {
return err
}
}
}
{
var val string
// path params
val = chi.URLParam(req, "userID")
r.UserID, err = payload.ParseUint64(val), nil
if err != nil {
return err
if val, ok := tmp["userID"]; ok && len(val) > 0 {
r.UserID, err = payload.ParseUint64(val[0]), nil
if err != nil {
return err
}
}
}
return err
@@ -190,17 +184,12 @@ func NewResourcesComposeNamespaces() *ResourcesComposeNamespaces {
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeNamespaces) Auditable() map[string]interface{} {
return map[string]interface{}{
"namespaceID": r.NamespaceID,
"limit": r.Limit,
"pageCursor": r.PageCursor,
"namespaceID": r.NamespaceID,
}
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeNamespaces) GetNamespaceID() uint64 {
return r.NamespaceID
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeNamespaces) GetLimit() uint {
return r.Limit
@@ -211,6 +200,11 @@ func (r ResourcesComposeNamespaces) GetPageCursor() string {
return r.PageCursor
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeNamespaces) GetNamespaceID() uint64 {
return r.NamespaceID
}
// Fill processes request and fills internal variables
func (r *ResourcesComposeNamespaces) Fill(req *http.Request) (err error) {
@@ -230,18 +224,12 @@ func (r *ResourcesComposeNamespaces) Fill(req *http.Request) (err error) {
return err
}
}
}
{
var val string
// path params
val = chi.URLParam(req, "namespaceID")
r.NamespaceID, err = payload.ParseUint64(val), nil
if err != nil {
return err
if val, ok := tmp["namespaceID"]; ok && len(val) > 0 {
r.NamespaceID, err = payload.ParseUint64(val[0]), nil
if err != nil {
return err
}
}
}
return err
@@ -256,9 +244,9 @@ func NewResourcesComposeModules() *ResourcesComposeModules {
func (r ResourcesComposeModules) Auditable() map[string]interface{} {
return map[string]interface{}{
"namespaceID": r.NamespaceID,
"moduleID": r.ModuleID,
"limit": r.Limit,
"pageCursor": r.PageCursor,
"moduleID": r.ModuleID,
}
}
@@ -267,11 +255,6 @@ func (r ResourcesComposeModules) GetNamespaceID() uint64 {
return r.NamespaceID
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeModules) GetModuleID() uint64 {
return r.ModuleID
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeModules) GetLimit() uint {
return r.Limit
@@ -282,6 +265,11 @@ func (r ResourcesComposeModules) GetPageCursor() string {
return r.PageCursor
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeModules) GetModuleID() uint64 {
return r.ModuleID
}
// Fill processes request and fills internal variables
func (r *ResourcesComposeModules) Fill(req *http.Request) (err error) {
@@ -301,6 +289,12 @@ func (r *ResourcesComposeModules) Fill(req *http.Request) (err error) {
return err
}
}
if val, ok := tmp["moduleID"]; ok && len(val) > 0 {
r.ModuleID, err = payload.ParseUint64(val[0]), nil
if err != nil {
return err
}
}
}
{
@@ -313,12 +307,6 @@ func (r *ResourcesComposeModules) Fill(req *http.Request) (err error) {
return err
}
val = chi.URLParam(req, "moduleID")
r.ModuleID, err = payload.ParseUint64(val), nil
if err != nil {
return err
}
}
return err
@@ -334,9 +322,9 @@ func (r ResourcesComposeRecords) Auditable() map[string]interface{} {
return map[string]interface{}{
"namespaceID": r.NamespaceID,
"moduleID": r.ModuleID,
"recordID": r.RecordID,
"limit": r.Limit,
"pageCursor": r.PageCursor,
"recordID": r.RecordID,
}
}
@@ -350,11 +338,6 @@ func (r ResourcesComposeRecords) GetModuleID() uint64 {
return r.ModuleID
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeRecords) GetRecordID() uint64 {
return r.RecordID
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeRecords) GetLimit() uint {
return r.Limit
@@ -365,6 +348,11 @@ func (r ResourcesComposeRecords) GetPageCursor() string {
return r.PageCursor
}
// Auditable returns all auditable/loggable parameters
func (r ResourcesComposeRecords) GetRecordID() uint64 {
return r.RecordID
}
// Fill processes request and fills internal variables
func (r *ResourcesComposeRecords) Fill(req *http.Request) (err error) {
@@ -384,6 +372,12 @@ func (r *ResourcesComposeRecords) Fill(req *http.Request) (err error) {
return err
}
}
if val, ok := tmp["recordID"]; ok && len(val) > 0 {
r.RecordID, err = payload.ParseUint64(val[0]), nil
if err != nil {
return err
}
}
}
{
@@ -402,12 +396,6 @@ func (r *ResourcesComposeRecords) Fill(req *http.Request) (err error) {
return err
}
val = chi.URLParam(req, "recordID")
r.RecordID, err = payload.ParseUint64(val), nil
if err != nil {
return err
}
}
return err

View File

@@ -7,6 +7,7 @@ import (
"github.com/cortezaproject/corteza-server/pkg/eventbus"
"github.com/cortezaproject/corteza-server/pkg/id"
"github.com/cortezaproject/corteza-server/pkg/options"
"sync"
"time"
"go.uber.org/zap"
@@ -17,6 +18,7 @@ type (
// logger for repository errors
logger *zap.Logger
opt options.DiscoveryOpt
mux sync.RWMutex
// where the activity log records are kept
store resourceActivityLogStore
@@ -46,7 +48,7 @@ func Service(logger *zap.Logger, opt options.DiscoveryOpt, s resourceActivityLog
return
}
func (svc service) log(a *types.ResourceActivity) {
func (svc *service) log(a *types.ResourceActivity) {
zlf := []zap.Field{
zap.Uint8("recordID", uint8(a.ResourceID)),
zap.String("ResourceType", a.ResourceType),
@@ -68,13 +70,16 @@ func (svc service) log(a *types.ResourceActivity) {
Debug(fmt.Sprintf("%s of %s", a.ResourceAction, a.ResourceType))
}
func (svc service) InitResourceActivityLog(ctx context.Context, resourceType []string) (err error) {
func (svc *service) InitResourceActivityLog(ctx context.Context, resourceType []string) (err error) {
eventType := []string{
string(types.AfterCreate),
string(types.AfterUpdate),
string(types.AfterDelete),
}
svc.mux.RLock()
defer svc.mux.RUnlock()
svc.eventbus.Register(
func(_ context.Context, ev eventbus.Event) error {
var a *types.ResourceActivity
@@ -109,7 +114,7 @@ func (svc service) InitResourceActivityLog(ctx context.Context, resourceType []s
return
}
func (svc service) Record(ctx context.Context, a *types.ResourceActivity) {
func (svc *service) Record(ctx context.Context, a *types.ResourceActivity) {
if a == nil {
// nothing to record
return

View File

@@ -64,18 +64,22 @@ type (
userDecoder interface {
ResDecoder
User() *systemTypes.User
OldUser() *systemTypes.User
}
nsDecoder interface {
ResDecoder
Namespace() *composeTypes.Namespace
OldNamespace() *composeTypes.Namespace
}
mDecoder interface {
ResDecoder
Module() *composeTypes.Module
OldModule() *composeTypes.Module
}
recDecoder interface {
ResDecoder
Record() *composeTypes.Record
OldRecord() *composeTypes.Record
}
)
@@ -129,19 +133,33 @@ func CastToResourceActivity(dec ResDecoder) (a *ResourceActivity, err error) {
switch a.ResourceType {
case "system:user": // @todo system/service/service.go#134
if v, ok := dec.(userDecoder); ok {
if v.User() != nil {
setResourceID(v.User().ID)
user := v.User()
// fallback to OldUser for afterDelete event
if user == nil {
user = v.OldUser()
}
if user != nil {
setResourceID(user.ID)
}
}
case (composeTypes.Namespace{}).LabelResourceKind():
if v, ok := dec.(nsDecoder); ok {
if v.Namespace() != nil {
setResourceID(v.Namespace().ID)
ns := v.Namespace()
// fallback to OldNamespace for afterDelete event
if ns == nil {
ns = v.OldNamespace()
}
if ns != nil {
setResourceID(ns.ID)
}
}
case (composeTypes.Module{}).LabelResourceKind():
if v, ok := dec.(mDecoder); ok {
mod := v.Module()
// fallback to OldModule for afterDelete event
if mod == nil {
mod = v.OldModule()
}
if mod != nil {
setResourceID(mod.ID)
err = setMeta(mod.NamespaceID, 0)
@@ -153,6 +171,10 @@ func CastToResourceActivity(dec ResDecoder) (a *ResourceActivity, err error) {
case (composeTypes.Record{}).LabelResourceKind():
if v, ok := dec.(recDecoder); ok {
rec := v.Record()
// fallback to OldRecord for afterDelete event
if rec == nil {
rec = v.OldRecord()
}
if rec != nil {
setResourceID(rec.ID)
err = setMeta(rec.NamespaceID, rec.ModuleID)

View File

@@ -33,7 +33,10 @@ func (s Store) SearchResourceActivityLogs(ctx context.Context, f types.ResourceA
)
return set, f, func() error {
q = s.resourceActivityLogsSelectBuilder()
q, err = s.convertResourceActivityLogFilter(f)
if err != nil {
return err
}
// Paging enabled
// {search: {enablePaging:true}}
@@ -78,7 +81,7 @@ func (s Store) SearchResourceActivityLogs(ctx context.Context, f types.ResourceA
ctx,
q, f.Sort, f.PageCursor,
f.Limit,
f.Check,
nil,
func(cur *filter.PagingCursor) squirrel.Sqlizer {
return builders.CursorCondition(cur, nil)
},
@@ -262,16 +265,6 @@ func (s Store) QueryResourceActivityLogs(
for _, res = range tmp {
// check fn set, call it and see if it passed the test
// if not, skip the item
if check != nil {
if chk, err := check(res); err != nil {
return nil, err
} else if !chk {
continue
}
}
set = append(set, res)
}
@@ -482,7 +475,7 @@ func (Store) resourceActivityLogColumns(aa ...string) []string {
}
}
// {true true false true true true}
// {true true false true true false}
// sortableResourceActivityLogColumns returns all ResourceActivityLog columns flagged as sortable
//

View File

@@ -20,9 +20,13 @@ lookups:
searches for corteza resource activity by ID
It returns corteza resource activity even if deleted
search:
enableFilterCheckFunction: false
rdbms:
alias: ral
table: resource_activity_log
customFilterConverter: true
mapFields:
Timestamp: { column: ts }
Meta: { column: meta }