Handle conversations moving into spam
This commit is contained in:
parent
4229f85d09
commit
97a3c97b8e
3 changed files with 72 additions and 5 deletions
|
@ -63,6 +63,9 @@ const (
|
||||||
SELECT conv_id, conv_receiver, id, mxid, mx_room, sender, timestamp, status FROM message
|
SELECT conv_id, conv_receiver, id, mxid, mx_room, sender, timestamp, status FROM message
|
||||||
WHERE mxid=$1
|
WHERE mxid=$1
|
||||||
`
|
`
|
||||||
|
deleteAllInChat = `
|
||||||
|
DELETE FROM message WHERE conv_id=$1 AND conv_receiver=$2
|
||||||
|
`
|
||||||
)
|
)
|
||||||
|
|
||||||
func (mq *MessageQuery) GetByID(ctx context.Context, receiver int, messageID string) (*Message, error) {
|
func (mq *MessageQuery) GetByID(ctx context.Context, receiver int, messageID string) (*Message, error) {
|
||||||
|
@ -81,6 +84,11 @@ func (mq *MessageQuery) GetLastInChatWithMXID(ctx context.Context, chat Key) (*M
|
||||||
return get[*Message](mq, ctx, getLastMessageInChatWithMXIDQuery, chat.ID, chat.Receiver)
|
return get[*Message](mq, ctx, getLastMessageInChatWithMXIDQuery, chat.ID, chat.Receiver)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mq *MessageQuery) DeleteAllInChat(ctx context.Context, chat Key) error {
|
||||||
|
_, err := mq.db.Conn(ctx).ExecContext(ctx, deleteAllInChat, chat.ID, chat.Receiver)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
type MediaPart struct {
|
type MediaPart struct {
|
||||||
EventID id.EventID `json:"mxid,omitempty"`
|
EventID id.EventID `json:"mxid,omitempty"`
|
||||||
PendingMedia bool `json:"pending_media,omitempty"`
|
PendingMedia bool `json:"pending_media,omitempty"`
|
||||||
|
|
25
portal.go
25
portal.go
|
@ -258,6 +258,8 @@ type Portal struct {
|
||||||
|
|
||||||
messages chan PortalMessage
|
messages chan PortalMessage
|
||||||
matrixMessages chan PortalMatrixMessage
|
matrixMessages chan PortalMatrixMessage
|
||||||
|
|
||||||
|
cancelCreation atomic.Pointer[context.CancelCauseFunc]
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -1898,6 +1900,29 @@ func (portal *Portal) Delete() {
|
||||||
portal.bridge.portalsLock.Unlock()
|
portal.bridge.portalsLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (portal *Portal) RemoveMXID(ctx context.Context) {
|
||||||
|
portal.bridge.portalsLock.Lock()
|
||||||
|
if len(portal.MXID) == 0 {
|
||||||
|
portal.bridge.portalsLock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(portal.bridge.portalsByMXID, portal.MXID)
|
||||||
|
portal.MXID = ""
|
||||||
|
portal.NameSet = false
|
||||||
|
portal.AvatarSet = false
|
||||||
|
portal.InSpace = false
|
||||||
|
portal.Encrypted = false
|
||||||
|
portal.bridge.portalsLock.Unlock()
|
||||||
|
err := portal.bridge.DB.Message.DeleteAllInChat(ctx, portal.Key)
|
||||||
|
if err != nil {
|
||||||
|
portal.zlog.Err(err).Msg("Failed to delete messages from database")
|
||||||
|
}
|
||||||
|
err = portal.Update(ctx)
|
||||||
|
if err != nil {
|
||||||
|
portal.zlog.Err(err).Msg("Failed to remove portal mxid from database")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (portal *Portal) Cleanup() {
|
func (portal *Portal) Cleanup() {
|
||||||
if len(portal.MXID) == 0 {
|
if len(portal.MXID) == 0 {
|
||||||
return
|
return
|
||||||
|
|
44
user.go
44
user.go
|
@ -785,13 +785,24 @@ func (user *User) syncConversation(v *gmproto.Conversation, source string) {
|
||||||
Str("data_source", source).
|
Str("data_source", source).
|
||||||
Interface("conversation_data", convCopy).
|
Interface("conversation_data", convCopy).
|
||||||
Logger()
|
Logger()
|
||||||
|
if cancel := portal.cancelCreation.Load(); cancel != nil {
|
||||||
|
if updateType == gmproto.ConversationStatus_SPAM_FOLDER || updateType == gmproto.ConversationStatus_BLOCKED_FOLDER {
|
||||||
|
(*cancel)(fmt.Errorf("conversation was moved to spam"))
|
||||||
|
} else {
|
||||||
|
log.Debug().Msg("Conversation creation is still pending, ignoring new sync event")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
if portal.MXID != "" {
|
if portal.MXID != "" {
|
||||||
switch updateType {
|
switch updateType {
|
||||||
// TODO also delete if blocked?
|
|
||||||
case gmproto.ConversationStatus_DELETED:
|
case gmproto.ConversationStatus_DELETED:
|
||||||
log.Info().Msg("Got delete event, cleaning up portal")
|
log.Info().Msg("Got delete event, cleaning up portal")
|
||||||
portal.Delete()
|
portal.Delete()
|
||||||
portal.Cleanup()
|
portal.Cleanup()
|
||||||
|
case gmproto.ConversationStatus_SPAM_FOLDER, gmproto.ConversationStatus_BLOCKED_FOLDER:
|
||||||
|
log.Info().Msg("Got spam/block event, cleaning up portal")
|
||||||
|
portal.Cleanup()
|
||||||
|
portal.RemoveMXID(context.TODO())
|
||||||
default:
|
default:
|
||||||
if v.Participants == nil {
|
if v.Participants == nil {
|
||||||
log.Debug().Msg("Not syncing conversation with nil participants")
|
log.Debug().Msg("Not syncing conversation with nil participants")
|
||||||
|
@ -816,10 +827,33 @@ func (user *User) syncConversation(v *gmproto.Conversation, source string) {
|
||||||
log.Debug().Msg("Not syncing conversation with nil participants")
|
log.Debug().Msg("Not syncing conversation with nil participants")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Debug().Msg("Creating portal for conversation")
|
if source == "event" {
|
||||||
err := portal.CreateMatrixRoom(user, v, source == "sync")
|
go func() {
|
||||||
if err != nil {
|
ctx, cancel := context.WithCancelCause(context.TODO())
|
||||||
log.Err(err).Msg("Error creating Matrix room from conversation event")
|
cancelPtr := &cancel
|
||||||
|
defer func() {
|
||||||
|
portal.cancelCreation.CompareAndSwap(cancelPtr, nil)
|
||||||
|
cancel(nil)
|
||||||
|
}()
|
||||||
|
portal.cancelCreation.Store(cancelPtr)
|
||||||
|
log.Debug().Msg("Creating portal for conversation in 5 seconds")
|
||||||
|
select {
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Debug().Err(ctx.Err()).Msg("Portal creation was cancelled")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := portal.CreateMatrixRoom(user, v, source == "sync")
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("Error creating Matrix room from conversation event")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
log.Debug().Msg("Creating portal for conversation")
|
||||||
|
err := portal.CreateMatrixRoom(user, v, source == "sync")
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("Error creating Matrix room from conversation event")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Debug().Msg("Not creating portal for conversation")
|
log.Debug().Msg("Not creating portal for conversation")
|
||||||
|
|
Loading…
Reference in a new issue