Merge response waiting methods with sending

This commit is contained in:
Tulir Asokan 2023-07-19 01:20:32 +03:00
parent 6caf7e89fb
commit 3e2348447a
2 changed files with 44 additions and 48 deletions

View file

@ -1,48 +0,0 @@
package libgm
import (
"encoding/base64"
)
func (s *SessionHandler) waitResponse(requestID string) chan *IncomingRPCMessage {
ch := make(chan *IncomingRPCMessage, 1)
s.responseWaitersLock.Lock()
s.responseWaiters[requestID] = ch
s.responseWaitersLock.Unlock()
return ch
}
func (s *SessionHandler) cancelResponse(requestID string, ch chan *IncomingRPCMessage) {
s.responseWaitersLock.Lock()
close(ch)
delete(s.responseWaiters, requestID)
s.responseWaitersLock.Unlock()
}
func (s *SessionHandler) receiveResponse(msg *IncomingRPCMessage) bool {
if msg.Message == nil {
return false
}
requestID := msg.Message.SessionID
s.responseWaitersLock.Lock()
ch, ok := s.responseWaiters[requestID]
if !ok {
s.responseWaitersLock.Unlock()
return false
}
delete(s.responseWaiters, requestID)
s.responseWaitersLock.Unlock()
evt := s.client.Logger.Trace().
Str("request_id", requestID)
if evt.Enabled() {
if msg.DecryptedData != nil {
evt.Str("data", base64.StdEncoding.EncodeToString(msg.DecryptedData))
}
if msg.DecryptedMessage != nil {
evt.Str("proto_name", string(msg.DecryptedMessage.ProtoReflect().Descriptor().FullName()))
}
}
evt.Msg("Received response")
ch <- msg
return true
}

View file

@ -1,6 +1,7 @@
package libgm package libgm
import ( import (
"encoding/base64"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -72,6 +73,49 @@ func typedResponse[T proto.Message](resp *IncomingRPCMessage, err error) (casted
return return
} }
func (s *SessionHandler) waitResponse(requestID string) chan *IncomingRPCMessage {
ch := make(chan *IncomingRPCMessage, 1)
s.responseWaitersLock.Lock()
s.responseWaiters[requestID] = ch
s.responseWaitersLock.Unlock()
return ch
}
func (s *SessionHandler) cancelResponse(requestID string, ch chan *IncomingRPCMessage) {
s.responseWaitersLock.Lock()
close(ch)
delete(s.responseWaiters, requestID)
s.responseWaitersLock.Unlock()
}
func (s *SessionHandler) receiveResponse(msg *IncomingRPCMessage) bool {
if msg.Message == nil {
return false
}
requestID := msg.Message.SessionID
s.responseWaitersLock.Lock()
ch, ok := s.responseWaiters[requestID]
if !ok {
s.responseWaitersLock.Unlock()
return false
}
delete(s.responseWaiters, requestID)
s.responseWaitersLock.Unlock()
evt := s.client.Logger.Trace().
Str("request_id", requestID)
if evt.Enabled() {
if msg.DecryptedData != nil {
evt.Str("data", base64.StdEncoding.EncodeToString(msg.DecryptedData))
}
if msg.DecryptedMessage != nil {
evt.Str("proto_name", string(msg.DecryptedMessage.ProtoReflect().Descriptor().FullName()))
}
}
evt.Msg("Received response")
ch <- msg
return true
}
func (s *SessionHandler) sendMessageWithParams(params SendMessageParams) (*IncomingRPCMessage, error) { func (s *SessionHandler) sendMessageWithParams(params SendMessageParams) (*IncomingRPCMessage, error) {
ch, err := s.sendAsyncMessage(params) ch, err := s.sendAsyncMessage(params)
if err != nil { if err != nil {