Fix deduplicating updates that flip back and forth

This commit is contained in:
Tulir Asokan 2023-07-19 22:29:30 +03:00
parent d410ba1e0c
commit ecae11f6b2
3 changed files with 28 additions and 15 deletions

View file

@ -41,6 +41,11 @@ const RefreshTachyonBuffer = 1 * time.Hour
type Proxy func(*http.Request) (*url.URL, error) type Proxy func(*http.Request) (*url.URL, error)
type EventHandler func(evt any) type EventHandler func(evt any)
type updateDedupItem struct {
id string
hash [32]byte
}
type Client struct { type Client struct {
Logger zerolog.Logger Logger zerolog.Logger
evHandler EventHandler evHandler EventHandler
@ -51,7 +56,7 @@ type Client struct {
skipCount int skipCount int
disconnecting bool disconnecting bool
recentUpdates [8][32]byte recentUpdates [8]updateDedupItem
recentUpdatesPtr int recentUpdatesPtr int
conversationsFetchedOnce bool conversationsFetchedOnce bool

View file

@ -77,19 +77,23 @@ func (c *Client) decryptInternalMessage(data *gmproto.IncomingRPCMessage) (*Inco
return msg, nil return msg, nil
} }
func (c *Client) deduplicateHash(hash [32]byte) bool { func (c *Client) deduplicateHash(id string, hash [32]byte) bool {
const recentUpdatesLen = len(c.recentUpdates) const recentUpdatesLen = len(c.recentUpdates)
for i := c.recentUpdatesPtr + recentUpdatesLen - 1; i >= c.recentUpdatesPtr; i-- { for i := c.recentUpdatesPtr + recentUpdatesLen - 1; i >= c.recentUpdatesPtr; i-- {
if c.recentUpdates[i%recentUpdatesLen] == hash { if c.recentUpdates[i%recentUpdatesLen].id == id {
return true if c.recentUpdates[i%recentUpdatesLen].hash == hash {
return true
} else {
break
}
} }
} }
c.recentUpdates[c.recentUpdatesPtr] = hash c.recentUpdates[c.recentUpdatesPtr] = updateDedupItem{id: id, hash: hash}
c.recentUpdatesPtr = (c.recentUpdatesPtr + 1) % recentUpdatesLen c.recentUpdatesPtr = (c.recentUpdatesPtr + 1) % recentUpdatesLen
return false return false
} }
func (c *Client) logContent(res *IncomingRPCMessage) { func (c *Client) logContent(res *IncomingRPCMessage, thingID string, contentHash []byte) {
if c.Logger.Trace().Enabled() && (res.DecryptedData != nil || res.DecryptedMessage != nil) { if c.Logger.Trace().Enabled() && (res.DecryptedData != nil || res.DecryptedMessage != nil) {
evt := c.Logger.Trace() evt := c.Logger.Trace()
if res.DecryptedMessage != nil { if res.DecryptedMessage != nil {
@ -97,6 +101,10 @@ func (c *Client) logContent(res *IncomingRPCMessage) {
} }
if res.DecryptedData != nil { if res.DecryptedData != nil {
evt.Str("data", base64.StdEncoding.EncodeToString(res.DecryptedData)) evt.Str("data", base64.StdEncoding.EncodeToString(res.DecryptedData))
if contentHash != nil {
evt.Str("thing_id", thingID)
evt.Hex("data_hash", contentHash)
}
} else { } else {
evt.Str("data", "<null>") evt.Str("data", "<null>")
} }
@ -104,14 +112,14 @@ func (c *Client) logContent(res *IncomingRPCMessage) {
} }
} }
func (c *Client) deduplicateUpdate(msg *IncomingRPCMessage) bool { func (c *Client) deduplicateUpdate(id string, msg *IncomingRPCMessage) bool {
if msg.DecryptedData != nil { if msg.DecryptedData != nil {
contentHash := sha256.Sum256(msg.DecryptedData) contentHash := sha256.Sum256(msg.DecryptedData)
if c.deduplicateHash(contentHash) { if c.deduplicateHash(id, contentHash) {
c.Logger.Trace().Hex("data_hash", contentHash[:]).Msg("Ignoring duplicate update") c.Logger.Trace().Str("thing_id", id).Hex("data_hash", contentHash[:]).Msg("Ignoring duplicate update")
return true return true
} }
c.logContent(msg) c.logContent(msg, id, contentHash[:])
} }
return false return false
} }

View file

@ -16,27 +16,27 @@ func (c *Client) handleUpdatesEvent(msg *IncomingRPCMessage) {
switch evt := data.Event.(type) { switch evt := data.Event.(type) {
case *gmproto.UpdateEvents_UserAlertEvent: case *gmproto.UpdateEvents_UserAlertEvent:
c.logContent(msg) c.logContent(msg, "", nil)
c.handleUserAlertEvent(msg, evt.UserAlertEvent) c.handleUserAlertEvent(msg, evt.UserAlertEvent)
case *gmproto.UpdateEvents_SettingsEvent: case *gmproto.UpdateEvents_SettingsEvent:
c.logContent(msg) c.logContent(msg, "", nil)
c.triggerEvent(evt.SettingsEvent) c.triggerEvent(evt.SettingsEvent)
case *gmproto.UpdateEvents_ConversationEvent: case *gmproto.UpdateEvents_ConversationEvent:
if c.deduplicateUpdate(msg) { if c.deduplicateUpdate(evt.ConversationEvent.GetData().GetConversationID(), msg) {
return return
} }
c.triggerEvent(evt.ConversationEvent.GetData()) c.triggerEvent(evt.ConversationEvent.GetData())
case *gmproto.UpdateEvents_MessageEvent: case *gmproto.UpdateEvents_MessageEvent:
if c.deduplicateUpdate(msg) { if c.deduplicateUpdate(evt.MessageEvent.GetData().GetMessageID(), msg) {
return return
} }
c.triggerEvent(evt.MessageEvent.GetData()) c.triggerEvent(evt.MessageEvent.GetData())
case *gmproto.UpdateEvents_TypingEvent: case *gmproto.UpdateEvents_TypingEvent:
c.logContent(msg) c.logContent(msg, "", nil)
c.triggerEvent(evt.TypingEvent.GetData()) c.triggerEvent(evt.TypingEvent.GetData())
default: default: