gmessages/libgm/longpoll.go

390 lines
11 KiB
Go
Raw Normal View History

2023-06-30 11:05:33 +00:00
package libgm
2023-06-30 09:54:08 +00:00
import (
"bufio"
"bytes"
"encoding/base64"
"encoding/json"
2023-06-30 09:54:08 +00:00
"errors"
2023-06-30 11:48:50 +00:00
"fmt"
2023-06-30 09:54:08 +00:00
"io"
2023-08-09 12:27:47 +00:00
"net/http"
2024-02-29 12:20:56 +00:00
"sync"
"sync/atomic"
2023-07-03 21:03:36 +00:00
"time"
2023-06-30 09:54:08 +00:00
2023-07-16 11:36:13 +00:00
"github.com/google/uuid"
"github.com/rs/zerolog"
2023-06-30 09:54:08 +00:00
"go.mau.fi/mautrix-gmessages/libgm/events"
"go.mau.fi/mautrix-gmessages/libgm/pblite"
2023-07-17 13:51:31 +00:00
"go.mau.fi/mautrix-gmessages/libgm/gmproto"
2023-06-30 09:54:08 +00:00
"go.mau.fi/mautrix-gmessages/libgm/util"
)
2024-02-29 12:20:56 +00:00
const defaultPingTimeout = 1 * time.Minute
const shortPingTimeout = 10 * time.Second
const minPingInterval = 30 * time.Second
const maxRepingTickerTime = 64 * time.Minute
var pingIDCounter atomic.Uint64
// Goals of the ditto pinger:
// - By default, send pings to the phone every 15 minutes when the long polling connection restarts
// - If an outgoing request doesn't respond quickly, send a ping immediately
// - If a ping caused by a request timeout doesn't respond quickly, send PhoneNotResponding
// (the user is probably actively trying to use the bridge)
// - If the first ping doesn't respond, send PhoneNotResponding
// (to avoid the bridge being stuck in the CONNECTING state)
// - If a ping doesn't respond, send new pings on increasing intervals
// (starting from 1 minute up to 1 hour) until it responds
// - If a normal ping doesn't respond, send PhoneNotResponding after 3 failed pings
// (so after ~8 minutes in total, not faster to avoid unnecessarily spamming the user)
// - If a request timeout happens during backoff pings, send PhoneNotResponding immediately
// - If a ping responds and PhoneNotResponding was sent, send PhoneRespondingAgain
type dittoPinger struct {
client *Client
firstPingDone bool
pingHandlingLock sync.RWMutex
oldestPingTime time.Time
lastPingTime time.Time
pingFails int
notRespondingSent bool
stop <-chan struct{}
ping <-chan struct{}
log *zerolog.Logger
}
func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration) {
dp.pingHandlingLock.Lock()
defer dp.pingHandlingLock.Unlock()
logEvt := dp.log.Debug().Uint64("ping_id", pingID).Dur("duration", dur)
if dp.notRespondingSent {
logEvt.Msg("Ditto ping successful (phone is back online)")
dp.client.triggerEvent(&events.PhoneRespondingAgain{})
} else if dp.pingFails > 0 {
logEvt.Msg("Ditto ping successful (stopped failing)")
// TODO separate event?
dp.client.triggerEvent(&events.PhoneRespondingAgain{})
} else {
logEvt.Msg("Ditto ping successful")
}
2024-02-29 12:20:56 +00:00
dp.oldestPingTime = time.Time{}
dp.notRespondingSent = false
dp.pingFails = 0
dp.firstPingDone = true
}
func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) {
dp.pingHandlingLock.Lock()
defer dp.pingHandlingLock.Unlock()
dp.log.Warn().Uint64("ping_id", pingID).Msg("Ditto ping is taking long, phone may be offline")
if (!dp.firstPingDone || sendNotResponding) && !dp.notRespondingSent {
dp.client.triggerEvent(&events.PhoneNotResponding{})
dp.notRespondingSent = true
}
}
func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout time.Duration, timeoutCount int, pingChan <-chan *IncomingRPCMessage) {
var timerChan <-chan time.Time
var timer *time.Timer
if timeout > 0 {
timer = time.NewTimer(timeout)
timerChan = timer.C
}
select {
case <-pingChan:
dp.OnRespond(pingID, time.Since(start))
if timer != nil && !timer.Stop() {
<-timer.C
}
2024-02-29 12:20:56 +00:00
case <-timerChan:
dp.OnTimeout(pingID, timeout == shortPingTimeout || timeoutCount > 3)
repingTickerTime := 1 * time.Minute
var repingTicker *time.Ticker
var repingTickerChan <-chan time.Time
if timeoutCount == 0 {
repingTicker = time.NewTicker(repingTickerTime)
repingTickerChan = repingTicker.C
}
2024-02-29 12:20:56 +00:00
for {
timeoutCount++
select {
case <-pingChan:
dp.OnRespond(pingID, time.Since(start))
return
case <-repingTickerChan:
if repingTickerTime < maxRepingTickerTime {
repingTickerTime *= 2
repingTicker.Reset(repingTickerTime)
}
subPingID := pingIDCounter.Add(1)
dp.log.Debug().
Uint64("parent_ping_id", pingID).
Uint64("ping_id", subPingID).
Str("next_reping", repingTickerTime.String()).
Msg("Sending new ping")
dp.Ping(subPingID, defaultPingTimeout, timeoutCount)
case <-dp.client.pingShortCircuit:
dp.pingHandlingLock.Lock()
dp.log.Debug().Uint64("ping_id", pingID).
Msg("Ditto ping wait short-circuited during ping backoff, sending PhoneNotResponding immediately")
if !dp.notRespondingSent {
dp.client.triggerEvent(&events.PhoneNotResponding{})
dp.notRespondingSent = true
}
dp.pingHandlingLock.Unlock()
case <-dp.stop:
return
}
}
case <-dp.stop:
if timer != nil && !timer.Stop() {
<-timer.C
}
}
2024-02-29 12:20:56 +00:00
}
func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount int) {
dp.pingHandlingLock.Lock()
if time.Since(dp.lastPingTime) < minPingInterval {
dp.log.Debug().
Uint64("ping_id", pingID).
Time("last_ping_time", dp.lastPingTime).
Msg("Skipping ping since last one was too recently")
dp.pingHandlingLock.Unlock()
return
}
now := time.Now()
dp.lastPingTime = now
if dp.oldestPingTime.IsZero() {
dp.oldestPingTime = now
}
pingChan, err := dp.client.NotifyDittoActivity()
if err != nil {
dp.log.Err(err).Uint64("ping_id", pingID).Msg("Error sending ping")
dp.pingFails++
dp.client.triggerEvent(&events.PingFailed{
Error: fmt.Errorf("failed to notify ditto activity: %w", err),
ErrorCount: dp.pingFails,
})
dp.pingHandlingLock.Unlock()
return
}
dp.pingHandlingLock.Unlock()
if timeoutCount == 0 {
dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan)
} else {
go dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan)
}
}
func (dp *dittoPinger) Loop() {
for {
select {
2024-02-29 12:20:56 +00:00
case <-dp.client.pingShortCircuit:
pingID := pingIDCounter.Add(1)
dp.log.Debug().Uint64("ping_id", pingID).Msg("Ditto ping wait short-circuited")
dp.Ping(pingID, shortPingTimeout, 0)
case <-dp.ping:
pingID := pingIDCounter.Add(1)
dp.log.Debug().Uint64("ping_id", pingID).Msg("Doing normal ditto ping")
dp.Ping(pingID, defaultPingTimeout, 0)
case <-dp.stop:
return
}
}
}
2023-09-04 11:24:45 +00:00
func tryReadBody(resp io.ReadCloser) []byte {
data, _ := io.ReadAll(resp)
_ = resp.Close()
return data
}
func (c *Client) doLongPoll(loggedIn bool) {
2023-07-19 11:12:23 +00:00
c.listenID++
listenID := c.listenID
2023-07-16 11:36:13 +00:00
listenReqID := uuid.NewString()
log := c.Logger.With().Int("listen_id", listenID).Logger()
defer func() {
log.Debug().Msg("Long polling stopped")
}()
log.Debug().Str("listen_uuid", listenReqID).Msg("Long polling starting")
dittoPing := make(chan struct{}, 1)
stopDittoPinger := make(chan struct{})
defer close(stopDittoPinger)
2024-02-29 12:20:56 +00:00
go (&dittoPinger{
ping: dittoPing,
stop: stopDittoPinger,
log: &log,
client: c,
}).Loop()
2023-08-09 12:27:47 +00:00
errorCount := 1
2023-07-19 11:12:23 +00:00
for c.listenID == listenID {
err := c.refreshAuthToken()
2023-07-16 11:36:13 +00:00
if err != nil {
log.Err(err).Msg("Error refreshing auth token")
if loggedIn {
2023-07-19 11:12:23 +00:00
c.triggerEvent(&events.ListenFatalError{Error: fmt.Errorf("failed to refresh auth token: %w", err)})
}
2023-07-16 11:36:13 +00:00
return
}
log.Debug().Msg("Starting new long-polling request")
payload := &gmproto.ReceiveMessagesRequest{
2023-07-17 13:51:31 +00:00
Auth: &gmproto.AuthMessage{
2023-07-16 11:36:13 +00:00
RequestID: listenReqID,
2023-07-19 11:12:23 +00:00
TachyonAuthToken: c.AuthData.TachyonAuthToken,
2024-02-23 12:53:19 +00:00
Network: c.AuthData.AuthNetwork(),
2023-07-16 12:55:30 +00:00
ConfigVersion: util.ConfigMessage,
2023-07-16 11:36:13 +00:00
},
2023-07-17 13:51:31 +00:00
Unknown: &gmproto.ReceiveMessagesRequest_UnknownEmptyObject2{
Unknown: &gmproto.ReceiveMessagesRequest_UnknownEmptyObject1{},
2023-07-16 11:36:13 +00:00
},
}
url := util.ReceiveMessagesURL
if c.AuthData.Cookies != nil {
url = util.ReceiveMessagesURLGoogle
payload.Auth.Network = util.GoogleNetwork
}
resp, err := c.makeProtobufHTTPRequest(url, payload, ContentTypePBLite)
if err != nil {
if loggedIn {
2023-07-19 11:12:23 +00:00
c.triggerEvent(&events.ListenTemporaryError{Error: err})
}
2023-08-09 12:27:47 +00:00
errorCount++
sleepSeconds := (errorCount + 1) * 5
log.Err(err).Int("sleep_seconds", sleepSeconds).Msg("Error making listen request, retrying in a while")
time.Sleep(time.Duration(sleepSeconds) * time.Second)
2023-07-03 21:03:36 +00:00
continue
}
2023-08-09 12:27:47 +00:00
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
2023-09-04 11:24:45 +00:00
body := tryReadBody(resp.Body)
log.Error().
Int("status_code", resp.StatusCode).
Bytes("resp_body", body).
Msg("Error making listen request")
if loggedIn {
2023-09-04 11:24:45 +00:00
c.triggerEvent(&events.ListenFatalError{Error: events.HTTPError{Action: "polling", Resp: resp, Body: body}})
}
2023-07-03 21:03:36 +00:00
return
2023-08-09 12:27:47 +00:00
} else if resp.StatusCode >= 400 {
if loggedIn {
2023-09-04 11:24:45 +00:00
c.triggerEvent(&events.ListenTemporaryError{Error: events.HTTPError{Action: "polling", Resp: resp, Body: tryReadBody(resp.Body)}})
} else {
_ = resp.Body.Close()
}
2023-08-09 12:27:47 +00:00
errorCount++
sleepSeconds := (errorCount + 1) * 5
log.Debug().
Int("statusCode", resp.StatusCode).
Int("sleep_seconds", sleepSeconds).
Msg("Error in long polling, retrying in a while")
time.Sleep(time.Duration(sleepSeconds) * time.Second)
2023-07-03 21:03:36 +00:00
continue
}
2023-08-09 12:27:47 +00:00
if errorCount > 0 {
errorCount = 0
if loggedIn {
2023-07-19 11:12:23 +00:00
c.triggerEvent(&events.ListenRecovered{})
}
}
log.Debug().Int("statusCode", resp.StatusCode).Msg("Long polling opened")
2023-07-19 11:12:23 +00:00
c.longPollingConn = resp.Body
if loggedIn {
select {
case dittoPing <- struct{}{}:
default:
log.Debug().Msg("Ditto pinger is still waiting for previous ping, skipping new ping")
}
}
c.readLongPoll(&log, resp.Body)
2023-07-19 11:12:23 +00:00
c.longPollingConn = nil
2023-06-30 09:54:08 +00:00
}
}
func (c *Client) readLongPoll(log *zerolog.Logger, rc io.ReadCloser) {
2023-06-30 09:54:08 +00:00
defer rc.Close()
2023-07-19 11:12:23 +00:00
c.disconnecting = false
2023-06-30 09:54:08 +00:00
reader := bufio.NewReader(rc)
buf := make([]byte, 2621440)
var accumulatedData []byte
n, err := reader.Read(buf[:2])
if err != nil {
log.Err(err).Msg("Error reading opening bytes")
return
} else if n != 2 || string(buf[:2]) != "[[" {
log.Err(err).Msg("Opening is not [[")
return
}
var expectEOF bool
2023-06-30 09:54:08 +00:00
for {
n, err = reader.Read(buf)
2023-06-30 09:54:08 +00:00
if err != nil {
var logEvt *zerolog.Event
2023-07-19 11:12:23 +00:00
if (errors.Is(err, io.EOF) && expectEOF) || c.disconnecting {
logEvt = log.Debug()
} else {
logEvt = log.Warn()
2023-06-30 09:54:08 +00:00
}
logEvt.Err(err).Msg("Stopped reading data from server")
2023-06-30 09:54:08 +00:00
return
} else if expectEOF {
log.Warn().Msg("Didn't get EOF after stream end marker")
2023-06-30 09:54:08 +00:00
}
chunk := buf[:n]
if len(accumulatedData) == 0 {
if len(chunk) == 2 && string(chunk) == "]]" {
log.Debug().Msg("Got stream end marker")
expectEOF = true
continue
2023-06-30 09:54:08 +00:00
}
chunk = bytes.TrimPrefix(chunk, []byte{','})
}
accumulatedData = append(accumulatedData, chunk...)
if !json.Valid(accumulatedData) {
log.Trace().Msg("Invalid JSON, reading next chunk")
2023-06-30 09:54:08 +00:00
continue
}
currentBlock := accumulatedData
accumulatedData = accumulatedData[:0]
msg := &gmproto.LongPollingPayload{}
err = pblite.Unmarshal(currentBlock, msg)
2023-06-30 09:54:08 +00:00
if err != nil {
log.Err(err).Msg("Error deserializing pblite message")
2023-06-30 09:54:08 +00:00
continue
}
switch {
case msg.GetData() != nil:
2023-07-19 11:12:23 +00:00
c.HandleRPCMsg(msg.GetData())
case msg.GetAck() != nil:
log.Debug().Int32("count", msg.GetAck().GetCount()).Msg("Got startup ack count message")
2023-07-19 11:12:23 +00:00
c.skipCount = int(msg.GetAck().GetCount())
case msg.GetStartRead() != nil:
log.Trace().Msg("Got startRead message")
case msg.GetHeartbeat() != nil:
log.Trace().Msg("Got heartbeat message")
default:
log.Warn().
Str("data", base64.StdEncoding.EncodeToString(currentBlock)).
Msg("Got unknown message")
}
}
2023-06-30 09:54:08 +00:00
}
2023-07-19 11:12:23 +00:00
func (c *Client) closeLongPolling() {
if conn := c.longPollingConn; conn != nil {
c.Logger.Debug().Int("current_listen_id", c.listenID).Msg("Closing long polling connection manually")
2023-07-19 11:12:23 +00:00
c.listenID++
c.disconnecting = true
_ = conn.Close()
c.longPollingConn = nil
2023-06-30 09:54:08 +00:00
}
}