From 84aa0c6f4fa0134a5f45c5d9b1843a8c16079f2e Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 10 Aug 2023 11:35:20 +0300 Subject: [PATCH] Check existing messages when forward backfilling --- backfill.go | 22 ++++++++-------------- portal.go | 30 ++++++++++++++++++++---------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/backfill.go b/backfill.go index b4b9127..01bd3b2 100644 --- a/backfill.go +++ b/backfill.go @@ -141,20 +141,17 @@ func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after tim maxTS := portal.lastMessageTS for i := len(resp.Messages) - 1; i >= 0; i-- { evt := resp.Messages[i] - // TODO this should check the database too - if dbMsg := portal.isOutgoingMessage(evt); dbMsg != nil { - log.Debug().Str("event_id", dbMsg.MXID.String()).Msg("Got echo for outgoing message in backfill batch") - portal.handleExistingMessageUpdate(ctx, user, dbMsg, evt) - continue - } else if !time.UnixMicro(evt.Timestamp).After(after) { + isTooOld := !time.UnixMicro(evt.Timestamp).After(after) + if portal.handleExistingMessage(ctx, user, evt, isTooOld) || isTooOld { continue } c := portal.convertGoogleMessage(ctx, user, evt, batchSending) - if c != nil { - converted = append(converted, c) - if c.Timestamp.After(maxTS) { - maxTS = c.Timestamp - } + if c == nil { + continue + } + converted = append(converted, c) + if c.Timestamp.After(maxTS) { + maxTS = c.Timestamp } } if len(converted) == 0 { @@ -261,9 +258,6 @@ func (portal *Portal) backfillSendLegacy(ctx context.Context, converted []*Conve var lastEventID id.EventID eventIDs := make(map[string]id.EventID) for _, msg := range converted { - if len(msg.Parts) == 0 { - continue - } msgEventIDs := portal.sendMessageParts(ctx, msg, eventIDs) if len(msgEventIDs) > 0 { eventIDs[msg.ID] = msgEventIDs[0] diff --git a/portal.go b/portal.go index a54ebc5..f312943 100644 --- a/portal.go +++ b/portal.go @@ -490,6 +490,25 @@ func (portal *Portal) handleExistingMessageUpdate(ctx context.Context, source *U } } +func (portal *Portal) handleExistingMessage(ctx context.Context, source *User, evt *gmproto.Message, outgoingOnly bool) bool { + log := zerolog.Ctx(ctx) + if existingMsg := portal.isOutgoingMessage(evt); existingMsg != nil { + log.Debug().Str("event_id", existingMsg.MXID.String()).Msg("Got echo for outgoing message") + portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt) + return true + } else if outgoingOnly { + return false + } + existingMsg, err := portal.bridge.DB.Message.GetByID(ctx, portal.Key, evt.MessageID) + if err != nil { + log.Err(err).Msg("Failed to check if message is duplicate") + } else if existingMsg != nil { + portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt) + return true + } + return false +} + func (portal *Portal) handleMessage(source *User, evt *gmproto.Message) { if len(portal.MXID) == 0 { portal.zlog.Warn().Msg("handleMessage called even though portal.MXID is empty") @@ -506,16 +525,7 @@ func (portal *Portal) handleMessage(source *User, evt *gmproto.Message) { Str("action", "handle google message"). Logger() ctx := log.WithContext(context.TODO()) - if existingMsg := portal.isOutgoingMessage(evt); existingMsg != nil { - log.Debug().Str("event_id", existingMsg.MXID.String()).Msg("Got echo for outgoing message") - portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt) - return - } - existingMsg, err := portal.bridge.DB.Message.GetByID(ctx, portal.Key, evt.MessageID) - if err != nil { - log.Err(err).Msg("Failed to check if message is duplicate") - } else if existingMsg != nil { - portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt) + if portal.handleExistingMessage(ctx, source, evt, false) { return } if evt.GetMessageStatus().GetStatus() == gmproto.MessageStatusType_MESSAGE_DELETED {