From 074e239ed40c5db8ff54dcd93b243428898a3a9d Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 21 May 2024 12:56:20 +0300 Subject: [PATCH] Fix ping loop getting stuck if the first ping doesn't respond --- libgm/longpoll.go | 53 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/libgm/longpoll.go b/libgm/longpoll.go index 4d062ac..503d0c7 100644 --- a/libgm/longpoll.go +++ b/libgm/longpoll.go @@ -59,7 +59,27 @@ type dittoPinger struct { log *zerolog.Logger } -func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration) { +type resetter struct { + C chan struct{} + d atomic.Bool +} + +func newResetter() *resetter { + return &resetter{ + C: make(chan struct{}), + } +} + +func (r *resetter) Done() { + if r.d.CompareAndSwap(false, true) { + go func() { + time.Sleep(5 * time.Second) + close(r.C) + }() + } +} + +func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration, reset *resetter) { dp.pingHandlingLock.Lock() defer dp.pingHandlingLock.Unlock() logEvt := dp.log.Debug().Uint64("ping_id", pingID).Dur("duration", dur) @@ -77,6 +97,7 @@ func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration) { dp.notRespondingSent = false dp.pingFails = 0 dp.firstPingDone = true + reset.Done() } func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) { @@ -89,7 +110,7 @@ func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) { } } -func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout time.Duration, timeoutCount int, pingChan <-chan *IncomingRPCMessage) { +func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout time.Duration, timeoutCount int, pingChan <-chan *IncomingRPCMessage, reset *resetter) { var timerChan <-chan time.Time var timer *time.Timer if timeout > 0 { @@ -98,7 +119,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t } select { case <-pingChan: - dp.OnRespond(pingID, time.Since(start)) + dp.OnRespond(pingID, time.Since(start), reset) if timer != nil && !timer.Stop() { <-timer.C } @@ -115,7 +136,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t timeoutCount++ select { case <-pingChan: - dp.OnRespond(pingID, time.Since(start)) + dp.OnRespond(pingID, time.Since(start), reset) return case <-repingTickerChan: if repingTickerTime < maxRepingTickerTime { @@ -128,7 +149,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t Uint64("ping_id", subPingID). Str("next_reping", repingTickerTime.String()). Msg("Sending new ping") - dp.Ping(subPingID, defaultPingTimeout, timeoutCount) + dp.Ping(subPingID, defaultPingTimeout, timeoutCount, reset) case <-dp.client.pingShortCircuit: dp.pingHandlingLock.Lock() dp.log.Debug().Uint64("ping_id", pingID). @@ -140,8 +161,20 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t dp.pingHandlingLock.Unlock() case <-dp.stop: return + case <-reset.C: + dp.log.Debug(). + Uint64("ping_id", pingID). + Msg("Another ping was successful, giving up on this one") + return } } + case <-reset.C: + dp.log.Debug(). + Uint64("ping_id", pingID). + Msg("Another ping was successful, giving up on this one") + if timer != nil && !timer.Stop() { + <-timer.C + } case <-dp.stop: if timer != nil && !timer.Stop() { <-timer.C @@ -149,7 +182,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t } } -func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount int) { +func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount int, reset *resetter) { dp.pingHandlingLock.Lock() if time.Since(dp.lastPingTime) < minPingInterval { dp.log.Debug(). @@ -177,9 +210,9 @@ func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount i } dp.pingHandlingLock.Unlock() if timeoutCount == 0 { - dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan) + dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan, reset) } else { - go dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan) + go dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan, reset) } } @@ -193,12 +226,12 @@ func (dp *dittoPinger) Loop() { pingID := pingIDCounter.Add(1) dp.log.Debug().Uint64("ping_id", pingID).Msg("Ditto ping wait short-circuited") pingStart = time.Now() - dp.Ping(pingID, shortPingTimeout, 0) + dp.Ping(pingID, shortPingTimeout, 0, newResetter()) case <-dp.ping: pingID := pingIDCounter.Add(1) dp.log.Trace().Uint64("ping_id", pingID).Msg("Doing normal ditto ping") pingStart = time.Now() - dp.Ping(pingID, defaultPingTimeout, 0) + dp.Ping(pingID, defaultPingTimeout, 0, newResetter()) case <-dp.stop: return }