gmessages/libgm/session_handler.go

167 lines
4.2 KiB
Go
Raw Normal View History

2023-06-30 11:05:33 +00:00
package libgm
2023-06-30 09:54:08 +00:00
import (
"fmt"
2023-07-15 12:57:07 +00:00
"sync"
2023-06-30 09:54:08 +00:00
"time"
2023-07-16 10:23:44 +00:00
"github.com/google/uuid"
2023-06-30 09:54:08 +00:00
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"
"go.mau.fi/mautrix-gmessages/libgm/pblite"
2023-06-30 09:54:08 +00:00
"go.mau.fi/mautrix-gmessages/libgm/binary"
"go.mau.fi/mautrix-gmessages/libgm/payload"
"go.mau.fi/mautrix-gmessages/libgm/routes"
2023-06-30 09:54:08 +00:00
"go.mau.fi/mautrix-gmessages/libgm/util"
)
2023-07-15 22:45:57 +00:00
type SessionHandler struct {
client *Client
2023-06-30 09:54:08 +00:00
2023-07-15 22:45:57 +00:00
responseWaiters map[string]chan<- *pblite.Response
responseWaitersLock sync.Mutex
2023-06-30 09:54:08 +00:00
2023-07-15 12:57:07 +00:00
ackMapLock sync.Mutex
ackMap []string
ackTicker *time.Ticker
2023-06-30 09:54:08 +00:00
2023-07-15 22:45:57 +00:00
sessionID string
2023-06-30 09:54:08 +00:00
responseTimeout time.Duration
}
2023-07-15 22:45:57 +00:00
func (s *SessionHandler) ResetSessionID() {
2023-07-16 10:23:44 +00:00
s.sessionID = uuid.NewString()
2023-06-30 09:54:08 +00:00
}
2023-07-15 22:45:57 +00:00
func (s *SessionHandler) sendMessageNoResponse(actionType binary.ActionType, encryptedData proto.Message) error {
_, payload, _, err := s.buildMessage(actionType, encryptedData)
if err != nil {
return err
}
2023-07-15 23:21:53 +00:00
_, err = s.client.rpc.sendMessageRequest(util.SendMessageURL, payload)
2023-07-15 22:45:57 +00:00
return err
2023-06-30 09:54:08 +00:00
}
2023-07-15 22:45:57 +00:00
func (s *SessionHandler) sendAsyncMessage(actionType binary.ActionType, encryptedData proto.Message) (<-chan *pblite.Response, error) {
requestID, payload, _, buildErr := s.buildMessage(actionType, encryptedData)
if buildErr != nil {
2023-07-15 22:45:57 +00:00
return nil, buildErr
2023-06-30 09:54:08 +00:00
}
2023-07-15 22:45:57 +00:00
ch := s.waitResponse(requestID)
2023-07-15 23:21:53 +00:00
_, reqErr := s.client.rpc.sendMessageRequest(util.SendMessageURL, payload)
if reqErr != nil {
2023-07-15 22:45:57 +00:00
s.cancelResponse(requestID, ch)
return nil, reqErr
}
return ch, nil
}
func (s *SessionHandler) sendMessage(actionType binary.ActionType, encryptedData proto.Message) (*pblite.Response, error) {
ch, err := s.sendAsyncMessage(actionType, encryptedData)
if err != nil {
return nil, err
2023-06-30 09:54:08 +00:00
}
2023-07-15 22:45:57 +00:00
// TODO add timeout
return <-ch, nil
}
2023-06-30 09:54:08 +00:00
func (s *SessionHandler) buildMessage(actionType binary.ActionType, encryptedData proto.Message) (string, []byte, binary.ActionType, error) {
2023-07-15 22:45:57 +00:00
var requestID string
pairedDevice := s.client.authData.DevicePair.Mobile
2023-07-15 22:45:57 +00:00
sessionId := s.client.sessionHandler.sessionID
token := s.client.authData.TachyonAuthToken
2023-06-30 09:54:08 +00:00
routeInfo, ok := routes.Routes[actionType]
if !ok {
return "", nil, 0, fmt.Errorf("failed to build message: could not find route %d", actionType)
2023-06-30 09:54:08 +00:00
}
if routeInfo.UseSessionID {
2023-07-15 22:45:57 +00:00
requestID = s.sessionID
} else {
2023-07-16 10:23:44 +00:00
requestID = uuid.NewString()
2023-06-30 09:54:08 +00:00
}
2023-07-15 22:45:57 +00:00
tmpMessage := payload.NewSendMessageBuilder(token, pairedDevice, requestID, sessionId).SetRoute(routeInfo.Action).SetSessionId(s.sessionID)
if encryptedData != nil {
2023-07-16 11:36:13 +00:00
tmpMessage.SetEncryptedProtoMessage(encryptedData, s.client.authData.RequestCrypto)
2023-06-30 09:54:08 +00:00
}
if routeInfo.UseTTL {
2023-07-16 11:36:13 +00:00
tmpMessage.SetTTL(s.client.authData.TachyonTTL)
2023-06-30 09:54:08 +00:00
}
message, buildErr := tmpMessage.Build()
if buildErr != nil {
return "", nil, 0, buildErr
2023-06-30 09:54:08 +00:00
}
2023-07-15 22:45:57 +00:00
return requestID, message, routeInfo.Action, nil
2023-06-30 09:54:08 +00:00
}
2023-07-15 22:45:57 +00:00
func (s *SessionHandler) queueMessageAck(messageID string) {
2023-07-15 12:57:07 +00:00
s.ackMapLock.Lock()
defer s.ackMapLock.Unlock()
2023-07-15 22:45:57 +00:00
if !slices.Contains(s.ackMap, messageID) {
s.ackMap = append(s.ackMap, messageID)
s.client.Logger.Trace().Any("message_id", messageID).Msg("Queued ack for message")
} else {
s.client.Logger.Trace().Any("message_id", messageID).Msg("Ack for message was already queued")
2023-06-30 09:54:08 +00:00
}
}
func (s *SessionHandler) startAckInterval() {
if s.ackTicker != nil {
s.ackTicker.Stop()
}
ticker := time.NewTicker(5 * time.Second)
s.ackTicker = ticker
go func() {
for range ticker.C {
s.sendAckRequest()
}
}()
}
func (s *SessionHandler) sendAckRequest() {
2023-07-15 12:57:07 +00:00
s.ackMapLock.Lock()
dataToAck := s.ackMap
s.ackMap = nil
2023-07-15 12:57:07 +00:00
s.ackMapLock.Unlock()
if len(dataToAck) == 0 {
return
}
ackMessages := make([]*binary.AckMessageData, len(dataToAck))
for i, reqID := range dataToAck {
ackMessages[i] = &binary.AckMessageData{
RequestID: reqID,
Device: s.client.authData.DevicePair.Browser,
}
}
2023-06-30 09:54:08 +00:00
ackMessagePayload := &binary.AckMessagePayload{
AuthData: &binary.AuthMessage{
2023-07-16 10:23:44 +00:00
RequestID: uuid.NewString(),
TachyonAuthToken: s.client.authData.TachyonAuthToken,
ConfigVersion: payload.ConfigMessage,
2023-06-30 09:54:08 +00:00
},
EmptyArr: &binary.EmptyArr{},
Acks: ackMessages,
2023-06-30 09:54:08 +00:00
}
jsonData, err := pblite.Marshal(ackMessagePayload)
2023-06-30 09:54:08 +00:00
if err != nil {
2023-07-09 20:32:19 +00:00
panic(err)
2023-06-30 09:54:08 +00:00
}
2023-07-15 23:21:53 +00:00
_, err = s.client.rpc.sendMessageRequest(util.AckMessagesURL, jsonData)
2023-06-30 09:54:08 +00:00
if err != nil {
2023-07-09 20:32:19 +00:00
panic(err)
2023-06-30 09:54:08 +00:00
}
2023-07-15 15:49:28 +00:00
s.client.Logger.Debug().Strs("message_ids", dataToAck).Msg("Sent acks")
2023-06-30 09:54:08 +00:00
}