From dfcc23e3a7776fed1347409854681be2712ff649 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Fri, 24 May 2024 12:42:28 +0300 Subject: [PATCH] Send error if remote echo takes unexpectedly long --- messagetracking.go | 9 +++++---- portal.go | 30 +++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/messagetracking.go b/messagetracking.go index 39fa92e..3614c5d 100644 --- a/messagetracking.go +++ b/messagetracking.go @@ -46,11 +46,10 @@ var ( errMediaDecryptFailed = errors.New("failed to decrypt media") errMediaConvertFailed = errors.New("failed to convert media") errMediaReuploadFailed = errors.New("failed to upload media to google") + errEchoTimeout = errors.New("remote echo timeout") errIncorrectUser = errors.New("incorrect user") errNotLoggedIn = errors.New("not logged in") - - errMessageTakingLong = errors.New("bridging the message is taking longer than usual") ) type OutgoingStatusError gmproto.MessageStatusType @@ -98,6 +97,8 @@ func errorToStatusReason(err error) (reason event.MessageStatusReason, status ev return event.MessageStatusUnsupported, event.MessageStatusFail, true, true, err.Error() case errors.Is(err, context.DeadlineExceeded): return event.MessageStatusTooOld, event.MessageStatusRetriable, false, true, "handling the message took too long and was cancelled" + case errors.Is(err, errEchoTimeout): + return event.MessageStatusTooOld, event.MessageStatusRetriable, false, true, "phone has not confirmed message delivery" case errors.Is(err, errTargetNotFound): return event.MessageStatusGenericError, event.MessageStatusFail, true, false, "" case errors.As(err, &ose): @@ -116,8 +117,8 @@ func (portal *Portal) sendErrorMessage(ctx context.Context, evt *event.Event, er certainty = "was not" } msg := fmt.Sprintf("\u26a0 Your %s %s bridged: %v", msgType, certainty, err) - if errors.Is(err, errMessageTakingLong) { - msg = fmt.Sprintf("\u26a0 Bridging your %s is taking longer than usual", msgType) + if errors.Is(err, errEchoTimeout) { + msg = fmt.Sprintf("\u26a0 Your phone has not echoed the message, it may have been lost") } content := &event.MessageEventContent{ MsgType: event.MsgNotice, diff --git a/portal.go b/portal.go index ab48498..0fa6f3a 100644 --- a/portal.go +++ b/portal.go @@ -40,6 +40,7 @@ import ( "maunium.net/go/mautrix" "maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/bridge" + "maunium.net/go/mautrix/bridge/status" "maunium.net/go/mautrix/crypto/attachment" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" @@ -252,8 +253,9 @@ type PortalMatrixMessage struct { type outgoingMessage struct { *event.Event - Saved bool - Checkpointed bool + Acked bool + Errored bool + Timeouted bool } type Portal struct { @@ -376,12 +378,14 @@ func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) { } func (portal *Portal) handleMessageLoop() { + outgoingTicker := time.NewTicker(1 * time.Minute) for { - portal.handleOneMessageLoopItem() + outgoingTicker.Stop() + portal.handleOneMessageLoopItem(outgoingTicker.C) } } -func (portal *Portal) handleOneMessageLoopItem() { +func (portal *Portal) handleOneMessageLoopItem(timeout <-chan time.Time) { defer func() { if err := recover(); err != nil { logEvt := portal.zlog.WithLevel(zerolog.FatalLevel). @@ -400,7 +404,18 @@ func (portal *Portal) handleOneMessageLoopItem() { portal.handleMessageLoopItem(msg) case msg := <-portal.matrixMessages: portal.handleMatrixMessageLoopItem(msg) + case <-timeout: } + portal.outgoingMessagesLock.Lock() + for _, out := range portal.outgoingMessages { + if !out.Timeouted && out.Acked && !out.Errored && time.Since(time.UnixMilli(out.Timestamp)) > 1*time.Minute { + go portal.sendStatusEvent(context.TODO(), out.ID, "", errEchoTimeout, nil) + go portal.sendErrorMessage(context.TODO(), out.Event, errEchoTimeout, "message", false, "") + go portal.bridge.SendMessageCheckpoint(out.Event, status.MsgStepRemote, errEchoTimeout, status.MsgStatusTimeout, 0) + out.Timeouted = true + } + } + portal.outgoingMessagesLock.Unlock() } func (portal *Portal) isOutgoingMessage(msg *gmproto.Message) *database.Message { @@ -2045,8 +2060,9 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event, timing } txnID := util.GenerateTmpID() + outgoingMsg := &outgoingMessage{Event: evt} portal.outgoingMessagesLock.Lock() - portal.outgoingMessages[txnID] = &outgoingMessage{Event: evt} + portal.outgoingMessages[txnID] = outgoingMsg portal.outgoingMessagesLock.Unlock() if evt.Type == event.EventSticker { content.MsgType = event.MsgImage @@ -2067,10 +2083,14 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event, timing resp, err := sender.Client.SendMessage(req) timings.send = time.Since(start) if err != nil { + outgoingMsg.Errored = true go ms.sendMessageMetrics(ctx, sender, evt, err, "Error sending", true) } else if resp.Status != gmproto.SendMessageResponse_SUCCESS { + outgoingMsg.Errored = true + outgoingMsg.Acked = true go ms.sendMessageMetrics(ctx, sender, evt, fmt.Errorf("response status %d", resp.Status), "Error sending", true) } else { + outgoingMsg.Acked = true go ms.sendMessageMetrics(ctx, sender, evt, nil, "", true) } }