From 97a3c97b8e75cc6e04d0893dfe2b39ce6aadb7be Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 21 Aug 2023 19:42:57 +0300 Subject: [PATCH] Handle conversations moving into spam --- database/message.go | 8 ++++++++ portal.go | 25 +++++++++++++++++++++++++ user.go | 44 +++++++++++++++++++++++++++++++++++++++----- 3 files changed, 72 insertions(+), 5 deletions(-) diff --git a/database/message.go b/database/message.go index f7c9650..0dc66ba 100644 --- a/database/message.go +++ b/database/message.go @@ -63,6 +63,9 @@ const ( SELECT conv_id, conv_receiver, id, mxid, mx_room, sender, timestamp, status FROM message WHERE mxid=$1 ` + deleteAllInChat = ` + DELETE FROM message WHERE conv_id=$1 AND conv_receiver=$2 + ` ) func (mq *MessageQuery) GetByID(ctx context.Context, receiver int, messageID string) (*Message, error) { @@ -81,6 +84,11 @@ func (mq *MessageQuery) GetLastInChatWithMXID(ctx context.Context, chat Key) (*M return get[*Message](mq, ctx, getLastMessageInChatWithMXIDQuery, chat.ID, chat.Receiver) } +func (mq *MessageQuery) DeleteAllInChat(ctx context.Context, chat Key) error { + _, err := mq.db.Conn(ctx).ExecContext(ctx, deleteAllInChat, chat.ID, chat.Receiver) + return err +} + type MediaPart struct { EventID id.EventID `json:"mxid,omitempty"` PendingMedia bool `json:"pending_media,omitempty"` diff --git a/portal.go b/portal.go index 91121e2..4f7b454 100644 --- a/portal.go +++ b/portal.go @@ -258,6 +258,8 @@ type Portal struct { messages chan PortalMessage matrixMessages chan PortalMatrixMessage + + cancelCreation atomic.Pointer[context.CancelCauseFunc] } var ( @@ -1898,6 +1900,29 @@ func (portal *Portal) Delete() { portal.bridge.portalsLock.Unlock() } +func (portal *Portal) RemoveMXID(ctx context.Context) { + portal.bridge.portalsLock.Lock() + if len(portal.MXID) == 0 { + portal.bridge.portalsLock.Unlock() + return + } + delete(portal.bridge.portalsByMXID, portal.MXID) + portal.MXID = "" + portal.NameSet = false + portal.AvatarSet = false + portal.InSpace = false + portal.Encrypted = false + portal.bridge.portalsLock.Unlock() + err := portal.bridge.DB.Message.DeleteAllInChat(ctx, portal.Key) + if err != nil { + portal.zlog.Err(err).Msg("Failed to delete messages from database") + } + err = portal.Update(ctx) + if err != nil { + portal.zlog.Err(err).Msg("Failed to remove portal mxid from database") + } +} + func (portal *Portal) Cleanup() { if len(portal.MXID) == 0 { return diff --git a/user.go b/user.go index 22bf873..b2a201e 100644 --- a/user.go +++ b/user.go @@ -785,13 +785,24 @@ func (user *User) syncConversation(v *gmproto.Conversation, source string) { Str("data_source", source). Interface("conversation_data", convCopy). Logger() + if cancel := portal.cancelCreation.Load(); cancel != nil { + if updateType == gmproto.ConversationStatus_SPAM_FOLDER || updateType == gmproto.ConversationStatus_BLOCKED_FOLDER { + (*cancel)(fmt.Errorf("conversation was moved to spam")) + } else { + log.Debug().Msg("Conversation creation is still pending, ignoring new sync event") + return + } + } if portal.MXID != "" { switch updateType { - // TODO also delete if blocked? case gmproto.ConversationStatus_DELETED: log.Info().Msg("Got delete event, cleaning up portal") portal.Delete() portal.Cleanup() + case gmproto.ConversationStatus_SPAM_FOLDER, gmproto.ConversationStatus_BLOCKED_FOLDER: + log.Info().Msg("Got spam/block event, cleaning up portal") + portal.Cleanup() + portal.RemoveMXID(context.TODO()) default: if v.Participants == nil { log.Debug().Msg("Not syncing conversation with nil participants") @@ -816,10 +827,33 @@ func (user *User) syncConversation(v *gmproto.Conversation, source string) { log.Debug().Msg("Not syncing conversation with nil participants") return } - log.Debug().Msg("Creating portal for conversation") - err := portal.CreateMatrixRoom(user, v, source == "sync") - if err != nil { - log.Err(err).Msg("Error creating Matrix room from conversation event") + if source == "event" { + go func() { + ctx, cancel := context.WithCancelCause(context.TODO()) + cancelPtr := &cancel + defer func() { + portal.cancelCreation.CompareAndSwap(cancelPtr, nil) + cancel(nil) + }() + portal.cancelCreation.Store(cancelPtr) + log.Debug().Msg("Creating portal for conversation in 5 seconds") + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + log.Debug().Err(ctx.Err()).Msg("Portal creation was cancelled") + return + } + err := portal.CreateMatrixRoom(user, v, source == "sync") + if err != nil { + log.Err(err).Msg("Error creating Matrix room from conversation event") + } + }() + } else { + log.Debug().Msg("Creating portal for conversation") + err := portal.CreateMatrixRoom(user, v, source == "sync") + if err != nil { + log.Err(err).Msg("Error creating Matrix room from conversation event") + } } } else { log.Debug().Msg("Not creating portal for conversation")