Add timeouts for portal event handlers
This commit is contained in:
parent
dad3dbc7e5
commit
473c9d59f9
1 changed files with 42 additions and 12 deletions
54
portal.go
54
portal.go
|
@ -307,7 +307,22 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
|
||||||
defer portal.forwardBackfillLock.Unlock()
|
defer portal.forwardBackfillLock.Unlock()
|
||||||
switch {
|
switch {
|
||||||
case msg.evt != nil:
|
case msg.evt != nil:
|
||||||
portal.handleMessage(msg.source, msg.evt, msg.raw)
|
doneChan := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(doneChan)
|
||||||
|
portal.handleMessage(msg.source, msg.evt, msg.raw)
|
||||||
|
}()
|
||||||
|
timer := time.NewTimer(1 * time.Minute)
|
||||||
|
select {
|
||||||
|
case <-doneChan:
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
case <-timer.C:
|
||||||
|
portal.zlog.Error().
|
||||||
|
Str("message_id", msg.evt.MessageID).
|
||||||
|
Msg("Google Messages event handling is taking over a minute, unblocking loop")
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
portal.zlog.Warn().Interface("portal_message", msg).Msg("Unexpected PortalMessage with no message")
|
portal.zlog.Warn().Interface("portal_message", msg).Msg("Unexpected PortalMessage with no message")
|
||||||
}
|
}
|
||||||
|
@ -330,17 +345,32 @@ func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) {
|
||||||
portalQueue: time.Since(msg.receivedAt),
|
portalQueue: time.Since(msg.receivedAt),
|
||||||
totalReceive: time.Since(evtTS),
|
totalReceive: time.Since(evtTS),
|
||||||
}
|
}
|
||||||
switch msg.evt.Type {
|
doneChan := make(chan struct{})
|
||||||
case event.EventMessage, event.EventSticker:
|
go func() {
|
||||||
portal.HandleMatrixMessage(msg.user, msg.evt, timings)
|
defer close(doneChan)
|
||||||
case event.EventReaction:
|
switch msg.evt.Type {
|
||||||
portal.HandleMatrixReaction(msg.user, msg.evt)
|
case event.EventMessage, event.EventSticker:
|
||||||
case event.EventRedaction:
|
portal.HandleMatrixMessage(msg.user, msg.evt, timings)
|
||||||
portal.HandleMatrixRedaction(msg.user, msg.evt)
|
case event.EventReaction:
|
||||||
default:
|
portal.HandleMatrixReaction(msg.user, msg.evt)
|
||||||
portal.zlog.Warn().
|
case event.EventRedaction:
|
||||||
Str("event_type", msg.evt.Type.Type).
|
portal.HandleMatrixRedaction(msg.user, msg.evt)
|
||||||
Msg("Unsupported event type in portal message channel")
|
default:
|
||||||
|
portal.zlog.Warn().
|
||||||
|
Str("event_type", msg.evt.Type.Type).
|
||||||
|
Msg("Unsupported event type in portal message channel")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
timer := time.NewTimer(1 * time.Minute)
|
||||||
|
select {
|
||||||
|
case <-doneChan:
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
case <-timer.C:
|
||||||
|
portal.zlog.Error().
|
||||||
|
Stringer("event_id", msg.evt.ID).
|
||||||
|
Msg("Matrix event handling is taking over a minute, unblocking loop")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue