diff --git a/internal/payload/outgoing.go b/internal/payload/outgoing.go index 94d601cbf..22c5551a1 100644 --- a/internal/payload/outgoing.go +++ b/internal/payload/outgoing.go @@ -8,9 +8,9 @@ import ( func Message(msg *sam.Message) *outgoing.Message { return &outgoing.Message{ - Message: msg.Message, ID: Uint64toa(msg.ID), ChannelID: Uint64toa(msg.ChannelID), + Message: msg.Message, Type: string(msg.Type), ReplyTo: Uint64toa(msg.ReplyTo), @@ -19,6 +19,7 @@ func Message(msg *sam.Message) *outgoing.Message { CreatedAt: msg.CreatedAt, UpdatedAt: msg.UpdatedAt, + DeletedAt: msg.DeletedAt, } } @@ -39,6 +40,11 @@ func Channel(ch *sam.Channel) *outgoing.Channel { Topic: ch.Topic, Type: string(ch.Type), Members: Uint64stoa(ch.Members), + + CreatedAt: ch.CreatedAt, + UpdatedAt: ch.UpdatedAt, + ArchivedAt: ch.ArchivedAt, + DeletedAt: ch.DeletedAt, } } diff --git a/internal/payload/outgoing/channel.go b/internal/payload/outgoing/channel.go index 4a0c644af..c840fc632 100644 --- a/internal/payload/outgoing/channel.go +++ b/internal/payload/outgoing/channel.go @@ -2,6 +2,7 @@ package outgoing import ( "encoding/json" + "time" ) type ( @@ -37,6 +38,11 @@ type ( Type string `json:"type"` LastMessageID string `json:"lastMessageID"` Members []string `json:"members,omitempty"` + + CreatedAt time.Time `json:"createdAt"` + UpdatedAt *time.Time `json:"updatedAt,omitempty"` + ArchivedAt *time.Time `json:"archivedAt,omitempty"` + DeletedAt *time.Time `json:"deletedAt,omitempty"` } ChannelSet []*Channel @@ -50,10 +56,6 @@ func (p *ChannelPart) EncodeMessage() ([]byte, error) { return json.Marshal(Payload{ChannelPart: p}) } -func (p *ChannelDeleted) EncodeMessage() ([]byte, error) { - return json.Marshal(Payload{ChannelDeleted: p}) -} - func (p *Channel) EncodeMessage() ([]byte, error) { return json.Marshal(Payload{Channel: p}) } diff --git a/internal/payload/outgoing/message.go b/internal/payload/outgoing/message.go index 71c5dc6fd..99ed72a01 100644 --- a/internal/payload/outgoing/message.go +++ b/internal/payload/outgoing/message.go @@ -18,20 +18,10 @@ type ( CreatedAt time.Time `json:"createdAt"` UpdatedAt *time.Time `json:"updatedAt,omitempty"` + DeletedAt *time.Time `json:"deletedAt,omitempty"` } MessageSet []*Message - - MessageUpdate struct { - ID string `json:"ID"` - Message string `json:"message"` - - UpdatedAt time.Time `json:"updatedAt,omitempty"` - } - - MessageDelete struct { - ID string `json:"ID"` - } ) func (p *Message) EncodeMessage() ([]byte, error) { @@ -41,11 +31,3 @@ func (p *Message) EncodeMessage() ([]byte, error) { func (p *MessageSet) EncodeMessage() ([]byte, error) { return json.Marshal(Payload{MessageSet: p}) } - -func (p *MessageUpdate) EncodeMessage() ([]byte, error) { - return json.Marshal(Payload{MessageUpdate: p}) -} - -func (p *MessageDelete) EncodeMessage() ([]byte, error) { - return json.Marshal(Payload{MessageDelete: p}) -} diff --git a/internal/payload/outgoing/payload.go b/internal/payload/outgoing/payload.go index 8598bd2ab..7776c5d2c 100644 --- a/internal/payload/outgoing/payload.go +++ b/internal/payload/outgoing/payload.go @@ -7,10 +7,8 @@ type ( *Connected `json:"clientConnected,omitempty"` *Disconnected `json:"clientDisconnected,omitempty"` - *Message `json:"message,omitempty"` - *MessageDelete `json:"messageDeleted,omitempty"` - *MessageUpdate `json:"messageUpdated,omitempty"` - *MessageSet `json:"messages,omitempty"` + *Message `json:"message,omitempty"` + *MessageSet `json:"messages,omitempty"` *ChannelJoin `json:"channelJoin,omitempty"` *ChannelPart `json:"channelPart,omitempty"` diff --git a/sam/service/attachment.go b/sam/service/attachment.go index 79210bb36..4a1eb5e12 100644 --- a/sam/service/attachment.go +++ b/sam/service/attachment.go @@ -2,11 +2,9 @@ package service import ( "context" - "fmt" "io" "log" "net/http" - "net/url" "path" "strings" @@ -24,7 +22,9 @@ type ( attachment repository.AttachmentRepository message repository.MessageRepository - store store.Store + + store store.Store + evl EventService } AttachmentService interface { @@ -32,31 +32,29 @@ type ( FindByID(id uint64) (*types.Attachment, error) Create(channelId uint64, name string, size int64, fh io.ReadSeeker) (*types.Attachment, error) - LoadFromMessages(mm types.MessageSet) (err error) OpenOriginal(att *types.Attachment) (io.ReadSeeker, error) OpenPreview(att *types.Attachment) (io.ReadSeeker, error) } ) -const ( - attachmentURL = "/attachment/%d/%s" - attachmentPreviewURL = "/attachment/%d/%s/preview" -) - func Attachment(store store.Store) AttachmentService { return (&attachment{ store: store, + evl: DefaultEvent, }).With(context.Background()) } func (svc *attachment) With(ctx context.Context) AttachmentService { db := repository.DB(ctx) return &attachment{ - db: db, - ctx: ctx, + db: db, + ctx: ctx, + attachment: repository.Attachment(ctx, db), message: repository.Message(ctx, db), - store: svc.store, + + store: svc.store, + evl: svc.evl.With(ctx), } } @@ -72,33 +70,6 @@ func (svc *attachment) OpenPreview(att *types.Attachment) (io.ReadSeeker, error) return svc.store.Open(att.PreviewUrl) } -func (svc *attachment) LoadFromMessages(mm types.MessageSet) (err error) { - var ids []uint64 - mm.Walk(func(m *types.Message) error { - if m.Type == types.MessageTypeAttachment || m.Type == types.MessageTypeInlineImage { - ids = append(ids, m.ID) - } - return nil - }) - - if set, err := svc.attachment.FindAttachmentByMessageID(ids...); err != nil { - return err - } else { - return set.Walk(func(a *types.MessageAttachment) error { - if a.MessageID > 0 { - if m := mm.FindById(a.MessageID); m != nil { - m.Attachment = &a.Attachment - - m.Attachment.Url = svc.url(&a.Attachment) - m.Attachment.PreviewUrl = svc.previewUrl(&a.Attachment) - } - } - - return nil - }) - } -} - func (svc *attachment) Create(channelId uint64, name string, size int64, fh io.ReadSeeker) (att *types.Attachment, err error) { var currentUserID uint64 = repository.Identity(svc.ctx) @@ -164,20 +135,12 @@ func (svc *attachment) Create(channelId uint64, name string, size int64, fh io.R log.Printf("File %s (id: %d) attached to message (id: %d)", att.Name, att.ID, msg.ID) - return + msg.Attachment = att + msg.Attachment.GenerateURLs() + return svc.evl.Message(msg) }) } -// Generates URL to a location -func (svc *attachment) url(att *types.Attachment) string { - return fmt.Sprintf(attachmentURL, att.ID, url.PathEscape(att.Name)) -} - -// Generates URL to a location -func (svc *attachment) previewUrl(att *types.Attachment) string { - return fmt.Sprintf(attachmentPreviewURL, att.ID, url.PathEscape(att.Name)) -} - func (svc *attachment) extractMeta(att *types.Attachment, file io.ReadSeeker) (err error) { if _, err = file.Seek(0, 0); err != nil { return err diff --git a/sam/service/channel.go b/sam/service/channel.go index 37575e443..670069958 100644 --- a/sam/service/channel.go +++ b/sam/service/channel.go @@ -17,6 +17,8 @@ type ( db *factory.DB ctx context.Context + evl EventService + channel repository.ChannelRepository message repository.MessageRepository } @@ -42,14 +44,19 @@ type ( ) func Channel() ChannelService { - return (&channel{}).With(context.Background()) + return (&channel{ + evl: DefaultEvent, + }).With(context.Background()) } func (svc *channel) With(ctx context.Context) ChannelService { db := repository.DB(ctx) return &channel{ - db: db, - ctx: ctx, + db: db, + ctx: ctx, + + evl: svc.evl.With(ctx), + channel: repository.Channel(ctx, db), message: repository.Message(ctx, db), } @@ -185,16 +192,15 @@ func (svc *channel) Create(in *types.Channel) (out *types.Channel, err error) { "", "")) + _ = msg if err != nil { // Message creation failed return } - // @todo send channel creation to the event-loop - // @todo send msg to the event-loop - _ = msg + svc.evl.Message(msg) - return nil + return svc.evl.Channel(out) }) } @@ -261,15 +267,14 @@ func (svc *channel) Update(in *types.Channel) (out *types.Channel, err error) { return } - // @todo send channel creation to the event-loop - // Create the first message, doing this directly with repository to circumvent // message service constraints for _, msg := range msgs { if msg, err = svc.message.CreateMessage(msg); err != nil { - // @todo send new msg to the event-loop return err } + + svc.evl.Message(msg) } if err != nil { @@ -277,7 +282,7 @@ func (svc *channel) Update(in *types.Channel) (out *types.Channel, err error) { return } - return nil + return svc.evl.Channel(out) }) } @@ -297,9 +302,17 @@ func (svc *channel) Delete(id uint64) error { return errors.New("Channel already deleted") } - _, err = svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d deleted this channel", userID)) + msg, err := svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d deleted this channel", userID)) + if err != nil { + return + } + svc.evl.Message(msg) - return svc.channel.DeleteChannelByID(id) + if err = svc.channel.DeleteChannelByID(id); err != nil { + return + } + + return svc.evl.Channel(ch) }) } @@ -319,9 +332,19 @@ func (svc *channel) Recover(id uint64) error { return errors.New("Channel not deleted") } - _, err = svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d recovered this channel", userID)) + msg, err := svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d recovered this channel", userID)) + if err != nil { + return + } + svc.evl.Message(msg) + + err = svc.channel.UnarchiveChannelByID(id) + if err != nil { + return + } + + return svc.evl.Channel(ch) - return svc.channel.DeleteChannelByID(id) }) } @@ -341,9 +364,18 @@ func (svc *channel) Archive(id uint64) error { return errors.New("Channel already archived") } - _, err = svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d archived this channel", userID)) + msg, err := svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d archived this channel", userID)) + if err != nil { + return + } + svc.evl.Message(msg) - return svc.channel.ArchiveChannelByID(id) + err = svc.channel.ArchiveChannelByID(id) + if err != nil { + return + } + + return svc.evl.Channel(ch) }) } @@ -363,9 +395,18 @@ func (svc *channel) Unarchive(id uint64) error { return errors.New("Channel not archived") } - _, err = svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d unarchived this channel", userID)) + msg, err := svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d unarchived this channel", userID)) + if err != nil { + return + } + svc.evl.Message(msg) - return svc.channel.ArchiveChannelByID(id) + err = svc.channel.ArchiveChannelByID(id) + if err != nil { + return + } + + return svc.evl.Channel(ch) }) } @@ -374,6 +415,7 @@ func (svc *channel) makeSystemMessage(ch *types.Channel, format string, a ...int return &types.Message{ ChannelID: ch.ID, Message: fmt.Sprintf(format, a...), + Type: types.MessageTypeChannelEvent, } } diff --git a/sam/service/events.go b/sam/service/events.go new file mode 100644 index 000000000..c3486fdd9 --- /dev/null +++ b/sam/service/events.go @@ -0,0 +1,60 @@ +package service + +import ( + "context" + + "github.com/crusttech/crust/internal/payload" + "github.com/crusttech/crust/internal/payload/outgoing" + "github.com/crusttech/crust/sam/repository" + "github.com/crusttech/crust/sam/types" +) + +type ( + event struct { + ctx context.Context + + events repository.EventsRepository + } + + EventService interface { + With(ctx context.Context) EventService + Message(m *types.Message) error + Channel(m *types.Channel) error + } +) + +func Event() EventService { + return (&event{events: repository.Events()}).With(context.Background()) +} + +func (svc *event) With(ctx context.Context) EventService { + return &event{ + ctx: ctx, + events: svc.events, + } +} + +func (svc *event) Message(m *types.Message) error { + return svc.push(payload.Message(m), m.ChannelID) +} + +func (svc *event) Channel(m *types.Channel) error { + return svc.push(payload.Channel(m), m.ID) +} + +func (svc *event) push(m outgoing.MessageEncoder, sub uint64) error { + var enc, err = m.EncodeMessage() + if err != nil { + return err + } + + println("PUSHING EVENT !!", string(enc)) + + item := &types.EventQueueItem{Payload: enc} + + if sub > 0 { + item.Subscriber = payload.Uint64toa(sub) + } + + return svc.events.Push(svc.ctx, item) +} diff --git a/sam/service/message.go b/sam/service/message.go index 9497467e9..d9916a16e 100644 --- a/sam/service/message.go +++ b/sam/service/message.go @@ -16,12 +16,13 @@ type ( db *factory.DB ctx context.Context - channel repository.ChannelRepository - message repository.MessageRepository - reaction repository.ReactionRepository + attachment repository.AttachmentRepository + channel repository.ChannelRepository + message repository.MessageRepository + reaction repository.ReactionRepository usr authService.UserService - att AttachmentService + evl EventService } MessageService interface { @@ -48,22 +49,24 @@ type ( ) func Message() MessageService { - return (&message{ + return &message{ usr: authService.DefaultUser, - att: DefaultAttachment, - }).With(context.Background()) + evl: DefaultEvent, + } } func (svc *message) With(ctx context.Context) MessageService { db := repository.DB(ctx) return &message{ - db: db, - ctx: ctx, - att: svc.att.With(ctx), - usr: svc.usr.With(ctx), - channel: repository.Channel(ctx, db), - message: repository.Message(ctx, db), - reaction: repository.Reaction(ctx, db), + db: db, + ctx: ctx, + usr: svc.usr.With(ctx), + evl: svc.evl.With(ctx), + + channel: repository.Channel(ctx, db), + attachment: repository.Attachment(ctx, db), + message: repository.Message(ctx, db), + reaction: repository.Reaction(ctx, db), } } @@ -85,7 +88,32 @@ func (svc *message) Find(filter *types.MessageFilter) (mm types.MessageSet, err return }) - return mm, svc.att.LoadFromMessages(mm) + return mm, svc.loadAttachments(mm) +} + +func (svc *message) loadAttachments(mm types.MessageSet) (err error) { + var ids []uint64 + mm.Walk(func(m *types.Message) error { + if m.Type == types.MessageTypeAttachment || m.Type == types.MessageTypeInlineImage { + ids = append(ids, m.ID) + } + return nil + }) + + if set, err := svc.attachment.FindAttachmentByMessageID(ids...); err != nil { + return err + } else { + return set.Walk(func(a *types.MessageAttachment) error { + if a.MessageID > 0 { + if m := mm.FindById(a.MessageID); m != nil { + m.Attachment = &a.Attachment + m.Attachment.GenerateURLs() + } + } + + return nil + }) + } } func (svc *message) Direct(recipientID uint64, in *types.Message) (out *types.Message, err error) { @@ -135,7 +163,11 @@ func (svc *message) Direct(recipientID uint64, in *types.Message) (out *types.Me // @todo send new msg to the event-loop out, err = svc.message.CreateMessage(in) - return + if err != nil { + return + } + + return svc.evl.Message(out) }) } @@ -148,10 +180,12 @@ func (svc *message) Create(mod *types.Message) (*types.Message, error) { mod.UserID = currentUserID message, err := svc.message.CreateMessage(mod) - if err == nil { - PubSub().Event(svc.ctx, "new message added") + + if err != nil { + return nil, err } - return message, err + + return message, svc.evl.Message(message) } func (svc *message) Update(mod *types.Message) (*types.Message, error) { @@ -165,7 +199,13 @@ func (svc *message) Update(mod *types.Message) (*types.Message, error) { // @todo verify ownership - return svc.message.UpdateMessage(mod) + message, err := svc.message.UpdateMessage(mod) + + if err == nil { + return nil, err + } + + return message, svc.evl.Message(message) } func (svc *message) Delete(id uint64) error { @@ -176,10 +216,15 @@ func (svc *message) Delete(id uint64) error { _ = currentUserID // @todo load current message - // @todo verify ownership - return svc.message.DeleteMessageByID(id) + err := svc.message.DeleteMessageByID(id) + + //if err == nil { + // err = svc.evl.Message(message) + //} + + return err } func (svc *message) React(messageID uint64, reaction string) error { diff --git a/sam/service/service.go b/sam/service/service.go index cbd25190b..90b773c60 100644 --- a/sam/service/service.go +++ b/sam/service/service.go @@ -15,6 +15,7 @@ var ( DefaultOrganisation OrganisationService DefaultPubSub *pubSub DefaultTeam TeamService + DefaultEvent EventService ) func Init() { @@ -24,9 +25,10 @@ func Init() { log.Fatalf("Failed to initialize stor: %v", err) } + DefaultEvent = Event() DefaultAttachment = Attachment(fs) - DefaultChannel = Channel() DefaultMessage = Message() + DefaultChannel = Channel() DefaultOrganisation = Organisation() DefaultPubSub = PubSub() DefaultTeam = Team() diff --git a/sam/types/attachment.go b/sam/types/attachment.go index 78831a932..0c1c8bc59 100644 --- a/sam/types/attachment.go +++ b/sam/types/attachment.go @@ -1,9 +1,16 @@ package types import ( + "fmt" + "net/url" "time" ) +const ( + attachmentURL = "/attachment/%d/%s" + attachmentPreviewURL = "/attachment/%d/%s/preview" +) + type ( Attachment struct { ID uint64 `db:"id" json:"ID,omitempty"` @@ -35,3 +42,8 @@ func (aa MessageAttachmentSet) Walk(w func(*MessageAttachment) error) (err error return } + +func (a *Attachment) GenerateURLs() { + a.Url = fmt.Sprintf(attachmentURL, a.ID, url.PathEscape(a.Name)) + a.PreviewUrl = fmt.Sprintf(attachmentPreviewURL, a.ID, url.PathEscape(a.Name)) +} diff --git a/sam/websocket/session_incoming_channel.go b/sam/websocket/session_incoming_channel.go index e9c11d7fa..322e0facf 100644 --- a/sam/websocket/session_incoming_channel.go +++ b/sam/websocket/session_incoming_channel.go @@ -80,7 +80,7 @@ func (s *Session) channelCreate(ctx context.Context, p *incoming.ChannelCreate) // Explicitly subscribe to newly created channel s.subs.Add(payload.Uint64toa(ch.ID)) - // @todo this should go over all user's sessons and subscribe there as well + // @todo this should go over all user's sessions and subscribe there as well // @todo load channel member count @@ -95,15 +95,7 @@ func (s *Session) channelCreate(ctx context.Context, p *incoming.ChannelCreate) } func (s *Session) channelDelete(ctx context.Context, p *incoming.ChannelDelete) (err error) { - err = s.svc.ch.With(ctx).Delete(payload.ParseUInt64(p.ChannelID)) - if err != nil { - return err - } - - return s.sendToAllSubscribers(&outgoing.ChannelDeleted{ - ID: p.ChannelID, - UserID: payload.Uint64toa(auth.GetIdentityFromContext(ctx).Identity()), - }, p.ChannelID) + return s.svc.ch.With(ctx).Delete(payload.ParseUInt64(p.ChannelID)) } func (s *Session) channelUpdate(ctx context.Context, p *incoming.ChannelUpdate) error { diff --git a/sam/websocket/session_incoming_message.go b/sam/websocket/session_incoming_message.go index e0cc8f754..879edb532 100644 --- a/sam/websocket/session_incoming_message.go +++ b/sam/websocket/session_incoming_message.go @@ -5,57 +5,29 @@ import ( "github.com/crusttech/crust/internal/payload" "github.com/crusttech/crust/internal/payload/incoming" - "github.com/crusttech/crust/internal/payload/outgoing" "github.com/crusttech/crust/sam/types" ) func (s *Session) messageCreate(ctx context.Context, p *incoming.MessageCreate) error { - var ( - msg = &types.Message{ - ChannelID: payload.ParseUInt64(p.ChannelID), - Message: p.Message, - } - ) + _, err := s.svc.msg.With(ctx).Create(&types.Message{ + ChannelID: payload.ParseUInt64(p.ChannelID), + Message: p.Message, + }) - msg, err := s.svc.msg.With(ctx).Create(msg) - if err != nil { - return err - } - - return s.sendToAllSubscribers(payload.Message(msg), p.ChannelID) + return err } func (s *Session) messageUpdate(ctx context.Context, p *incoming.MessageUpdate) error { - var ( - msg = &types.Message{ - ID: payload.ParseUInt64(p.ID), - Message: p.Message, - } - ) - msg, err := s.svc.msg.With(ctx).Update(msg) - if err != nil { - return err - } + _, err := s.svc.msg.With(ctx).Update(&types.Message{ + ID: payload.ParseUInt64(p.ID), + Message: p.Message, + }) - omsg := &outgoing.MessageUpdate{ - ID: p.ID, - Message: msg.Message, - UpdatedAt: *msg.UpdatedAt, - } - - return s.sendToAllSubscribers(omsg, p.ID) + return err } func (s *Session) messageDelete(ctx context.Context, p *incoming.MessageDelete) error { - var ( - id = payload.ParseUInt64(p.ID) - ) - - if err := s.svc.msg.With(ctx).Delete(id); err != nil { - return err - } - - return s.sendToAllSubscribers(&outgoing.MessageDelete{ID: p.ID}, p.ChannelID) + return s.svc.msg.With(ctx).Delete(payload.ParseUInt64(p.ID)) } func (s *Session) messageHistory(ctx context.Context, p *incoming.Messages) error { diff --git a/sam/websocket/session_outgoing.go b/sam/websocket/session_outgoing.go index e7a1c7235..566ab5a17 100644 --- a/sam/websocket/session_outgoing.go +++ b/sam/websocket/session_outgoing.go @@ -28,9 +28,7 @@ func (s *Session) sendToAll(p MessageEncoder) error { return repository.Events().Push(s.ctx, &types.EventQueueItem{Payload: pb}) } -// @todo: this isn't going to be correct - a user may have open multiple clients, -// that will connect to different edge SAM servers. It should also go -// through a repository.Events().Push (EventQueueItem) path. +// Sends message only on this session, no need to enqueue item func (s *Session) sendReply(p MessageEncoder) error { pb, err := p.EncodeMessage() if err != nil {