Add basic forward backfill support

This commit is contained in:
Tulir Asokan 2023-07-12 01:57:07 +03:00
parent c802d2bced
commit 1f45d5bdec
7 changed files with 410 additions and 62 deletions

212
backfill.go Normal file
View file

@ -0,0 +1,212 @@
// mautrix-gmessages - A Matrix-Google Messages puppeting bridge.
// Copyright (C) 2023 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package main
import (
"context"
"crypto/sha256"
"encoding/base64"
"fmt"
"time"
"github.com/rs/zerolog"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
"go.mau.fi/mautrix-gmessages/database"
)
func (portal *Portal) initialForwardBackfill(user *User) {
// This is only called from CreateMatrixRoom which locks forwardBackfillLock
defer portal.forwardBackfillLock.Unlock()
log := portal.zlog.With().
Str("action", "initial forward backfill").
Logger()
ctx := log.WithContext(context.TODO())
portal.forwardBackfill(ctx, user, time.Time{}, 50)
}
func (portal *Portal) missedForwardBackfill(user *User, lastMessageTS time.Time, lastMessageID string) {
portal.forwardBackfillLock.Lock()
defer portal.forwardBackfillLock.Unlock()
log := portal.zlog.With().
Str("action", "missed forward backfill").
Logger()
ctx := log.WithContext(context.TODO())
if portal.lastMessageTS.IsZero() {
lastMsg, err := portal.bridge.DB.Message.GetLastInChat(ctx, portal.Key)
if err != nil {
log.Err(err).Msg("Failed to get last message in chat")
return
} else if lastMsg == nil {
log.Debug().Msg("No messages in chat")
} else {
portal.lastMessageTS = lastMsg.Timestamp
}
}
if !lastMessageTS.After(portal.lastMessageTS) {
log.Trace().
Time("latest_message_ts", lastMessageTS).
Str("latest_message_id", lastMessageID).
Time("last_bridged_ts", portal.lastMessageTS).
Msg("Nothing to backfill")
return
}
log.Info().
Time("latest_message_ts", lastMessageTS).
Str("latest_message_id", lastMessageID).
Time("last_bridged_ts", portal.lastMessageTS).
Msg("Backfilling missed messages")
portal.forwardBackfill(ctx, user, portal.lastMessageTS, 100)
}
func (portal *Portal) deterministicEventID(messageID string, part int) id.EventID {
data := fmt.Sprintf("%s/gmessages/%s/%d", portal.MXID, messageID, part)
sum := sha256.Sum256([]byte(data))
return id.EventID(fmt.Sprintf("$%s:messages.google.com", base64.RawURLEncoding.EncodeToString(sum[:])))
}
func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after time.Time, limit int64) {
log := zerolog.Ctx(ctx)
resp, err := user.Client.Conversations.FetchMessages(portal.ID, limit, nil)
if err != nil {
portal.zlog.Error().Err(err).Msg("Failed to fetch messages")
return
}
log.Debug().
Int64("total_messages", resp.TotalMessages).
Int("message_count", len(resp.Messages)).
Msg("Got message chunk to backfill")
batchSending := portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending)
converted := make([]*ConvertedMessage, 0, len(resp.Messages))
maxTS := portal.lastMessageTS
for i := len(resp.Messages) - 1; i >= 0; i-- {
evt := resp.Messages[i]
// TODO this should check the database too
if evtID := portal.isOutgoingMessage(evt); evtID != "" {
log.Debug().Str("event_id", evtID.String()).Msg("Got echo for outgoing message in backfill batch")
continue
} else if !time.UnixMicro(evt.Timestamp).After(after) {
continue
}
c := portal.convertGoogleMessage(ctx, user, evt, batchSending)
if c != nil {
converted = append(converted, c)
if c.Timestamp.After(maxTS) {
maxTS = c.Timestamp
}
}
}
log.Debug().
Int("converted_count", len(converted)).
Msg("Converted messages for backfill")
if batchSending {
portal.backfillSendBatch(ctx, converted)
} else {
portal.backfillSendLegacy(ctx, converted)
}
portal.lastMessageTS = maxTS
}
func (portal *Portal) backfillSendBatch(ctx context.Context, converted []*ConvertedMessage) {
log := zerolog.Ctx(ctx)
events := make([]*event.Event, 0, len(converted))
dbMessages := make([]*database.Message, 0, len(converted))
for _, msg := range converted {
dbm := portal.bridge.DB.Message.New()
dbm.Chat = portal.Key
dbm.ID = msg.ID
dbm.Sender = msg.SenderID
dbm.Timestamp = msg.Timestamp
for i, part := range msg.Parts {
content := event.Content{
Parsed: part.Content,
Raw: part.Extra,
}
eventType := event.EventMessage
var err error
eventType, err = portal.encrypt(msg.Intent, &content, eventType)
if err != nil {
log.Err(err).Str("message_id", msg.ID).Int("part", i).Msg("Failed to encrypt event")
continue
}
msg.Intent.AddDoublePuppetValue(&content)
evt := &event.Event{
Sender: msg.Intent.UserID,
Type: eventType,
Timestamp: msg.Timestamp.UnixMilli(),
ID: portal.deterministicEventID(msg.ID, i),
RoomID: portal.MXID,
Content: content,
}
events = append(events, evt)
if dbm.MXID == "" {
dbm.MXID = evt.ID
}
}
if dbm.MXID != "" {
dbMessages = append(dbMessages, dbm)
}
}
_, err := portal.MainIntent().BeeperBatchSend(portal.MXID, &mautrix.ReqBeeperBatchSend{
Forward: true,
MarkReadBy: "",
Events: events,
})
if err != nil {
log.Err(err).Msg("Failed to send batch of messages")
return
}
err = portal.bridge.DB.Message.MassInsert(ctx, dbMessages)
if err != nil {
log.Err(err).Msg("Failed to insert messages to database")
}
}
func (portal *Portal) backfillSendLegacy(ctx context.Context, converted []*ConvertedMessage) {
log := zerolog.Ctx(ctx)
eventIDs := make(map[string]id.EventID)
for _, msg := range converted {
var eventID id.EventID
for i, part := range msg.Parts {
if msg.ReplyTo != "" && part.Content.RelatesTo == nil {
replyToEvent, ok := eventIDs[msg.ReplyTo]
if ok {
part.Content.RelatesTo = &event.RelatesTo{
InReplyTo: &event.InReplyTo{EventID: replyToEvent},
}
}
}
resp, err := portal.sendMessage(msg.Intent, event.EventMessage, part.Content, part.Extra, msg.Timestamp.UnixMilli())
if err != nil {
log.Err(err).Str("message_id", msg.ID).Int("part", i).Msg("Failed to send message")
} else if eventID == "" {
eventID = resp.EventID
eventIDs[msg.ID] = resp.EventID
}
}
if eventID != "" {
portal.markHandled(msg, eventID, false)
}
}
}

