Delay missed backfill if messages are recent
This commit is contained in:
parent
c3be0a30fa
commit
439f5d1033
3 changed files with 32 additions and 7 deletions
32
backfill.go
32
backfill.go
|
@ -43,17 +43,41 @@ func (portal *Portal) initialForwardBackfill(user *User, markRead bool) {
|
||||||
portal.forwardBackfill(ctx, user, time.Time{}, portal.bridge.Config.Bridge.Backfill.InitialLimit, markRead)
|
portal.forwardBackfill(ctx, user, time.Time{}, portal.bridge.Config.Bridge.Backfill.InitialLimit, markRead)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const recentBackfillDelay = 15 * time.Second
|
||||||
|
|
||||||
|
type pendingBackfill struct {
|
||||||
|
cancel context.CancelFunc
|
||||||
|
lastMessageID string
|
||||||
|
lastMessageTS time.Time
|
||||||
|
}
|
||||||
|
|
||||||
func (portal *Portal) missedForwardBackfill(user *User, lastMessageTS time.Time, lastMessageID string, markRead bool) bool {
|
func (portal *Portal) missedForwardBackfill(user *User, lastMessageTS time.Time, lastMessageID string, markRead bool) bool {
|
||||||
if portal.bridge.Config.Bridge.Backfill.MissedLimit == 0 {
|
if portal.bridge.Config.Bridge.Backfill.MissedLimit == 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
log := portal.zlog.With().
|
||||||
|
Str("action", "missed forward backfill").
|
||||||
|
Str("latest_message_id", lastMessageID).
|
||||||
|
Logger()
|
||||||
|
ctx := log.WithContext(context.TODO())
|
||||||
|
if time.Since(lastMessageTS) < 5*time.Minute {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithCancel(ctx)
|
||||||
|
prev := portal.pendingRecentBackfill.Swap(&pendingBackfill{cancel: cancel, lastMessageID: lastMessageID, lastMessageTS: lastMessageTS})
|
||||||
|
if prev != nil {
|
||||||
|
prev.cancel()
|
||||||
|
}
|
||||||
|
log.Debug().Msg("Delaying missed forward backfill as latest message is new")
|
||||||
|
select {
|
||||||
|
case <-time.After(recentBackfillDelay):
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Debug().Msg("Backfill was cancelled by a newer backfill")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
portal.forwardBackfillLock.Lock()
|
portal.forwardBackfillLock.Lock()
|
||||||
defer portal.forwardBackfillLock.Unlock()
|
defer portal.forwardBackfillLock.Unlock()
|
||||||
log := portal.zlog.With().
|
|
||||||
Str("action", "missed forward backfill").
|
|
||||||
Logger()
|
|
||||||
ctx := log.WithContext(context.TODO())
|
|
||||||
if !lastMessageTS.IsZero() {
|
if !lastMessageTS.IsZero() {
|
||||||
if portal.lastMessageTS.IsZero() {
|
if portal.lastMessageTS.IsZero() {
|
||||||
lastMsg, err := portal.bridge.DB.Message.GetLastInChat(ctx, portal.Key)
|
lastMsg, err := portal.bridge.DB.Message.GetLastInChat(ctx, portal.Key)
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
_ "image/gif"
|
_ "image/gif"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gabriel-vasile/mimetype"
|
"github.com/gabriel-vasile/mimetype"
|
||||||
|
@ -239,6 +240,8 @@ type Portal struct {
|
||||||
forwardBackfillLock sync.Mutex
|
forwardBackfillLock sync.Mutex
|
||||||
lastMessageTS time.Time
|
lastMessageTS time.Time
|
||||||
|
|
||||||
|
pendingRecentBackfill atomic.Pointer[pendingBackfill]
|
||||||
|
|
||||||
recentlyHandled [recentlyHandledLength]string
|
recentlyHandled [recentlyHandledLength]string
|
||||||
recentlyHandledLock sync.Mutex
|
recentlyHandledLock sync.Mutex
|
||||||
recentlyHandledIndex uint8
|
recentlyHandledIndex uint8
|
||||||
|
@ -270,8 +273,6 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
|
||||||
switch {
|
switch {
|
||||||
case msg.evt != nil:
|
case msg.evt != nil:
|
||||||
portal.handleMessage(msg.source, msg.evt)
|
portal.handleMessage(msg.source, msg.evt)
|
||||||
//case msg.receipt != nil:
|
|
||||||
// portal.handleReceipt(msg.receipt, msg.source)
|
|
||||||
default:
|
default:
|
||||||
portal.zlog.Warn().Interface("portal_message", msg).Msg("Unexpected PortalMessage with no message")
|
portal.zlog.Warn().Interface("portal_message", msg).Msg("Unexpected PortalMessage with no message")
|
||||||
}
|
}
|
||||||
|
|
2
user.go
2
user.go
|
@ -831,8 +831,8 @@ func (user *User) syncConversation(v *gmproto.Conversation, source string) {
|
||||||
}
|
}
|
||||||
log.Debug().Msg("Syncing existing portal")
|
log.Debug().Msg("Syncing existing portal")
|
||||||
portal.UpdateMetadata(user, v)
|
portal.UpdateMetadata(user, v)
|
||||||
didBackfill := portal.missedForwardBackfill(user, time.UnixMicro(v.LastMessageTimestamp), v.LatestMessageID, !v.GetUnread())
|
|
||||||
user.syncChatDoublePuppetDetails(portal, v, false)
|
user.syncChatDoublePuppetDetails(portal, v, false)
|
||||||
|
didBackfill := portal.missedForwardBackfill(user, time.UnixMicro(v.LastMessageTimestamp), v.LatestMessageID, !v.GetUnread())
|
||||||
if !didBackfill && !v.GetUnread() && v.LatestMessageID != "" && user.DoublePuppetIntent != nil {
|
if !didBackfill && !v.GetUnread() && v.LatestMessageID != "" && user.DoublePuppetIntent != nil {
|
||||||
// TODO this would spam a lot of read receipts on startup
|
// TODO this would spam a lot of read receipts on startup
|
||||||
//user.markSelfReadFull(portal, v.LatestMessageID)
|
//user.markSelfReadFull(portal, v.LatestMessageID)
|
||||||
|
|
Loading…
Reference in a new issue