Chan & thread unread fixes
This commit is contained in:
parent
95758fb689
commit
bb447ee795
@ -472,7 +472,7 @@
|
||||
"title": "Manages read/unread messages in a channel or a thread",
|
||||
"parameters": {
|
||||
"path": [],
|
||||
"post": [
|
||||
"get": [
|
||||
{
|
||||
"type": "uint64",
|
||||
"name": "threadID",
|
||||
|
||||
@ -72,8 +72,7 @@
|
||||
"Title": "Manages read/unread messages in a channel or a thread",
|
||||
"Path": "/mark-as-read",
|
||||
"Parameters": {
|
||||
"path": [],
|
||||
"post": [
|
||||
"get": [
|
||||
{
|
||||
"name": "threadID",
|
||||
"required": false,
|
||||
@ -86,7 +85,8 @@
|
||||
"title": "ID of the last read message",
|
||||
"type": "uint64"
|
||||
}
|
||||
]
|
||||
],
|
||||
"path": []
|
||||
}
|
||||
},
|
||||
{
|
||||
|
||||
@ -363,9 +363,9 @@ A channel is a representation of a sequence of messages. It has meta data like c
|
||||
|
||||
| Parameter | Type | Method | Description | Default | Required? |
|
||||
| --------- | ---- | ------ | ----------- | ------- | --------- |
|
||||
| threadID | uint64 | GET | ID of thread (messageID) | N/A | NO |
|
||||
| lastReadMessageID | uint64 | GET | ID of the last read message | N/A | NO |
|
||||
| channelID | uint64 | PATH | Channel ID | N/A | YES |
|
||||
| threadID | uint64 | POST | ID of thread (messageID) | N/A | NO |
|
||||
| lastReadMessageID | uint64 | POST | ID of the last read message | N/A | NO |
|
||||
|
||||
## Edit existing message
|
||||
|
||||
|
||||
@ -40,6 +40,7 @@ func Message(ctx context.Context, msg *messagingTypes.Message) *outgoing.Message
|
||||
ReplyTo: msg.ReplyTo,
|
||||
Replies: msg.Replies,
|
||||
RepliesFrom: Uint64stoa(msg.RepliesFrom),
|
||||
Unread: Unread(msg.Unread),
|
||||
|
||||
Attachment: Attachment(msg.Attachment, currentUserID),
|
||||
Mentions: messageMentionSet(msg.Mentions),
|
||||
@ -198,6 +199,7 @@ func Unread(v *messagingTypes.Unread) *outgoing.Unread {
|
||||
return &outgoing.Unread{
|
||||
LastMessageID: v.LastMessageID,
|
||||
Count: v.Count,
|
||||
InThreadCount: v.InThreadCount,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -16,6 +16,7 @@ type (
|
||||
ReplyTo uint64 `json:"replyTo,omitempty,string"`
|
||||
Replies uint `json:"replies,omitempty"`
|
||||
RepliesFrom []string `json:"repliesFrom,omitempty"`
|
||||
Unread *Unread `json:"unread,omitempty"`
|
||||
|
||||
Attachment *Attachment `json:"att,omitempty"`
|
||||
Mentions MessageMentionSet `json:"mentions,omitempty"`
|
||||
|
||||
@ -5,5 +5,6 @@ type (
|
||||
// Channel to part (nil) for ALL channels
|
||||
LastMessageID uint64 `json:"lastMessageID,string,omitempty"`
|
||||
Count uint32 `json:"count"`
|
||||
InThreadCount uint32 `json:"tcount,omitempty"`
|
||||
}
|
||||
)
|
||||
|
||||
File diff suppressed because one or more lines are too long
41
messaging/db/schema/mysql/20190623080000.unreads.up.sql
Normal file
41
messaging/db/schema/mysql/20190623080000.unreads.up.sql
Normal file
@ -0,0 +1,41 @@
|
||||
UPDATE `messaging_unread` SET rel_reply_to = 0 WHERE rel_reply_to IS NULL;
|
||||
ALTER TABLE `messaging_unread` CHANGE COLUMN `rel_reply_to` `rel_reply_to` BIGINT UNSIGNED NOT NULL;
|
||||
ALTER TABLE `messaging_unread` DROP PRIMARY KEY, ADD PRIMARY KEY(`rel_channel`, `rel_reply_to`, `rel_user`);
|
||||
|
||||
-- Add entries for all (unexisting) unreads (channels & threads)
|
||||
INSERT IGNORE INTO messaging_unread
|
||||
(rel_channel, rel_reply_to, rel_user)
|
||||
SELECT DISTINCT cm.rel_channel, msg.id, cm.rel_user
|
||||
FROM messaging_channel_member AS cm
|
||||
INNER JOIN messaging_message AS msg ON (cm.rel_channel = msg.rel_channel AND replies > 0)
|
||||
WHERE NOT EXISTS (SELECT 1 FROM messaging_unread AS u WHERE u.rel_reply_to = msg.id AND u.rel_user = cm.rel_user)
|
||||
AND msg.rel_user > 0
|
||||
|
||||
UNION
|
||||
|
||||
SELECT DISTINCT cm.rel_channel, 0, cm.rel_user
|
||||
FROM messaging_channel_member AS cm
|
||||
WHERE NOT EXISTS (SELECT 1 FROM messaging_unread AS u WHERE u.rel_channel = cm.rel_channel AND u.rel_user = cm.rel_user)
|
||||
AND cm.rel_user > 0
|
||||
;
|
||||
|
||||
|
||||
-- Update counters for channel messages
|
||||
INSERT IGNORE INTO messaging_unread
|
||||
(rel_channel, rel_reply_to, rel_user, count, rel_last_message)
|
||||
SELECT u.rel_channel, 0, u.rel_user, COUNT(m.id), u.rel_last_message
|
||||
FROM messaging_unread AS u
|
||||
INNER JOIN messaging_message AS m ON (u.rel_channel = m.rel_channel AND m.id > u.rel_last_message)
|
||||
WHERE u.rel_reply_to = 0
|
||||
AND m.reply_to = 0
|
||||
GROUP BY u.rel_channel, u.rel_user;
|
||||
|
||||
-- Update counters for thread messages
|
||||
|
||||
INSERT IGNORE INTO messaging_unread
|
||||
(rel_channel, rel_reply_to, rel_user, count, rel_last_message)
|
||||
SELECT u.rel_channel, rpl.reply_to, u.rel_user, COUNT(rpl.id), u.rel_last_message
|
||||
FROM messaging_unread AS u
|
||||
INNER JOIN messaging_message AS rpl ON (u.rel_channel = rpl.rel_channel AND rpl.reply_to = u.rel_reply_to AND rpl.id > u.rel_last_message)
|
||||
WHERE rpl.replies > 0 AND u.rel_reply_to > 0
|
||||
GROUP BY u.rel_channel, rpl.reply_to, u.rel_user;
|
||||
@ -21,6 +21,7 @@ type (
|
||||
Find(filter *types.MessageFilter) (types.MessageSet, error)
|
||||
FindThreads(filter *types.MessageFilter) (types.MessageSet, error)
|
||||
CountFromMessageID(channelID, threadID, messageID uint64) (uint32, error)
|
||||
LastMessageID(channelID, threadID uint64) (uint64, error)
|
||||
PrefillThreadParticipants(mm types.MessageSet) error
|
||||
|
||||
Create(mod *types.Message) (*types.Message, error)
|
||||
@ -88,6 +89,13 @@ const (
|
||||
"AND COALESCE(type, '') NOT IN (?) " +
|
||||
"AND id > ? AND deleted_at IS NULL"
|
||||
|
||||
sqlLastMessageID = "SELECT COALESCE(MAX(id), 0) AS last " +
|
||||
"FROM messaging_message " +
|
||||
"WHERE rel_channel = ? " +
|
||||
"AND reply_to = ? " +
|
||||
"AND COALESCE(type, '') NOT IN (?) " +
|
||||
"AND deleted_at IS NULL"
|
||||
|
||||
sqlMessageRepliesIncCount = `UPDATE messaging_message SET replies = replies + 1 WHERE id = ? AND reply_to = 0`
|
||||
sqlMessageRepliesDecCount = `UPDATE messaging_message SET replies = replies - 1 WHERE id = ? AND reply_to = 0`
|
||||
|
||||
@ -250,12 +258,27 @@ func (r *message) CountFromMessageID(channelID, threadID, lastReadMessageID uint
|
||||
)
|
||||
}
|
||||
|
||||
func (r *message) LastMessageID(channelID, threadID uint64) (uint64, error) {
|
||||
rval := struct{ Last uint64 }{}
|
||||
return rval.Last, r.db().Get(&rval,
|
||||
sqlLastMessageID,
|
||||
channelID,
|
||||
threadID,
|
||||
types.MessageTypeChannelEvent,
|
||||
)
|
||||
}
|
||||
|
||||
func (r *message) PrefillThreadParticipants(mm types.MessageSet) error {
|
||||
var rval = []struct {
|
||||
ReplyTo uint64 `db:"reply_to"`
|
||||
UserID uint64 `db:"rel_user"`
|
||||
}{}
|
||||
|
||||
// Filter out only relevant messages -- ones with replies
|
||||
mm, _ = mm.Filter(func(m *types.Message) (b bool, e error) {
|
||||
return m.Replies > 0, nil
|
||||
})
|
||||
|
||||
if len(mm) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package repository
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/titpetric/factory"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/messaging/types"
|
||||
@ -14,6 +15,7 @@ type (
|
||||
With(ctx context.Context, db *factory.DB) UnreadRepository
|
||||
|
||||
Find(filter *types.UnreadFilter) (types.UnreadSet, error)
|
||||
Preset(channelID, threadID uint64, userIDs ...uint64) (err error)
|
||||
Record(userID, channelID, threadID, lastReadMessageID uint64, count uint32) error
|
||||
Inc(channelID, replyTo, userID uint64) error
|
||||
Dec(channelID, replyTo, userID uint64) error
|
||||
@ -30,8 +32,14 @@ type (
|
||||
const (
|
||||
// Fetching channel members of all channels a specific user has access to
|
||||
sqlUnreadSelect = `SELECT rel_channel, rel_reply_to, rel_user, count, rel_last_message
|
||||
FROM messaging_unread
|
||||
WHERE true `
|
||||
|
||||
// Fetching channel members of all channels a specific user has access to
|
||||
sqlThreadUnreadSelect = `SELECT rel_channel, sum(count) as count
|
||||
FROM messaging_unread
|
||||
WHERE true `
|
||||
WHERE rel_user = ? AND rel_reply_to > 0
|
||||
GROUP BY rel_channel`
|
||||
|
||||
sqlUnreadIncCount = `UPDATE messaging_unread
|
||||
SET count = count + 1
|
||||
@ -39,7 +47,14 @@ const (
|
||||
|
||||
sqlUnreadDecCount = `UPDATE messaging_unread
|
||||
SET count = count - 1
|
||||
WHERE rel_channel = ? AND rel_reply_to = ? AND rel_user <> ? AND count > 0`
|
||||
WHERE rel_channel = ? AND rel_reply_to = ? AND count > 0`
|
||||
|
||||
sqlUnreadPresetChannel = `INSERT IGNORE INTO messaging_unread (rel_channel, rel_reply_to, rel_user) VALUES (?, ?, ?)`
|
||||
sqlUnreadPresetThreads = `INSERT IGNORE INTO messaging_unread (rel_channel, rel_reply_to, rel_user)
|
||||
SELECT rel_channel, id, ?
|
||||
FROM messaging_message
|
||||
WHERE rel_channel = ?
|
||||
AND replies > 0`
|
||||
)
|
||||
|
||||
// Unread creates new instance of channel member repository
|
||||
@ -54,13 +69,9 @@ func (r *unread) With(ctx context.Context, db *factory.DB) UnreadRepository {
|
||||
}
|
||||
}
|
||||
|
||||
// FindMembers fetches membership info
|
||||
//
|
||||
// If channelID > 0 it returns members of a specific channel
|
||||
// If userID > 0 it returns members of all channels this user is member of
|
||||
func (r *unread) Find(filter *types.UnreadFilter) (types.UnreadSet, error) {
|
||||
// Find unread info
|
||||
func (r *unread) Find(filter *types.UnreadFilter) (uu types.UnreadSet, err error) {
|
||||
params := make([]interface{}, 0)
|
||||
vv := types.UnreadSet{}
|
||||
sql := sqlUnreadSelect
|
||||
|
||||
if filter != nil {
|
||||
@ -69,12 +80,76 @@ func (r *unread) Find(filter *types.UnreadFilter) (types.UnreadSet, error) {
|
||||
sql += ` AND rel_user = ?`
|
||||
params = append(params, filter.UserID)
|
||||
}
|
||||
|
||||
if len(filter.ThreadIDs) > 0 {
|
||||
sql += ` AND rel_reply_to IN (?)`
|
||||
params = append(params, filter.ThreadIDs)
|
||||
}
|
||||
}
|
||||
|
||||
return vv, r.db().Select(&vv, sql, params...)
|
||||
if sql, params, err = sqlx.In(sql, params...); err != nil {
|
||||
return nil, err
|
||||
} else if err = r.db().Select(&uu, sql, params...); err != nil {
|
||||
return nil, err
|
||||
} else if len(filter.ThreadIDs) == 0 && filter.UserID > 0 {
|
||||
// Check for unread thread messages
|
||||
|
||||
// We'll abuse Unread/UnreadSet
|
||||
tt := types.UnreadSet{}
|
||||
|
||||
err = r.db().Select(&tt, sqlThreadUnreadSelect, filter.UserID)
|
||||
|
||||
_ = tt.Walk(func(t *types.Unread) error {
|
||||
c := uu.FindByChannelId(t.ChannelID)
|
||||
if c != nil {
|
||||
c.InThreadCount = t.Count
|
||||
} else {
|
||||
// No un-reads in channel but we have them in threads (of that channel)
|
||||
// swap values and append
|
||||
t.InThreadCount, t.Count = t.Count, 0
|
||||
uu = append(uu, t)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return uu, nil
|
||||
}
|
||||
|
||||
// Records channel view
|
||||
// Preset channel unread records for all users (and threads in that channel)
|
||||
//
|
||||
// Whenever channel member is added or a new thread is created
|
||||
// we generate records
|
||||
func (r unread) Preset(channelID, threadID uint64, userIDs ...uint64) (err error) {
|
||||
if channelID == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, userID := range userIDs {
|
||||
if userID == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = r.db().Exec(sqlUnreadPresetChannel, channelID, threadID, userID)
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if threadID == 0 {
|
||||
// Preset for all threads in the channel
|
||||
_, err = r.db().Exec(sqlUnreadPresetThreads, userID, channelID)
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Record channel/thread view
|
||||
func (r *unread) Record(userID, channelID, threadID, lastReadMessageID uint64, count uint32) error {
|
||||
mod := &types.Unread{
|
||||
ChannelID: channelID,
|
||||
@ -87,15 +162,15 @@ func (r *unread) Record(userID, channelID, threadID, lastReadMessageID uint64, c
|
||||
return r.db().Replace("messaging_unread", mod)
|
||||
}
|
||||
|
||||
// Increments unread message count on a channel/thread for all but one user
|
||||
// Inc increments unread message count on a channel/thread for all but one user
|
||||
func (r *unread) Inc(channelID, threadID, userID uint64) error {
|
||||
_, err := r.db().Exec(sqlUnreadIncCount, channelID, threadID, userID)
|
||||
return err
|
||||
}
|
||||
|
||||
// Decrements unread message count on a channel/thread for all but one user
|
||||
// Dec decrements unread message count on a channel/thread for all but one user
|
||||
func (r *unread) Dec(channelID, threadID, userID uint64) error {
|
||||
_, err := r.db().Exec(sqlUnreadDecCount, channelID, threadID, userID)
|
||||
_, err := r.db().Exec(sqlUnreadDecCount, channelID, threadID)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@ -70,7 +70,6 @@ type (
|
||||
Unarchive(ID uint64) (*types.Channel, error)
|
||||
Delete(ID uint64) (*types.Channel, error)
|
||||
Undelete(ID uint64) (*types.Channel, error)
|
||||
RecordView(userID, channelID, lastMessageID uint64) error
|
||||
}
|
||||
)
|
||||
|
||||
@ -145,7 +144,7 @@ func (svc *channel) preloadExtras(cc types.ChannelSet) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = svc.preloadViews(cc); err != nil {
|
||||
if err = svc.preloadUnreads(cc); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -171,19 +170,17 @@ func (svc *channel) preloadMembers(cc types.ChannelSet) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (svc *channel) preloadViews(cc types.ChannelSet) error {
|
||||
func (svc *channel) preloadUnreads(cc types.ChannelSet) error {
|
||||
var userID = auth.GetIdentityFromContext(svc.ctx).Identity()
|
||||
|
||||
if vv, err := svc.unread.Find(&types.UnreadFilter{UserID: userID}); err != nil {
|
||||
return err
|
||||
} else {
|
||||
cc.Walk(func(ch *types.Channel) error {
|
||||
return cc.Walk(func(ch *types.Channel) error {
|
||||
ch.Unread = vv.FindByChannelId(ch.ID)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// FindMembers loads all members (and full users) for a specific channel
|
||||
@ -266,7 +263,7 @@ func (svc *channel) Create(in *types.Channel) (out *types.Channel, err error) {
|
||||
m.ChannelID = out.ID
|
||||
|
||||
// Create member
|
||||
if m, err = svc.cmember.Create(m); err != nil {
|
||||
if m, err = svc.createMember(m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -646,7 +643,7 @@ func (svc *channel) InviteUser(channelID uint64, memberIDs ...uint64) (out types
|
||||
Type: types.ChannelMembershipTypeInvitee,
|
||||
}
|
||||
|
||||
if member, err = svc.cmember.Create(member); err != nil {
|
||||
if member, err = svc.createMember(member); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -724,7 +721,7 @@ func (svc *channel) AddMember(channelID uint64, memberIDs ...uint64) (out types.
|
||||
if exists {
|
||||
member, err = svc.cmember.Update(member)
|
||||
} else {
|
||||
member, err = svc.cmember.Create(member)
|
||||
member, err = svc.createMember(member)
|
||||
}
|
||||
|
||||
svc.event.Join(memberID, channelID)
|
||||
@ -745,6 +742,20 @@ func (svc *channel) AddMember(channelID uint64, memberIDs ...uint64) (out types.
|
||||
})
|
||||
}
|
||||
|
||||
// createMember orchestrates member creation
|
||||
func (svc channel) createMember(member *types.ChannelMember) (m *types.ChannelMember, err error) {
|
||||
if m, err = svc.cmember.Create(member); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Create zero-count unread record
|
||||
if err = svc.unread.Preset(m.ChannelID, 0, m.UserID); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (svc *channel) DeleteMember(channelID uint64, memberIDs ...uint64) (err error) {
|
||||
var (
|
||||
userID = repository.Identity(svc.ctx)
|
||||
@ -795,14 +806,6 @@ func (svc *channel) DeleteMember(channelID uint64, memberIDs ...uint64) (err err
|
||||
})
|
||||
}
|
||||
|
||||
// RecordView
|
||||
// @deprecated
|
||||
func (svc *channel) RecordView(userID, channelID, lastMessageID uint64) error {
|
||||
return svc.db.Transaction(func() (err error) {
|
||||
return svc.unread.Record(userID, channelID, 0, lastMessageID, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func (svc *channel) scheduleSystemMessage(ch *types.Channel, format string, a ...interface{}) {
|
||||
svc.sysmsgs = append(svc.sysmsgs, &types.Message{
|
||||
ChannelID: ch.ID,
|
||||
|
||||
@ -25,7 +25,7 @@ type (
|
||||
attachment repository.AttachmentRepository
|
||||
channel repository.ChannelRepository
|
||||
cmember repository.ChannelMemberRepository
|
||||
unreads repository.UnreadRepository
|
||||
unread repository.UnreadRepository
|
||||
message repository.MessageRepository
|
||||
mflag repository.MessageFlagRepository
|
||||
mentions repository.MentionRepository
|
||||
@ -56,7 +56,7 @@ type (
|
||||
React(messageID uint64, reaction string) error
|
||||
RemoveReaction(messageID uint64, reaction string) error
|
||||
|
||||
MarkAsRead(channelID, threadID, lastReadMessageID uint64) (uint32, error)
|
||||
MarkAsRead(channelID, threadID, lastReadMessageID uint64) (uint64, uint32, error)
|
||||
|
||||
Pin(messageID uint64) error
|
||||
RemovePin(messageID uint64) error
|
||||
@ -96,7 +96,7 @@ func (svc message) With(ctx context.Context) MessageService {
|
||||
attachment: repository.Attachment(ctx, db),
|
||||
channel: repository.Channel(ctx, db),
|
||||
cmember: repository.ChannelMember(ctx, db),
|
||||
unreads: repository.Unread(ctx, db),
|
||||
unread: repository.Unread(ctx, db),
|
||||
message: repository.Message(ctx, db),
|
||||
mflag: repository.MessageFlag(ctx, db),
|
||||
mentions: repository.Mention(ctx, db),
|
||||
@ -240,6 +240,22 @@ func (svc message) Create(in *types.Message) (message *types.Message, err error)
|
||||
|
||||
in.ChannelID = original.ChannelID
|
||||
|
||||
if original.Replies == 0 {
|
||||
// First reply,
|
||||
//
|
||||
// reset unreads for all members
|
||||
var mm types.ChannelMemberSet
|
||||
mm, err = svc.cmember.Find(&types.ChannelMemberFilter{ChannelID: original.ChannelID})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = svc.unread.Preset(original.ChannelID, original.ID, mm.AllMemberIDs()...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Increment counter, on struct and in repository.
|
||||
original.Replies++
|
||||
if err = svc.message.IncReplyCount(original.ID); err != nil {
|
||||
@ -271,7 +287,7 @@ func (svc message) Create(in *types.Message) (message *types.Message, err error)
|
||||
return
|
||||
}
|
||||
|
||||
if err = svc.unreads.Inc(message.ChannelID, message.ReplyTo, message.UserID); err != nil {
|
||||
if err = svc.unread.Inc(message.ChannelID, message.ReplyTo, message.UserID); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -397,7 +413,7 @@ func (svc message) Delete(messageID uint64) error {
|
||||
return
|
||||
}
|
||||
|
||||
if err = svc.unreads.Dec(deletedMsg.ChannelID, deletedMsg.ReplyTo, deletedMsg.UserID); err != nil {
|
||||
if err = svc.unread.Dec(deletedMsg.ChannelID, deletedMsg.ReplyTo, deletedMsg.UserID); err != nil {
|
||||
return err
|
||||
} else {
|
||||
// Set deletedAt timestamp so that our clients can react properly...
|
||||
@ -412,9 +428,15 @@ func (svc message) Delete(messageID uint64) error {
|
||||
})
|
||||
}
|
||||
|
||||
// M
|
||||
func (svc message) MarkAsRead(channelID, threadID, lastReadMessageID uint64) (count uint32, err error) {
|
||||
var currentUserID uint64 = repository.Identity(svc.ctx)
|
||||
// MarkAsRead marks channel/thread as read
|
||||
//
|
||||
// If lastReadMessageID is set, it uses that message as last read message
|
||||
func (svc message) MarkAsRead(channelID, threadID, lastReadMessageID uint64) (uint64, uint32, error) {
|
||||
var (
|
||||
currentUserID uint64 = repository.Identity(svc.ctx)
|
||||
count uint32
|
||||
err error
|
||||
)
|
||||
|
||||
err = svc.db.Transaction(func() (err error) {
|
||||
var ch *types.Channel
|
||||
@ -439,24 +461,37 @@ func (svc message) MarkAsRead(channelID, threadID, lastReadMessageID uint64) (co
|
||||
}
|
||||
|
||||
if lastReadMessageID > 0 {
|
||||
// Validate thread
|
||||
// Validate messageID/threadID/channelID combo
|
||||
if lastMessage, err = svc.message.FindByID(lastReadMessageID); err != nil {
|
||||
return errors.Wrap(err, "unable to verify last message")
|
||||
} else if !lastMessage.IsValid() {
|
||||
return errors.New("invalid message")
|
||||
} else if lastMessage.ChannelID != channelID {
|
||||
return errors.New("last read message not in the same channel")
|
||||
} else if threadID > 0 && lastMessage.ReplyTo != threadID {
|
||||
return errors.New("last read message not in the same thread")
|
||||
}
|
||||
|
||||
count, err = svc.message.CountFromMessageID(channelID, threadID, lastReadMessageID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to count unread messages")
|
||||
}
|
||||
|
||||
} else {
|
||||
// use last message ID
|
||||
if lastReadMessageID, err = svc.message.LastMessageID(channelID, threadID); err != nil {
|
||||
return errors.Wrap(err, "unable to find last message")
|
||||
}
|
||||
|
||||
// no need to count
|
||||
count = 0
|
||||
}
|
||||
|
||||
count, err = svc.message.CountFromMessageID(channelID, threadID, lastReadMessageID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to count unread messages")
|
||||
}
|
||||
|
||||
err = svc.unreads.Record(currentUserID, channelID, threadID, lastReadMessageID, count)
|
||||
err = svc.unread.Record(currentUserID, channelID, threadID, lastReadMessageID, count)
|
||||
return errors.Wrap(err, "unable to record unread messages")
|
||||
})
|
||||
|
||||
return count, errors.Wrap(err, "unable to mark as read")
|
||||
return lastReadMessageID, count, errors.Wrap(err, "unable to mark as read")
|
||||
}
|
||||
|
||||
// React on a message with an emoji
|
||||
@ -577,6 +612,10 @@ func (svc message) preload(mm types.MessageSet) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
if err = svc.preloadUnreads(mm); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = svc.message.PrefillThreadParticipants(mm); err != nil {
|
||||
return
|
||||
}
|
||||
@ -646,6 +685,28 @@ func (svc message) preloadAttachments(mm types.MessageSet) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (svc message) preloadUnreads(mm types.MessageSet) error {
|
||||
var userID = auth.GetIdentityFromContext(svc.ctx).Identity()
|
||||
|
||||
// Filter out only relevant messages -- ones with replies
|
||||
mm, _ = mm.Filter(func(m *types.Message) (b bool, e error) {
|
||||
return m.Replies > 0, nil
|
||||
})
|
||||
|
||||
if len(mm) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if vv, err := svc.unread.Find(&types.UnreadFilter{UserID: userID, ThreadIDs: mm.IDs()}); err != nil {
|
||||
return err
|
||||
} else {
|
||||
return mm.Walk(func(m *types.Message) error {
|
||||
m.Unread = vv.FindByThreadId(m.ID)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Sends message to event loop
|
||||
func (svc message) sendEvent(mm ...*types.Message) (err error) {
|
||||
if err = svc.preload(mm); err != nil {
|
||||
|
||||
@ -63,7 +63,12 @@ func (ctrl *Message) Delete(ctx context.Context, r *request.MessageDelete) (inte
|
||||
}
|
||||
|
||||
func (ctrl *Message) MarkAsRead(ctx context.Context, r *request.MessageMarkAsRead) (interface{}, error) {
|
||||
return ctrl.svc.msg.With(ctx).MarkAsRead(r.ChannelID, r.ThreadID, r.LastReadMessageID)
|
||||
var messageID, count, err = ctrl.svc.msg.With(ctx).MarkAsRead(r.ChannelID, r.ThreadID, r.LastReadMessageID)
|
||||
|
||||
return outgoing.Unread{
|
||||
LastMessageID: messageID,
|
||||
Count: count,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (ctrl *Message) PinCreate(ctx context.Context, r *request.MessagePinCreate) (interface{}, error) {
|
||||
|
||||
@ -154,9 +154,9 @@ var _ RequestFiller = NewMessageExecuteCommand()
|
||||
|
||||
// Message markAsRead request parameters
|
||||
type MessageMarkAsRead struct {
|
||||
ChannelID uint64 `json:",string"`
|
||||
ThreadID uint64 `json:",string"`
|
||||
LastReadMessageID uint64 `json:",string"`
|
||||
ChannelID uint64 `json:",string"`
|
||||
}
|
||||
|
||||
func NewMessageMarkAsRead() *MessageMarkAsRead {
|
||||
@ -166,9 +166,9 @@ func NewMessageMarkAsRead() *MessageMarkAsRead {
|
||||
func (r MessageMarkAsRead) Auditable() map[string]interface{} {
|
||||
var out = map[string]interface{}{}
|
||||
|
||||
out["channelID"] = r.ChannelID
|
||||
out["threadID"] = r.ThreadID
|
||||
out["lastReadMessageID"] = r.LastReadMessageID
|
||||
out["channelID"] = r.ChannelID
|
||||
|
||||
return out
|
||||
}
|
||||
@ -200,13 +200,13 @@ func (r *MessageMarkAsRead) Fill(req *http.Request) (err error) {
|
||||
post[name] = string(param[0])
|
||||
}
|
||||
|
||||
r.ChannelID = parseUInt64(chi.URLParam(req, "channelID"))
|
||||
if val, ok := post["threadID"]; ok {
|
||||
if val, ok := get["threadID"]; ok {
|
||||
r.ThreadID = parseUInt64(val)
|
||||
}
|
||||
if val, ok := post["lastReadMessageID"]; ok {
|
||||
if val, ok := get["lastReadMessageID"]; ok {
|
||||
r.LastReadMessageID = parseUInt64(val)
|
||||
}
|
||||
r.ChannelID = parseUInt64(chi.URLParam(req, "channelID"))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -28,6 +28,8 @@ type (
|
||||
Attachment *Attachment `json:"attachment,omitempty"`
|
||||
Flags MessageFlagSet `json:"flags,omitempty"`
|
||||
|
||||
Unread *Unread `json:"-" db:"-"`
|
||||
|
||||
Mentions MentionSet
|
||||
RepliesFrom []uint64
|
||||
}
|
||||
|
||||
@ -164,3 +164,13 @@ func (set UnreadSet) FindByChannelId(channelID uint64) *Unread {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (set UnreadSet) FindByThreadId(threadID uint64) *Unread {
|
||||
for i := range set {
|
||||
if set[i].ReplyTo == threadID {
|
||||
return set[i]
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -7,10 +7,12 @@ type (
|
||||
UserID uint64 `db:"rel_user"`
|
||||
LastMessageID uint64 `db:"rel_last_message"`
|
||||
|
||||
Count uint32 `db:"count"`
|
||||
Count uint32 `db:"count"`
|
||||
InThreadCount uint32 `db:"-"`
|
||||
}
|
||||
|
||||
UnreadFilter struct {
|
||||
UserID uint64
|
||||
UserID uint64
|
||||
ThreadIDs []uint64
|
||||
}
|
||||
)
|
||||
|
||||
@ -34,9 +34,6 @@ func (s *Session) dispatch(raw []byte) error {
|
||||
case p.ChannelUpdate != nil:
|
||||
return s.channelUpdate(ctx, p.ChannelUpdate)
|
||||
|
||||
// @deprecated
|
||||
case p.ChannelViewRecord != nil:
|
||||
return s.channelViewRecord(ctx, p.ChannelViewRecord)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@ -101,10 +101,3 @@ func (s *Session) channelUpdate(ctx context.Context, p *incoming.ChannelUpdate)
|
||||
_, err = s.svc.ch.With(ctx).Update(ch)
|
||||
return err
|
||||
}
|
||||
|
||||
// @deprecated
|
||||
func (s *Session) channelViewRecord(ctx context.Context, p *incoming.ChannelViewRecord) error {
|
||||
var userID = auth.GetIdentityFromContext(ctx).Identity()
|
||||
|
||||
return s.svc.ch.With(ctx).RecordView(userID, p.ChannelID, p.LastMessageID)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user