gmessages/libgm/rpc.go

199 lines
6.3 KiB
Go
Raw Normal View History

2023-06-30 11:05:33 +00:00
package libgm
2023-06-30 09:54:08 +00:00
import (
"bufio"
"bytes"
"encoding/base64"
"encoding/json"
2023-06-30 09:54:08 +00:00
"errors"
2023-06-30 11:48:50 +00:00
"fmt"
2023-06-30 09:54:08 +00:00
"io"
"net/http"
2023-07-03 21:03:36 +00:00
"time"
2023-06-30 09:54:08 +00:00
"github.com/rs/zerolog"
2023-06-30 09:54:08 +00:00
"go.mau.fi/mautrix-gmessages/libgm/events"
"go.mau.fi/mautrix-gmessages/libgm/pblite"
"go.mau.fi/mautrix-gmessages/libgm/binary"
2023-06-30 09:54:08 +00:00
"go.mau.fi/mautrix-gmessages/libgm/util"
)
type RPC struct {
client *Client
http *http.Client
conn io.ReadCloser
stopping bool
rpcSessionId string
2023-07-02 14:19:00 +00:00
listenID int
skipCount int
2023-07-14 22:51:36 +00:00
recentUpdates [8][32]byte
recentUpdatesPtr int
2023-06-30 09:54:08 +00:00
}
func (r *RPC) ListenReceiveMessages(payload []byte) {
2023-07-02 14:19:00 +00:00
r.listenID++
listenID := r.listenID
2023-07-03 21:03:36 +00:00
errored := true
2023-07-02 14:19:00 +00:00
for r.listenID == listenID {
if r.client.authData.DevicePair != nil && r.client.authData.AuthenticatedAt.Add(20*time.Hour).Before(time.Now()) {
r.client.Logger.Debug().Msg("Refreshing auth token before starting new long-polling request")
err := r.client.refreshAuthToken()
if err != nil {
r.client.Logger.Err(err).Msg("Error refreshing auth token")
2023-07-15 12:02:03 +00:00
r.client.triggerEvent(&events.ListenFatalError{Error: fmt.Errorf("failed to refresh auth token: %w", err)})
return
}
}
r.client.Logger.Debug().Msg("Starting new long-polling request")
req, err := http.NewRequest("POST", util.RECEIVE_MESSAGES, bytes.NewReader(payload))
if err != nil {
panic(fmt.Errorf("Error creating request: %v", err))
}
util.BuildRelayHeaders(req, "application/json+protobuf", "*/*")
resp, reqErr := r.http.Do(req)
//r.client.Logger.Info().Any("bodyLength", len(payload)).Any("url", util.RECEIVE_MESSAGES).Any("headers", resp.Request.Header).Msg("RPC Request Headers")
if reqErr != nil {
2023-07-03 21:03:36 +00:00
r.client.triggerEvent(&events.ListenTemporaryError{Error: reqErr})
errored = true
r.client.Logger.Err(err).Msg("Error making listen request, retrying in 5 seconds")
time.Sleep(5 * time.Second)
continue
}
2023-07-15 12:02:03 +00:00
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
r.client.Logger.Error().Int("status_code", resp.StatusCode).Msg("Error making listen request")
2023-07-15 12:02:03 +00:00
r.client.triggerEvent(&events.ListenFatalError{Error: events.HTTPError{Action: "polling", Resp: resp}})
2023-07-03 21:03:36 +00:00
return
} else if resp.StatusCode >= 500 {
2023-07-15 12:02:03 +00:00
r.client.triggerEvent(&events.ListenTemporaryError{Error: events.HTTPError{Action: "polling", Resp: resp}})
2023-07-03 21:03:36 +00:00
errored = true
r.client.Logger.Debug().Int("statusCode", resp.StatusCode).Msg("5xx error in long polling, retrying in 5 seconds")
time.Sleep(5 * time.Second)
continue
}
if errored {
errored = false
r.client.triggerEvent(&events.ListenRecovered{})
}
2023-07-02 14:19:00 +00:00
r.client.Logger.Debug().Int("statusCode", resp.StatusCode).Msg("Long polling opened")
r.conn = resp.Body
if r.client.authData.DevicePair != nil {
go func() {
err := r.client.NotifyDittoActivity()
if err != nil {
r.client.Logger.Err(err).Msg("Error notifying ditto activity")
}
}()
}
r.startReadingData(resp.Body)
r.conn = nil
2023-06-30 09:54:08 +00:00
}
}
/*
The start of a message always begins with byte 44 (",")
If the message is parsable (after , has been removed) as an array of interfaces:
func (r *RPC) tryUnmarshalJSON(jsonData []byte, msgArr *[]interface{}) error {
err := json.Unmarshal(jsonData, &msgArr)
return err
}
then the message is complete and it should continue to the HandleRPCMsg function and it should also reset the buffer so that the next message can be received properly.
if it's not parsable, it should just append the received data to the buf and attempt to parse it until it's parsable. Because that would indicate that the full msg has been received
*/
func (r *RPC) startReadingData(rc io.ReadCloser) {
r.stopping = false
2023-06-30 09:54:08 +00:00
defer rc.Close()
reader := bufio.NewReader(rc)
buf := make([]byte, 2621440)
var accumulatedData []byte
n, err := reader.Read(buf[:2])
if err != nil {
r.client.Logger.Err(err).Msg("Error reading opening bytes")
return
} else if n != 2 || string(buf[:2]) != "[[" {
r.client.Logger.Err(err).Msg("Opening is not [[")
return
}
var expectEOF bool
2023-06-30 09:54:08 +00:00
for {
n, err = reader.Read(buf)
2023-06-30 09:54:08 +00:00
if err != nil {
var logEvt *zerolog.Event
if (errors.Is(err, io.EOF) && expectEOF) || r.stopping {
logEvt = r.client.Logger.Debug()
} else {
logEvt = r.client.Logger.Warn()
2023-06-30 09:54:08 +00:00
}
logEvt.Err(err).Msg("Stopped reading data from server")
2023-06-30 09:54:08 +00:00
return
} else if expectEOF {
r.client.Logger.Warn().Msg("Didn't get EOF after stream end marker")
2023-06-30 09:54:08 +00:00
}
chunk := buf[:n]
if len(accumulatedData) == 0 {
if len(chunk) == 2 && string(chunk) == "]]" {
r.client.Logger.Debug().Msg("Got stream end marker")
expectEOF = true
continue
2023-06-30 09:54:08 +00:00
}
chunk = bytes.TrimPrefix(chunk, []byte{','})
}
accumulatedData = append(accumulatedData, chunk...)
if !json.Valid(accumulatedData) {
r.client.Logger.Debug().Str("data", string(chunk)).Msg("Invalid JSON")
2023-06-30 09:54:08 +00:00
continue
}
currentBlock := accumulatedData
accumulatedData = accumulatedData[:0]
msg := &binary.InternalMessage{}
err = pblite.Unmarshal(currentBlock, msg)
2023-06-30 09:54:08 +00:00
if err != nil {
r.client.Logger.Err(err).Msg("Error deserializing pblite message")
2023-06-30 09:54:08 +00:00
continue
}
switch {
case msg.GetData() != nil:
r.HandleRPCMsg(msg)
case msg.GetAck() != nil:
r.client.Logger.Debug().Int32("count", msg.GetAck().GetCount()).Msg("Got startup ack count message")
r.skipCount = int(msg.GetAck().GetCount())
case msg.GetStartRead() != nil:
r.client.Logger.Trace().Msg("Got startRead message")
case msg.GetHeartbeat() != nil:
r.client.Logger.Trace().Msg("Got heartbeat message")
default:
r.client.Logger.Warn().
Str("data", base64.StdEncoding.EncodeToString(currentBlock)).
Msg("Got unknown message")
}
}
2023-06-30 09:54:08 +00:00
}
func (r *RPC) CloseConnection() {
if r.conn != nil {
r.client.Logger.Debug().Msg("Closing connection manually")
r.stopping = true
2023-06-30 09:54:08 +00:00
r.conn.Close()
r.conn = nil
}
}
func (r *RPC) sendMessageRequest(url string, payload []byte) (*http.Response, error) {
req, err := http.NewRequest("POST", url, bytes.NewReader(payload))
if err != nil {
2023-07-09 20:32:19 +00:00
return nil, fmt.Errorf("error creating request: %w", err)
2023-06-30 09:54:08 +00:00
}
util.BuildRelayHeaders(req, "application/json+protobuf", "*/*")
resp, reqErr := r.client.http.Do(req)
//r.client.Logger.Info().Any("bodyLength", len(payload)).Any("url", url).Any("headers", resp.Request.Header).Msg("RPC Request Headers")
if reqErr != nil {
2023-07-09 20:32:19 +00:00
return nil, fmt.Errorf("error making request: %w", err)
2023-06-30 09:54:08 +00:00
}
return resp, reqErr
}