Handle events synchronously and ignore duplicates

This commit is contained in:
Tulir Asokan 2023-07-11 01:20:50 +03:00
parent 4f4b8818e5
commit c802d2bced
9 changed files with 104 additions and 66 deletions

View file

@ -1,11 +0,0 @@
package libgm
import (
"go.mau.fi/mautrix-gmessages/libgm/pblite"
"go.mau.fi/mautrix-gmessages/libgm/binary"
)
func (c *Client) handleConversationEvent(res *pblite.Response, data *binary.Conversation) {
c.triggerEvent(data)
}

View file

@ -88,18 +88,6 @@ func (c *Cryptor) Decrypt(encryptedData []byte) ([]byte, error) {
return encryptedDataWithoutHMAC, nil return encryptedDataWithoutHMAC, nil
} }
func (c *Cryptor) DecryptAndDecodeData(encryptedData []byte, message proto.Message) error {
decryptedData, err := c.Decrypt(encryptedData)
if err != nil {
return err
}
err = binary.DecodeProtoMessage(decryptedData, message)
if err != nil {
return err
}
return nil
}
func (c *Cryptor) EncodeAndEncryptData(message proto.Message) ([]byte, error) { func (c *Cryptor) EncodeAndEncryptData(message proto.Message) ([]byte, error) {
encodedData, encodeErr := binary.EncodeProtoMessage(message) encodedData, encodeErr := binary.EncodeProtoMessage(message)
if encodeErr != nil { if encodeErr != nil {

View file

@ -1,6 +1,8 @@
package libgm package libgm
import ( import (
"crypto/sha256"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -9,7 +11,35 @@ import (
"go.mau.fi/mautrix-gmessages/libgm/binary" "go.mau.fi/mautrix-gmessages/libgm/binary"
) )
var skipCount int32 func (r *RPC) deduplicateHash(hash [32]byte) bool {
const recentUpdatesLen = len(r.recentUpdates)
for i := r.recentUpdatesPtr + recentUpdatesLen - 1; i >= r.recentUpdatesPtr; i-- {
if r.recentUpdates[i%recentUpdatesLen] == hash {
return true
}
}
r.recentUpdates[r.recentUpdatesPtr] = hash
r.recentUpdatesPtr = (r.recentUpdatesPtr + 1) % recentUpdatesLen
return false
}
func (r *RPC) deduplicateUpdate(response *pblite.Response) bool {
if response.Data.RawDecrypted != nil {
contentHash := sha256.Sum256(response.Data.RawDecrypted)
if r.deduplicateHash(contentHash) {
r.client.Logger.Trace().Hex("data_hash", contentHash[:]).Msg("Ignoring duplicate update")
return true
}
if r.client.Logger.Trace().Enabled() {
r.client.Logger.Trace().
Str("proto_name", string(response.Data.Decrypted.ProtoReflect().Descriptor().FullName())).
Str("data", base64.StdEncoding.EncodeToString(response.Data.RawDecrypted)).
Hex("data_hash", contentHash[:]).
Msg("Got event")
}
}
return false
}
func (r *RPC) HandleRPCMsg(msgArr []interface{}) { func (r *RPC) HandleRPCMsg(msgArr []interface{}) {
response, decodeErr := pblite.DecodeAndDecryptInternalMessage(msgArr, r.client.authData.Cryptor) response, decodeErr := pblite.DecodeAndDecryptInternalMessage(msgArr, r.client.authData.Cryptor)
@ -29,15 +59,30 @@ func (r *RPC) HandleRPCMsg(msgArr []interface{}) {
//r.client.Logger.Debug().Any("waiting", waitingForResponse).Msg("got request! waiting?") //r.client.Logger.Debug().Any("waiting", waitingForResponse).Msg("got request! waiting?")
r.client.sessionHandler.addResponseAck(response.ResponseId) r.client.sessionHandler.addResponseAck(response.ResponseId)
if waitingForResponse { if waitingForResponse {
if response.Data.Decrypted != nil && r.client.Logger.Trace().Enabled() {
r.client.Logger.Trace().
Str("proto_name", string(response.Data.Decrypted.ProtoReflect().Descriptor().FullName())).
Str("data", base64.StdEncoding.EncodeToString(response.Data.RawDecrypted)).
Msg("Got response")
}
r.client.sessionHandler.respondToRequestChannel(response) r.client.sessionHandler.respondToRequestChannel(response)
} else { } else {
switch response.BugleRoute { switch response.BugleRoute {
case binary.BugleRoute_PairEvent: case binary.BugleRoute_PairEvent:
r.client.handlePairingEvent(response) go r.client.handlePairingEvent(response)
case binary.BugleRoute_DataEvent: case binary.BugleRoute_DataEvent:
if skipCount > 0 { if r.skipCount > 0 {
skipCount-- r.skipCount--
r.client.Logger.Info().Any("action", response.Data.Action).Any("toSkip", skipCount).Msg("Skipped DataEvent") r.client.Logger.Debug().
Any("action", response.Data.Action).
Any("toSkip", r.skipCount).
Msg("Skipped DataEvent")
if response.Data.Decrypted != nil {
r.client.Logger.Trace().
Str("proto_name", string(response.Data.Decrypted.ProtoReflect().Descriptor().FullName())).
Str("data", base64.StdEncoding.EncodeToString(response.Data.RawDecrypted)).
Msg("Skipped event data")
}
return return
} }
r.client.handleUpdatesEvent(response) r.client.handleUpdatesEvent(response)

View file

@ -1,10 +0,0 @@
package libgm
import (
"go.mau.fi/mautrix-gmessages/libgm/binary"
"go.mau.fi/mautrix-gmessages/libgm/pblite"
)
func (c *Client) handleMessageEvent(res *pblite.Response, data *binary.Message) {
c.triggerEvent(data)
}

View file

@ -1,6 +1,7 @@
package pblite package pblite
import ( import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"go.mau.fi/mautrix-gmessages/libgm/binary" "go.mau.fi/mautrix-gmessages/libgm/binary"
@ -20,7 +21,8 @@ type RequestData struct {
Bool1 bool `json:"bool1,omitempty"` Bool1 bool `json:"bool1,omitempty"`
Bool2 bool `json:"bool2,omitempty"` Bool2 bool `json:"bool2,omitempty"`
EncryptedData []byte `json:"requestData,omitempty"` EncryptedData []byte `json:"requestData,omitempty"`
Decrypted interface{} `json:"decrypted,omitempty"` RawDecrypted []byte `json:"-,omitempty"`
Decrypted proto.Message `json:"decrypted,omitempty"`
Bool3 bool `json:"bool3,omitempty"` Bool3 bool `json:"bool3,omitempty"`
} }
@ -59,14 +61,19 @@ func DecodeAndDecryptInternalMessage(data []interface{}, cryptor *crypto.Cryptor
return nil, decodeErr return nil, decodeErr
} }
if internalRequestData.EncryptedData != nil { if internalRequestData.EncryptedData != nil {
var decryptedData = routes.Routes[internalRequestData.GetAction()].ResponseStruct.ProtoReflect().New().Interface() decryptedBytes, err := cryptor.Decrypt(internalRequestData.EncryptedData)
decryptErr := cryptor.DecryptAndDecodeData(internalRequestData.EncryptedData, decryptedData) if err != nil {
if decryptErr != nil { return nil, err
return nil, decryptErr
} }
resp = newResponseFromDataEvent(internalMessage.GetData(), internalRequestData, decryptedData) responseStruct := routes.Routes[internalRequestData.GetAction()].ResponseStruct
deserializedData := responseStruct.ProtoReflect().New().Interface()
err = proto.Unmarshal(decryptedBytes, deserializedData)
if err != nil {
return nil, err
}
resp = newResponseFromDataEvent(internalMessage.GetData(), internalRequestData, decryptedBytes, deserializedData)
} else { } else {
resp = newResponseFromDataEvent(internalMessage.GetData(), internalRequestData, nil) resp = newResponseFromDataEvent(internalMessage.GetData(), internalRequestData, nil, nil)
} }
} }
return resp, nil return resp, nil
@ -94,7 +101,7 @@ func newResponseFromPairEvent(internalMsg *binary.InternalMessageData, data *bin
return resp return resp
} }
func newResponseFromDataEvent(internalMsg *binary.InternalMessageData, internalRequestData *binary.InternalRequestData, decrypted protoreflect.ProtoMessage) *Response { func newResponseFromDataEvent(internalMsg *binary.InternalMessageData, internalRequestData *binary.InternalRequestData, rawData []byte, decrypted protoreflect.ProtoMessage) *Response {
resp := &Response{ resp := &Response{
ResponseId: internalMsg.GetResponseID(), ResponseId: internalMsg.GetResponseID(),
BugleRoute: internalMsg.GetBugleRoute(), BugleRoute: internalMsg.GetBugleRoute(),
@ -114,6 +121,7 @@ func newResponseFromDataEvent(internalMsg *binary.InternalMessageData, internalR
Bool2: internalRequestData.GetBool2(), Bool2: internalRequestData.GetBool2(),
EncryptedData: internalRequestData.GetEncryptedData(), EncryptedData: internalRequestData.GetEncryptedData(),
Decrypted: decrypted, Decrypted: decrypted,
RawDecrypted: rawData,
Bool3: internalRequestData.GetBool3(), Bool3: internalRequestData.GetBool3(),
}, },
SignatureId: internalMsg.GetSignatureID(), SignatureId: internalMsg.GetSignatureID(),

View file

@ -24,6 +24,11 @@ type RPC struct {
conn io.ReadCloser conn io.ReadCloser
rpcSessionId string rpcSessionId string
listenID int listenID int
skipCount int
recentUpdates [32][32]byte
recentUpdatesPtr int
} }
func (r *RPC) ListenReceiveMessages(payload []byte) { func (r *RPC) ListenReceiveMessages(payload []byte) {
@ -142,7 +147,7 @@ func (r *RPC) startReadingData(rc io.ReadCloser) {
accumulatedData = []byte{} accumulatedData = []byte{}
//r.client.Logger.Info().Any("val", msgArr).Msg("MsgArr") //r.client.Logger.Info().Any("val", msgArr).Msg("MsgArr")
go r.HandleRPCMsg(msgArr) r.HandleRPCMsg(msgArr)
} }
} }
@ -155,8 +160,8 @@ func (r *RPC) isAcknowledgeMessage(data []byte) bool {
if parseErr != nil { if parseErr != nil {
panic(parseErr) panic(parseErr)
} }
skipCount = parsed.Container.Data.GetAckAmount().Count r.skipCount = int(parsed.Container.Data.GetAckAmount().Count)
r.client.Logger.Info().Any("count", skipCount).Msg("Messages To Skip") r.client.Logger.Info().Any("count", r.skipCount).Msg("Messages To Skip")
} else { } else {
return false return false
} }

View file

@ -23,10 +23,16 @@ func (c *Client) handleUpdatesEvent(res *pblite.Response) {
c.handleSettingsEvent(res, evt.SettingsEvent) c.handleSettingsEvent(res, evt.SettingsEvent)
case *binary.UpdateEvents_ConversationEvent: case *binary.UpdateEvents_ConversationEvent:
c.handleConversationEvent(res, evt.ConversationEvent.GetData()) if c.rpc.deduplicateUpdate(res) {
return
}
c.triggerEvent(evt.ConversationEvent.GetData())
case *binary.UpdateEvents_MessageEvent: case *binary.UpdateEvents_MessageEvent:
c.handleMessageEvent(res, evt.MessageEvent.GetData()) if c.rpc.deduplicateUpdate(res) {
return
}
c.triggerEvent(evt.MessageEvent.GetData())
case *binary.UpdateEvents_TypingEvent: case *binary.UpdateEvents_TypingEvent:
c.handleTypingEvent(res, evt.TypingEvent.GetData()) c.handleTypingEvent(res, evt.TypingEvent.GetData())

View file

@ -7,6 +7,21 @@ import (
"go.mau.fi/mautrix-gmessages/libgm/events" "go.mau.fi/mautrix-gmessages/libgm/events"
) )
func (c *Client) handleClientReady(newSessionId string) {
c.Logger.Info().Any("sessionId", newSessionId).Msg("Client is ready!")
conversations, convErr := c.Conversations.List(25)
if convErr != nil {
panic(convErr)
}
c.Logger.Debug().Any("conversations", conversations).Msg("got conversations")
notifyErr := c.Session.NotifyDittoActivity()
if notifyErr != nil {
panic(notifyErr)
}
readyEvt := events.NewClientReady(newSessionId, conversations)
c.triggerEvent(readyEvt)
}
func (c *Client) handleUserAlertEvent(res *pblite.Response, data *binary.UserAlertEvent) { func (c *Client) handleUserAlertEvent(res *pblite.Response, data *binary.UserAlertEvent) {
alertType := data.AlertType alertType := data.AlertType
switch alertType { switch alertType {
@ -17,20 +32,8 @@ func (c *Client) handleUserAlertEvent(res *pblite.Response, data *binary.UserAle
evt := events.NewBrowserActive(newSessionId) evt := events.NewBrowserActive(newSessionId)
c.triggerEvent(evt) c.triggerEvent(evt)
} else { } else {
c.Logger.Info().Any("sessionId", newSessionId).Msg("Client is ready!") go c.handleClientReady(newSessionId)
conversations, convErr := c.Conversations.List(25)
if convErr != nil {
panic(convErr)
}
c.Logger.Debug().Any("conversations", conversations).Msg("got conversations")
notifyErr := c.Session.NotifyDittoActivity()
if notifyErr != nil {
panic(notifyErr)
}
readyEvt := events.NewClientReady(newSessionId, conversations)
c.triggerEvent(readyEvt)
} }
case binary.AlertType_MOBILE_BATTERY_LOW: case binary.AlertType_MOBILE_BATTERY_LOW:
c.Logger.Info().Msg("[MOBILE_BATTERY_LOW] Mobile device is on low battery") c.Logger.Info().Msg("[MOBILE_BATTERY_LOW] Mobile device is on low battery")
evt := events.NewMobileBatteryLow() evt := events.NewMobileBatteryLow()

View file

@ -392,7 +392,11 @@ func (user *User) createClient() {
} }
} }
user.Client = libgm.NewClient(user.Session, user.zlog.With().Str("component", "libgm").Logger()) user.Client = libgm.NewClient(user.Session, user.zlog.With().Str("component", "libgm").Logger())
user.Client.SetEventHandler(user.HandleEvent) user.Client.SetEventHandler(user.syncHandleEvent)
}
func (user *User) syncHandleEvent(ev any) {
go user.HandleEvent(ev)
} }
func (user *User) Login(ctx context.Context) (<-chan string, error) { func (user *User) Login(ctx context.Context) (<-chan string, error) {
@ -555,7 +559,7 @@ func (user *User) HandleEvent(event interface{}) {
case *events.BrowserActive: case *events.BrowserActive:
user.zlog.Trace().Any("data", v).Msg("Browser active") user.zlog.Trace().Any("data", v).Msg("Browser active")
default: default:
user.zlog.Trace().Any("data", v).Msg("Unknown event") user.zlog.Trace().Any("data", v).Type("data_type", v).Msg("Unknown event")
} }
} }