Fix ping loop getting stuck if the first ping doesn't respond

This commit is contained in:
Tulir Asokan 2024-05-21 12:56:20 +03:00
parent 09ac469771
commit 074e239ed4

View file

@ -59,7 +59,27 @@ type dittoPinger struct {
log *zerolog.Logger log *zerolog.Logger
} }
func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration) { type resetter struct {
C chan struct{}
d atomic.Bool
}
func newResetter() *resetter {
return &resetter{
C: make(chan struct{}),
}
}
func (r *resetter) Done() {
if r.d.CompareAndSwap(false, true) {
go func() {
time.Sleep(5 * time.Second)
close(r.C)
}()
}
}
func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration, reset *resetter) {
dp.pingHandlingLock.Lock() dp.pingHandlingLock.Lock()
defer dp.pingHandlingLock.Unlock() defer dp.pingHandlingLock.Unlock()
logEvt := dp.log.Debug().Uint64("ping_id", pingID).Dur("duration", dur) logEvt := dp.log.Debug().Uint64("ping_id", pingID).Dur("duration", dur)
@ -77,6 +97,7 @@ func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration) {
dp.notRespondingSent = false dp.notRespondingSent = false
dp.pingFails = 0 dp.pingFails = 0
dp.firstPingDone = true dp.firstPingDone = true
reset.Done()
} }
func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) { func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) {
@ -89,7 +110,7 @@ func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) {
} }
} }
func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout time.Duration, timeoutCount int, pingChan <-chan *IncomingRPCMessage) { func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout time.Duration, timeoutCount int, pingChan <-chan *IncomingRPCMessage, reset *resetter) {
var timerChan <-chan time.Time var timerChan <-chan time.Time
var timer *time.Timer var timer *time.Timer
if timeout > 0 { if timeout > 0 {
@ -98,7 +119,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t
} }
select { select {
case <-pingChan: case <-pingChan:
dp.OnRespond(pingID, time.Since(start)) dp.OnRespond(pingID, time.Since(start), reset)
if timer != nil && !timer.Stop() { if timer != nil && !timer.Stop() {
<-timer.C <-timer.C
} }
@ -115,7 +136,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t
timeoutCount++ timeoutCount++
select { select {
case <-pingChan: case <-pingChan:
dp.OnRespond(pingID, time.Since(start)) dp.OnRespond(pingID, time.Since(start), reset)
return return
case <-repingTickerChan: case <-repingTickerChan:
if repingTickerTime < maxRepingTickerTime { if repingTickerTime < maxRepingTickerTime {
@ -128,7 +149,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t
Uint64("ping_id", subPingID). Uint64("ping_id", subPingID).
Str("next_reping", repingTickerTime.String()). Str("next_reping", repingTickerTime.String()).
Msg("Sending new ping") Msg("Sending new ping")
dp.Ping(subPingID, defaultPingTimeout, timeoutCount) dp.Ping(subPingID, defaultPingTimeout, timeoutCount, reset)
case <-dp.client.pingShortCircuit: case <-dp.client.pingShortCircuit:
dp.pingHandlingLock.Lock() dp.pingHandlingLock.Lock()
dp.log.Debug().Uint64("ping_id", pingID). dp.log.Debug().Uint64("ping_id", pingID).
@ -140,8 +161,20 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t
dp.pingHandlingLock.Unlock() dp.pingHandlingLock.Unlock()
case <-dp.stop: case <-dp.stop:
return return
case <-reset.C:
dp.log.Debug().
Uint64("ping_id", pingID).
Msg("Another ping was successful, giving up on this one")
return
} }
} }
case <-reset.C:
dp.log.Debug().
Uint64("ping_id", pingID).
Msg("Another ping was successful, giving up on this one")
if timer != nil && !timer.Stop() {
<-timer.C
}
case <-dp.stop: case <-dp.stop:
if timer != nil && !timer.Stop() { if timer != nil && !timer.Stop() {
<-timer.C <-timer.C
@ -149,7 +182,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t
} }
} }
func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount int) { func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount int, reset *resetter) {
dp.pingHandlingLock.Lock() dp.pingHandlingLock.Lock()
if time.Since(dp.lastPingTime) < minPingInterval { if time.Since(dp.lastPingTime) < minPingInterval {
dp.log.Debug(). dp.log.Debug().
@ -177,9 +210,9 @@ func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount i
} }
dp.pingHandlingLock.Unlock() dp.pingHandlingLock.Unlock()
if timeoutCount == 0 { if timeoutCount == 0 {
dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan) dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan, reset)
} else { } else {
go dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan) go dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan, reset)
} }
} }
@ -193,12 +226,12 @@ func (dp *dittoPinger) Loop() {
pingID := pingIDCounter.Add(1) pingID := pingIDCounter.Add(1)
dp.log.Debug().Uint64("ping_id", pingID).Msg("Ditto ping wait short-circuited") dp.log.Debug().Uint64("ping_id", pingID).Msg("Ditto ping wait short-circuited")
pingStart = time.Now() pingStart = time.Now()
dp.Ping(pingID, shortPingTimeout, 0) dp.Ping(pingID, shortPingTimeout, 0, newResetter())
case <-dp.ping: case <-dp.ping:
pingID := pingIDCounter.Add(1) pingID := pingIDCounter.Add(1)
dp.log.Trace().Uint64("ping_id", pingID).Msg("Doing normal ditto ping") dp.log.Trace().Uint64("ping_id", pingID).Msg("Doing normal ditto ping")
pingStart = time.Now() pingStart = time.Now()
dp.Ping(pingID, defaultPingTimeout, 0) dp.Ping(pingID, defaultPingTimeout, 0, newResetter())
case <-dp.stop: case <-dp.stop:
return return
} }