View file

@ -20,10 +20,13 @@ import (
"context" "context"
"database/sql" "database/sql"
"errors" "errors"
"fmt"
"strings"
"time" "time"
log "maunium.net/go/maulogger/v2" log "maunium.net/go/maulogger/v2"
"go.mau.fi/mautrix-gmessages/libgm/binary"
"maunium.net/go/mautrix/id" "maunium.net/go/mautrix/id"
"maunium.net/go/mautrix/util/dbutil" "maunium.net/go/mautrix/util/dbutil"
) )
@ -44,11 +47,16 @@ func (mq *MessageQuery) getDB() *Database {
const ( const (
getMessageByIDQuery = ` getMessageByIDQuery = `
SELECT conv_id, conv_receiver, id, mxid, sender, timestamp FROM message SELECT conv_id, conv_receiver, id, mxid, sender, timestamp, status FROM message
WHERE conv_id=$1 AND conv_receiver=$2 AND id=$3 WHERE conv_id=$1 AND conv_receiver=$2 AND id=$3
` `
getLastMessageInChatQuery = `
SELECT conv_id, conv_receiver, id, mxid, sender, timestamp, status FROM message
WHERE conv_id=$1 AND conv_receiver=$2
ORDER BY timestamp DESC LIMIT 1
`
getMessageByMXIDQuery = ` getMessageByMXIDQuery = `
SELECT conv_id, conv_receiver, id, mxid, sender, timestamp FROM message SELECT conv_id, conv_receiver, id, mxid, sender, timestamp, status FROM message
WHERE mxid=$1 WHERE mxid=$1
` `
) )
@ -61,6 +69,14 @@ func (mq *MessageQuery) GetByMXID(ctx context.Context, mxid id.EventID) (*Messag
return get[*Message](mq, ctx, getMessageByMXIDQuery, mxid) return get[*Message](mq, ctx, getMessageByMXIDQuery, mxid)
} }
func (mq *MessageQuery) GetLastInChat(ctx context.Context, chat Key) (*Message, error) {
return get[*Message](mq, ctx, getLastMessageInChatQuery, chat.ID, chat.Receiver)
}
type MessageStatus struct {
Type binary.MessageStatusType
}
type Message struct { type Message struct {
db *Database db *Database
log log.Logger log log.Logger
@ -70,11 +86,12 @@ type Message struct {
MXID id.EventID MXID id.EventID
Sender string Sender string
Timestamp time.Time Timestamp time.Time
Status MessageStatus
} }
func (msg *Message) Scan(row dbutil.Scannable) (*Message, error) { func (msg *Message) Scan(row dbutil.Scannable) (*Message, error) {
var ts int64 var ts int64
err := row.Scan(&msg.Chat.ID, &msg.Chat.Receiver, &msg.ID, &msg.MXID, &msg.Sender, &ts) err := row.Scan(&msg.Chat.ID, &msg.Chat.Receiver, &msg.ID, &msg.MXID, &msg.Sender, &ts, dbutil.JSON{Data: &msg.Status})
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
return nil, nil return nil, nil
} else if err != nil { } else if err != nil {
@ -87,17 +104,48 @@ func (msg *Message) Scan(row dbutil.Scannable) (*Message, error) {
} }
func (msg *Message) sqlVariables() []any { func (msg *Message) sqlVariables() []any {
return []any{msg.Chat.ID, msg.Chat.Receiver, msg.ID, msg.MXID, msg.Sender, msg.Timestamp.UnixMicro()} return []any{msg.Chat.ID, msg.Chat.Receiver, msg.ID, msg.MXID, msg.Sender, msg.Timestamp.UnixMicro(), dbutil.JSON{Data: &msg.Status}}
} }
func (msg *Message) Insert(ctx context.Context) error { func (msg *Message) Insert(ctx context.Context) error {
_, err := msg.db.Conn(ctx).ExecContext(ctx, ` _, err := msg.db.Conn(ctx).ExecContext(ctx, `
INSERT INTO message (conv_id, conv_receiver, id, mxid, sender, timestamp) INSERT INTO message (conv_id, conv_receiver, id, mxid, sender, timestamp, status)
VALUES ($1, $2, $3, $4, $5, $6) VALUES ($1, $2, $3, $4, $5, $6, $7)
`, msg.sqlVariables()...) `, msg.sqlVariables()...)
return err return err
} }
func (mq *MessageQuery) MassInsert(ctx context.Context, messages []*Message) error {
valueStringFormat := "($1, $2, $%d, $%d, $%d, $%d, $%d)"
if mq.db.Dialect == dbutil.SQLite {
valueStringFormat = strings.ReplaceAll(valueStringFormat, "$", "?")
}
placeholders := make([]string, len(messages))
params := make([]any, 2+len(messages)*5)
params[0] = messages[0].Chat.ID
params[1] = messages[0].Chat.Receiver
for i, msg := range messages {
baseIndex := 2 + i*5
params[baseIndex] = msg.ID
params[baseIndex+1] = msg.MXID
params[baseIndex+2] = msg.Sender
params[baseIndex+3] = msg.Timestamp.UnixMicro()
params[baseIndex+4] = dbutil.JSON{Data: &msg.Status}
placeholders[i] = fmt.Sprintf(valueStringFormat, baseIndex+1, baseIndex+2, baseIndex+3, baseIndex+4, baseIndex+5)
}
query := `
INSERT INTO message (conv_id, conv_receiver, id, mxid, sender, timestamp, status)
VALUES
` + strings.Join(placeholders, ",")
_, err := mq.db.Conn(ctx).ExecContext(ctx, query, params...)
return err
}
func (msg *Message) UpdateStatus(ctx context.Context) error {
_, err := msg.db.Conn(ctx).ExecContext(ctx, "UPDATE message SET status=$1 WHERE conv_id=$2 AND conv_receiver=$3 AND id=$4", dbutil.JSON{Data: &msg.Status}, msg.Chat.ID, msg.Chat.Receiver, msg.ID)
return err
}
func (msg *Message) Delete(ctx context.Context) error { func (msg *Message) Delete(ctx context.Context) error {
_, err := msg.db.Conn(ctx).ExecContext(ctx, "DELETE FROM message WHERE conv_id=$1 AND conv_receiver=$2 AND id=$3", msg.Chat.ID, msg.Chat.Receiver, msg.ID) _, err := msg.db.Conn(ctx).ExecContext(ctx, "DELETE FROM message WHERE conv_id=$1 AND conv_receiver=$2 AND id=$3", msg.Chat.ID, msg.Chat.Receiver, msg.ID)
return err return err

View file

@ -27,9 +27,9 @@ CREATE TABLE puppet (
avatar_set BOOLEAN NOT NULL DEFAULT false, avatar_set BOOLEAN NOT NULL DEFAULT false,
contact_info_set BOOLEAN NOT NULL DEFAULT false, contact_info_set BOOLEAN NOT NULL DEFAULT false,
FOREIGN KEY (receiver) REFERENCES "user"(rowid) ON DELETE CASCADE, PRIMARY KEY (id, receiver),
UNIQUE (phone, receiver), CONSTRAINT puppet_user_fkey FOREIGN KEY (receiver) REFERENCES "user"(rowid) ON DELETE CASCADE,
PRIMARY KEY (id, receiver) CONSTRAINT puppet_phone_unique UNIQUE (phone, receiver)
); );
CREATE TABLE portal ( CREATE TABLE portal (
@ -46,9 +46,9 @@ CREATE TABLE portal (
encrypted BOOLEAN NOT NULL DEFAULT false, encrypted BOOLEAN NOT NULL DEFAULT false,
in_space BOOLEAN NOT NULL DEFAULT false, in_space BOOLEAN NOT NULL DEFAULT false,
FOREIGN KEY (receiver) REFERENCES "user"(rowid) ON DELETE CASCADE, PRIMARY KEY (id, receiver),
FOREIGN KEY (other_user, receiver) REFERENCES puppet(id, receiver) ON DELETE CASCADE, CONSTRAINT portal_user_fkey FOREIGN KEY (receiver) REFERENCES "user"(rowid) ON DELETE CASCADE,
PRIMARY KEY (id, receiver) CONSTRAINT portal_puppet_fkey FOREIGN KEY (other_user, receiver) REFERENCES puppet(id, receiver) ON DELETE CASCADE
); );
CREATE TABLE message ( CREATE TABLE message (
@ -58,7 +58,8 @@ CREATE TABLE message (
mxid TEXT NOT NULL UNIQUE, mxid TEXT NOT NULL UNIQUE,
sender TEXT NOT NULL, sender TEXT NOT NULL,
timestamp BIGINT NOT NULL, timestamp BIGINT NOT NULL,
status jsonb NOT NULL,
PRIMARY KEY (conv_id, conv_receiver, id), PRIMARY KEY (conv_id, conv_receiver, id),
FOREIGN KEY (conv_id, conv_receiver) REFERENCES portal(id, receiver) ON DELETE CASCADE CONSTRAINT message_portal_fkey FOREIGN KEY (conv_id, conv_receiver) REFERENCES portal(id, receiver) ON DELETE CASCADE
); );

4
go.mod
View file

@ -3,17 +3,17 @@ module go.mau.fi/mautrix-gmessages
go 1.20 go 1.20
require ( require (
github.com/gabriel-vasile/mimetype v1.4.2
github.com/mattn/go-sqlite3 v1.14.17 github.com/mattn/go-sqlite3 v1.14.17
github.com/rs/zerolog v1.29.1 github.com/rs/zerolog v1.29.1
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
go.mau.fi/mautrix-gmessages/libgm v0.1.0 go.mau.fi/mautrix-gmessages/libgm v0.1.0
maunium.net/go/maulogger/v2 v2.4.1 maunium.net/go/maulogger/v2 v2.4.1
maunium.net/go/mautrix v0.15.4-0.20230628151140-e99578a15474 maunium.net/go/mautrix v0.15.4-0.20230711231757-65db706cd3ce
) )
require ( require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/mux v1.8.0 // indirect

4
go.sum
View file

@ -81,5 +81,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/maulogger/v2 v2.4.1 h1:N7zSdd0mZkB2m2JtFUsiGTQQAdP0YeFWT7YMc80yAL8= maunium.net/go/maulogger/v2 v2.4.1 h1:N7zSdd0mZkB2m2JtFUsiGTQQAdP0YeFWT7YMc80yAL8=
maunium.net/go/maulogger/v2 v2.4.1/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho= maunium.net/go/maulogger/v2 v2.4.1/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho=
maunium.net/go/mautrix v0.15.4-0.20230628151140-e99578a15474 h1:Pzopg6NL7qaNdG2iUf8dVnu9+tVEa82b44KJKmpb/NY= maunium.net/go/mautrix v0.15.4-0.20230711231757-65db706cd3ce h1:SYYPKkcJLI012g+p3jZ/Qnqm5VQd7kFZSSEHOtI4NZA=
maunium.net/go/mautrix v0.15.4-0.20230628151140-e99578a15474/go.mod h1:zLrQqdxJlLkurRCozTc9CL6FySkgZlO/kpCYxBILSLE= maunium.net/go/mautrix v0.15.4-0.20230711231757-65db706cd3ce/go.mod h1:zLrQqdxJlLkurRCozTc9CL6FySkgZlO/kpCYxBILSLE=

162
portal.go
View file

@ -30,6 +30,7 @@ import (
"github.com/gabriel-vasile/mimetype" "github.com/gabriel-vasile/mimetype"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"maunium.net/go/maulogger/v2" "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix" "maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/bridge" "maunium.net/go/mautrix/bridge"
@ -233,7 +234,8 @@ type Portal struct {
backfillLock sync.Mutex backfillLock sync.Mutex
avatarLock sync.Mutex avatarLock sync.Mutex
latestEventBackfillLock sync.Mutex forwardBackfillLock sync.Mutex
lastMessageTS time.Time
recentlyHandled [recentlyHandledLength]string recentlyHandled [recentlyHandledLength]string
recentlyHandledLock sync.Mutex recentlyHandledLock sync.Mutex
@ -261,8 +263,8 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
if len(portal.MXID) == 0 { if len(portal.MXID) == 0 {
return return
} }
portal.latestEventBackfillLock.Lock() portal.forwardBackfillLock.Lock()
defer portal.latestEventBackfillLock.Unlock() defer portal.forwardBackfillLock.Unlock()
switch { switch {
case msg.evt != nil: case msg.evt != nil:
portal.handleMessage(msg.source, msg.evt) portal.handleMessage(msg.source, msg.evt)
@ -274,8 +276,8 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
} }
func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) { func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) {
portal.latestEventBackfillLock.Lock() portal.forwardBackfillLock.Lock()
defer portal.latestEventBackfillLock.Unlock() defer portal.forwardBackfillLock.Unlock()
evtTS := time.UnixMilli(msg.evt.Timestamp) evtTS := time.UnixMilli(msg.evt.Timestamp)
timings := messageTimings{ timings := messageTimings{
initReceive: msg.evt.Mautrix.ReceivedAt.Sub(evtTS), initReceive: msg.evt.Mautrix.ReceivedAt.Sub(evtTS),
@ -312,13 +314,17 @@ func (portal *Portal) isOutgoingMessage(msg *binary.Message) id.EventID {
out, ok := portal.outgoingMessages[msg.TmpID] out, ok := portal.outgoingMessages[msg.TmpID]
if ok { if ok {
if !out.Saved { if !out.Saved {
portal.markHandled(msg, map[string]id.EventID{"": out.ID}, true) portal.markHandled(&ConvertedMessage{
ID: msg.MessageID,
Timestamp: time.UnixMicro(msg.GetTimestamp()),
SenderID: msg.ParticipantID,
}, out.ID, true)
out.Saved = true out.Saved = true
} }
switch msg.GetMessageStatus().GetStatus() { switch msg.GetMessageStatus().GetStatus() {
case binary.MessageStatusType_OUTGOING_DELIVERED, binary.MessageStatusType_OUTGOING_COMPLETE, binary.MessageStatusType_OUTGOING_DISPLAYED: case binary.MessageStatusType_OUTGOING_DELIVERED, binary.MessageStatusType_OUTGOING_COMPLETE, binary.MessageStatusType_OUTGOING_DISPLAYED:
delete(portal.outgoingMessages, msg.TmpID) delete(portal.outgoingMessages, msg.TmpID)
portal.sendStatusEvent(out.ID, "", nil) go portal.sendStatusEvent(out.ID, "", nil)
case binary.MessageStatusType_OUTGOING_FAILED_GENERIC, case binary.MessageStatusType_OUTGOING_FAILED_GENERIC,
binary.MessageStatusType_OUTGOING_FAILED_EMERGENCY_NUMBER, binary.MessageStatusType_OUTGOING_FAILED_EMERGENCY_NUMBER,
binary.MessageStatusType_OUTGOING_CANCELED, binary.MessageStatusType_OUTGOING_CANCELED,
@ -329,7 +335,7 @@ func (portal *Portal) isOutgoingMessage(msg *binary.Message) id.EventID {
binary.MessageStatusType_OUTGOING_FAILED_RECIPIENT_LOST_ENCRYPTION, binary.MessageStatusType_OUTGOING_FAILED_RECIPIENT_LOST_ENCRYPTION,
binary.MessageStatusType_OUTGOING_FAILED_RECIPIENT_DID_NOT_DECRYPT_NO_MORE_RETRY: binary.MessageStatusType_OUTGOING_FAILED_RECIPIENT_DID_NOT_DECRYPT_NO_MORE_RETRY:
err := OutgoingStatusError(msg.GetMessageStatus().GetStatus()) err := OutgoingStatusError(msg.GetMessageStatus().GetStatus())
portal.sendStatusEvent(out.ID, "", err) go portal.sendStatusEvent(out.ID, "", err)
// TODO error notice // TODO error notice
} }
return out.ID return out.ID
@ -351,6 +357,8 @@ func (portal *Portal) handleMessage(source *User, evt *binary.Message) {
portal.zlog.Warn().Msg("handleMessage called even though portal.MXID is empty") portal.zlog.Warn().Msg("handleMessage called even though portal.MXID is empty")
return return
} }
eventTS := time.UnixMicro(evt.GetTimestamp())
portal.lastMessageTS = eventTS
log := portal.zlog.With(). log := portal.zlog.With().
Str("message_id", evt.MessageID). Str("message_id", evt.MessageID).
Str("participant_id", evt.ParticipantID). Str("participant_id", evt.ParticipantID).
@ -381,39 +389,81 @@ func (portal *Portal) handleMessage(source *User, evt *binary.Message) {
return return
} }
var intent *appservice.IntentAPI converted := portal.convertGoogleMessage(ctx, source, evt, false)
if converted == nil {
return
}
eventIDs := make([]id.EventID, 0, len(converted.Parts))
for _, part := range converted.Parts {
resp, err := portal.sendMessage(converted.Intent, event.EventMessage, part.Content, part.Extra, converted.Timestamp.UnixMilli())
if err != nil {
log.Err(err).Msg("Failed to send message")
} else {
eventIDs = append(eventIDs, resp.EventID)
}
}
portal.markHandled(converted, eventIDs[0], true)
portal.sendDeliveryReceipt(eventIDs[len(eventIDs)-1])
log.Debug().Interface("event_ids", eventIDs).Msg("Handled message")
}
type ConvertedMessagePart struct {
Content *event.MessageEventContent
Extra map[string]any
}
type ConvertedMessage struct {
ID string
SenderID string
Intent *appservice.IntentAPI
Timestamp time.Time
ReplyTo string
Parts []ConvertedMessagePart
}
func (portal *Portal) convertGoogleMessage(ctx context.Context, source *User, evt *binary.Message, backfill bool) *ConvertedMessage {
log := zerolog.Ctx(ctx)
var cm ConvertedMessage
cm.SenderID = evt.ParticipantID
cm.ID = evt.MessageID
cm.Timestamp = time.UnixMicro(evt.Timestamp)
// TODO is there a fromMe flag? // TODO is there a fromMe flag?
if evt.GetParticipantID() == portal.SelfUserID { if evt.GetParticipantID() == portal.SelfUserID {
intent = source.DoublePuppetIntent cm.Intent = source.DoublePuppetIntent
if intent == nil { if cm.Intent == nil {
log.Debug().Msg("Dropping message from self as double puppeting is not enabled") log.Debug().Msg("Dropping message from self as double puppeting is not enabled")
return return nil
} }
} else { } else {
puppet := source.GetPuppetByID(evt.ParticipantID, "") puppet := source.GetPuppetByID(evt.ParticipantID, "")
if puppet == nil { if puppet == nil {
log.Debug().Msg("Dropping message from unknown participant") log.Debug().Str("participant_id", evt.ParticipantID).Msg("Dropping message from unknown participant")
return return nil
} }
intent = puppet.IntentFor(portal) cm.Intent = puppet.IntentFor(portal)
} }
var replyTo id.EventID var replyTo id.EventID
if evt.GetReplyMessage() != nil { if evt.GetReplyMessage() != nil {
replyToID := evt.GetReplyMessage().GetMessageID() cm.ReplyTo = evt.GetReplyMessage().GetMessageID()
msg, err := portal.bridge.DB.Message.GetByID(ctx, portal.Key, replyToID) msg, err := portal.bridge.DB.Message.GetByID(ctx, portal.Key, cm.ReplyTo)
if err != nil { if err != nil {
log.Err(err).Str("reply_to_id", replyToID).Msg("Failed to get reply target message") log.Err(err).Str("reply_to_id", cm.ReplyTo).Msg("Failed to get reply target message")
} else if msg == nil { } else if msg == nil {
log.Warn().Str("reply_to_id", replyToID).Msg("Reply target message not found") if backfill {
replyTo = portal.deterministicEventID(cm.ReplyTo, 0)
} else {
log.Warn().Str("reply_to_id", cm.ReplyTo).Msg("Reply target message not found")
}
} else { } else {
replyTo = msg.MXID replyTo = msg.MXID
} }
} }
eventIDs := make(map[string]id.EventID)
var lastEventID id.EventID
ts := time.UnixMicro(evt.Timestamp).UnixMilli()
for _, part := range evt.MessageInfo { for _, part := range evt.MessageInfo {
var content event.MessageEventContent var content event.MessageEventContent
switch data := part.GetData().(type) { switch data := part.GetData().(type) {
@ -423,7 +473,7 @@ func (portal *Portal) handleMessage(source *User, evt *binary.Message) {
Body: data.MessageContent.GetContent(), Body: data.MessageContent.GetContent(),
} }
case *binary.MessageInfo_MediaContent: case *binary.MessageInfo_MediaContent:
contentPtr, err := portal.convertGoogleMedia(source, intent, data.MediaContent) contentPtr, err := portal.convertGoogleMedia(source, cm.Intent, data.MediaContent)
if err != nil { if err != nil {
log.Err(err).Msg("Failed to copy attachment") log.Err(err).Msg("Failed to copy attachment")
content = event.MessageEventContent{ content = event.MessageEventContent{
@ -437,17 +487,44 @@ func (portal *Portal) handleMessage(source *User, evt *binary.Message) {
if replyTo != "" { if replyTo != "" {
content.RelatesTo = &event.RelatesTo{InReplyTo: &event.InReplyTo{EventID: replyTo}} content.RelatesTo = &event.RelatesTo{InReplyTo: &event.InReplyTo{EventID: replyTo}}
} }
resp, err := portal.sendMessage(intent, event.EventMessage, &content, nil, ts) cm.Parts = append(cm.Parts, ConvertedMessagePart{
if err != nil { Content: &content,
log.Err(err).Msg("Failed to send message") })
} else {
eventIDs[part.GetActionMessageID()] = resp.EventID
lastEventID = resp.EventID
}
} }
portal.markHandled(evt, eventIDs, true) if portal.bridge.Config.Bridge.CaptionInMessage {
portal.sendDeliveryReceipt(lastEventID) cm.MergeCaption()
log.Debug().Interface("event_ids", eventIDs).Msg("Handled message") }
return &cm
}
func (msg *ConvertedMessage) MergeCaption() {
if len(msg.Parts) != 2 {
return
}
var textPart, filePart ConvertedMessagePart
if msg.Parts[0].Content.MsgType == event.MsgText {
textPart = msg.Parts[0]
filePart = msg.Parts[1]
} else {
textPart = msg.Parts[1]
filePart = msg.Parts[0]
}
if textPart.Content.MsgType != event.MsgText {
return
}
switch filePart.Content.MsgType {
case event.MsgImage, event.MsgVideo, event.MsgAudio, event.MsgFile:
default:
return
}
filePart.Content.FileName = filePart.Content.Body
filePart.Content.Body = textPart.Content.Body
filePart.Content.Format = textPart.Content.Format
filePart.Content.FormattedBody = textPart.Content.FormattedBody
msg.Parts = []ConvertedMessagePart{filePart}
} }
var mediaFormatToMime = map[binary.MediaFormats]string{ var mediaFormatToMime = map[binary.MediaFormats]string{
@ -530,18 +607,16 @@ func (portal *Portal) isRecentlyHandled(id string) bool {
return false return false
} }
func (portal *Portal) markHandled(info *binary.Message, mxids map[string]id.EventID, recent bool) *database.Message { func (portal *Portal) markHandled(cm *ConvertedMessage, eventID id.EventID, recent bool) *database.Message {
msg := portal.bridge.DB.Message.New() msg := portal.bridge.DB.Message.New()
msg.Chat = portal.Key msg.Chat = portal.Key
msg.ID = info.MessageID msg.ID = cm.ID
for _, evtID := range mxids { msg.MXID = eventID
msg.MXID = evtID msg.Timestamp = cm.Timestamp
} msg.Sender = cm.SenderID
msg.Timestamp = time.UnixMicro(info.Timestamp)
msg.Sender = info.ParticipantID
err := msg.Insert(context.TODO()) err := msg.Insert(context.TODO())
if err != nil { if err != nil {
portal.zlog.Err(err).Str("message_id", info.MessageID).Msg("Failed to insert message to database") portal.zlog.Err(err).Str("message_id", cm.ID).Msg("Failed to insert message to database")
} }
if recent { if recent {
@ -549,7 +624,7 @@ func (portal *Portal) markHandled(info *binary.Message, mxids map[string]id.Even
index := portal.recentlyHandledIndex index := portal.recentlyHandledIndex
portal.recentlyHandledIndex = (portal.recentlyHandledIndex + 1) % recentlyHandledLength portal.recentlyHandledIndex = (portal.recentlyHandledIndex + 1) % recentlyHandledLength
portal.recentlyHandledLock.Unlock() portal.recentlyHandledLock.Unlock()
portal.recentlyHandled[index] = info.MessageID portal.recentlyHandled[index] = cm.ID
} }
return msg return msg
} }
@ -840,6 +915,7 @@ func (portal *Portal) CreateMatrixRoom(user *User, conv *binary.Conversation) er
portal.zlog.Info().Str("room_id", resp.RoomID.String()).Msg("Matrix room created") portal.zlog.Info().Str("room_id", resp.RoomID.String()).Msg("Matrix room created")
portal.InSpace = false portal.InSpace = false
portal.NameSet = len(req.Name) > 0 portal.NameSet = len(req.Name) > 0
portal.forwardBackfillLock.Lock()
portal.MXID = resp.RoomID portal.MXID = resp.RoomID
portal.bridge.portalsLock.Lock() portal.bridge.portalsLock.Lock()
portal.bridge.portalsByMXID[portal.MXID] = portal portal.bridge.portalsByMXID[portal.MXID] = portal
@ -874,7 +950,7 @@ func (portal *Portal) CreateMatrixRoom(user *User, conv *binary.Conversation) er
portal.ensureUserInvited(user) portal.ensureUserInvited(user)
} }
user.syncChatDoublePuppetDetails(portal, conv, true) user.syncChatDoublePuppetDetails(portal, conv, true)
go portal.initialForwardBackfill(user)
go portal.addToPersonalSpace(user, true) go portal.addToPersonalSpace(user, true)
return nil return nil
} }

15
user.go
View file

@ -564,14 +564,25 @@ func (user *User) HandleEvent(event interface{}) {
} }
func (user *User) syncConversation(v *binary.Conversation) { func (user *User) syncConversation(v *binary.Conversation) {
updateType := v.GetStatus()
portal := user.GetPortalByID(v.GetConversationID()) portal := user.GetPortalByID(v.GetConversationID())
if portal.MXID != "" { if portal.MXID != "" {
portal.UpdateMetadata(user, v) switch updateType {
} else { case binary.ConvUpdateTypes_DELETED:
user.zlog.Info().Str("conversation_id", portal.ID).Msg("Got delete event, cleaning up portal")
portal.Delete()
portal.Cleanup(false)
default:
portal.UpdateMetadata(user, v)
portal.missedForwardBackfill(user, time.UnixMicro(v.LastMessageTimestamp), v.LatestMessageID)
}
} else if updateType == binary.ConvUpdateTypes_UNARCHIVED || updateType == binary.ConvUpdateTypes_ARCHIVED {
err := portal.CreateMatrixRoom(user, v) err := portal.CreateMatrixRoom(user, v)
if err != nil { if err != nil {
user.zlog.Err(err).Msg("Error creating Matrix room from conversation event") user.zlog.Err(err).Msg("Error creating Matrix room from conversation event")
} }
} else {
user.zlog.Debug().Str("update_type", updateType.String()).Msg("Not creating portal for conversation")
} }
} }