Don't drop missed events after reconnect

This commit is contained in:
Tulir Asokan 2023-09-07 17:29:48 +03:00
parent 1b7f876b44
commit 29320b06fa
2 changed files with 28 additions and 15 deletions

View file

@ -13,6 +13,8 @@ import (
type IncomingRPCMessage struct { type IncomingRPCMessage struct {
*gmproto.IncomingRPCMessage *gmproto.IncomingRPCMessage
IsOld bool
Pair *gmproto.RPCPairData Pair *gmproto.RPCPairData
Message *gmproto.RPCMessageData Message *gmproto.RPCMessageData
@ -105,7 +107,7 @@ func (c *Client) deduplicateHash(id string, hash [32]byte) bool {
func (c *Client) logContent(res *IncomingRPCMessage, thingID string, contentHash []byte) { func (c *Client) logContent(res *IncomingRPCMessage, thingID string, contentHash []byte) {
if c.Logger.Trace().Enabled() && (res.DecryptedData != nil || res.DecryptedMessage != nil) { if c.Logger.Trace().Enabled() && (res.DecryptedData != nil || res.DecryptedMessage != nil) {
evt := c.Logger.Trace() evt := c.Logger.Trace().Bool("is_old", res.IsOld)
if res.DecryptedMessage != nil { if res.DecryptedMessage != nil {
evt.Str("proto_name", string(res.DecryptedMessage.ProtoReflect().Descriptor().FullName())) evt.Str("proto_name", string(res.DecryptedMessage.ProtoReflect().Descriptor().FullName()))
} }
@ -126,7 +128,11 @@ func (c *Client) deduplicateUpdate(id string, msg *IncomingRPCMessage) bool {
if msg.DecryptedData != nil { if msg.DecryptedData != nil {
contentHash := sha256.Sum256(msg.DecryptedData) contentHash := sha256.Sum256(msg.DecryptedData)
if c.deduplicateHash(id, contentHash) { if c.deduplicateHash(id, contentHash) {
c.Logger.Trace().Str("thing_id", id).Hex("data_hash", contentHash[:]).Msg("Ignoring duplicate update") c.Logger.Trace().
Str("thing_id", id).
Hex("data_hash", contentHash[:]).
Bool("is_old", msg.IsOld).
Msg("Ignoring duplicate update")
return true return true
} }
c.logContent(msg, id, contentHash[:]) c.logContent(msg, id, contentHash[:])
@ -151,17 +157,7 @@ func (c *Client) HandleRPCMsg(rawMsg *gmproto.IncomingRPCMessage) {
case gmproto.BugleRoute_DataEvent: case gmproto.BugleRoute_DataEvent:
if c.skipCount > 0 { if c.skipCount > 0 {
c.skipCount-- c.skipCount--
c.Logger.Debug(). msg.IsOld = true
Any("action", msg.Message.GetAction()).
Int("remaining_skip_count", c.skipCount).
Msg("Skipped DataEvent")
if msg.DecryptedMessage != nil {
c.Logger.Trace().
Str("proto_name", string(msg.DecryptedMessage.ProtoReflect().Descriptor().FullName())).
Str("data", base64.StdEncoding.EncodeToString(msg.DecryptedData)).
Msg("Skipped event data")
}
return
} }
c.handleUpdatesEvent(msg) c.handleUpdatesEvent(msg)
} }
@ -169,7 +165,8 @@ func (c *Client) HandleRPCMsg(rawMsg *gmproto.IncomingRPCMessage) {
type WrappedMessage struct { type WrappedMessage struct {
*gmproto.Message *gmproto.Message
Data []byte IsOld bool
Data []byte
} }
func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) { func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) {
@ -177,18 +174,25 @@ func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) {
case gmproto.ActionType_GET_UPDATES: case gmproto.ActionType_GET_UPDATES:
data, ok := msg.DecryptedMessage.(*gmproto.UpdateEvents) data, ok := msg.DecryptedMessage.(*gmproto.UpdateEvents)
if !ok { if !ok {
c.Logger.Error().Type("data_type", msg.DecryptedMessage).Msg("Unexpected data type in GET_UPDATES event") c.Logger.Error().
Type("data_type", msg.DecryptedMessage).
Bool("is_old", msg.IsOld).
Msg("Unexpected data type in GET_UPDATES event")
return return
} }
switch evt := data.Event.(type) { switch evt := data.Event.(type) {
case *gmproto.UpdateEvents_UserAlertEvent: case *gmproto.UpdateEvents_UserAlertEvent:
c.logContent(msg, "", nil) c.logContent(msg, "", nil)
if msg.IsOld {
return
}
c.triggerEvent(evt.UserAlertEvent) c.triggerEvent(evt.UserAlertEvent)
case *gmproto.UpdateEvents_SettingsEvent: case *gmproto.UpdateEvents_SettingsEvent:
c.Logger.Debug(). c.Logger.Debug().
Str("data", base64.StdEncoding.EncodeToString(msg.DecryptedData)). Str("data", base64.StdEncoding.EncodeToString(msg.DecryptedData)).
Bool("is_old", msg.IsOld).
Msg("Got settings event") Msg("Got settings event")
c.triggerEvent(evt.SettingsEvent) c.triggerEvent(evt.SettingsEvent)
@ -196,6 +200,9 @@ func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) {
for _, part := range evt.ConversationEvent.GetData() { for _, part := range evt.ConversationEvent.GetData() {
if c.deduplicateUpdate(part.GetConversationID(), msg) { if c.deduplicateUpdate(part.GetConversationID(), msg) {
return return
} else if msg.IsOld {
c.Logger.Debug().Str("conv_id", part.ConversationID).Msg("Ignoring old conversation event")
continue
} }
c.triggerEvent(part) c.triggerEvent(part)
} }
@ -207,12 +214,16 @@ func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) {
} }
c.triggerEvent(&WrappedMessage{ c.triggerEvent(&WrappedMessage{
Message: part, Message: part,
IsOld: msg.IsOld,
Data: msg.DecryptedData, Data: msg.DecryptedData,
}) })
} }
case *gmproto.UpdateEvents_TypingEvent: case *gmproto.UpdateEvents_TypingEvent:
c.logContent(msg, "", nil) c.logContent(msg, "", nil)
if msg.IsOld {
return
}
c.triggerEvent(evt.TypingEvent.GetData()) c.triggerEvent(evt.TypingEvent.GetData())
default: default:
@ -224,6 +235,7 @@ func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) {
c.Logger.Debug(). c.Logger.Debug().
Str("request_id", msg.Message.SessionID). Str("request_id", msg.Message.SessionID).
Str("action_type", msg.Message.Action.String()). Str("action_type", msg.Message.Action.String()).
Bool("is_old", msg.IsOld).
Msg("Got unexpected response") Msg("Got unexpected response")
} }
} }

View file

@ -638,6 +638,7 @@ func (user *User) syncHandleEvent(event any) {
Str("message_status", v.GetMessageStatus().GetStatus().String()). Str("message_status", v.GetMessageStatus().GetStatus().String()).
Int64("message_ts", v.GetTimestamp()). Int64("message_ts", v.GetTimestamp()).
Str("tmp_id", v.GetTmpID()). Str("tmp_id", v.GetTmpID()).
Bool("is_old", v.IsOld).
Msg("Received message") Msg("Received message")
portal := user.GetPortalByID(v.GetConversationID()) portal := user.GetPortalByID(v.GetConversationID())
portal.messages <- PortalMessage{evt: v.Message, source: user, raw: v.Data} portal.messages <- PortalMessage{evt: v.Message, source: user, raw: v.Data}