Refactor pinger

This commit is contained in:
Tulir Asokan 2024-02-29 14:20:56 +02:00
parent 69aacf25b5
commit 6d7bfc1ba8

View file

@ -9,6 +9,8 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"sync"
"sync/atomic"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@ -21,64 +23,177 @@ import (
"go.mau.fi/mautrix-gmessages/libgm/util" "go.mau.fi/mautrix-gmessages/libgm/util"
) )
const phoneNotRespondingTimeout = 30 * time.Second const defaultPingTimeout = 1 * time.Minute
const shortPingTimeout = 10 * time.Second
const minPingInterval = 30 * time.Second
const maxRepingTickerTime = 64 * time.Minute
func (c *Client) doDittoPinger(log *zerolog.Logger, dittoPing <-chan struct{}, stopPinger <-chan struct{}) { var pingIDCounter atomic.Uint64
notResponding := false
pingFails := 0 // Goals of the ditto pinger:
exit := false // - By default, send pings to the phone every 15 minutes when the long polling connection restarts
onRespond := func() { // - If an outgoing request doesn't respond quickly, send a ping immediately
if notResponding { // - If a ping caused by a request timeout doesn't respond quickly, send PhoneNotResponding
log.Debug().Msg("Ditto ping succeeded, phone is back online") // (the user is probably actively trying to use the bridge)
c.triggerEvent(&events.PhoneRespondingAgain{}) // - If the first ping doesn't respond, send PhoneNotResponding
notResponding = false // (to avoid the bridge being stuck in the CONNECTING state)
pingFails = 0 // - If a ping doesn't respond, send new pings on increasing intervals
} else if pingFails > 0 { // (starting from 1 minute up to 1 hour) until it responds
// TODO separate event? // - If a normal ping doesn't respond, send PhoneNotResponding after 3 failed pings
c.triggerEvent(&events.PhoneRespondingAgain{}) // (so after ~8 minutes in total, not faster to avoid unnecessarily spamming the user)
pingFails = 0 // - If a request timeout happens during backoff pings, send PhoneNotResponding immediately
// - If a ping responds and PhoneNotResponding was sent, send PhoneRespondingAgain
type dittoPinger struct {
client *Client
firstPingDone bool
pingHandlingLock sync.RWMutex
oldestPingTime time.Time
lastPingTime time.Time
pingFails int
notRespondingSent bool
stop <-chan struct{}
ping <-chan struct{}
log *zerolog.Logger
}
func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration) {
dp.pingHandlingLock.Lock()
defer dp.pingHandlingLock.Unlock()
logEvt := dp.log.Debug().Uint64("ping_id", pingID).Dur("duration", dur)
if dp.notRespondingSent {
logEvt.Msg("Ditto ping successful (phone is back online)")
dp.client.triggerEvent(&events.PhoneRespondingAgain{})
} else if dp.pingFails > 0 {
logEvt.Msg("Ditto ping successful (stopped failing)")
// TODO separate event?
dp.client.triggerEvent(&events.PhoneRespondingAgain{})
} else {
logEvt.Msg("Ditto ping successful")
}
dp.oldestPingTime = time.Time{}
dp.notRespondingSent = false
dp.pingFails = 0
dp.firstPingDone = true
}
func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) {
dp.pingHandlingLock.Lock()
defer dp.pingHandlingLock.Unlock()
dp.log.Warn().Uint64("ping_id", pingID).Msg("Ditto ping is taking long, phone may be offline")
if (!dp.firstPingDone || sendNotResponding) && !dp.notRespondingSent {
dp.client.triggerEvent(&events.PhoneNotResponding{})
dp.notRespondingSent = true
}
}
func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout time.Duration, timeoutCount int, pingChan <-chan *IncomingRPCMessage) {
var timerChan <-chan time.Time
var timer *time.Timer
if timeout > 0 {
timer = time.NewTimer(timeout)
timerChan = timer.C
}
select {
case <-pingChan:
dp.OnRespond(pingID, time.Since(start))
if timer != nil && !timer.Stop() {
<-timer.C
}
case <-timerChan:
dp.OnTimeout(pingID, timeout == shortPingTimeout || timeoutCount > 3)
repingTickerTime := 1 * time.Minute
var repingTicker *time.Ticker
var repingTickerChan <-chan time.Time
if timeoutCount == 0 {
repingTicker = time.NewTicker(repingTickerTime)
repingTickerChan = repingTicker.C
}
for {
timeoutCount++
select {
case <-pingChan:
dp.OnRespond(pingID, time.Since(start))
return
case <-repingTickerChan:
if repingTickerTime < maxRepingTickerTime {
repingTickerTime *= 2
repingTicker.Reset(repingTickerTime)
}
subPingID := pingIDCounter.Add(1)
dp.log.Debug().
Uint64("parent_ping_id", pingID).
Uint64("ping_id", subPingID).
Str("next_reping", repingTickerTime.String()).
Msg("Sending new ping")
dp.Ping(subPingID, defaultPingTimeout, timeoutCount)
case <-dp.client.pingShortCircuit:
dp.pingHandlingLock.Lock()
dp.log.Debug().Uint64("ping_id", pingID).
Msg("Ditto ping wait short-circuited during ping backoff, sending PhoneNotResponding immediately")
if !dp.notRespondingSent {
dp.client.triggerEvent(&events.PhoneNotResponding{})
dp.notRespondingSent = true
}
dp.pingHandlingLock.Unlock()
case <-dp.stop:
return
}
}
case <-dp.stop:
if timer != nil && !timer.Stop() {
<-timer.C
} }
} }
doPing := func() { }
pingChan, err := c.NotifyDittoActivity()
if err != nil { func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount int) {
log.Err(err).Msg("Error notifying ditto activity") dp.pingHandlingLock.Lock()
pingFails++ if time.Since(dp.lastPingTime) < minPingInterval {
c.triggerEvent(&events.PingFailed{ dp.log.Debug().
Error: fmt.Errorf("failed to notify ditto activity: %w", err), Uint64("ping_id", pingID).
ErrorCount: pingFails, Time("last_ping_time", dp.lastPingTime).
}) Msg("Skipping ping since last one was too recently")
return dp.pingHandlingLock.Unlock()
} return
select {
case <-pingChan:
onRespond()
return
case <-time.After(phoneNotRespondingTimeout):
log.Warn().Msg("Ditto ping is taking long, phone may be offline")
c.triggerEvent(&events.PhoneNotResponding{})
notResponding = true
case <-stopPinger:
exit = true
return
}
select {
case <-pingChan:
onRespond()
case <-stopPinger:
exit = true
return
}
} }
for !exit { now := time.Now()
dp.lastPingTime = now
if dp.oldestPingTime.IsZero() {
dp.oldestPingTime = now
}
pingChan, err := dp.client.NotifyDittoActivity()
if err != nil {
dp.log.Err(err).Uint64("ping_id", pingID).Msg("Error sending ping")
dp.pingFails++
dp.client.triggerEvent(&events.PingFailed{
Error: fmt.Errorf("failed to notify ditto activity: %w", err),
ErrorCount: dp.pingFails,
})
dp.pingHandlingLock.Unlock()
return
}
dp.pingHandlingLock.Unlock()
if timeoutCount == 0 {
dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan)
} else {
go dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan)
}
}
func (dp *dittoPinger) Loop() {
for {
select { select {
case <-c.pingShortCircuit: case <-dp.client.pingShortCircuit:
log.Debug().Msg("Ditto ping wait short-circuited") pingID := pingIDCounter.Add(1)
doPing() dp.log.Debug().Uint64("ping_id", pingID).Msg("Ditto ping wait short-circuited")
case <-dittoPing: dp.Ping(pingID, shortPingTimeout, 0)
log.Trace().Msg("Doing normal ditto ping") case <-dp.ping:
doPing() pingID := pingIDCounter.Add(1)
case <-stopPinger: dp.log.Debug().Uint64("ping_id", pingID).Msg("Doing normal ditto ping")
dp.Ping(pingID, defaultPingTimeout, 0)
case <-dp.stop:
return return
} }
} }
@ -103,9 +218,13 @@ func (c *Client) doLongPoll(loggedIn bool) {
dittoPing := make(chan struct{}, 1) dittoPing := make(chan struct{}, 1)
stopDittoPinger := make(chan struct{}) stopDittoPinger := make(chan struct{})
defer close(stopDittoPinger) defer close(stopDittoPinger)
go c.doDittoPinger(&log, dittoPing, stopDittoPinger) go (&dittoPinger{
ping: dittoPing,
stop: stopDittoPinger,
log: &log,
client: c,
}).Loop()
errorCount := 1 errorCount := 1
for c.listenID == listenID { for c.listenID == listenID {