Move remote echo timeout checker to separate loop
This commit is contained in:
parent
7cf98e493f
commit
2660e3fa91
1 changed files with 19 additions and 15 deletions
34
portal.go
34
portal.go
|
@ -234,6 +234,7 @@ func (br *GMBridge) NewPortal(dbPortal *database.Portal) *Portal {
|
||||||
}
|
}
|
||||||
portal.updateLogger()
|
portal.updateLogger()
|
||||||
go portal.handleMessageLoop()
|
go portal.handleMessageLoop()
|
||||||
|
go portal.outgoingMessageTimeoutLoop()
|
||||||
return portal
|
return portal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,14 +380,28 @@ func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (portal *Portal) handleMessageLoop() {
|
func (portal *Portal) handleMessageLoop() {
|
||||||
outgoingTicker := time.NewTicker(1 * time.Minute)
|
|
||||||
for {
|
for {
|
||||||
outgoingTicker.Stop()
|
portal.handleOneMessageLoopItem()
|
||||||
portal.handleOneMessageLoopItem(outgoingTicker.C)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (portal *Portal) handleOneMessageLoopItem(timeout <-chan time.Time) {
|
func (portal *Portal) outgoingMessageTimeoutLoop() {
|
||||||
|
ticker := time.NewTicker(1 * time.Minute)
|
||||||
|
for range ticker.C {
|
||||||
|
portal.outgoingMessagesLock.Lock()
|
||||||
|
for _, out := range portal.outgoingMessages {
|
||||||
|
if !out.Timeouted && out.Acked && !out.Errored && time.Since(time.UnixMilli(out.Timestamp)) > 1*time.Minute {
|
||||||
|
go portal.sendStatusEvent(context.TODO(), out.ID, "", errEchoTimeout, nil)
|
||||||
|
go portal.sendErrorMessage(context.TODO(), out.Event, errEchoTimeout, "message", false, "")
|
||||||
|
go portal.bridge.SendMessageCheckpoint(out.Event, status.MsgStepRemote, errEchoTimeout, status.MsgStatusTimeout, 0)
|
||||||
|
out.Timeouted = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
portal.outgoingMessagesLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (portal *Portal) handleOneMessageLoopItem() {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
logEvt := portal.zlog.WithLevel(zerolog.FatalLevel).
|
logEvt := portal.zlog.WithLevel(zerolog.FatalLevel).
|
||||||
|
@ -405,18 +420,7 @@ func (portal *Portal) handleOneMessageLoopItem(timeout <-chan time.Time) {
|
||||||
portal.handleMessageLoopItem(msg)
|
portal.handleMessageLoopItem(msg)
|
||||||
case msg := <-portal.matrixMessages:
|
case msg := <-portal.matrixMessages:
|
||||||
portal.handleMatrixMessageLoopItem(msg)
|
portal.handleMatrixMessageLoopItem(msg)
|
||||||
case <-timeout:
|
|
||||||
}
|
}
|
||||||
portal.outgoingMessagesLock.Lock()
|
|
||||||
for _, out := range portal.outgoingMessages {
|
|
||||||
if !out.Timeouted && out.Acked && !out.Errored && time.Since(time.UnixMilli(out.Timestamp)) > 1*time.Minute {
|
|
||||||
go portal.sendStatusEvent(context.TODO(), out.ID, "", errEchoTimeout, nil)
|
|
||||||
go portal.sendErrorMessage(context.TODO(), out.Event, errEchoTimeout, "message", false, "")
|
|
||||||
go portal.bridge.SendMessageCheckpoint(out.Event, status.MsgStepRemote, errEchoTimeout, status.MsgStatusTimeout, 0)
|
|
||||||
out.Timeouted = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
portal.outgoingMessagesLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (portal *Portal) isOutgoingMessage(msg *gmproto.Message) *database.Message {
|
func (portal *Portal) isOutgoingMessage(msg *gmproto.Message) *database.Message {
|
||||||
|
|
Loading…
Reference in a new issue