Send error if remote echo takes unexpectedly long

This commit is contained in:
Tulir Asokan 2024-05-24 12:42:28 +03:00
parent 074e239ed4
commit dfcc23e3a7
2 changed files with 30 additions and 9 deletions

View file

@ -46,11 +46,10 @@ var (
errMediaDecryptFailed = errors.New("failed to decrypt media") errMediaDecryptFailed = errors.New("failed to decrypt media")
errMediaConvertFailed = errors.New("failed to convert media") errMediaConvertFailed = errors.New("failed to convert media")
errMediaReuploadFailed = errors.New("failed to upload media to google") errMediaReuploadFailed = errors.New("failed to upload media to google")
errEchoTimeout = errors.New("remote echo timeout")
errIncorrectUser = errors.New("incorrect user") errIncorrectUser = errors.New("incorrect user")
errNotLoggedIn = errors.New("not logged in") errNotLoggedIn = errors.New("not logged in")
errMessageTakingLong = errors.New("bridging the message is taking longer than usual")
) )
type OutgoingStatusError gmproto.MessageStatusType type OutgoingStatusError gmproto.MessageStatusType
@ -98,6 +97,8 @@ func errorToStatusReason(err error) (reason event.MessageStatusReason, status ev
return event.MessageStatusUnsupported, event.MessageStatusFail, true, true, err.Error() return event.MessageStatusUnsupported, event.MessageStatusFail, true, true, err.Error()
case errors.Is(err, context.DeadlineExceeded): case errors.Is(err, context.DeadlineExceeded):
return event.MessageStatusTooOld, event.MessageStatusRetriable, false, true, "handling the message took too long and was cancelled" return event.MessageStatusTooOld, event.MessageStatusRetriable, false, true, "handling the message took too long and was cancelled"
case errors.Is(err, errEchoTimeout):
return event.MessageStatusTooOld, event.MessageStatusRetriable, false, true, "phone has not confirmed message delivery"
case errors.Is(err, errTargetNotFound): case errors.Is(err, errTargetNotFound):
return event.MessageStatusGenericError, event.MessageStatusFail, true, false, "" return event.MessageStatusGenericError, event.MessageStatusFail, true, false, ""
case errors.As(err, &ose): case errors.As(err, &ose):
@ -116,8 +117,8 @@ func (portal *Portal) sendErrorMessage(ctx context.Context, evt *event.Event, er
certainty = "was not" certainty = "was not"
} }
msg := fmt.Sprintf("\u26a0 Your %s %s bridged: %v", msgType, certainty, err) msg := fmt.Sprintf("\u26a0 Your %s %s bridged: %v", msgType, certainty, err)
if errors.Is(err, errMessageTakingLong) { if errors.Is(err, errEchoTimeout) {
msg = fmt.Sprintf("\u26a0 Bridging your %s is taking longer than usual", msgType) msg = fmt.Sprintf("\u26a0 Your phone has not echoed the message, it may have been lost")
} }
content := &event.MessageEventContent{ content := &event.MessageEventContent{
MsgType: event.MsgNotice, MsgType: event.MsgNotice,

View file

@ -40,6 +40,7 @@ import (
"maunium.net/go/mautrix" "maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/bridge" "maunium.net/go/mautrix/bridge"
"maunium.net/go/mautrix/bridge/status"
"maunium.net/go/mautrix/crypto/attachment" "maunium.net/go/mautrix/crypto/attachment"
"maunium.net/go/mautrix/event" "maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id" "maunium.net/go/mautrix/id"
@ -252,8 +253,9 @@ type PortalMatrixMessage struct {
type outgoingMessage struct { type outgoingMessage struct {
*event.Event *event.Event
Saved bool Acked bool
Checkpointed bool Errored bool
Timeouted bool
} }
type Portal struct { type Portal struct {
@ -376,12 +378,14 @@ func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) {
} }
func (portal *Portal) handleMessageLoop() { func (portal *Portal) handleMessageLoop() {
outgoingTicker := time.NewTicker(1 * time.Minute)
for { for {
portal.handleOneMessageLoopItem() outgoingTicker.Stop()
portal.handleOneMessageLoopItem(outgoingTicker.C)
} }
} }
func (portal *Portal) handleOneMessageLoopItem() { func (portal *Portal) handleOneMessageLoopItem(timeout <-chan time.Time) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logEvt := portal.zlog.WithLevel(zerolog.FatalLevel). logEvt := portal.zlog.WithLevel(zerolog.FatalLevel).
@ -400,7 +404,18 @@ func (portal *Portal) handleOneMessageLoopItem() {
portal.handleMessageLoopItem(msg) portal.handleMessageLoopItem(msg)
case msg := <-portal.matrixMessages: case msg := <-portal.matrixMessages:
portal.handleMatrixMessageLoopItem(msg) portal.handleMatrixMessageLoopItem(msg)
case <-timeout:
} }
portal.outgoingMessagesLock.Lock()
for _, out := range portal.outgoingMessages {
if !out.Timeouted && out.Acked && !out.Errored && time.Since(time.UnixMilli(out.Timestamp)) > 1*time.Minute {
go portal.sendStatusEvent(context.TODO(), out.ID, "", errEchoTimeout, nil)
go portal.sendErrorMessage(context.TODO(), out.Event, errEchoTimeout, "message", false, "")
go portal.bridge.SendMessageCheckpoint(out.Event, status.MsgStepRemote, errEchoTimeout, status.MsgStatusTimeout, 0)
out.Timeouted = true
}
}
portal.outgoingMessagesLock.Unlock()
} }
func (portal *Portal) isOutgoingMessage(msg *gmproto.Message) *database.Message { func (portal *Portal) isOutgoingMessage(msg *gmproto.Message) *database.Message {
@ -2045,8 +2060,9 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event, timing
} }
txnID := util.GenerateTmpID() txnID := util.GenerateTmpID()
outgoingMsg := &outgoingMessage{Event: evt}
portal.outgoingMessagesLock.Lock() portal.outgoingMessagesLock.Lock()
portal.outgoingMessages[txnID] = &outgoingMessage{Event: evt} portal.outgoingMessages[txnID] = outgoingMsg
portal.outgoingMessagesLock.Unlock() portal.outgoingMessagesLock.Unlock()
if evt.Type == event.EventSticker { if evt.Type == event.EventSticker {
content.MsgType = event.MsgImage content.MsgType = event.MsgImage
@ -2067,10 +2083,14 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event, timing
resp, err := sender.Client.SendMessage(req) resp, err := sender.Client.SendMessage(req)
timings.send = time.Since(start) timings.send = time.Since(start)
if err != nil { if err != nil {
outgoingMsg.Errored = true
go ms.sendMessageMetrics(ctx, sender, evt, err, "Error sending", true) go ms.sendMessageMetrics(ctx, sender, evt, err, "Error sending", true)
} else if resp.Status != gmproto.SendMessageResponse_SUCCESS { } else if resp.Status != gmproto.SendMessageResponse_SUCCESS {
outgoingMsg.Errored = true
outgoingMsg.Acked = true
go ms.sendMessageMetrics(ctx, sender, evt, fmt.Errorf("response status %d", resp.Status), "Error sending", true) go ms.sendMessageMetrics(ctx, sender, evt, fmt.Errorf("response status %d", resp.Status), "Error sending", true)
} else { } else {
outgoingMsg.Acked = true
go ms.sendMessageMetrics(ctx, sender, evt, nil, "", true) go ms.sendMessageMetrics(ctx, sender, evt, nil, "", true)
} }
} }