Check existing messages when forward backfilling
This commit is contained in:
parent
211f000b28
commit
84aa0c6f4f
2 changed files with 28 additions and 24 deletions
22
backfill.go
22
backfill.go
|
@ -141,20 +141,17 @@ func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after tim
|
||||||
maxTS := portal.lastMessageTS
|
maxTS := portal.lastMessageTS
|
||||||
for i := len(resp.Messages) - 1; i >= 0; i-- {
|
for i := len(resp.Messages) - 1; i >= 0; i-- {
|
||||||
evt := resp.Messages[i]
|
evt := resp.Messages[i]
|
||||||
// TODO this should check the database too
|
isTooOld := !time.UnixMicro(evt.Timestamp).After(after)
|
||||||
if dbMsg := portal.isOutgoingMessage(evt); dbMsg != nil {
|
if portal.handleExistingMessage(ctx, user, evt, isTooOld) || isTooOld {
|
||||||
log.Debug().Str("event_id", dbMsg.MXID.String()).Msg("Got echo for outgoing message in backfill batch")
|
|
||||||
portal.handleExistingMessageUpdate(ctx, user, dbMsg, evt)
|
|
||||||
continue
|
|
||||||
} else if !time.UnixMicro(evt.Timestamp).After(after) {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c := portal.convertGoogleMessage(ctx, user, evt, batchSending)
|
c := portal.convertGoogleMessage(ctx, user, evt, batchSending)
|
||||||
if c != nil {
|
if c == nil {
|
||||||
converted = append(converted, c)
|
continue
|
||||||
if c.Timestamp.After(maxTS) {
|
}
|
||||||
maxTS = c.Timestamp
|
converted = append(converted, c)
|
||||||
}
|
if c.Timestamp.After(maxTS) {
|
||||||
|
maxTS = c.Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(converted) == 0 {
|
if len(converted) == 0 {
|
||||||
|
@ -261,9 +258,6 @@ func (portal *Portal) backfillSendLegacy(ctx context.Context, converted []*Conve
|
||||||
var lastEventID id.EventID
|
var lastEventID id.EventID
|
||||||
eventIDs := make(map[string]id.EventID)
|
eventIDs := make(map[string]id.EventID)
|
||||||
for _, msg := range converted {
|
for _, msg := range converted {
|
||||||
if len(msg.Parts) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
msgEventIDs := portal.sendMessageParts(ctx, msg, eventIDs)
|
msgEventIDs := portal.sendMessageParts(ctx, msg, eventIDs)
|
||||||
if len(msgEventIDs) > 0 {
|
if len(msgEventIDs) > 0 {
|
||||||
eventIDs[msg.ID] = msgEventIDs[0]
|
eventIDs[msg.ID] = msgEventIDs[0]
|
||||||
|
|
30
portal.go
30
portal.go
|
@ -490,6 +490,25 @@ func (portal *Portal) handleExistingMessageUpdate(ctx context.Context, source *U
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (portal *Portal) handleExistingMessage(ctx context.Context, source *User, evt *gmproto.Message, outgoingOnly bool) bool {
|
||||||
|
log := zerolog.Ctx(ctx)
|
||||||
|
if existingMsg := portal.isOutgoingMessage(evt); existingMsg != nil {
|
||||||
|
log.Debug().Str("event_id", existingMsg.MXID.String()).Msg("Got echo for outgoing message")
|
||||||
|
portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt)
|
||||||
|
return true
|
||||||
|
} else if outgoingOnly {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
existingMsg, err := portal.bridge.DB.Message.GetByID(ctx, portal.Key, evt.MessageID)
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("Failed to check if message is duplicate")
|
||||||
|
} else if existingMsg != nil {
|
||||||
|
portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (portal *Portal) handleMessage(source *User, evt *gmproto.Message) {
|
func (portal *Portal) handleMessage(source *User, evt *gmproto.Message) {
|
||||||
if len(portal.MXID) == 0 {
|
if len(portal.MXID) == 0 {
|
||||||
portal.zlog.Warn().Msg("handleMessage called even though portal.MXID is empty")
|
portal.zlog.Warn().Msg("handleMessage called even though portal.MXID is empty")
|
||||||
|
@ -506,16 +525,7 @@ func (portal *Portal) handleMessage(source *User, evt *gmproto.Message) {
|
||||||
Str("action", "handle google message").
|
Str("action", "handle google message").
|
||||||
Logger()
|
Logger()
|
||||||
ctx := log.WithContext(context.TODO())
|
ctx := log.WithContext(context.TODO())
|
||||||
if existingMsg := portal.isOutgoingMessage(evt); existingMsg != nil {
|
if portal.handleExistingMessage(ctx, source, evt, false) {
|
||||||
log.Debug().Str("event_id", existingMsg.MXID.String()).Msg("Got echo for outgoing message")
|
|
||||||
portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
existingMsg, err := portal.bridge.DB.Message.GetByID(ctx, portal.Key, evt.MessageID)
|
|
||||||
if err != nil {
|
|
||||||
log.Err(err).Msg("Failed to check if message is duplicate")
|
|
||||||
} else if existingMsg != nil {
|
|
||||||
portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if evt.GetMessageStatus().GetStatus() == gmproto.MessageStatusType_MESSAGE_DELETED {
|
if evt.GetMessageStatus().GetStatus() == gmproto.MessageStatusType_MESSAGE_DELETED {
|
||||||
|
|
Loading…
Reference in a new issue