diff --git a/backfill.go b/backfill.go new file mode 100644 index 0000000..1ea8d8a --- /dev/null +++ b/backfill.go @@ -0,0 +1,212 @@ +// mautrix-gmessages - A Matrix-Google Messages puppeting bridge. +// Copyright (C) 2023 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "context" + "crypto/sha256" + "encoding/base64" + "fmt" + "time" + + "github.com/rs/zerolog" + + "maunium.net/go/mautrix" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" + + "go.mau.fi/mautrix-gmessages/database" +) + +func (portal *Portal) initialForwardBackfill(user *User) { + // This is only called from CreateMatrixRoom which locks forwardBackfillLock + defer portal.forwardBackfillLock.Unlock() + log := portal.zlog.With(). + Str("action", "initial forward backfill"). + Logger() + ctx := log.WithContext(context.TODO()) + + portal.forwardBackfill(ctx, user, time.Time{}, 50) +} + +func (portal *Portal) missedForwardBackfill(user *User, lastMessageTS time.Time, lastMessageID string) { + portal.forwardBackfillLock.Lock() + defer portal.forwardBackfillLock.Unlock() + log := portal.zlog.With(). + Str("action", "missed forward backfill"). + Logger() + ctx := log.WithContext(context.TODO()) + if portal.lastMessageTS.IsZero() { + lastMsg, err := portal.bridge.DB.Message.GetLastInChat(ctx, portal.Key) + if err != nil { + log.Err(err).Msg("Failed to get last message in chat") + return + } else if lastMsg == nil { + log.Debug().Msg("No messages in chat") + } else { + portal.lastMessageTS = lastMsg.Timestamp + } + } + if !lastMessageTS.After(portal.lastMessageTS) { + log.Trace(). + Time("latest_message_ts", lastMessageTS). + Str("latest_message_id", lastMessageID). + Time("last_bridged_ts", portal.lastMessageTS). + Msg("Nothing to backfill") + return + } + log.Info(). + Time("latest_message_ts", lastMessageTS). + Str("latest_message_id", lastMessageID). + Time("last_bridged_ts", portal.lastMessageTS). + Msg("Backfilling missed messages") + portal.forwardBackfill(ctx, user, portal.lastMessageTS, 100) +} + +func (portal *Portal) deterministicEventID(messageID string, part int) id.EventID { + data := fmt.Sprintf("%s/gmessages/%s/%d", portal.MXID, messageID, part) + sum := sha256.Sum256([]byte(data)) + return id.EventID(fmt.Sprintf("$%s:messages.google.com", base64.RawURLEncoding.EncodeToString(sum[:]))) +} + +func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after time.Time, limit int64) { + log := zerolog.Ctx(ctx) + resp, err := user.Client.Conversations.FetchMessages(portal.ID, limit, nil) + if err != nil { + portal.zlog.Error().Err(err).Msg("Failed to fetch messages") + return + } + log.Debug(). + Int64("total_messages", resp.TotalMessages). + Int("message_count", len(resp.Messages)). + Msg("Got message chunk to backfill") + + batchSending := portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending) + converted := make([]*ConvertedMessage, 0, len(resp.Messages)) + maxTS := portal.lastMessageTS + for i := len(resp.Messages) - 1; i >= 0; i-- { + evt := resp.Messages[i] + // TODO this should check the database too + if evtID := portal.isOutgoingMessage(evt); evtID != "" { + log.Debug().Str("event_id", evtID.String()).Msg("Got echo for outgoing message in backfill batch") + continue + } else if !time.UnixMicro(evt.Timestamp).After(after) { + continue + } + c := portal.convertGoogleMessage(ctx, user, evt, batchSending) + if c != nil { + converted = append(converted, c) + if c.Timestamp.After(maxTS) { + maxTS = c.Timestamp + } + } + } + log.Debug(). + Int("converted_count", len(converted)). + Msg("Converted messages for backfill") + + if batchSending { + portal.backfillSendBatch(ctx, converted) + } else { + portal.backfillSendLegacy(ctx, converted) + } + portal.lastMessageTS = maxTS +} + +func (portal *Portal) backfillSendBatch(ctx context.Context, converted []*ConvertedMessage) { + log := zerolog.Ctx(ctx) + events := make([]*event.Event, 0, len(converted)) + dbMessages := make([]*database.Message, 0, len(converted)) + for _, msg := range converted { + dbm := portal.bridge.DB.Message.New() + dbm.Chat = portal.Key + dbm.ID = msg.ID + dbm.Sender = msg.SenderID + dbm.Timestamp = msg.Timestamp + + for i, part := range msg.Parts { + content := event.Content{ + Parsed: part.Content, + Raw: part.Extra, + } + eventType := event.EventMessage + var err error + eventType, err = portal.encrypt(msg.Intent, &content, eventType) + if err != nil { + log.Err(err).Str("message_id", msg.ID).Int("part", i).Msg("Failed to encrypt event") + continue + } + msg.Intent.AddDoublePuppetValue(&content) + evt := &event.Event{ + Sender: msg.Intent.UserID, + Type: eventType, + Timestamp: msg.Timestamp.UnixMilli(), + ID: portal.deterministicEventID(msg.ID, i), + RoomID: portal.MXID, + Content: content, + } + events = append(events, evt) + if dbm.MXID == "" { + dbm.MXID = evt.ID + } + } + if dbm.MXID != "" { + dbMessages = append(dbMessages, dbm) + } + } + _, err := portal.MainIntent().BeeperBatchSend(portal.MXID, &mautrix.ReqBeeperBatchSend{ + Forward: true, + MarkReadBy: "", + Events: events, + }) + if err != nil { + log.Err(err).Msg("Failed to send batch of messages") + return + } + err = portal.bridge.DB.Message.MassInsert(ctx, dbMessages) + if err != nil { + log.Err(err).Msg("Failed to insert messages to database") + } +} + +func (portal *Portal) backfillSendLegacy(ctx context.Context, converted []*ConvertedMessage) { + log := zerolog.Ctx(ctx) + eventIDs := make(map[string]id.EventID) + for _, msg := range converted { + var eventID id.EventID + for i, part := range msg.Parts { + if msg.ReplyTo != "" && part.Content.RelatesTo == nil { + replyToEvent, ok := eventIDs[msg.ReplyTo] + if ok { + part.Content.RelatesTo = &event.RelatesTo{ + InReplyTo: &event.InReplyTo{EventID: replyToEvent}, + } + } + } + resp, err := portal.sendMessage(msg.Intent, event.EventMessage, part.Content, part.Extra, msg.Timestamp.UnixMilli()) + if err != nil { + log.Err(err).Str("message_id", msg.ID).Int("part", i).Msg("Failed to send message") + } else if eventID == "" { + eventID = resp.EventID + eventIDs[msg.ID] = resp.EventID + } + } + if eventID != "" { + portal.markHandled(msg, eventID, false) + } + } +} diff --git a/database/message.go b/database/message.go index 4df37b2..b6a7ad1 100644 --- a/database/message.go +++ b/database/message.go @@ -20,10 +20,13 @@ import ( "context" "database/sql" "errors" + "fmt" + "strings" "time" log "maunium.net/go/maulogger/v2" + "go.mau.fi/mautrix-gmessages/libgm/binary" "maunium.net/go/mautrix/id" "maunium.net/go/mautrix/util/dbutil" ) @@ -44,11 +47,16 @@ func (mq *MessageQuery) getDB() *Database { const ( getMessageByIDQuery = ` - SELECT conv_id, conv_receiver, id, mxid, sender, timestamp FROM message + SELECT conv_id, conv_receiver, id, mxid, sender, timestamp, status FROM message WHERE conv_id=$1 AND conv_receiver=$2 AND id=$3 ` + getLastMessageInChatQuery = ` + SELECT conv_id, conv_receiver, id, mxid, sender, timestamp, status FROM message + WHERE conv_id=$1 AND conv_receiver=$2 + ORDER BY timestamp DESC LIMIT 1 + ` getMessageByMXIDQuery = ` - SELECT conv_id, conv_receiver, id, mxid, sender, timestamp FROM message + SELECT conv_id, conv_receiver, id, mxid, sender, timestamp, status FROM message WHERE mxid=$1 ` ) @@ -61,6 +69,14 @@ func (mq *MessageQuery) GetByMXID(ctx context.Context, mxid id.EventID) (*Messag return get[*Message](mq, ctx, getMessageByMXIDQuery, mxid) } +func (mq *MessageQuery) GetLastInChat(ctx context.Context, chat Key) (*Message, error) { + return get[*Message](mq, ctx, getLastMessageInChatQuery, chat.ID, chat.Receiver) +} + +type MessageStatus struct { + Type binary.MessageStatusType +} + type Message struct { db *Database log log.Logger @@ -70,11 +86,12 @@ type Message struct { MXID id.EventID Sender string Timestamp time.Time + Status MessageStatus } func (msg *Message) Scan(row dbutil.Scannable) (*Message, error) { var ts int64 - err := row.Scan(&msg.Chat.ID, &msg.Chat.Receiver, &msg.ID, &msg.MXID, &msg.Sender, &ts) + err := row.Scan(&msg.Chat.ID, &msg.Chat.Receiver, &msg.ID, &msg.MXID, &msg.Sender, &ts, dbutil.JSON{Data: &msg.Status}) if errors.Is(err, sql.ErrNoRows) { return nil, nil } else if err != nil { @@ -87,17 +104,48 @@ func (msg *Message) Scan(row dbutil.Scannable) (*Message, error) { } func (msg *Message) sqlVariables() []any { - return []any{msg.Chat.ID, msg.Chat.Receiver, msg.ID, msg.MXID, msg.Sender, msg.Timestamp.UnixMicro()} + return []any{msg.Chat.ID, msg.Chat.Receiver, msg.ID, msg.MXID, msg.Sender, msg.Timestamp.UnixMicro(), dbutil.JSON{Data: &msg.Status}} } func (msg *Message) Insert(ctx context.Context) error { _, err := msg.db.Conn(ctx).ExecContext(ctx, ` - INSERT INTO message (conv_id, conv_receiver, id, mxid, sender, timestamp) - VALUES ($1, $2, $3, $4, $5, $6) + INSERT INTO message (conv_id, conv_receiver, id, mxid, sender, timestamp, status) + VALUES ($1, $2, $3, $4, $5, $6, $7) `, msg.sqlVariables()...) return err } +func (mq *MessageQuery) MassInsert(ctx context.Context, messages []*Message) error { + valueStringFormat := "($1, $2, $%d, $%d, $%d, $%d, $%d)" + if mq.db.Dialect == dbutil.SQLite { + valueStringFormat = strings.ReplaceAll(valueStringFormat, "$", "?") + } + placeholders := make([]string, len(messages)) + params := make([]any, 2+len(messages)*5) + params[0] = messages[0].Chat.ID + params[1] = messages[0].Chat.Receiver + for i, msg := range messages { + baseIndex := 2 + i*5 + params[baseIndex] = msg.ID + params[baseIndex+1] = msg.MXID + params[baseIndex+2] = msg.Sender + params[baseIndex+3] = msg.Timestamp.UnixMicro() + params[baseIndex+4] = dbutil.JSON{Data: &msg.Status} + placeholders[i] = fmt.Sprintf(valueStringFormat, baseIndex+1, baseIndex+2, baseIndex+3, baseIndex+4, baseIndex+5) + } + query := ` + INSERT INTO message (conv_id, conv_receiver, id, mxid, sender, timestamp, status) + VALUES + ` + strings.Join(placeholders, ",") + _, err := mq.db.Conn(ctx).ExecContext(ctx, query, params...) + return err +} + +func (msg *Message) UpdateStatus(ctx context.Context) error { + _, err := msg.db.Conn(ctx).ExecContext(ctx, "UPDATE message SET status=$1 WHERE conv_id=$2 AND conv_receiver=$3 AND id=$4", dbutil.JSON{Data: &msg.Status}, msg.Chat.ID, msg.Chat.Receiver, msg.ID) + return err +} + func (msg *Message) Delete(ctx context.Context) error { _, err := msg.db.Conn(ctx).ExecContext(ctx, "DELETE FROM message WHERE conv_id=$1 AND conv_receiver=$2 AND id=$3", msg.Chat.ID, msg.Chat.Receiver, msg.ID) return err diff --git a/database/upgrades/00-latest-revision.sql b/database/upgrades/00-latest-revision.sql index c16e1b7..105c9f9 100644 --- a/database/upgrades/00-latest-revision.sql +++ b/database/upgrades/00-latest-revision.sql @@ -27,9 +27,9 @@ CREATE TABLE puppet ( avatar_set BOOLEAN NOT NULL DEFAULT false, contact_info_set BOOLEAN NOT NULL DEFAULT false, - FOREIGN KEY (receiver) REFERENCES "user"(rowid) ON DELETE CASCADE, - UNIQUE (phone, receiver), - PRIMARY KEY (id, receiver) + PRIMARY KEY (id, receiver), + CONSTRAINT puppet_user_fkey FOREIGN KEY (receiver) REFERENCES "user"(rowid) ON DELETE CASCADE, + CONSTRAINT puppet_phone_unique UNIQUE (phone, receiver) ); CREATE TABLE portal ( @@ -46,9 +46,9 @@ CREATE TABLE portal ( encrypted BOOLEAN NOT NULL DEFAULT false, in_space BOOLEAN NOT NULL DEFAULT false, - FOREIGN KEY (receiver) REFERENCES "user"(rowid) ON DELETE CASCADE, - FOREIGN KEY (other_user, receiver) REFERENCES puppet(id, receiver) ON DELETE CASCADE, - PRIMARY KEY (id, receiver) + PRIMARY KEY (id, receiver), + CONSTRAINT portal_user_fkey FOREIGN KEY (receiver) REFERENCES "user"(rowid) ON DELETE CASCADE, + CONSTRAINT portal_puppet_fkey FOREIGN KEY (other_user, receiver) REFERENCES puppet(id, receiver) ON DELETE CASCADE ); CREATE TABLE message ( @@ -58,7 +58,8 @@ CREATE TABLE message ( mxid TEXT NOT NULL UNIQUE, sender TEXT NOT NULL, timestamp BIGINT NOT NULL, + status jsonb NOT NULL, PRIMARY KEY (conv_id, conv_receiver, id), - FOREIGN KEY (conv_id, conv_receiver) REFERENCES portal(id, receiver) ON DELETE CASCADE + CONSTRAINT message_portal_fkey FOREIGN KEY (conv_id, conv_receiver) REFERENCES portal(id, receiver) ON DELETE CASCADE ); diff --git a/go.mod b/go.mod index 1c754b4..ca181ec 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,17 @@ module go.mau.fi/mautrix-gmessages go 1.20 require ( + github.com/gabriel-vasile/mimetype v1.4.2 github.com/mattn/go-sqlite3 v1.14.17 github.com/rs/zerolog v1.29.1 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e go.mau.fi/mautrix-gmessages/libgm v0.1.0 maunium.net/go/maulogger/v2 v2.4.1 - maunium.net/go/mautrix v0.15.4-0.20230628151140-e99578a15474 + maunium.net/go/mautrix v0.15.4-0.20230711231757-65db706cd3ce ) require ( github.com/coreos/go-systemd/v22 v22.5.0 // indirect - github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gorilla/mux v1.8.0 // indirect diff --git a/go.sum b/go.sum index a31e338..c5b0178 100644 --- a/go.sum +++ b/go.sum @@ -81,5 +81,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= maunium.net/go/maulogger/v2 v2.4.1 h1:N7zSdd0mZkB2m2JtFUsiGTQQAdP0YeFWT7YMc80yAL8= maunium.net/go/maulogger/v2 v2.4.1/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho= -maunium.net/go/mautrix v0.15.4-0.20230628151140-e99578a15474 h1:Pzopg6NL7qaNdG2iUf8dVnu9+tVEa82b44KJKmpb/NY= -maunium.net/go/mautrix v0.15.4-0.20230628151140-e99578a15474/go.mod h1:zLrQqdxJlLkurRCozTc9CL6FySkgZlO/kpCYxBILSLE= +maunium.net/go/mautrix v0.15.4-0.20230711231757-65db706cd3ce h1:SYYPKkcJLI012g+p3jZ/Qnqm5VQd7kFZSSEHOtI4NZA= +maunium.net/go/mautrix v0.15.4-0.20230711231757-65db706cd3ce/go.mod h1:zLrQqdxJlLkurRCozTc9CL6FySkgZlO/kpCYxBILSLE= diff --git a/portal.go b/portal.go index 9829bb1..a4392bc 100644 --- a/portal.go +++ b/portal.go @@ -30,6 +30,7 @@ import ( "github.com/gabriel-vasile/mimetype" "github.com/rs/zerolog" "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix" "maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/bridge" @@ -233,7 +234,8 @@ type Portal struct { backfillLock sync.Mutex avatarLock sync.Mutex - latestEventBackfillLock sync.Mutex + forwardBackfillLock sync.Mutex + lastMessageTS time.Time recentlyHandled [recentlyHandledLength]string recentlyHandledLock sync.Mutex @@ -261,8 +263,8 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) { if len(portal.MXID) == 0 { return } - portal.latestEventBackfillLock.Lock() - defer portal.latestEventBackfillLock.Unlock() + portal.forwardBackfillLock.Lock() + defer portal.forwardBackfillLock.Unlock() switch { case msg.evt != nil: portal.handleMessage(msg.source, msg.evt) @@ -274,8 +276,8 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) { } func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) { - portal.latestEventBackfillLock.Lock() - defer portal.latestEventBackfillLock.Unlock() + portal.forwardBackfillLock.Lock() + defer portal.forwardBackfillLock.Unlock() evtTS := time.UnixMilli(msg.evt.Timestamp) timings := messageTimings{ initReceive: msg.evt.Mautrix.ReceivedAt.Sub(evtTS), @@ -312,13 +314,17 @@ func (portal *Portal) isOutgoingMessage(msg *binary.Message) id.EventID { out, ok := portal.outgoingMessages[msg.TmpID] if ok { if !out.Saved { - portal.markHandled(msg, map[string]id.EventID{"": out.ID}, true) + portal.markHandled(&ConvertedMessage{ + ID: msg.MessageID, + Timestamp: time.UnixMicro(msg.GetTimestamp()), + SenderID: msg.ParticipantID, + }, out.ID, true) out.Saved = true } switch msg.GetMessageStatus().GetStatus() { case binary.MessageStatusType_OUTGOING_DELIVERED, binary.MessageStatusType_OUTGOING_COMPLETE, binary.MessageStatusType_OUTGOING_DISPLAYED: delete(portal.outgoingMessages, msg.TmpID) - portal.sendStatusEvent(out.ID, "", nil) + go portal.sendStatusEvent(out.ID, "", nil) case binary.MessageStatusType_OUTGOING_FAILED_GENERIC, binary.MessageStatusType_OUTGOING_FAILED_EMERGENCY_NUMBER, binary.MessageStatusType_OUTGOING_CANCELED, @@ -329,7 +335,7 @@ func (portal *Portal) isOutgoingMessage(msg *binary.Message) id.EventID { binary.MessageStatusType_OUTGOING_FAILED_RECIPIENT_LOST_ENCRYPTION, binary.MessageStatusType_OUTGOING_FAILED_RECIPIENT_DID_NOT_DECRYPT_NO_MORE_RETRY: err := OutgoingStatusError(msg.GetMessageStatus().GetStatus()) - portal.sendStatusEvent(out.ID, "", err) + go portal.sendStatusEvent(out.ID, "", err) // TODO error notice } return out.ID @@ -351,6 +357,8 @@ func (portal *Portal) handleMessage(source *User, evt *binary.Message) { portal.zlog.Warn().Msg("handleMessage called even though portal.MXID is empty") return } + eventTS := time.UnixMicro(evt.GetTimestamp()) + portal.lastMessageTS = eventTS log := portal.zlog.With(). Str("message_id", evt.MessageID). Str("participant_id", evt.ParticipantID). @@ -381,39 +389,81 @@ func (portal *Portal) handleMessage(source *User, evt *binary.Message) { return } - var intent *appservice.IntentAPI + converted := portal.convertGoogleMessage(ctx, source, evt, false) + if converted == nil { + return + } + + eventIDs := make([]id.EventID, 0, len(converted.Parts)) + for _, part := range converted.Parts { + resp, err := portal.sendMessage(converted.Intent, event.EventMessage, part.Content, part.Extra, converted.Timestamp.UnixMilli()) + if err != nil { + log.Err(err).Msg("Failed to send message") + } else { + eventIDs = append(eventIDs, resp.EventID) + } + } + portal.markHandled(converted, eventIDs[0], true) + portal.sendDeliveryReceipt(eventIDs[len(eventIDs)-1]) + log.Debug().Interface("event_ids", eventIDs).Msg("Handled message") +} + +type ConvertedMessagePart struct { + Content *event.MessageEventContent + Extra map[string]any +} + +type ConvertedMessage struct { + ID string + SenderID string + + Intent *appservice.IntentAPI + Timestamp time.Time + ReplyTo string + Parts []ConvertedMessagePart +} + +func (portal *Portal) convertGoogleMessage(ctx context.Context, source *User, evt *binary.Message, backfill bool) *ConvertedMessage { + log := zerolog.Ctx(ctx) + + var cm ConvertedMessage + cm.SenderID = evt.ParticipantID + cm.ID = evt.MessageID + cm.Timestamp = time.UnixMicro(evt.Timestamp) + // TODO is there a fromMe flag? if evt.GetParticipantID() == portal.SelfUserID { - intent = source.DoublePuppetIntent - if intent == nil { + cm.Intent = source.DoublePuppetIntent + if cm.Intent == nil { log.Debug().Msg("Dropping message from self as double puppeting is not enabled") - return + return nil } } else { puppet := source.GetPuppetByID(evt.ParticipantID, "") if puppet == nil { - log.Debug().Msg("Dropping message from unknown participant") - return + log.Debug().Str("participant_id", evt.ParticipantID).Msg("Dropping message from unknown participant") + return nil } - intent = puppet.IntentFor(portal) + cm.Intent = puppet.IntentFor(portal) } var replyTo id.EventID if evt.GetReplyMessage() != nil { - replyToID := evt.GetReplyMessage().GetMessageID() - msg, err := portal.bridge.DB.Message.GetByID(ctx, portal.Key, replyToID) + cm.ReplyTo = evt.GetReplyMessage().GetMessageID() + msg, err := portal.bridge.DB.Message.GetByID(ctx, portal.Key, cm.ReplyTo) if err != nil { - log.Err(err).Str("reply_to_id", replyToID).Msg("Failed to get reply target message") + log.Err(err).Str("reply_to_id", cm.ReplyTo).Msg("Failed to get reply target message") } else if msg == nil { - log.Warn().Str("reply_to_id", replyToID).Msg("Reply target message not found") + if backfill { + replyTo = portal.deterministicEventID(cm.ReplyTo, 0) + } else { + log.Warn().Str("reply_to_id", cm.ReplyTo).Msg("Reply target message not found") + } } else { replyTo = msg.MXID } } - eventIDs := make(map[string]id.EventID) - var lastEventID id.EventID - ts := time.UnixMicro(evt.Timestamp).UnixMilli() for _, part := range evt.MessageInfo { var content event.MessageEventContent switch data := part.GetData().(type) { @@ -423,7 +473,7 @@ func (portal *Portal) handleMessage(source *User, evt *binary.Message) { Body: data.MessageContent.GetContent(), } case *binary.MessageInfo_MediaContent: - contentPtr, err := portal.convertGoogleMedia(source, intent, data.MediaContent) + contentPtr, err := portal.convertGoogleMedia(source, cm.Intent, data.MediaContent) if err != nil { log.Err(err).Msg("Failed to copy attachment") content = event.MessageEventContent{ @@ -437,17 +487,44 @@ func (portal *Portal) handleMessage(source *User, evt *binary.Message) { if replyTo != "" { content.RelatesTo = &event.RelatesTo{InReplyTo: &event.InReplyTo{EventID: replyTo}} } - resp, err := portal.sendMessage(intent, event.EventMessage, &content, nil, ts) - if err != nil { - log.Err(err).Msg("Failed to send message") - } else { - eventIDs[part.GetActionMessageID()] = resp.EventID - lastEventID = resp.EventID - } + cm.Parts = append(cm.Parts, ConvertedMessagePart{ + Content: &content, + }) } - portal.markHandled(evt, eventIDs, true) - portal.sendDeliveryReceipt(lastEventID) - log.Debug().Interface("event_ids", eventIDs).Msg("Handled message") + if portal.bridge.Config.Bridge.CaptionInMessage { + cm.MergeCaption() + } + return &cm +} + +func (msg *ConvertedMessage) MergeCaption() { + if len(msg.Parts) != 2 { + return + } + + var textPart, filePart ConvertedMessagePart + if msg.Parts[0].Content.MsgType == event.MsgText { + textPart = msg.Parts[0] + filePart = msg.Parts[1] + } else { + textPart = msg.Parts[1] + filePart = msg.Parts[0] + } + + if textPart.Content.MsgType != event.MsgText { + return + } + switch filePart.Content.MsgType { + case event.MsgImage, event.MsgVideo, event.MsgAudio, event.MsgFile: + default: + return + } + + filePart.Content.FileName = filePart.Content.Body + filePart.Content.Body = textPart.Content.Body + filePart.Content.Format = textPart.Content.Format + filePart.Content.FormattedBody = textPart.Content.FormattedBody + msg.Parts = []ConvertedMessagePart{filePart} } var mediaFormatToMime = map[binary.MediaFormats]string{ @@ -530,18 +607,16 @@ func (portal *Portal) isRecentlyHandled(id string) bool { return false } -func (portal *Portal) markHandled(info *binary.Message, mxids map[string]id.EventID, recent bool) *database.Message { +func (portal *Portal) markHandled(cm *ConvertedMessage, eventID id.EventID, recent bool) *database.Message { msg := portal.bridge.DB.Message.New() msg.Chat = portal.Key - msg.ID = info.MessageID - for _, evtID := range mxids { - msg.MXID = evtID - } - msg.Timestamp = time.UnixMicro(info.Timestamp) - msg.Sender = info.ParticipantID + msg.ID = cm.ID + msg.MXID = eventID + msg.Timestamp = cm.Timestamp + msg.Sender = cm.SenderID err := msg.Insert(context.TODO()) if err != nil { - portal.zlog.Err(err).Str("message_id", info.MessageID).Msg("Failed to insert message to database") + portal.zlog.Err(err).Str("message_id", cm.ID).Msg("Failed to insert message to database") } if recent { @@ -549,7 +624,7 @@ func (portal *Portal) markHandled(info *binary.Message, mxids map[string]id.Even index := portal.recentlyHandledIndex portal.recentlyHandledIndex = (portal.recentlyHandledIndex + 1) % recentlyHandledLength portal.recentlyHandledLock.Unlock() - portal.recentlyHandled[index] = info.MessageID + portal.recentlyHandled[index] = cm.ID } return msg } @@ -840,6 +915,7 @@ func (portal *Portal) CreateMatrixRoom(user *User, conv *binary.Conversation) er portal.zlog.Info().Str("room_id", resp.RoomID.String()).Msg("Matrix room created") portal.InSpace = false portal.NameSet = len(req.Name) > 0 + portal.forwardBackfillLock.Lock() portal.MXID = resp.RoomID portal.bridge.portalsLock.Lock() portal.bridge.portalsByMXID[portal.MXID] = portal @@ -874,7 +950,7 @@ func (portal *Portal) CreateMatrixRoom(user *User, conv *binary.Conversation) er portal.ensureUserInvited(user) } user.syncChatDoublePuppetDetails(portal, conv, true) - + go portal.initialForwardBackfill(user) go portal.addToPersonalSpace(user, true) return nil } diff --git a/user.go b/user.go index 5bbe115..3e054c8 100644 --- a/user.go +++ b/user.go @@ -564,14 +564,25 @@ func (user *User) HandleEvent(event interface{}) { } func (user *User) syncConversation(v *binary.Conversation) { + updateType := v.GetStatus() portal := user.GetPortalByID(v.GetConversationID()) if portal.MXID != "" { - portal.UpdateMetadata(user, v) - } else { + switch updateType { + case binary.ConvUpdateTypes_DELETED: + user.zlog.Info().Str("conversation_id", portal.ID).Msg("Got delete event, cleaning up portal") + portal.Delete() + portal.Cleanup(false) + default: + portal.UpdateMetadata(user, v) + portal.missedForwardBackfill(user, time.UnixMicro(v.LastMessageTimestamp), v.LatestMessageID) + } + } else if updateType == binary.ConvUpdateTypes_UNARCHIVED || updateType == binary.ConvUpdateTypes_ARCHIVED { err := portal.CreateMatrixRoom(user, v) if err != nil { user.zlog.Err(err).Msg("Error creating Matrix room from conversation event") } + } else { + user.zlog.Debug().Str("update_type", updateType.String()).Msg("Not creating portal for conversation") } }