diff --git a/libgm/conversation_handler.go b/libgm/conversation_handler.go deleted file mode 100644 index cbc1f1a..0000000 --- a/libgm/conversation_handler.go +++ /dev/null @@ -1,11 +0,0 @@ -package libgm - -import ( - "go.mau.fi/mautrix-gmessages/libgm/pblite" - - "go.mau.fi/mautrix-gmessages/libgm/binary" -) - -func (c *Client) handleConversationEvent(res *pblite.Response, data *binary.Conversation) { - c.triggerEvent(data) -} diff --git a/libgm/crypto/cryptor.go b/libgm/crypto/cryptor.go index ea8be5a..da071f7 100644 --- a/libgm/crypto/cryptor.go +++ b/libgm/crypto/cryptor.go @@ -88,18 +88,6 @@ func (c *Cryptor) Decrypt(encryptedData []byte) ([]byte, error) { return encryptedDataWithoutHMAC, nil } -func (c *Cryptor) DecryptAndDecodeData(encryptedData []byte, message proto.Message) error { - decryptedData, err := c.Decrypt(encryptedData) - if err != nil { - return err - } - err = binary.DecodeProtoMessage(decryptedData, message) - if err != nil { - return err - } - return nil -} - func (c *Cryptor) EncodeAndEncryptData(message proto.Message) ([]byte, error) { encodedData, encodeErr := binary.EncodeProtoMessage(message) if encodeErr != nil { diff --git a/libgm/event_handler.go b/libgm/event_handler.go index b0cb91c..3c0d84b 100644 --- a/libgm/event_handler.go +++ b/libgm/event_handler.go @@ -1,6 +1,8 @@ package libgm import ( + "crypto/sha256" + "encoding/base64" "encoding/json" "fmt" @@ -9,7 +11,35 @@ import ( "go.mau.fi/mautrix-gmessages/libgm/binary" ) -var skipCount int32 +func (r *RPC) deduplicateHash(hash [32]byte) bool { + const recentUpdatesLen = len(r.recentUpdates) + for i := r.recentUpdatesPtr + recentUpdatesLen - 1; i >= r.recentUpdatesPtr; i-- { + if r.recentUpdates[i%recentUpdatesLen] == hash { + return true + } + } + r.recentUpdates[r.recentUpdatesPtr] = hash + r.recentUpdatesPtr = (r.recentUpdatesPtr + 1) % recentUpdatesLen + return false +} + +func (r *RPC) deduplicateUpdate(response *pblite.Response) bool { + if response.Data.RawDecrypted != nil { + contentHash := sha256.Sum256(response.Data.RawDecrypted) + if r.deduplicateHash(contentHash) { + r.client.Logger.Trace().Hex("data_hash", contentHash[:]).Msg("Ignoring duplicate update") + return true + } + if r.client.Logger.Trace().Enabled() { + r.client.Logger.Trace(). + Str("proto_name", string(response.Data.Decrypted.ProtoReflect().Descriptor().FullName())). + Str("data", base64.StdEncoding.EncodeToString(response.Data.RawDecrypted)). + Hex("data_hash", contentHash[:]). + Msg("Got event") + } + } + return false +} func (r *RPC) HandleRPCMsg(msgArr []interface{}) { response, decodeErr := pblite.DecodeAndDecryptInternalMessage(msgArr, r.client.authData.Cryptor) @@ -29,15 +59,30 @@ func (r *RPC) HandleRPCMsg(msgArr []interface{}) { //r.client.Logger.Debug().Any("waiting", waitingForResponse).Msg("got request! waiting?") r.client.sessionHandler.addResponseAck(response.ResponseId) if waitingForResponse { + if response.Data.Decrypted != nil && r.client.Logger.Trace().Enabled() { + r.client.Logger.Trace(). + Str("proto_name", string(response.Data.Decrypted.ProtoReflect().Descriptor().FullName())). + Str("data", base64.StdEncoding.EncodeToString(response.Data.RawDecrypted)). + Msg("Got response") + } r.client.sessionHandler.respondToRequestChannel(response) } else { switch response.BugleRoute { case binary.BugleRoute_PairEvent: - r.client.handlePairingEvent(response) + go r.client.handlePairingEvent(response) case binary.BugleRoute_DataEvent: - if skipCount > 0 { - skipCount-- - r.client.Logger.Info().Any("action", response.Data.Action).Any("toSkip", skipCount).Msg("Skipped DataEvent") + if r.skipCount > 0 { + r.skipCount-- + r.client.Logger.Debug(). + Any("action", response.Data.Action). + Any("toSkip", r.skipCount). + Msg("Skipped DataEvent") + if response.Data.Decrypted != nil { + r.client.Logger.Trace(). + Str("proto_name", string(response.Data.Decrypted.ProtoReflect().Descriptor().FullName())). + Str("data", base64.StdEncoding.EncodeToString(response.Data.RawDecrypted)). + Msg("Skipped event data") + } return } r.client.handleUpdatesEvent(response) diff --git a/libgm/message_handler.go b/libgm/message_handler.go deleted file mode 100644 index 36e94f7..0000000 --- a/libgm/message_handler.go +++ /dev/null @@ -1,10 +0,0 @@ -package libgm - -import ( - "go.mau.fi/mautrix-gmessages/libgm/binary" - "go.mau.fi/mautrix-gmessages/libgm/pblite" -) - -func (c *Client) handleMessageEvent(res *pblite.Response, data *binary.Message) { - c.triggerEvent(data) -} diff --git a/libgm/pblite/internal.go b/libgm/pblite/internal.go index a8f1d45..6149320 100644 --- a/libgm/pblite/internal.go +++ b/libgm/pblite/internal.go @@ -1,6 +1,7 @@ package pblite import ( + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "go.mau.fi/mautrix-gmessages/libgm/binary" @@ -20,7 +21,8 @@ type RequestData struct { Bool1 bool `json:"bool1,omitempty"` Bool2 bool `json:"bool2,omitempty"` EncryptedData []byte `json:"requestData,omitempty"` - Decrypted interface{} `json:"decrypted,omitempty"` + RawDecrypted []byte `json:"-,omitempty"` + Decrypted proto.Message `json:"decrypted,omitempty"` Bool3 bool `json:"bool3,omitempty"` } @@ -59,14 +61,19 @@ func DecodeAndDecryptInternalMessage(data []interface{}, cryptor *crypto.Cryptor return nil, decodeErr } if internalRequestData.EncryptedData != nil { - var decryptedData = routes.Routes[internalRequestData.GetAction()].ResponseStruct.ProtoReflect().New().Interface() - decryptErr := cryptor.DecryptAndDecodeData(internalRequestData.EncryptedData, decryptedData) - if decryptErr != nil { - return nil, decryptErr + decryptedBytes, err := cryptor.Decrypt(internalRequestData.EncryptedData) + if err != nil { + return nil, err } - resp = newResponseFromDataEvent(internalMessage.GetData(), internalRequestData, decryptedData) + responseStruct := routes.Routes[internalRequestData.GetAction()].ResponseStruct + deserializedData := responseStruct.ProtoReflect().New().Interface() + err = proto.Unmarshal(decryptedBytes, deserializedData) + if err != nil { + return nil, err + } + resp = newResponseFromDataEvent(internalMessage.GetData(), internalRequestData, decryptedBytes, deserializedData) } else { - resp = newResponseFromDataEvent(internalMessage.GetData(), internalRequestData, nil) + resp = newResponseFromDataEvent(internalMessage.GetData(), internalRequestData, nil, nil) } } return resp, nil @@ -94,7 +101,7 @@ func newResponseFromPairEvent(internalMsg *binary.InternalMessageData, data *bin return resp } -func newResponseFromDataEvent(internalMsg *binary.InternalMessageData, internalRequestData *binary.InternalRequestData, decrypted protoreflect.ProtoMessage) *Response { +func newResponseFromDataEvent(internalMsg *binary.InternalMessageData, internalRequestData *binary.InternalRequestData, rawData []byte, decrypted protoreflect.ProtoMessage) *Response { resp := &Response{ ResponseId: internalMsg.GetResponseID(), BugleRoute: internalMsg.GetBugleRoute(), @@ -114,6 +121,7 @@ func newResponseFromDataEvent(internalMsg *binary.InternalMessageData, internalR Bool2: internalRequestData.GetBool2(), EncryptedData: internalRequestData.GetEncryptedData(), Decrypted: decrypted, + RawDecrypted: rawData, Bool3: internalRequestData.GetBool3(), }, SignatureId: internalMsg.GetSignatureID(), diff --git a/libgm/rpc.go b/libgm/rpc.go index e27a16f..7cf79a7 100644 --- a/libgm/rpc.go +++ b/libgm/rpc.go @@ -24,6 +24,11 @@ type RPC struct { conn io.ReadCloser rpcSessionId string listenID int + + skipCount int + + recentUpdates [32][32]byte + recentUpdatesPtr int } func (r *RPC) ListenReceiveMessages(payload []byte) { @@ -142,7 +147,7 @@ func (r *RPC) startReadingData(rc io.ReadCloser) { accumulatedData = []byte{} //r.client.Logger.Info().Any("val", msgArr).Msg("MsgArr") - go r.HandleRPCMsg(msgArr) + r.HandleRPCMsg(msgArr) } } @@ -155,8 +160,8 @@ func (r *RPC) isAcknowledgeMessage(data []byte) bool { if parseErr != nil { panic(parseErr) } - skipCount = parsed.Container.Data.GetAckAmount().Count - r.client.Logger.Info().Any("count", skipCount).Msg("Messages To Skip") + r.skipCount = int(parsed.Container.Data.GetAckAmount().Count) + r.client.Logger.Info().Any("count", r.skipCount).Msg("Messages To Skip") } else { return false } diff --git a/libgm/updates_handler.go b/libgm/updates_handler.go index 03c252a..fc5d4f8 100644 --- a/libgm/updates_handler.go +++ b/libgm/updates_handler.go @@ -23,10 +23,16 @@ func (c *Client) handleUpdatesEvent(res *pblite.Response) { c.handleSettingsEvent(res, evt.SettingsEvent) case *binary.UpdateEvents_ConversationEvent: - c.handleConversationEvent(res, evt.ConversationEvent.GetData()) + if c.rpc.deduplicateUpdate(res) { + return + } + c.triggerEvent(evt.ConversationEvent.GetData()) case *binary.UpdateEvents_MessageEvent: - c.handleMessageEvent(res, evt.MessageEvent.GetData()) + if c.rpc.deduplicateUpdate(res) { + return + } + c.triggerEvent(evt.MessageEvent.GetData()) case *binary.UpdateEvents_TypingEvent: c.handleTypingEvent(res, evt.TypingEvent.GetData()) diff --git a/libgm/useralert_handler.go b/libgm/useralert_handler.go index 27d85dd..0f9ae27 100644 --- a/libgm/useralert_handler.go +++ b/libgm/useralert_handler.go @@ -7,6 +7,21 @@ import ( "go.mau.fi/mautrix-gmessages/libgm/events" ) +func (c *Client) handleClientReady(newSessionId string) { + c.Logger.Info().Any("sessionId", newSessionId).Msg("Client is ready!") + conversations, convErr := c.Conversations.List(25) + if convErr != nil { + panic(convErr) + } + c.Logger.Debug().Any("conversations", conversations).Msg("got conversations") + notifyErr := c.Session.NotifyDittoActivity() + if notifyErr != nil { + panic(notifyErr) + } + readyEvt := events.NewClientReady(newSessionId, conversations) + c.triggerEvent(readyEvt) +} + func (c *Client) handleUserAlertEvent(res *pblite.Response, data *binary.UserAlertEvent) { alertType := data.AlertType switch alertType { @@ -17,20 +32,8 @@ func (c *Client) handleUserAlertEvent(res *pblite.Response, data *binary.UserAle evt := events.NewBrowserActive(newSessionId) c.triggerEvent(evt) } else { - c.Logger.Info().Any("sessionId", newSessionId).Msg("Client is ready!") - conversations, convErr := c.Conversations.List(25) - if convErr != nil { - panic(convErr) - } - c.Logger.Debug().Any("conversations", conversations).Msg("got conversations") - notifyErr := c.Session.NotifyDittoActivity() - if notifyErr != nil { - panic(notifyErr) - } - readyEvt := events.NewClientReady(newSessionId, conversations) - c.triggerEvent(readyEvt) + go c.handleClientReady(newSessionId) } - case binary.AlertType_MOBILE_BATTERY_LOW: c.Logger.Info().Msg("[MOBILE_BATTERY_LOW] Mobile device is on low battery") evt := events.NewMobileBatteryLow() diff --git a/user.go b/user.go index 2902f7f..5bbe115 100644 --- a/user.go +++ b/user.go @@ -392,7 +392,11 @@ func (user *User) createClient() { } } user.Client = libgm.NewClient(user.Session, user.zlog.With().Str("component", "libgm").Logger()) - user.Client.SetEventHandler(user.HandleEvent) + user.Client.SetEventHandler(user.syncHandleEvent) +} + +func (user *User) syncHandleEvent(ev any) { + go user.HandleEvent(ev) } func (user *User) Login(ctx context.Context) (<-chan string, error) { @@ -555,7 +559,7 @@ func (user *User) HandleEvent(event interface{}) { case *events.BrowserActive: user.zlog.Trace().Any("data", v).Msg("Browser active") default: - user.zlog.Trace().Any("data", v).Msg("Unknown event") + user.zlog.Trace().Any("data", v).Type("data_type", v).Msg("Unknown event") } }