From 6d7bfc1ba8db57318d931f8cd74cccff66df0f0a Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 29 Feb 2024 14:20:56 +0200 Subject: [PATCH] Refactor pinger --- libgm/longpoll.go | 229 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 174 insertions(+), 55 deletions(-) diff --git a/libgm/longpoll.go b/libgm/longpoll.go index 47b05d4..e2facec 100644 --- a/libgm/longpoll.go +++ b/libgm/longpoll.go @@ -9,6 +9,8 @@ import ( "fmt" "io" "net/http" + "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -21,64 +23,177 @@ import ( "go.mau.fi/mautrix-gmessages/libgm/util" ) -const phoneNotRespondingTimeout = 30 * time.Second +const defaultPingTimeout = 1 * time.Minute +const shortPingTimeout = 10 * time.Second +const minPingInterval = 30 * time.Second +const maxRepingTickerTime = 64 * time.Minute -func (c *Client) doDittoPinger(log *zerolog.Logger, dittoPing <-chan struct{}, stopPinger <-chan struct{}) { - notResponding := false - pingFails := 0 - exit := false - onRespond := func() { - if notResponding { - log.Debug().Msg("Ditto ping succeeded, phone is back online") - c.triggerEvent(&events.PhoneRespondingAgain{}) - notResponding = false - pingFails = 0 - } else if pingFails > 0 { - // TODO separate event? - c.triggerEvent(&events.PhoneRespondingAgain{}) - pingFails = 0 +var pingIDCounter atomic.Uint64 + +// Goals of the ditto pinger: +// - By default, send pings to the phone every 15 minutes when the long polling connection restarts +// - If an outgoing request doesn't respond quickly, send a ping immediately +// - If a ping caused by a request timeout doesn't respond quickly, send PhoneNotResponding +// (the user is probably actively trying to use the bridge) +// - If the first ping doesn't respond, send PhoneNotResponding +// (to avoid the bridge being stuck in the CONNECTING state) +// - If a ping doesn't respond, send new pings on increasing intervals +// (starting from 1 minute up to 1 hour) until it responds +// - If a normal ping doesn't respond, send PhoneNotResponding after 3 failed pings +// (so after ~8 minutes in total, not faster to avoid unnecessarily spamming the user) +// - If a request timeout happens during backoff pings, send PhoneNotResponding immediately +// - If a ping responds and PhoneNotResponding was sent, send PhoneRespondingAgain +type dittoPinger struct { + client *Client + + firstPingDone bool + pingHandlingLock sync.RWMutex + oldestPingTime time.Time + lastPingTime time.Time + pingFails int + notRespondingSent bool + + stop <-chan struct{} + ping <-chan struct{} + log *zerolog.Logger +} + +func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration) { + dp.pingHandlingLock.Lock() + defer dp.pingHandlingLock.Unlock() + logEvt := dp.log.Debug().Uint64("ping_id", pingID).Dur("duration", dur) + if dp.notRespondingSent { + logEvt.Msg("Ditto ping successful (phone is back online)") + dp.client.triggerEvent(&events.PhoneRespondingAgain{}) + } else if dp.pingFails > 0 { + logEvt.Msg("Ditto ping successful (stopped failing)") + // TODO separate event? + dp.client.triggerEvent(&events.PhoneRespondingAgain{}) + } else { + logEvt.Msg("Ditto ping successful") + } + dp.oldestPingTime = time.Time{} + dp.notRespondingSent = false + dp.pingFails = 0 + dp.firstPingDone = true +} + +func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) { + dp.pingHandlingLock.Lock() + defer dp.pingHandlingLock.Unlock() + dp.log.Warn().Uint64("ping_id", pingID).Msg("Ditto ping is taking long, phone may be offline") + if (!dp.firstPingDone || sendNotResponding) && !dp.notRespondingSent { + dp.client.triggerEvent(&events.PhoneNotResponding{}) + dp.notRespondingSent = true + } +} + +func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout time.Duration, timeoutCount int, pingChan <-chan *IncomingRPCMessage) { + var timerChan <-chan time.Time + var timer *time.Timer + if timeout > 0 { + timer = time.NewTimer(timeout) + timerChan = timer.C + } + select { + case <-pingChan: + dp.OnRespond(pingID, time.Since(start)) + if timer != nil && !timer.Stop() { + <-timer.C + } + case <-timerChan: + dp.OnTimeout(pingID, timeout == shortPingTimeout || timeoutCount > 3) + repingTickerTime := 1 * time.Minute + var repingTicker *time.Ticker + var repingTickerChan <-chan time.Time + if timeoutCount == 0 { + repingTicker = time.NewTicker(repingTickerTime) + repingTickerChan = repingTicker.C + } + for { + timeoutCount++ + select { + case <-pingChan: + dp.OnRespond(pingID, time.Since(start)) + return + case <-repingTickerChan: + if repingTickerTime < maxRepingTickerTime { + repingTickerTime *= 2 + repingTicker.Reset(repingTickerTime) + } + subPingID := pingIDCounter.Add(1) + dp.log.Debug(). + Uint64("parent_ping_id", pingID). + Uint64("ping_id", subPingID). + Str("next_reping", repingTickerTime.String()). + Msg("Sending new ping") + dp.Ping(subPingID, defaultPingTimeout, timeoutCount) + case <-dp.client.pingShortCircuit: + dp.pingHandlingLock.Lock() + dp.log.Debug().Uint64("ping_id", pingID). + Msg("Ditto ping wait short-circuited during ping backoff, sending PhoneNotResponding immediately") + if !dp.notRespondingSent { + dp.client.triggerEvent(&events.PhoneNotResponding{}) + dp.notRespondingSent = true + } + dp.pingHandlingLock.Unlock() + case <-dp.stop: + return + } + } + case <-dp.stop: + if timer != nil && !timer.Stop() { + <-timer.C } } - doPing := func() { - pingChan, err := c.NotifyDittoActivity() - if err != nil { - log.Err(err).Msg("Error notifying ditto activity") - pingFails++ - c.triggerEvent(&events.PingFailed{ - Error: fmt.Errorf("failed to notify ditto activity: %w", err), - ErrorCount: pingFails, - }) - return - } - select { - case <-pingChan: - onRespond() - return - case <-time.After(phoneNotRespondingTimeout): - log.Warn().Msg("Ditto ping is taking long, phone may be offline") - c.triggerEvent(&events.PhoneNotResponding{}) - notResponding = true - case <-stopPinger: - exit = true - return - } - select { - case <-pingChan: - onRespond() - case <-stopPinger: - exit = true - return - } +} + +func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount int) { + dp.pingHandlingLock.Lock() + if time.Since(dp.lastPingTime) < minPingInterval { + dp.log.Debug(). + Uint64("ping_id", pingID). + Time("last_ping_time", dp.lastPingTime). + Msg("Skipping ping since last one was too recently") + dp.pingHandlingLock.Unlock() + return } - for !exit { + now := time.Now() + dp.lastPingTime = now + if dp.oldestPingTime.IsZero() { + dp.oldestPingTime = now + } + pingChan, err := dp.client.NotifyDittoActivity() + if err != nil { + dp.log.Err(err).Uint64("ping_id", pingID).Msg("Error sending ping") + dp.pingFails++ + dp.client.triggerEvent(&events.PingFailed{ + Error: fmt.Errorf("failed to notify ditto activity: %w", err), + ErrorCount: dp.pingFails, + }) + dp.pingHandlingLock.Unlock() + return + } + dp.pingHandlingLock.Unlock() + if timeoutCount == 0 { + dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan) + } else { + go dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan) + } +} + +func (dp *dittoPinger) Loop() { + for { select { - case <-c.pingShortCircuit: - log.Debug().Msg("Ditto ping wait short-circuited") - doPing() - case <-dittoPing: - log.Trace().Msg("Doing normal ditto ping") - doPing() - case <-stopPinger: + case <-dp.client.pingShortCircuit: + pingID := pingIDCounter.Add(1) + dp.log.Debug().Uint64("ping_id", pingID).Msg("Ditto ping wait short-circuited") + dp.Ping(pingID, shortPingTimeout, 0) + case <-dp.ping: + pingID := pingIDCounter.Add(1) + dp.log.Debug().Uint64("ping_id", pingID).Msg("Doing normal ditto ping") + dp.Ping(pingID, defaultPingTimeout, 0) + case <-dp.stop: return } } @@ -103,9 +218,13 @@ func (c *Client) doLongPoll(loggedIn bool) { dittoPing := make(chan struct{}, 1) stopDittoPinger := make(chan struct{}) - defer close(stopDittoPinger) - go c.doDittoPinger(&log, dittoPing, stopDittoPinger) + go (&dittoPinger{ + ping: dittoPing, + stop: stopDittoPinger, + log: &log, + client: c, + }).Loop() errorCount := 1 for c.listenID == listenID {