Don't send notifications from batch sending if last message is old

This commit is contained in:
Tulir Asokan 2023-08-15 11:05:21 +03:00
parent aa12c2e5d2
commit 18288b5ea5
5 changed files with 18 additions and 14 deletions

View file

@ -32,7 +32,7 @@ import (
"go.mau.fi/mautrix-gmessages/database" "go.mau.fi/mautrix-gmessages/database"
) )
func (portal *Portal) initialForwardBackfill(user *User, markRead bool) { func (portal *Portal) initialForwardBackfill(user *User, markRead, allowNotify bool) {
// This is only called from CreateMatrixRoom which locks forwardBackfillLock // This is only called from CreateMatrixRoom which locks forwardBackfillLock
defer portal.forwardBackfillLock.Unlock() defer portal.forwardBackfillLock.Unlock()
log := portal.zlog.With(). log := portal.zlog.With().
@ -40,7 +40,7 @@ func (portal *Portal) initialForwardBackfill(user *User, markRead bool) {
Logger() Logger()
ctx := log.WithContext(context.TODO()) ctx := log.WithContext(context.TODO())
portal.forwardBackfill(ctx, user, time.Time{}, portal.bridge.Config.Bridge.Backfill.InitialLimit, markRead) portal.forwardBackfill(ctx, user, time.Time{}, portal.bridge.Config.Bridge.Backfill.InitialLimit, markRead, allowNotify)
} }
const recentBackfillDelay = 15 * time.Second const recentBackfillDelay = 15 * time.Second
@ -111,7 +111,7 @@ func (portal *Portal) missedForwardBackfill(user *User, lastMessageTS time.Time,
Str("latest_message_id", lastMessageID). Str("latest_message_id", lastMessageID).
Time("last_bridged_ts", portal.lastMessageTS). Time("last_bridged_ts", portal.lastMessageTS).
Msg("Backfilling missed messages") Msg("Backfilling missed messages")
portal.forwardBackfill(ctx, user, portal.lastMessageTS, portal.bridge.Config.Bridge.Backfill.MissedLimit, markRead) portal.forwardBackfill(ctx, user, portal.lastMessageTS, portal.bridge.Config.Bridge.Backfill.MissedLimit, markRead, true)
} }
func (portal *Portal) deterministicEventID(messageID string, part int) id.EventID { func (portal *Portal) deterministicEventID(messageID string, part int) id.EventID {
@ -120,7 +120,7 @@ func (portal *Portal) deterministicEventID(messageID string, part int) id.EventI
return id.EventID(fmt.Sprintf("$%s:messages.google.com", base64.RawURLEncoding.EncodeToString(sum[:]))) return id.EventID(fmt.Sprintf("$%s:messages.google.com", base64.RawURLEncoding.EncodeToString(sum[:])))
} }
func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after time.Time, limit int, markRead bool) bool { func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after time.Time, limit int, markRead, allowNotify bool) bool {
if limit == 0 { if limit == 0 {
return false return false
} }
@ -138,7 +138,7 @@ func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after tim
batchSending := portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending) batchSending := portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending)
converted := make([]*ConvertedMessage, 0, len(resp.Messages)) converted := make([]*ConvertedMessage, 0, len(resp.Messages))
maxTS := portal.lastMessageTS maxTS := time.Time{}
for i := len(resp.Messages) - 1; i >= 0; i-- { for i := len(resp.Messages) - 1; i >= 0; i-- {
evt := resp.Messages[i] evt := resp.Messages[i]
isTooOld := !time.UnixMicro(evt.Timestamp).After(after) isTooOld := !time.UnixMicro(evt.Timestamp).After(after)
@ -167,7 +167,8 @@ func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after tim
if markRead { if markRead {
markReadBy = user.MXID markReadBy = user.MXID
} }
portal.backfillSendBatch(ctx, converted, markReadBy) allowNotify = allowNotify && time.Since(maxTS) < 24*time.Hour
portal.backfillSendBatch(ctx, converted, markReadBy, allowNotify)
} else { } else {
lastEventID := portal.backfillSendLegacy(ctx, converted) lastEventID := portal.backfillSendLegacy(ctx, converted)
if markRead && user.DoublePuppetIntent != nil { if markRead && user.DoublePuppetIntent != nil {
@ -177,11 +178,13 @@ func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after tim
} }
} }
} }
if maxTS.After(portal.lastMessageTS) {
portal.lastMessageTS = maxTS portal.lastMessageTS = maxTS
}
return true return true
} }
func (portal *Portal) backfillSendBatch(ctx context.Context, converted []*ConvertedMessage, markReadBy id.UserID) { func (portal *Portal) backfillSendBatch(ctx context.Context, converted []*ConvertedMessage, markReadBy id.UserID, allowNotify bool) {
log := zerolog.Ctx(ctx) log := zerolog.Ctx(ctx)
events := make([]*event.Event, 0, len(converted)) events := make([]*event.Event, 0, len(converted))
dbMessages := make([]*database.Message, 0, len(converted)) dbMessages := make([]*database.Message, 0, len(converted))
@ -250,7 +253,7 @@ func (portal *Portal) backfillSendBatch(ctx context.Context, converted []*Conver
_, err := portal.MainIntent().BeeperBatchSend(portal.MXID, &mautrix.ReqBeeperBatchSend{ _, err := portal.MainIntent().BeeperBatchSend(portal.MXID, &mautrix.ReqBeeperBatchSend{
Forward: forward, Forward: forward,
MarkReadBy: markReadBy, MarkReadBy: markReadBy,
SendNotification: forward && markReadBy == "", SendNotification: forward && markReadBy == "" && allowNotify,
Events: events, Events: events,
}) })
if err != nil { if err != nil {

View file

@ -354,7 +354,7 @@ func fnPM(ce *WrappedCommandEvent) {
ce.Reply("Failed to start chat: no conversation in response") ce.Reply("Failed to start chat: no conversation in response")
} else if portal := ce.User.GetPortalByID(resp.Conversation.ConversationID); portal.MXID != "" { } else if portal := ce.User.GetPortalByID(resp.Conversation.ConversationID); portal.MXID != "" {
ce.Reply("Chat already exists at [%s](https://matrix.to/#/%s)", portal.MXID, portal.MXID) ce.Reply("Chat already exists at [%s](https://matrix.to/#/%s)", portal.MXID, portal.MXID)
} else if err = portal.CreateMatrixRoom(ce.User, resp.Conversation); err != nil { } else if err = portal.CreateMatrixRoom(ce.User, resp.Conversation, false); err != nil {
ce.ZLog.Err(err).Msg("Failed to create matrix room") ce.ZLog.Err(err).Msg("Failed to create matrix room")
ce.Reply("Failed to create portal room for conversation") ce.Reply("Failed to create portal room for conversation")
} else { } else {

View file

@ -1278,7 +1278,7 @@ func (portal *Portal) GetEncryptionEventContent() (evt *event.EncryptionEventCon
return return
} }
func (portal *Portal) CreateMatrixRoom(user *User, conv *gmproto.Conversation) error { func (portal *Portal) CreateMatrixRoom(user *User, conv *gmproto.Conversation, isFromSync bool) error {
portal.roomCreateLock.Lock() portal.roomCreateLock.Lock()
defer portal.roomCreateLock.Unlock() defer portal.roomCreateLock.Unlock()
if len(portal.MXID) > 0 { if len(portal.MXID) > 0 {
@ -1418,7 +1418,8 @@ func (portal *Portal) CreateMatrixRoom(user *User, conv *gmproto.Conversation) e
portal.ensureUserInvited(user) portal.ensureUserInvited(user)
} }
user.syncChatDoublePuppetDetails(portal, conv, true) user.syncChatDoublePuppetDetails(portal, conv, true)
go portal.initialForwardBackfill(user, !conv.GetUnread()) allowNotify := !isFromSync
go portal.initialForwardBackfill(user, !conv.GetUnread(), allowNotify)
go portal.addToPersonalSpace(user, true) go portal.addToPersonalSpace(user, true)
return nil return nil
} }

View file

@ -242,7 +242,7 @@ func (prov *ProvisioningAPI) StartChat(w http.ResponseWriter, r *http.Request) {
convCopy.LatestMessage = nil convCopy.LatestMessage = nil
prov.zlog.Debug().Any("conversation_data", convCopy).Msg("Got conversation data for start chat") prov.zlog.Debug().Any("conversation_data", convCopy).Msg("Got conversation data for start chat")
portal := user.GetPortalByID(resp.Conversation.ConversationID) portal := user.GetPortalByID(resp.Conversation.ConversationID)
err = portal.CreateMatrixRoom(user, resp.Conversation) err = portal.CreateMatrixRoom(user, resp.Conversation, false)
if err != nil { if err != nil {
prov.zlog.Err(err).Msg("Failed to create matrix room") prov.zlog.Err(err).Msg("Failed to create matrix room")
jsonResponse(w, http.StatusInternalServerError, Error{ jsonResponse(w, http.StatusInternalServerError, Error{

View file

@ -828,7 +828,7 @@ func (user *User) syncConversation(v *gmproto.Conversation, source string) {
return return
} }
log.Debug().Msg("Creating portal for conversation") log.Debug().Msg("Creating portal for conversation")
err := portal.CreateMatrixRoom(user, v) err := portal.CreateMatrixRoom(user, v, source == "sync")
if err != nil { if err != nil {
log.Err(err).Msg("Error creating Matrix room from conversation event") log.Err(err).Msg("Error creating Matrix room from conversation event")
} }