From 2660e3fa91c9f7c2c855df7b3424108923f981eb Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 4 Jun 2024 20:55:12 +0300 Subject: [PATCH] Move remote echo timeout checker to separate loop --- portal.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/portal.go b/portal.go index 95f8571..0dc0258 100644 --- a/portal.go +++ b/portal.go @@ -234,6 +234,7 @@ func (br *GMBridge) NewPortal(dbPortal *database.Portal) *Portal { } portal.updateLogger() go portal.handleMessageLoop() + go portal.outgoingMessageTimeoutLoop() return portal } @@ -379,14 +380,28 @@ func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) { } func (portal *Portal) handleMessageLoop() { - outgoingTicker := time.NewTicker(1 * time.Minute) for { - outgoingTicker.Stop() - portal.handleOneMessageLoopItem(outgoingTicker.C) + portal.handleOneMessageLoopItem() } } -func (portal *Portal) handleOneMessageLoopItem(timeout <-chan time.Time) { +func (portal *Portal) outgoingMessageTimeoutLoop() { + ticker := time.NewTicker(1 * time.Minute) + for range ticker.C { + portal.outgoingMessagesLock.Lock() + for _, out := range portal.outgoingMessages { + if !out.Timeouted && out.Acked && !out.Errored && time.Since(time.UnixMilli(out.Timestamp)) > 1*time.Minute { + go portal.sendStatusEvent(context.TODO(), out.ID, "", errEchoTimeout, nil) + go portal.sendErrorMessage(context.TODO(), out.Event, errEchoTimeout, "message", false, "") + go portal.bridge.SendMessageCheckpoint(out.Event, status.MsgStepRemote, errEchoTimeout, status.MsgStatusTimeout, 0) + out.Timeouted = true + } + } + portal.outgoingMessagesLock.Unlock() + } +} + +func (portal *Portal) handleOneMessageLoopItem() { defer func() { if err := recover(); err != nil { logEvt := portal.zlog.WithLevel(zerolog.FatalLevel). @@ -405,18 +420,7 @@ func (portal *Portal) handleOneMessageLoopItem(timeout <-chan time.Time) { portal.handleMessageLoopItem(msg) case msg := <-portal.matrixMessages: portal.handleMatrixMessageLoopItem(msg) - case <-timeout: } - portal.outgoingMessagesLock.Lock() - for _, out := range portal.outgoingMessages { - if !out.Timeouted && out.Acked && !out.Errored && time.Since(time.UnixMilli(out.Timestamp)) > 1*time.Minute { - go portal.sendStatusEvent(context.TODO(), out.ID, "", errEchoTimeout, nil) - go portal.sendErrorMessage(context.TODO(), out.Event, errEchoTimeout, "message", false, "") - go portal.bridge.SendMessageCheckpoint(out.Event, status.MsgStepRemote, errEchoTimeout, status.MsgStatusTimeout, 0) - out.Timeouted = true - } - } - portal.outgoingMessagesLock.Unlock() } func (portal *Portal) isOutgoingMessage(msg *gmproto.Message) *database.Message {