Include raw protobuf data in events for debugging

This commit is contained in:
Tulir Asokan 2023-08-24 14:48:03 +03:00
parent 2ec1dcda6c
commit 0c5be59fad
4 changed files with 35 additions and 15 deletions

View file

@ -142,10 +142,10 @@ func (portal *Portal) forwardBackfill(ctx context.Context, user *User, after tim
for i := len(resp.Messages) - 1; i >= 0; i-- { for i := len(resp.Messages) - 1; i >= 0; i-- {
evt := resp.Messages[i] evt := resp.Messages[i]
isTooOld := !time.UnixMicro(evt.Timestamp).After(after) isTooOld := !time.UnixMicro(evt.Timestamp).After(after)
if portal.handleExistingMessage(ctx, user, evt, isTooOld) || isTooOld { if portal.handleExistingMessage(ctx, user, evt, isTooOld, nil) || isTooOld {
continue continue
} }
c := portal.convertGoogleMessage(ctx, user, evt, batchSending) c := portal.convertGoogleMessage(ctx, user, evt, batchSending, nil)
if c == nil { if c == nil {
continue continue
} }

View file

@ -166,6 +166,11 @@ func (c *Client) HandleRPCMsg(rawMsg *gmproto.IncomingRPCMessage) {
} }
} }
type WrappedMessage struct {
*gmproto.Message
Data []byte
}
func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) { func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) {
switch msg.Message.Action { switch msg.Message.Action {
case gmproto.ActionType_GET_UPDATES: case gmproto.ActionType_GET_UPDATES:
@ -194,7 +199,10 @@ func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) {
if c.deduplicateUpdate(evt.MessageEvent.GetData().GetMessageID(), msg) { if c.deduplicateUpdate(evt.MessageEvent.GetData().GetMessageID(), msg) {
return return
} }
c.triggerEvent(evt.MessageEvent.GetData()) c.triggerEvent(&WrappedMessage{
Message: evt.MessageEvent.GetData(),
Data: msg.DecryptedData,
})
case *gmproto.UpdateEvents_TypingEvent: case *gmproto.UpdateEvents_TypingEvent:
c.logContent(msg, "", nil) c.logContent(msg, "", nil)

View file

@ -19,6 +19,7 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/base64"
"errors" "errors"
"fmt" "fmt"
"image" "image"
@ -214,6 +215,7 @@ const recentlyHandledLength = 100
type PortalMessage struct { type PortalMessage struct {
evt *gmproto.Message evt *gmproto.Message
source *User source *User
raw []byte
} }
type PortalMatrixMessage struct { type PortalMatrixMessage struct {
@ -278,7 +280,7 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
defer portal.forwardBackfillLock.Unlock() defer portal.forwardBackfillLock.Unlock()
switch { switch {
case msg.evt != nil: case msg.evt != nil:
portal.handleMessage(msg.source, msg.evt) portal.handleMessage(msg.source, msg.evt, msg.raw)
default: default:
portal.zlog.Warn().Interface("portal_message", msg).Msg("Unexpected PortalMessage with no message") portal.zlog.Warn().Interface("portal_message", msg).Msg("Unexpected PortalMessage with no message")
} }
@ -420,11 +422,12 @@ func (portal *Portal) redactMessage(ctx context.Context, msg *database.Message)
msg.MXID = "" msg.MXID = ""
} }
func (portal *Portal) handleExistingMessageUpdate(ctx context.Context, source *User, dbMsg *database.Message, evt *gmproto.Message) { func (portal *Portal) handleExistingMessageUpdate(ctx context.Context, source *User, dbMsg *database.Message, evt *gmproto.Message, raw []byte) {
log := *zerolog.Ctx(ctx) log := *zerolog.Ctx(ctx)
newStatus := evt.GetMessageStatus().GetStatus() newStatus := evt.GetMessageStatus().GetStatus()
chatIDChanged := dbMsg.Chat.ID != portal.ID chatIDChanged := dbMsg.Chat.ID != portal.ID
if dbMsg.Status.Type == newStatus && !chatIDChanged && !(dbMsg.Status.HasPendingMediaParts() && !hasInProgressMedia(evt)) { if dbMsg.Status.Type == newStatus && !chatIDChanged && !(dbMsg.Status.HasPendingMediaParts() && !hasInProgressMedia(evt)) {
log.Debug().Msg("Nothing changed in message update, just syncing reactions")
portal.syncReactions(ctx, source, dbMsg, evt.Reactions) portal.syncReactions(ctx, source, dbMsg, evt.Reactions)
return return
} }
@ -461,7 +464,7 @@ func (portal *Portal) handleExistingMessageUpdate(ctx context.Context, source *U
dbMsg.Status.MediaStatus != downloadPendingStatusMessage(newStatus), dbMsg.Status.MediaStatus != downloadPendingStatusMessage(newStatus),
dbMsg.Status.HasPendingMediaParts() && !hasInProgressMedia(evt), dbMsg.Status.HasPendingMediaParts() && !hasInProgressMedia(evt),
dbMsg.Status.PartCount != len(evt.MessageInfo): dbMsg.Status.PartCount != len(evt.MessageInfo):
converted := portal.convertGoogleMessage(ctx, source, evt, false) converted := portal.convertGoogleMessage(ctx, source, evt, false, raw)
dbMsg.Status.MediaStatus = converted.MediaStatus dbMsg.Status.MediaStatus = converted.MediaStatus
if dbMsg.Status.MediaParts == nil { if dbMsg.Status.MediaParts == nil {
dbMsg.Status.MediaParts = make(map[string]database.MediaPart) dbMsg.Status.MediaParts = make(map[string]database.MediaPart)
@ -556,11 +559,11 @@ func (portal *Portal) handleExistingMessageUpdate(ctx context.Context, source *U
portal.syncReactions(ctx, source, dbMsg, evt.Reactions) portal.syncReactions(ctx, source, dbMsg, evt.Reactions)
} }
func (portal *Portal) handleExistingMessage(ctx context.Context, source *User, evt *gmproto.Message, outgoingOnly bool) bool { func (portal *Portal) handleExistingMessage(ctx context.Context, source *User, evt *gmproto.Message, outgoingOnly bool, raw []byte) bool {
log := zerolog.Ctx(ctx) log := zerolog.Ctx(ctx)
if existingMsg := portal.isOutgoingMessage(evt); existingMsg != nil { if existingMsg := portal.isOutgoingMessage(evt); existingMsg != nil {
log.Debug().Str("event_id", existingMsg.MXID.String()).Msg("Got echo for outgoing message") log.Debug().Str("event_id", existingMsg.MXID.String()).Msg("Got echo for outgoing message")
portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt) portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt, raw)
return true return true
} else if outgoingOnly { } else if outgoingOnly {
return false return false
@ -569,7 +572,7 @@ func (portal *Portal) handleExistingMessage(ctx context.Context, source *User, e
if err != nil { if err != nil {
log.Err(err).Msg("Failed to check if message is duplicate") log.Err(err).Msg("Failed to check if message is duplicate")
} else if existingMsg != nil { } else if existingMsg != nil {
portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt) portal.handleExistingMessageUpdate(ctx, source, existingMsg, evt, raw)
return true return true
} }
return false return false
@ -583,7 +586,7 @@ func idToInt(id string) int {
return i return i
} }
func (portal *Portal) handleMessage(source *User, evt *gmproto.Message) { func (portal *Portal) handleMessage(source *User, evt *gmproto.Message, raw []byte) {
eventTS := time.UnixMicro(evt.GetTimestamp()) eventTS := time.UnixMicro(evt.GetTimestamp())
if eventTS.After(portal.lastMessageTS) { if eventTS.After(portal.lastMessageTS) {
portal.lastMessageTS = eventTS portal.lastMessageTS = eventTS
@ -596,7 +599,7 @@ func (portal *Portal) handleMessage(source *User, evt *gmproto.Message) {
Str("action", "handle google message"). Str("action", "handle google message").
Logger() Logger()
ctx := log.WithContext(context.TODO()) ctx := log.WithContext(context.TODO())
if portal.handleExistingMessage(ctx, source, evt, false) { if portal.handleExistingMessage(ctx, source, evt, false, raw) {
return return
} }
switch evt.GetMessageStatus().GetStatus() { switch evt.GetMessageStatus().GetStatus() {
@ -617,7 +620,7 @@ func (portal *Portal) handleMessage(source *User, evt *gmproto.Message) {
} }
} }
converted := portal.convertGoogleMessage(ctx, source, evt, false) converted := portal.convertGoogleMessage(ctx, source, evt, false, raw)
eventIDs := portal.sendMessageParts(ctx, converted, nil) eventIDs := portal.sendMessageParts(ctx, converted, nil)
if len(eventIDs) > 0 { if len(eventIDs) > 0 {
portal.sendDeliveryReceipt(eventIDs[len(eventIDs)-1]) portal.sendDeliveryReceipt(eventIDs[len(eventIDs)-1])
@ -811,7 +814,7 @@ func (portal *Portal) shouldIgnoreStatus(status gmproto.MessageStatusType) bool
} }
} }
func (portal *Portal) convertGoogleMessage(ctx context.Context, source *User, evt *gmproto.Message, backfill bool) *ConvertedMessage { func (portal *Portal) convertGoogleMessage(ctx context.Context, source *User, evt *gmproto.Message, backfill bool, raw []byte) *ConvertedMessage {
log := zerolog.Ctx(ctx) log := zerolog.Ctx(ctx)
var cm ConvertedMessage var cm ConvertedMessage
@ -923,6 +926,14 @@ func (portal *Portal) convertGoogleMessage(ctx context.Context, source *User, ev
if portal.bridge.Config.Bridge.CaptionInMessage { if portal.bridge.Config.Bridge.CaptionInMessage {
cm.MergeCaption() cm.MergeCaption()
} }
if raw != nil && base64.StdEncoding.EncodedLen(len(raw)) < 8192 {
extra := cm.Parts[0].Extra
if extra == nil {
extra = make(map[string]any)
}
extra["fi.mau.gmessages.raw_debug_data"] = base64.StdEncoding.EncodeToString(raw)
cm.Parts[0].Extra = extra
}
return &cm return &cm
} }

View file

@ -603,7 +603,8 @@ func (user *User) syncHandleEvent(event any) {
}() }()
case *gmproto.Conversation: case *gmproto.Conversation:
go user.syncConversation(v, "event") go user.syncConversation(v, "event")
case *gmproto.Message: //case *gmproto.Message:
case *libgm.WrappedMessage:
user.zlog.Debug(). user.zlog.Debug().
Str("conversation_id", v.GetConversationID()). Str("conversation_id", v.GetConversationID()).
Str("participant_id", v.GetParticipantID()). Str("participant_id", v.GetParticipantID()).
@ -613,7 +614,7 @@ func (user *User) syncHandleEvent(event any) {
Str("tmp_id", v.GetTmpID()). Str("tmp_id", v.GetTmpID()).
Msg("Received message") Msg("Received message")
portal := user.GetPortalByID(v.GetConversationID()) portal := user.GetPortalByID(v.GetConversationID())
portal.messages <- PortalMessage{evt: v, source: user} portal.messages <- PortalMessage{evt: v.Message, source: user, raw: v.Data}
case *gmproto.UserAlertEvent: case *gmproto.UserAlertEvent:
user.handleUserAlert(v) user.handleUserAlert(v)
default: default: