Link services & event loop
This commit is contained in:
parent
bf8f6f4213
commit
5f7abc2187
@ -8,9 +8,9 @@ import (
|
||||
|
||||
func Message(msg *sam.Message) *outgoing.Message {
|
||||
return &outgoing.Message{
|
||||
Message: msg.Message,
|
||||
ID: Uint64toa(msg.ID),
|
||||
ChannelID: Uint64toa(msg.ChannelID),
|
||||
Message: msg.Message,
|
||||
Type: string(msg.Type),
|
||||
ReplyTo: Uint64toa(msg.ReplyTo),
|
||||
|
||||
@ -19,6 +19,7 @@ func Message(msg *sam.Message) *outgoing.Message {
|
||||
|
||||
CreatedAt: msg.CreatedAt,
|
||||
UpdatedAt: msg.UpdatedAt,
|
||||
DeletedAt: msg.DeletedAt,
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,6 +40,11 @@ func Channel(ch *sam.Channel) *outgoing.Channel {
|
||||
Topic: ch.Topic,
|
||||
Type: string(ch.Type),
|
||||
Members: Uint64stoa(ch.Members),
|
||||
|
||||
CreatedAt: ch.CreatedAt,
|
||||
UpdatedAt: ch.UpdatedAt,
|
||||
ArchivedAt: ch.ArchivedAt,
|
||||
DeletedAt: ch.DeletedAt,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ package outgoing
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -37,6 +38,11 @@ type (
|
||||
Type string `json:"type"`
|
||||
LastMessageID string `json:"lastMessageID"`
|
||||
Members []string `json:"members,omitempty"`
|
||||
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
|
||||
ArchivedAt *time.Time `json:"archivedAt,omitempty"`
|
||||
DeletedAt *time.Time `json:"deletedAt,omitempty"`
|
||||
}
|
||||
|
||||
ChannelSet []*Channel
|
||||
@ -50,10 +56,6 @@ func (p *ChannelPart) EncodeMessage() ([]byte, error) {
|
||||
return json.Marshal(Payload{ChannelPart: p})
|
||||
}
|
||||
|
||||
func (p *ChannelDeleted) EncodeMessage() ([]byte, error) {
|
||||
return json.Marshal(Payload{ChannelDeleted: p})
|
||||
}
|
||||
|
||||
func (p *Channel) EncodeMessage() ([]byte, error) {
|
||||
return json.Marshal(Payload{Channel: p})
|
||||
}
|
||||
|
||||
@ -18,20 +18,10 @@ type (
|
||||
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
|
||||
DeletedAt *time.Time `json:"deletedAt,omitempty"`
|
||||
}
|
||||
|
||||
MessageSet []*Message
|
||||
|
||||
MessageUpdate struct {
|
||||
ID string `json:"ID"`
|
||||
Message string `json:"message"`
|
||||
|
||||
UpdatedAt time.Time `json:"updatedAt,omitempty"`
|
||||
}
|
||||
|
||||
MessageDelete struct {
|
||||
ID string `json:"ID"`
|
||||
}
|
||||
)
|
||||
|
||||
func (p *Message) EncodeMessage() ([]byte, error) {
|
||||
@ -41,11 +31,3 @@ func (p *Message) EncodeMessage() ([]byte, error) {
|
||||
func (p *MessageSet) EncodeMessage() ([]byte, error) {
|
||||
return json.Marshal(Payload{MessageSet: p})
|
||||
}
|
||||
|
||||
func (p *MessageUpdate) EncodeMessage() ([]byte, error) {
|
||||
return json.Marshal(Payload{MessageUpdate: p})
|
||||
}
|
||||
|
||||
func (p *MessageDelete) EncodeMessage() ([]byte, error) {
|
||||
return json.Marshal(Payload{MessageDelete: p})
|
||||
}
|
||||
|
||||
@ -7,10 +7,8 @@ type (
|
||||
*Connected `json:"clientConnected,omitempty"`
|
||||
*Disconnected `json:"clientDisconnected,omitempty"`
|
||||
|
||||
*Message `json:"message,omitempty"`
|
||||
*MessageDelete `json:"messageDeleted,omitempty"`
|
||||
*MessageUpdate `json:"messageUpdated,omitempty"`
|
||||
*MessageSet `json:"messages,omitempty"`
|
||||
*Message `json:"message,omitempty"`
|
||||
*MessageSet `json:"messages,omitempty"`
|
||||
|
||||
*ChannelJoin `json:"channelJoin,omitempty"`
|
||||
*ChannelPart `json:"channelPart,omitempty"`
|
||||
|
||||
@ -2,11 +2,9 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
@ -24,7 +22,9 @@ type (
|
||||
|
||||
attachment repository.AttachmentRepository
|
||||
message repository.MessageRepository
|
||||
store store.Store
|
||||
|
||||
store store.Store
|
||||
evl EventService
|
||||
}
|
||||
|
||||
AttachmentService interface {
|
||||
@ -32,31 +32,29 @@ type (
|
||||
|
||||
FindByID(id uint64) (*types.Attachment, error)
|
||||
Create(channelId uint64, name string, size int64, fh io.ReadSeeker) (*types.Attachment, error)
|
||||
LoadFromMessages(mm types.MessageSet) (err error)
|
||||
OpenOriginal(att *types.Attachment) (io.ReadSeeker, error)
|
||||
OpenPreview(att *types.Attachment) (io.ReadSeeker, error)
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
attachmentURL = "/attachment/%d/%s"
|
||||
attachmentPreviewURL = "/attachment/%d/%s/preview"
|
||||
)
|
||||
|
||||
func Attachment(store store.Store) AttachmentService {
|
||||
return (&attachment{
|
||||
store: store,
|
||||
evl: DefaultEvent,
|
||||
}).With(context.Background())
|
||||
}
|
||||
|
||||
func (svc *attachment) With(ctx context.Context) AttachmentService {
|
||||
db := repository.DB(ctx)
|
||||
return &attachment{
|
||||
db: db,
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
ctx: ctx,
|
||||
|
||||
attachment: repository.Attachment(ctx, db),
|
||||
message: repository.Message(ctx, db),
|
||||
store: svc.store,
|
||||
|
||||
store: svc.store,
|
||||
evl: svc.evl.With(ctx),
|
||||
}
|
||||
}
|
||||
|
||||
@ -72,33 +70,6 @@ func (svc *attachment) OpenPreview(att *types.Attachment) (io.ReadSeeker, error)
|
||||
return svc.store.Open(att.PreviewUrl)
|
||||
}
|
||||
|
||||
func (svc *attachment) LoadFromMessages(mm types.MessageSet) (err error) {
|
||||
var ids []uint64
|
||||
mm.Walk(func(m *types.Message) error {
|
||||
if m.Type == types.MessageTypeAttachment || m.Type == types.MessageTypeInlineImage {
|
||||
ids = append(ids, m.ID)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if set, err := svc.attachment.FindAttachmentByMessageID(ids...); err != nil {
|
||||
return err
|
||||
} else {
|
||||
return set.Walk(func(a *types.MessageAttachment) error {
|
||||
if a.MessageID > 0 {
|
||||
if m := mm.FindById(a.MessageID); m != nil {
|
||||
m.Attachment = &a.Attachment
|
||||
|
||||
m.Attachment.Url = svc.url(&a.Attachment)
|
||||
m.Attachment.PreviewUrl = svc.previewUrl(&a.Attachment)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *attachment) Create(channelId uint64, name string, size int64, fh io.ReadSeeker) (att *types.Attachment, err error) {
|
||||
var currentUserID uint64 = repository.Identity(svc.ctx)
|
||||
|
||||
@ -164,20 +135,12 @@ func (svc *attachment) Create(channelId uint64, name string, size int64, fh io.R
|
||||
|
||||
log.Printf("File %s (id: %d) attached to message (id: %d)", att.Name, att.ID, msg.ID)
|
||||
|
||||
return
|
||||
msg.Attachment = att
|
||||
msg.Attachment.GenerateURLs()
|
||||
return svc.evl.Message(msg)
|
||||
})
|
||||
}
|
||||
|
||||
// Generates URL to a location
|
||||
func (svc *attachment) url(att *types.Attachment) string {
|
||||
return fmt.Sprintf(attachmentURL, att.ID, url.PathEscape(att.Name))
|
||||
}
|
||||
|
||||
// Generates URL to a location
|
||||
func (svc *attachment) previewUrl(att *types.Attachment) string {
|
||||
return fmt.Sprintf(attachmentPreviewURL, att.ID, url.PathEscape(att.Name))
|
||||
}
|
||||
|
||||
func (svc *attachment) extractMeta(att *types.Attachment, file io.ReadSeeker) (err error) {
|
||||
if _, err = file.Seek(0, 0); err != nil {
|
||||
return err
|
||||
|
||||
@ -17,6 +17,8 @@ type (
|
||||
db *factory.DB
|
||||
ctx context.Context
|
||||
|
||||
evl EventService
|
||||
|
||||
channel repository.ChannelRepository
|
||||
message repository.MessageRepository
|
||||
}
|
||||
@ -42,14 +44,19 @@ type (
|
||||
)
|
||||
|
||||
func Channel() ChannelService {
|
||||
return (&channel{}).With(context.Background())
|
||||
return (&channel{
|
||||
evl: DefaultEvent,
|
||||
}).With(context.Background())
|
||||
}
|
||||
|
||||
func (svc *channel) With(ctx context.Context) ChannelService {
|
||||
db := repository.DB(ctx)
|
||||
return &channel{
|
||||
db: db,
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
ctx: ctx,
|
||||
|
||||
evl: svc.evl.With(ctx),
|
||||
|
||||
channel: repository.Channel(ctx, db),
|
||||
message: repository.Message(ctx, db),
|
||||
}
|
||||
@ -185,16 +192,15 @@ func (svc *channel) Create(in *types.Channel) (out *types.Channel, err error) {
|
||||
"<PRIVATE-OR-PUBLIC>",
|
||||
"<TOPIC>"))
|
||||
|
||||
_ = msg
|
||||
if err != nil {
|
||||
// Message creation failed
|
||||
return
|
||||
}
|
||||
|
||||
// @todo send channel creation to the event-loop
|
||||
// @todo send msg to the event-loop
|
||||
_ = msg
|
||||
svc.evl.Message(msg)
|
||||
|
||||
return nil
|
||||
return svc.evl.Channel(out)
|
||||
})
|
||||
}
|
||||
|
||||
@ -261,15 +267,14 @@ func (svc *channel) Update(in *types.Channel) (out *types.Channel, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// @todo send channel creation to the event-loop
|
||||
|
||||
// Create the first message, doing this directly with repository to circumvent
|
||||
// message service constraints
|
||||
for _, msg := range msgs {
|
||||
if msg, err = svc.message.CreateMessage(msg); err != nil {
|
||||
// @todo send new msg to the event-loop
|
||||
return err
|
||||
}
|
||||
|
||||
svc.evl.Message(msg)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@ -277,7 +282,7 @@ func (svc *channel) Update(in *types.Channel) (out *types.Channel, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
return nil
|
||||
return svc.evl.Channel(out)
|
||||
})
|
||||
}
|
||||
|
||||
@ -297,9 +302,17 @@ func (svc *channel) Delete(id uint64) error {
|
||||
return errors.New("Channel already deleted")
|
||||
}
|
||||
|
||||
_, err = svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d deleted this channel", userID))
|
||||
msg, err := svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d deleted this channel", userID))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
svc.evl.Message(msg)
|
||||
|
||||
return svc.channel.DeleteChannelByID(id)
|
||||
if err = svc.channel.DeleteChannelByID(id); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return svc.evl.Channel(ch)
|
||||
})
|
||||
}
|
||||
|
||||
@ -319,9 +332,19 @@ func (svc *channel) Recover(id uint64) error {
|
||||
return errors.New("Channel not deleted")
|
||||
}
|
||||
|
||||
_, err = svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d recovered this channel", userID))
|
||||
msg, err := svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d recovered this channel", userID))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
svc.evl.Message(msg)
|
||||
|
||||
err = svc.channel.UnarchiveChannelByID(id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return svc.evl.Channel(ch)
|
||||
|
||||
return svc.channel.DeleteChannelByID(id)
|
||||
})
|
||||
}
|
||||
|
||||
@ -341,9 +364,18 @@ func (svc *channel) Archive(id uint64) error {
|
||||
return errors.New("Channel already archived")
|
||||
}
|
||||
|
||||
_, err = svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d archived this channel", userID))
|
||||
msg, err := svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d archived this channel", userID))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
svc.evl.Message(msg)
|
||||
|
||||
return svc.channel.ArchiveChannelByID(id)
|
||||
err = svc.channel.ArchiveChannelByID(id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return svc.evl.Channel(ch)
|
||||
})
|
||||
}
|
||||
|
||||
@ -363,9 +395,18 @@ func (svc *channel) Unarchive(id uint64) error {
|
||||
return errors.New("Channel not archived")
|
||||
}
|
||||
|
||||
_, err = svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d unarchived this channel", userID))
|
||||
msg, err := svc.message.CreateMessage(svc.makeSystemMessage(ch, "@%d unarchived this channel", userID))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
svc.evl.Message(msg)
|
||||
|
||||
return svc.channel.ArchiveChannelByID(id)
|
||||
err = svc.channel.ArchiveChannelByID(id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return svc.evl.Channel(ch)
|
||||
})
|
||||
|
||||
}
|
||||
@ -374,6 +415,7 @@ func (svc *channel) makeSystemMessage(ch *types.Channel, format string, a ...int
|
||||
return &types.Message{
|
||||
ChannelID: ch.ID,
|
||||
Message: fmt.Sprintf(format, a...),
|
||||
Type: types.MessageTypeChannelEvent,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
60
sam/service/events.go
Normal file
60
sam/service/events.go
Normal file
@ -0,0 +1,60 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/crusttech/crust/internal/payload"
|
||||
"github.com/crusttech/crust/internal/payload/outgoing"
|
||||
"github.com/crusttech/crust/sam/repository"
|
||||
"github.com/crusttech/crust/sam/types"
|
||||
)
|
||||
|
||||
type (
|
||||
event struct {
|
||||
ctx context.Context
|
||||
|
||||
events repository.EventsRepository
|
||||
}
|
||||
|
||||
EventService interface {
|
||||
With(ctx context.Context) EventService
|
||||
Message(m *types.Message) error
|
||||
Channel(m *types.Channel) error
|
||||
}
|
||||
)
|
||||
|
||||
func Event() EventService {
|
||||
return (&event{events: repository.Events()}).With(context.Background())
|
||||
}
|
||||
|
||||
func (svc *event) With(ctx context.Context) EventService {
|
||||
return &event{
|
||||
ctx: ctx,
|
||||
events: svc.events,
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *event) Message(m *types.Message) error {
|
||||
return svc.push(payload.Message(m), m.ChannelID)
|
||||
}
|
||||
|
||||
func (svc *event) Channel(m *types.Channel) error {
|
||||
return svc.push(payload.Channel(m), m.ID)
|
||||
}
|
||||
|
||||
func (svc *event) push(m outgoing.MessageEncoder, sub uint64) error {
|
||||
var enc, err = m.EncodeMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
println("PUSHING EVENT !!", string(enc))
|
||||
|
||||
item := &types.EventQueueItem{Payload: enc}
|
||||
|
||||
if sub > 0 {
|
||||
item.Subscriber = payload.Uint64toa(sub)
|
||||
}
|
||||
|
||||
return svc.events.Push(svc.ctx, item)
|
||||
}
|
||||
@ -16,12 +16,13 @@ type (
|
||||
db *factory.DB
|
||||
ctx context.Context
|
||||
|
||||
channel repository.ChannelRepository
|
||||
message repository.MessageRepository
|
||||
reaction repository.ReactionRepository
|
||||
attachment repository.AttachmentRepository
|
||||
channel repository.ChannelRepository
|
||||
message repository.MessageRepository
|
||||
reaction repository.ReactionRepository
|
||||
|
||||
usr authService.UserService
|
||||
att AttachmentService
|
||||
evl EventService
|
||||
}
|
||||
|
||||
MessageService interface {
|
||||
@ -48,22 +49,24 @@ type (
|
||||
)
|
||||
|
||||
func Message() MessageService {
|
||||
return (&message{
|
||||
return &message{
|
||||
usr: authService.DefaultUser,
|
||||
att: DefaultAttachment,
|
||||
}).With(context.Background())
|
||||
evl: DefaultEvent,
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *message) With(ctx context.Context) MessageService {
|
||||
db := repository.DB(ctx)
|
||||
return &message{
|
||||
db: db,
|
||||
ctx: ctx,
|
||||
att: svc.att.With(ctx),
|
||||
usr: svc.usr.With(ctx),
|
||||
channel: repository.Channel(ctx, db),
|
||||
message: repository.Message(ctx, db),
|
||||
reaction: repository.Reaction(ctx, db),
|
||||
db: db,
|
||||
ctx: ctx,
|
||||
usr: svc.usr.With(ctx),
|
||||
evl: svc.evl.With(ctx),
|
||||
|
||||
channel: repository.Channel(ctx, db),
|
||||
attachment: repository.Attachment(ctx, db),
|
||||
message: repository.Message(ctx, db),
|
||||
reaction: repository.Reaction(ctx, db),
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,7 +88,32 @@ func (svc *message) Find(filter *types.MessageFilter) (mm types.MessageSet, err
|
||||
return
|
||||
})
|
||||
|
||||
return mm, svc.att.LoadFromMessages(mm)
|
||||
return mm, svc.loadAttachments(mm)
|
||||
}
|
||||
|
||||
func (svc *message) loadAttachments(mm types.MessageSet) (err error) {
|
||||
var ids []uint64
|
||||
mm.Walk(func(m *types.Message) error {
|
||||
if m.Type == types.MessageTypeAttachment || m.Type == types.MessageTypeInlineImage {
|
||||
ids = append(ids, m.ID)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if set, err := svc.attachment.FindAttachmentByMessageID(ids...); err != nil {
|
||||
return err
|
||||
} else {
|
||||
return set.Walk(func(a *types.MessageAttachment) error {
|
||||
if a.MessageID > 0 {
|
||||
if m := mm.FindById(a.MessageID); m != nil {
|
||||
m.Attachment = &a.Attachment
|
||||
m.Attachment.GenerateURLs()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *message) Direct(recipientID uint64, in *types.Message) (out *types.Message, err error) {
|
||||
@ -135,7 +163,11 @@ func (svc *message) Direct(recipientID uint64, in *types.Message) (out *types.Me
|
||||
|
||||
// @todo send new msg to the event-loop
|
||||
out, err = svc.message.CreateMessage(in)
|
||||
return
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return svc.evl.Message(out)
|
||||
})
|
||||
}
|
||||
|
||||
@ -148,10 +180,12 @@ func (svc *message) Create(mod *types.Message) (*types.Message, error) {
|
||||
mod.UserID = currentUserID
|
||||
|
||||
message, err := svc.message.CreateMessage(mod)
|
||||
if err == nil {
|
||||
PubSub().Event(svc.ctx, "new message added")
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return message, err
|
||||
|
||||
return message, svc.evl.Message(message)
|
||||
}
|
||||
|
||||
func (svc *message) Update(mod *types.Message) (*types.Message, error) {
|
||||
@ -165,7 +199,13 @@ func (svc *message) Update(mod *types.Message) (*types.Message, error) {
|
||||
|
||||
// @todo verify ownership
|
||||
|
||||
return svc.message.UpdateMessage(mod)
|
||||
message, err := svc.message.UpdateMessage(mod)
|
||||
|
||||
if err == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return message, svc.evl.Message(message)
|
||||
}
|
||||
|
||||
func (svc *message) Delete(id uint64) error {
|
||||
@ -176,10 +216,15 @@ func (svc *message) Delete(id uint64) error {
|
||||
_ = currentUserID
|
||||
|
||||
// @todo load current message
|
||||
|
||||
// @todo verify ownership
|
||||
|
||||
return svc.message.DeleteMessageByID(id)
|
||||
err := svc.message.DeleteMessageByID(id)
|
||||
|
||||
//if err == nil {
|
||||
// err = svc.evl.Message(message)
|
||||
//}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *message) React(messageID uint64, reaction string) error {
|
||||
|
||||
@ -15,6 +15,7 @@ var (
|
||||
DefaultOrganisation OrganisationService
|
||||
DefaultPubSub *pubSub
|
||||
DefaultTeam TeamService
|
||||
DefaultEvent EventService
|
||||
)
|
||||
|
||||
func Init() {
|
||||
@ -24,9 +25,10 @@ func Init() {
|
||||
log.Fatalf("Failed to initialize stor: %v", err)
|
||||
}
|
||||
|
||||
DefaultEvent = Event()
|
||||
DefaultAttachment = Attachment(fs)
|
||||
DefaultChannel = Channel()
|
||||
DefaultMessage = Message()
|
||||
DefaultChannel = Channel()
|
||||
DefaultOrganisation = Organisation()
|
||||
DefaultPubSub = PubSub()
|
||||
DefaultTeam = Team()
|
||||
|
||||
@ -1,9 +1,16 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
attachmentURL = "/attachment/%d/%s"
|
||||
attachmentPreviewURL = "/attachment/%d/%s/preview"
|
||||
)
|
||||
|
||||
type (
|
||||
Attachment struct {
|
||||
ID uint64 `db:"id" json:"ID,omitempty"`
|
||||
@ -35,3 +42,8 @@ func (aa MessageAttachmentSet) Walk(w func(*MessageAttachment) error) (err error
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (a *Attachment) GenerateURLs() {
|
||||
a.Url = fmt.Sprintf(attachmentURL, a.ID, url.PathEscape(a.Name))
|
||||
a.PreviewUrl = fmt.Sprintf(attachmentPreviewURL, a.ID, url.PathEscape(a.Name))
|
||||
}
|
||||
|
||||
@ -80,7 +80,7 @@ func (s *Session) channelCreate(ctx context.Context, p *incoming.ChannelCreate)
|
||||
// Explicitly subscribe to newly created channel
|
||||
s.subs.Add(payload.Uint64toa(ch.ID))
|
||||
|
||||
// @todo this should go over all user's sessons and subscribe there as well
|
||||
// @todo this should go over all user's sessions and subscribe there as well
|
||||
|
||||
// @todo load channel member count
|
||||
|
||||
@ -95,15 +95,7 @@ func (s *Session) channelCreate(ctx context.Context, p *incoming.ChannelCreate)
|
||||
}
|
||||
|
||||
func (s *Session) channelDelete(ctx context.Context, p *incoming.ChannelDelete) (err error) {
|
||||
err = s.svc.ch.With(ctx).Delete(payload.ParseUInt64(p.ChannelID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.sendToAllSubscribers(&outgoing.ChannelDeleted{
|
||||
ID: p.ChannelID,
|
||||
UserID: payload.Uint64toa(auth.GetIdentityFromContext(ctx).Identity()),
|
||||
}, p.ChannelID)
|
||||
return s.svc.ch.With(ctx).Delete(payload.ParseUInt64(p.ChannelID))
|
||||
}
|
||||
|
||||
func (s *Session) channelUpdate(ctx context.Context, p *incoming.ChannelUpdate) error {
|
||||
|
||||
@ -5,57 +5,29 @@ import (
|
||||
|
||||
"github.com/crusttech/crust/internal/payload"
|
||||
"github.com/crusttech/crust/internal/payload/incoming"
|
||||
"github.com/crusttech/crust/internal/payload/outgoing"
|
||||
"github.com/crusttech/crust/sam/types"
|
||||
)
|
||||
|
||||
func (s *Session) messageCreate(ctx context.Context, p *incoming.MessageCreate) error {
|
||||
var (
|
||||
msg = &types.Message{
|
||||
ChannelID: payload.ParseUInt64(p.ChannelID),
|
||||
Message: p.Message,
|
||||
}
|
||||
)
|
||||
_, err := s.svc.msg.With(ctx).Create(&types.Message{
|
||||
ChannelID: payload.ParseUInt64(p.ChannelID),
|
||||
Message: p.Message,
|
||||
})
|
||||
|
||||
msg, err := s.svc.msg.With(ctx).Create(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.sendToAllSubscribers(payload.Message(msg), p.ChannelID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Session) messageUpdate(ctx context.Context, p *incoming.MessageUpdate) error {
|
||||
var (
|
||||
msg = &types.Message{
|
||||
ID: payload.ParseUInt64(p.ID),
|
||||
Message: p.Message,
|
||||
}
|
||||
)
|
||||
msg, err := s.svc.msg.With(ctx).Update(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := s.svc.msg.With(ctx).Update(&types.Message{
|
||||
ID: payload.ParseUInt64(p.ID),
|
||||
Message: p.Message,
|
||||
})
|
||||
|
||||
omsg := &outgoing.MessageUpdate{
|
||||
ID: p.ID,
|
||||
Message: msg.Message,
|
||||
UpdatedAt: *msg.UpdatedAt,
|
||||
}
|
||||
|
||||
return s.sendToAllSubscribers(omsg, p.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Session) messageDelete(ctx context.Context, p *incoming.MessageDelete) error {
|
||||
var (
|
||||
id = payload.ParseUInt64(p.ID)
|
||||
)
|
||||
|
||||
if err := s.svc.msg.With(ctx).Delete(id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.sendToAllSubscribers(&outgoing.MessageDelete{ID: p.ID}, p.ChannelID)
|
||||
return s.svc.msg.With(ctx).Delete(payload.ParseUInt64(p.ID))
|
||||
}
|
||||
|
||||
func (s *Session) messageHistory(ctx context.Context, p *incoming.Messages) error {
|
||||
|
||||
@ -28,9 +28,7 @@ func (s *Session) sendToAll(p MessageEncoder) error {
|
||||
return repository.Events().Push(s.ctx, &types.EventQueueItem{Payload: pb})
|
||||
}
|
||||
|
||||
// @todo: this isn't going to be correct - a user may have open multiple clients,
|
||||
// that will connect to different edge SAM servers. It should also go
|
||||
// through a repository.Events().Push (EventQueueItem) path.
|
||||
// Sends message only on this session, no need to enqueue item
|
||||
func (s *Session) sendReply(p MessageEncoder) error {
|
||||
pb, err := p.EncodeMessage()
|
||||
if err != nil {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user