gmessages/libgm/longpoll.go

172 lines
4.7 KiB
Go
Raw Normal View History

2023-06-30 11:05:33 +00:00
package libgm
2023-06-30 09:54:08 +00:00
import (
"bufio"
"bytes"
"encoding/base64"
"encoding/json"
2023-06-30 09:54:08 +00:00
"errors"
2023-06-30 11:48:50 +00:00
"fmt"
2023-06-30 09:54:08 +00:00
"io"
2023-07-03 21:03:36 +00:00
"time"
2023-06-30 09:54:08 +00:00
2023-07-16 11:36:13 +00:00
"github.com/google/uuid"
"github.com/rs/zerolog"
2023-06-30 09:54:08 +00:00
"go.mau.fi/mautrix-gmessages/libgm/events"
"go.mau.fi/mautrix-gmessages/libgm/pblite"
2023-07-17 13:51:31 +00:00
"go.mau.fi/mautrix-gmessages/libgm/gmproto"
2023-06-30 09:54:08 +00:00
"go.mau.fi/mautrix-gmessages/libgm/util"
)
func (c *Client) doLongPoll(loggedIn bool) {
2023-07-19 11:12:23 +00:00
c.listenID++
listenID := c.listenID
2023-07-03 21:03:36 +00:00
errored := true
2023-07-16 11:36:13 +00:00
listenReqID := uuid.NewString()
2023-07-19 11:12:23 +00:00
for c.listenID == listenID {
err := c.refreshAuthToken()
2023-07-16 11:36:13 +00:00
if err != nil {
2023-07-19 11:12:23 +00:00
c.Logger.Err(err).Msg("Error refreshing auth token")
if loggedIn {
2023-07-19 11:12:23 +00:00
c.triggerEvent(&events.ListenFatalError{Error: fmt.Errorf("failed to refresh auth token: %w", err)})
}
2023-07-16 11:36:13 +00:00
return
}
2023-07-19 11:12:23 +00:00
c.Logger.Debug().Msg("Starting new long-polling request")
payload := &gmproto.ReceiveMessagesRequest{
2023-07-17 13:51:31 +00:00
Auth: &gmproto.AuthMessage{
2023-07-16 11:36:13 +00:00
RequestID: listenReqID,
2023-07-19 11:12:23 +00:00
TachyonAuthToken: c.AuthData.TachyonAuthToken,
2023-07-16 12:55:30 +00:00
ConfigVersion: util.ConfigMessage,
2023-07-16 11:36:13 +00:00
},
2023-07-17 13:51:31 +00:00
Unknown: &gmproto.ReceiveMessagesRequest_UnknownEmptyObject2{
Unknown: &gmproto.ReceiveMessagesRequest_UnknownEmptyObject1{},
2023-07-16 11:36:13 +00:00
},
}
2023-07-19 11:12:23 +00:00
resp, err := c.makeProtobufHTTPRequest(util.ReceiveMessagesURL, payload, ContentTypePBLite)
if err != nil {
if loggedIn {
2023-07-19 11:12:23 +00:00
c.triggerEvent(&events.ListenTemporaryError{Error: err})
}
2023-07-03 21:03:36 +00:00
errored = true
2023-07-19 11:12:23 +00:00
c.Logger.Err(err).Msg("Error making listen request, retrying in 5 seconds")
2023-07-03 21:03:36 +00:00
time.Sleep(5 * time.Second)
continue
}
2023-07-15 12:02:03 +00:00
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
2023-07-19 11:12:23 +00:00
c.Logger.Error().Int("status_code", resp.StatusCode).Msg("Error making listen request")
if loggedIn {
2023-07-19 11:12:23 +00:00
c.triggerEvent(&events.ListenFatalError{Error: events.HTTPError{Action: "polling", Resp: resp}})
}
2023-07-03 21:03:36 +00:00
return
} else if resp.StatusCode >= 500 {
if loggedIn {
2023-07-19 11:12:23 +00:00
c.triggerEvent(&events.ListenTemporaryError{Error: events.HTTPError{Action: "polling", Resp: resp}})
}
2023-07-03 21:03:36 +00:00
errored = true
2023-07-19 11:12:23 +00:00
c.Logger.Debug().Int("statusCode", resp.StatusCode).Msg("5xx error in long polling, retrying in 5 seconds")
2023-07-03 21:03:36 +00:00
time.Sleep(5 * time.Second)
continue
}
if errored {
errored = false
if loggedIn {
2023-07-19 11:12:23 +00:00
c.triggerEvent(&events.ListenRecovered{})
}
}
2023-07-19 11:12:23 +00:00
c.Logger.Debug().Int("statusCode", resp.StatusCode).Msg("Long polling opened")
c.longPollingConn = resp.Body
if c.AuthData.Browser != nil {
go func() {
2023-07-19 11:12:23 +00:00
err := c.NotifyDittoActivity()
if err != nil {
2023-07-19 11:12:23 +00:00
c.Logger.Err(err).Msg("Error notifying ditto activity")
}
}()
}
c.readLongPoll(resp.Body)
2023-07-19 11:12:23 +00:00
c.longPollingConn = nil
2023-06-30 09:54:08 +00:00
}
}
func (c *Client) readLongPoll(rc io.ReadCloser) {
2023-06-30 09:54:08 +00:00
defer rc.Close()
2023-07-19 11:12:23 +00:00
c.disconnecting = false
2023-06-30 09:54:08 +00:00
reader := bufio.NewReader(rc)
buf := make([]byte, 2621440)
var accumulatedData []byte
n, err := reader.Read(buf[:2])
if err != nil {
2023-07-19 11:12:23 +00:00
c.Logger.Err(err).Msg("Error reading opening bytes")
return
} else if n != 2 || string(buf[:2]) != "[[" {
2023-07-19 11:12:23 +00:00
c.Logger.Err(err).Msg("Opening is not [[")
return
}
var expectEOF bool
2023-06-30 09:54:08 +00:00
for {
n, err = reader.Read(buf)
2023-06-30 09:54:08 +00:00
if err != nil {
var logEvt *zerolog.Event
2023-07-19 11:12:23 +00:00
if (errors.Is(err, io.EOF) && expectEOF) || c.disconnecting {
logEvt = c.Logger.Debug()
} else {
2023-07-19 11:12:23 +00:00
logEvt = c.Logger.Warn()
2023-06-30 09:54:08 +00:00
}
logEvt.Err(err).Msg("Stopped reading data from server")
2023-06-30 09:54:08 +00:00
return
} else if expectEOF {
2023-07-19 11:12:23 +00:00
c.Logger.Warn().Msg("Didn't get EOF after stream end marker")
2023-06-30 09:54:08 +00:00
}
chunk := buf[:n]
if len(accumulatedData) == 0 {
if len(chunk) == 2 && string(chunk) == "]]" {
2023-07-19 11:12:23 +00:00
c.Logger.Debug().Msg("Got stream end marker")
expectEOF = true
continue
2023-06-30 09:54:08 +00:00
}
chunk = bytes.TrimPrefix(chunk, []byte{','})
}
accumulatedData = append(accumulatedData, chunk...)
if !json.Valid(accumulatedData) {
2023-07-19 11:12:23 +00:00
c.Logger.Trace().Bytes("data", chunk).Msg("Invalid JSON, reading next chunk")
2023-06-30 09:54:08 +00:00
continue
}
currentBlock := accumulatedData
accumulatedData = accumulatedData[:0]
msg := &gmproto.LongPollingPayload{}
err = pblite.Unmarshal(currentBlock, msg)
2023-06-30 09:54:08 +00:00
if err != nil {
2023-07-19 11:12:23 +00:00
c.Logger.Err(err).Msg("Error deserializing pblite message")
2023-06-30 09:54:08 +00:00
continue
}
switch {
case msg.GetData() != nil:
2023-07-19 11:12:23 +00:00
c.HandleRPCMsg(msg.GetData())
case msg.GetAck() != nil:
2023-07-19 11:12:23 +00:00
c.Logger.Debug().Int32("count", msg.GetAck().GetCount()).Msg("Got startup ack count message")
c.skipCount = int(msg.GetAck().GetCount())
case msg.GetStartRead() != nil:
2023-07-19 11:12:23 +00:00
c.Logger.Trace().Msg("Got startRead message")
case msg.GetHeartbeat() != nil:
2023-07-19 11:12:23 +00:00
c.Logger.Trace().Msg("Got heartbeat message")
default:
2023-07-19 11:12:23 +00:00
c.Logger.Warn().
Str("data", base64.StdEncoding.EncodeToString(currentBlock)).
Msg("Got unknown message")
}
}
2023-06-30 09:54:08 +00:00
}
2023-07-19 11:12:23 +00:00
func (c *Client) closeLongPolling() {
if conn := c.longPollingConn; conn != nil {
c.Logger.Debug().Msg("Closing long polling connection manually")
c.listenID++
c.disconnecting = true
_ = conn.Close()
c.longPollingConn = nil
2023-06-30 09:54:08 +00:00
}
}