gmessages/messagetracking.go

321 lines
10 KiB
Go
Raw Normal View History

// mautrix-gmessages - A Matrix-Google Messages puppeting bridge.
// Copyright (C) 2023 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package main
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
2023-07-19 21:01:05 +00:00
"github.com/rs/zerolog"
"go.mau.fi/util/jsontime"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/bridge/status"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
"go.mau.fi/mautrix-gmessages/database"
2023-07-17 13:51:31 +00:00
"go.mau.fi/mautrix-gmessages/libgm/gmproto"
)
var (
errMNoticeDisabled = errors.New("bridging m.notice messages is disabled")
errUnexpectedParsedContentType = errors.New("unexpected parsed content type")
errUnknownMsgType = errors.New("unknown msgtype")
errMediaUnsupportedType = errors.New("unsupported media type")
2023-07-14 22:06:49 +00:00
errTargetNotFound = errors.New("target event not found")
errMissingMediaURL = errors.New("missing media URL")
errMediaDownloadFailed = errors.New("failed to download media")
errMediaDecryptFailed = errors.New("failed to decrypt media")
errMediaReuploadFailed = errors.New("failed to upload media to google")
errMessageTakingLong = errors.New("bridging the message is taking longer than usual")
)
2023-07-17 13:51:31 +00:00
type OutgoingStatusError gmproto.MessageStatusType
func (ose OutgoingStatusError) Error() string {
2023-08-08 15:57:23 +00:00
return strings.TrimPrefix(gmproto.MessageStatusType(ose).String(), "OUTGOING_")
}
func (ose OutgoingStatusError) HumanError() string {
2023-08-08 15:57:23 +00:00
switch gmproto.MessageStatusType(ose) {
case gmproto.MessageStatusType_OUTGOING_FAILED_TOO_LARGE:
return "too large"
case gmproto.MessageStatusType_OUTGOING_FAILED_RECIPIENT_LOST_RCS:
return "recipient lost RCS support"
case gmproto.MessageStatusType_OUTGOING_FAILED_RECIPIENT_LOST_ENCRYPTION:
return "recipient lost encryption support"
case gmproto.MessageStatusType_OUTGOING_FAILED_RECIPIENT_DID_NOT_DECRYPT,
gmproto.MessageStatusType_OUTGOING_FAILED_RECIPIENT_DID_NOT_DECRYPT_NO_MORE_RETRY:
return "recipient failed to decrypt message"
}
return ""
}
func (ose OutgoingStatusError) Is(other error) bool {
otherOSE, ok := other.(OutgoingStatusError)
if !ok {
return false
}
return int(ose) == int(otherOSE)
}
func errorToStatusReason(err error) (reason event.MessageStatusReason, status event.MessageStatus, isCertain, sendNotice bool, humanMessage string) {
var ose OutgoingStatusError
switch {
case errors.Is(err, errUnexpectedParsedContentType),
errors.Is(err, errUnknownMsgType):
return event.MessageStatusUnsupported, event.MessageStatusFail, true, true, ""
case errors.Is(err, errMNoticeDisabled):
return event.MessageStatusUnsupported, event.MessageStatusFail, true, false, ""
case errors.Is(err, errMediaUnsupportedType):
return event.MessageStatusUnsupported, event.MessageStatusFail, true, true, err.Error()
case errors.Is(err, context.DeadlineExceeded):
return event.MessageStatusTooOld, event.MessageStatusRetriable, false, true, "handling the message took too long and was cancelled"
2023-07-14 22:06:49 +00:00
case errors.Is(err, errTargetNotFound):
return event.MessageStatusGenericError, event.MessageStatusFail, true, false, ""
case errors.As(err, &ose):
return event.MessageStatusNetworkError, event.MessageStatusFail, true, true, ose.HumanError()
default:
return event.MessageStatusGenericError, event.MessageStatusRetriable, false, true, ""
}
}
func (portal *Portal) sendErrorMessage(evt *event.Event, err error, msgType string, confirmed bool, editID id.EventID) id.EventID {
if !portal.bridge.Config.Bridge.MessageErrorNotices {
return ""
}
certainty := "may not have been"
if confirmed {
certainty = "was not"
}
msg := fmt.Sprintf("\u26a0 Your %s %s bridged: %v", msgType, certainty, err)
if errors.Is(err, errMessageTakingLong) {
msg = fmt.Sprintf("\u26a0 Bridging your %s is taking longer than usual", msgType)
}
content := &event.MessageEventContent{
MsgType: event.MsgNotice,
Body: msg,
}
if editID != "" {
content.SetEdit(editID)
} else {
content.SetReply(evt)
}
resp, err := portal.sendMainIntentMessage(content)
if err != nil {
2023-07-19 21:01:05 +00:00
portal.zlog.Warn().Err(err).Str("event_id", evt.ID.String()).Msg("Failed to send bridging error message")
return ""
}
return resp.EventID
}
func (portal *Portal) sendCheckpoint(dbMsg *database.Message, err error, delivered bool) {
checkpoint := status.MessageCheckpoint{
EventID: dbMsg.MXID,
RoomID: dbMsg.RoomID,
Step: status.MsgStepRemote,
Timestamp: jsontime.UnixMilliNow(),
Status: "",
ReportedBy: status.MsgReportedByBridge,
}
if err != nil {
checkpoint.Status = status.MsgStatusPermFailure
checkpoint.Info = err.Error()
} else if delivered {
checkpoint.Status = status.MsgStatusDelivered
} else {
checkpoint.Status = status.MsgStatusSuccess
}
go portal.bridge.SendRawMessageCheckpoint(&checkpoint)
}
func (portal *Portal) sendStatusEvent(evtID, lastRetry id.EventID, err error, deliveredTo *[]id.UserID) {
if !portal.bridge.Config.Bridge.MessageStatusEvents {
return
}
if lastRetry == evtID {
lastRetry = ""
}
intent := portal.bridge.Bot
if !portal.Encrypted {
// Bridge bot isn't present in unencrypted DMs
intent = portal.MainIntent()
}
content := event.BeeperMessageStatusEventContent{
Network: portal.getBridgeInfoStateKey(),
RelatesTo: event.RelatesTo{
Type: event.RelReference,
EventID: evtID,
},
LastRetry: lastRetry,
DeliveredToUsers: deliveredTo,
}
if err == nil {
content.Status = event.MessageStatusSuccess
} else {
content.Reason, content.Status, _, _, content.Message = errorToStatusReason(err)
content.Error = err.Error()
}
_, err = intent.SendMessageEvent(portal.MXID, event.BeeperMessageStatus, &content)
if err != nil {
2023-07-19 21:01:05 +00:00
portal.zlog.Warn().Err(err).Str("event_id", evtID.String()).Msg("Failed to send message status event")
}
}
func (portal *Portal) sendDeliveryReceipt(eventID id.EventID) {
if portal.bridge.Config.Bridge.DeliveryReceipts {
err := portal.bridge.Bot.SendReceipt(portal.MXID, eventID, event.ReceiptTypeRead, nil)
if err != nil {
2023-07-19 21:01:05 +00:00
portal.zlog.Warn().Err(err).Str("event_id", eventID.String()).Msg("Failed to send delivery receipt")
}
}
}
2023-09-01 10:49:27 +00:00
func (portal *Portal) sendMessageMetrics(user *User, evt *event.Event, err error, part string, ms *metricSender) {
var msgType string
switch evt.Type {
case event.EventMessage:
msgType = "message"
case event.EventReaction:
msgType = "reaction"
case event.EventRedaction:
msgType = "redaction"
default:
msgType = "unknown event"
}
origEvtID := evt.ID
if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil {
origEvtID = retryMeta.OriginalEventID
}
if err != nil {
2023-07-19 21:01:05 +00:00
logEvt := portal.zlog.Error()
if part == "Ignoring" {
2023-07-19 21:01:05 +00:00
logEvt = portal.zlog.Debug()
}
2023-07-19 21:01:05 +00:00
logEvt.Err(err).
Str("event_id", evt.ID.String()).
Str("part", part).
Str("event_sender", evt.Sender.String()).
Str("event_type", evt.Type.Type).
Msg("Failed to handle Matrix event")
reason, statusCode, isCertain, sendNotice, _ := errorToStatusReason(err)
checkpointStatus := status.ReasonToCheckpointStatus(reason, statusCode)
2023-09-01 10:49:27 +00:00
checkpointErr := err
// This is very hacky and should be removed once we find what the error statuses mean
if strings.HasPrefix(err.Error(), "response status ") {
checkpointErr = fmt.Errorf("%w (default:%t,rcs:%t,sims:%d)", err, user.Settings.IsDefaultSMSApp, user.Settings.RCSEnabled, user.SIMCount())
}
portal.bridge.SendMessageCheckpoint(evt, status.MsgStepRemote, checkpointErr, checkpointStatus, ms.getRetryNum())
if sendNotice {
ms.setNoticeID(portal.sendErrorMessage(evt, err, msgType, isCertain, ms.getNoticeID()))
}
portal.sendStatusEvent(origEvtID, evt.ID, err, nil)
} else {
2023-09-01 10:42:52 +00:00
portal.zlog.Debug().
2023-07-19 21:01:05 +00:00
Str("event_id", evt.ID.String()).
2023-09-01 10:42:52 +00:00
Str("event_type", evt.Type.Type).
Msg("Handled Matrix event")
portal.sendDeliveryReceipt(evt.ID)
2023-07-14 22:06:49 +00:00
if msgType != "message" {
portal.bridge.SendMessageSuccessCheckpoint(evt, status.MsgStepRemote, ms.getRetryNum())
portal.sendStatusEvent(origEvtID, evt.ID, nil, nil)
2023-07-14 22:06:49 +00:00
}
if prevNotice := ms.popNoticeID(); prevNotice != "" {
_, _ = portal.MainIntent().RedactEvent(portal.MXID, prevNotice, mautrix.ReqRedact{
Reason: "error resolved",
})
}
}
if ms != nil {
2023-07-19 21:01:05 +00:00
portal.zlog.Debug().EmbedObject(ms.timings).Str("event_id", evt.ID.String()).Msg("Timings for Matrix event")
}
}
type messageTimings struct {
initReceive time.Duration
decrypt time.Duration
portalQueue time.Duration
totalReceive time.Duration
2023-07-19 21:01:05 +00:00
convert time.Duration
send time.Duration
}
2023-07-19 21:01:05 +00:00
func (mt *messageTimings) MarshalZerologObject(evt *zerolog.Event) {
evt.Dur("receive", mt.initReceive).
Dur("decrypt", mt.decrypt).
Dur("queue", mt.portalQueue).
Dur("total_hs_to_portal", mt.totalReceive).
Dur("convert", mt.convert).
Dur("send", mt.send)
}
type metricSender struct {
portal *Portal
previousNotice id.EventID
lock sync.Mutex
completed bool
retryNum int
timings *messageTimings
}
func (ms *metricSender) getRetryNum() int {
if ms != nil {
return ms.retryNum
}
return 0
}
func (ms *metricSender) getNoticeID() id.EventID {
if ms == nil {
return ""
}
return ms.previousNotice
}
func (ms *metricSender) popNoticeID() id.EventID {
if ms == nil {
return ""
}
evtID := ms.previousNotice
ms.previousNotice = ""
return evtID
}
func (ms *metricSender) setNoticeID(evtID id.EventID) {
if ms != nil && ms.previousNotice == "" {
ms.previousNotice = evtID
}
}
2023-09-01 10:49:27 +00:00
func (ms *metricSender) sendMessageMetrics(user *User, evt *event.Event, err error, part string, completed bool) {
ms.lock.Lock()
defer ms.lock.Unlock()
if !completed && ms.completed {
return
}
2023-09-01 10:49:27 +00:00
ms.portal.sendMessageMetrics(user, evt, err, part, ms)
ms.retryNum++
ms.completed = completed
}