Send correct bridge state if phone isn't responding

This commit is contained in:
Tulir Asokan 2023-07-20 00:58:39 +03:00
parent 157c9ff17a
commit 8e814bd05b
7 changed files with 167 additions and 51 deletions

View file

@ -34,6 +34,8 @@ const (
GMBrowserInactive status.BridgeStateErrorCode = "gm-browser-inactive" GMBrowserInactive status.BridgeStateErrorCode = "gm-browser-inactive"
GMBrowserInactiveTimeout status.BridgeStateErrorCode = "gm-browser-inactive-timeout" GMBrowserInactiveTimeout status.BridgeStateErrorCode = "gm-browser-inactive-timeout"
GMBrowserInactiveInactivity status.BridgeStateErrorCode = "gm-browser-inactive-inactivity" GMBrowserInactiveInactivity status.BridgeStateErrorCode = "gm-browser-inactive-inactivity"
GMPhoneNotResponding status.BridgeStateErrorCode = "gm-phone-not-responding"
) )
func init() { func init() {
@ -44,6 +46,7 @@ func init() {
GMBrowserInactive: "Google Messages opened in another browser", GMBrowserInactive: "Google Messages opened in another browser",
GMBrowserInactiveTimeout: "Google Messages disconnected due to timeout", GMBrowserInactiveTimeout: "Google Messages disconnected due to timeout",
GMBrowserInactiveInactivity: "Google Messages disconnected due to inactivity", GMBrowserInactiveInactivity: "Google Messages disconnected due to inactivity",
GMPhoneNotResponding: "Your phone is not responding, please check that it is connected to the internet",
}) })
} }

View file

@ -56,6 +56,8 @@ type Client struct {
skipCount int skipCount int
disconnecting bool disconnecting bool
pingShortCircuit chan struct{}
recentUpdates [8]updateDedupItem recentUpdates [8]updateDedupItem
recentUpdatesPtr int recentUpdatesPtr int
@ -77,13 +79,14 @@ func NewAuthData() *AuthData {
func NewClient(authData *AuthData, logger zerolog.Logger) *Client { func NewClient(authData *AuthData, logger zerolog.Logger) *Client {
sessionHandler := &SessionHandler{ sessionHandler := &SessionHandler{
responseWaiters: make(map[string]chan<- *IncomingRPCMessage), responseWaiters: make(map[string]chan<- *IncomingRPCMessage),
responseTimeout: time.Duration(5000) * time.Millisecond,
} }
cli := &Client{ cli := &Client{
AuthData: authData, AuthData: authData,
Logger: logger, Logger: logger,
sessionHandler: sessionHandler, sessionHandler: sessionHandler,
http: &http.Client{}, http: &http.Client{},
pingShortCircuit: make(chan struct{}),
} }
sessionHandler.client = cli sessionHandler.client = cli
cli.FetchConfigVersion() cli.FetchConfigVersion()
@ -127,17 +130,33 @@ func (c *Client) Connect() error {
c.updateWebEncryptionKey(webEncryptionKeyResponse.GetKey()) c.updateWebEncryptionKey(webEncryptionKeyResponse.GetKey())
go c.doLongPoll(true) go c.doLongPoll(true)
c.sessionHandler.startAckInterval() c.sessionHandler.startAckInterval()
go c.postConnect()
return nil
}
bugleRes, bugleErr := c.IsBugleDefault() func (c *Client) postConnect() {
if bugleErr != nil { err := c.SetActiveSession()
return fmt.Errorf("failed to check bugle default: %w", err) if err != nil {
c.Logger.Err(err).Msg("Failed to set active session")
return
}
doneChan := make(chan struct{})
go func() {
select {
case <-time.After(5 * time.Second):
c.Logger.Warn().Msg("Checking bugle default on connect is taking long")
case <-doneChan:
}
}()
bugleRes, err := c.IsBugleDefault()
close(doneChan)
if err != nil {
c.Logger.Err(err).Msg("Failed to check bugle default")
return
} }
c.Logger.Debug().Bool("bugle_default", bugleRes.Success).Msg("Got is bugle default response on connect") c.Logger.Debug().Bool("bugle_default", bugleRes.Success).Msg("Got is bugle default response on connect")
sessionErr := c.SetActiveSession()
if sessionErr != nil {
return fmt.Errorf("failed to set active session: %w", err)
}
return nil
} }
func (c *Client) Disconnect() { func (c *Client) Disconnect() {

View file

@ -35,3 +35,7 @@ type ListenTemporaryError struct {
} }
type ListenRecovered struct{} type ListenRecovered struct{}
type PhoneNotResponding struct{}
type PhoneRespondingAgain struct{}

View file

@ -20,21 +20,84 @@ import (
"go.mau.fi/mautrix-gmessages/libgm/util" "go.mau.fi/mautrix-gmessages/libgm/util"
) )
func (c *Client) doDittoPinger(log *zerolog.Logger, dittoPing chan struct{}, stopPinger chan struct{}) {
notResponding := false
exit := false
onRespond := func() {
if notResponding {
log.Debug().Msg("Ditto ping succeeded, phone is back online")
c.triggerEvent(&events.PhoneRespondingAgain{})
notResponding = false
}
}
doPing := func() {
pingChan, err := c.NotifyDittoActivity()
if err != nil {
log.Err(err).Msg("Error notifying ditto activity")
return
}
select {
case <-pingChan:
onRespond()
return
case <-time.After(15 * time.Second):
log.Warn().Msg("Ditto ping is taking long, phone may be offline")
c.triggerEvent(&events.PhoneNotResponding{})
notResponding = true
case <-stopPinger:
exit = true
return
}
select {
case <-pingChan:
onRespond()
case <-stopPinger:
exit = true
return
}
}
for !exit {
select {
case <-c.pingShortCircuit:
log.Debug().Msg("Ditto ping wait short-circuited")
doPing()
case <-dittoPing:
log.Trace().Msg("Doing normal ditto ping")
doPing()
case <-stopPinger:
return
}
}
}
func (c *Client) doLongPoll(loggedIn bool) { func (c *Client) doLongPoll(loggedIn bool) {
c.listenID++ c.listenID++
listenID := c.listenID listenID := c.listenID
errored := true errored := true
listenReqID := uuid.NewString() listenReqID := uuid.NewString()
log := c.Logger.With().Int("listen_id", listenID).Logger()
defer func() {
log.Debug().Msg("Long polling stopped")
}()
log.Debug().Str("listen_uuid", listenReqID).Msg("Long polling starting")
dittoPing := make(chan struct{}, 1)
stopDittoPinger := make(chan struct{})
defer close(stopDittoPinger)
go c.doDittoPinger(&log, dittoPing, stopDittoPinger)
for c.listenID == listenID { for c.listenID == listenID {
err := c.refreshAuthToken() err := c.refreshAuthToken()
if err != nil { if err != nil {
c.Logger.Err(err).Msg("Error refreshing auth token") log.Err(err).Msg("Error refreshing auth token")
if loggedIn { if loggedIn {
c.triggerEvent(&events.ListenFatalError{Error: fmt.Errorf("failed to refresh auth token: %w", err)}) c.triggerEvent(&events.ListenFatalError{Error: fmt.Errorf("failed to refresh auth token: %w", err)})
} }
return return
} }
c.Logger.Debug().Msg("Starting new long-polling request") log.Debug().Msg("Starting new long-polling request")
payload := &gmproto.ReceiveMessagesRequest{ payload := &gmproto.ReceiveMessagesRequest{
Auth: &gmproto.AuthMessage{ Auth: &gmproto.AuthMessage{
RequestID: listenReqID, RequestID: listenReqID,
@ -51,12 +114,12 @@ func (c *Client) doLongPoll(loggedIn bool) {
c.triggerEvent(&events.ListenTemporaryError{Error: err}) c.triggerEvent(&events.ListenTemporaryError{Error: err})
} }
errored = true errored = true
c.Logger.Err(err).Msg("Error making listen request, retrying in 5 seconds") log.Err(err).Msg("Error making listen request, retrying in 5 seconds")
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
continue continue
} }
if resp.StatusCode >= 400 && resp.StatusCode < 500 { if resp.StatusCode >= 400 && resp.StatusCode < 500 {
c.Logger.Error().Int("status_code", resp.StatusCode).Msg("Error making listen request") log.Error().Int("status_code", resp.StatusCode).Msg("Error making listen request")
if loggedIn { if loggedIn {
c.triggerEvent(&events.ListenFatalError{Error: events.HTTPError{Action: "polling", Resp: resp}}) c.triggerEvent(&events.ListenFatalError{Error: events.HTTPError{Action: "polling", Resp: resp}})
} }
@ -66,7 +129,7 @@ func (c *Client) doLongPoll(loggedIn bool) {
c.triggerEvent(&events.ListenTemporaryError{Error: events.HTTPError{Action: "polling", Resp: resp}}) c.triggerEvent(&events.ListenTemporaryError{Error: events.HTTPError{Action: "polling", Resp: resp}})
} }
errored = true errored = true
c.Logger.Debug().Int("statusCode", resp.StatusCode).Msg("5xx error in long polling, retrying in 5 seconds") log.Debug().Int("statusCode", resp.StatusCode).Msg("5xx error in long polling, retrying in 5 seconds")
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
continue continue
} }
@ -76,22 +139,21 @@ func (c *Client) doLongPoll(loggedIn bool) {
c.triggerEvent(&events.ListenRecovered{}) c.triggerEvent(&events.ListenRecovered{})
} }
} }
c.Logger.Debug().Int("statusCode", resp.StatusCode).Msg("Long polling opened") log.Debug().Int("statusCode", resp.StatusCode).Msg("Long polling opened")
c.longPollingConn = resp.Body c.longPollingConn = resp.Body
if c.AuthData.Browser != nil { if loggedIn {
go func() { select {
err := c.NotifyDittoActivity() case dittoPing <- struct{}{}:
if err != nil { default:
c.Logger.Err(err).Msg("Error notifying ditto activity") log.Debug().Msg("Ditto pinger is still waiting for previous ping, skipping new ping")
} }
}()
} }
c.readLongPoll(resp.Body) c.readLongPoll(&log, resp.Body)
c.longPollingConn = nil c.longPollingConn = nil
} }
} }
func (c *Client) readLongPoll(rc io.ReadCloser) { func (c *Client) readLongPoll(log *zerolog.Logger, rc io.ReadCloser) {
defer rc.Close() defer rc.Close()
c.disconnecting = false c.disconnecting = false
reader := bufio.NewReader(rc) reader := bufio.NewReader(rc)
@ -99,10 +161,10 @@ func (c *Client) readLongPoll(rc io.ReadCloser) {
var accumulatedData []byte var accumulatedData []byte
n, err := reader.Read(buf[:2]) n, err := reader.Read(buf[:2])
if err != nil { if err != nil {
c.Logger.Err(err).Msg("Error reading opening bytes") log.Err(err).Msg("Error reading opening bytes")
return return
} else if n != 2 || string(buf[:2]) != "[[" { } else if n != 2 || string(buf[:2]) != "[[" {
c.Logger.Err(err).Msg("Opening is not [[") log.Err(err).Msg("Opening is not [[")
return return
} }
var expectEOF bool var expectEOF bool
@ -111,19 +173,19 @@ func (c *Client) readLongPoll(rc io.ReadCloser) {
if err != nil { if err != nil {
var logEvt *zerolog.Event var logEvt *zerolog.Event
if (errors.Is(err, io.EOF) && expectEOF) || c.disconnecting { if (errors.Is(err, io.EOF) && expectEOF) || c.disconnecting {
logEvt = c.Logger.Debug() logEvt = log.Debug()
} else { } else {
logEvt = c.Logger.Warn() logEvt = log.Warn()
} }
logEvt.Err(err).Msg("Stopped reading data from server") logEvt.Err(err).Msg("Stopped reading data from server")
return return
} else if expectEOF { } else if expectEOF {
c.Logger.Warn().Msg("Didn't get EOF after stream end marker") log.Warn().Msg("Didn't get EOF after stream end marker")
} }
chunk := buf[:n] chunk := buf[:n]
if len(accumulatedData) == 0 { if len(accumulatedData) == 0 {
if len(chunk) == 2 && string(chunk) == "]]" { if len(chunk) == 2 && string(chunk) == "]]" {
c.Logger.Debug().Msg("Got stream end marker") log.Debug().Msg("Got stream end marker")
expectEOF = true expectEOF = true
continue continue
} }
@ -131,7 +193,7 @@ func (c *Client) readLongPoll(rc io.ReadCloser) {
} }
accumulatedData = append(accumulatedData, chunk...) accumulatedData = append(accumulatedData, chunk...)
if !json.Valid(accumulatedData) { if !json.Valid(accumulatedData) {
c.Logger.Trace().Bytes("data", chunk).Msg("Invalid JSON, reading next chunk") log.Trace().Bytes("data", chunk).Msg("Invalid JSON, reading next chunk")
continue continue
} }
currentBlock := accumulatedData currentBlock := accumulatedData
@ -139,21 +201,21 @@ func (c *Client) readLongPoll(rc io.ReadCloser) {
msg := &gmproto.LongPollingPayload{} msg := &gmproto.LongPollingPayload{}
err = pblite.Unmarshal(currentBlock, msg) err = pblite.Unmarshal(currentBlock, msg)
if err != nil { if err != nil {
c.Logger.Err(err).Msg("Error deserializing pblite message") log.Err(err).Msg("Error deserializing pblite message")
continue continue
} }
switch { switch {
case msg.GetData() != nil: case msg.GetData() != nil:
c.HandleRPCMsg(msg.GetData()) c.HandleRPCMsg(msg.GetData())
case msg.GetAck() != nil: case msg.GetAck() != nil:
c.Logger.Debug().Int32("count", msg.GetAck().GetCount()).Msg("Got startup ack count message") log.Debug().Int32("count", msg.GetAck().GetCount()).Msg("Got startup ack count message")
c.skipCount = int(msg.GetAck().GetCount()) c.skipCount = int(msg.GetAck().GetCount())
case msg.GetStartRead() != nil: case msg.GetStartRead() != nil:
c.Logger.Trace().Msg("Got startRead message") log.Trace().Msg("Got startRead message")
case msg.GetHeartbeat() != nil: case msg.GetHeartbeat() != nil:
c.Logger.Trace().Msg("Got heartbeat message") log.Trace().Msg("Got heartbeat message")
default: default:
c.Logger.Warn(). log.Warn().
Str("data", base64.StdEncoding.EncodeToString(currentBlock)). Str("data", base64.StdEncoding.EncodeToString(currentBlock)).
Msg("Got unknown message") Msg("Got unknown message")
} }
@ -162,7 +224,7 @@ func (c *Client) readLongPoll(rc io.ReadCloser) {
func (c *Client) closeLongPolling() { func (c *Client) closeLongPolling() {
if conn := c.longPollingConn; conn != nil { if conn := c.longPollingConn; conn != nil {
c.Logger.Debug().Msg("Closing long polling connection manually") c.Logger.Debug().Int("current_listen_id", c.listenID).Msg("Closing long polling connection manually")
c.listenID++ c.listenID++
c.disconnecting = true c.disconnecting = true
_ = conn.Close() _ = conn.Close()

View file

@ -121,15 +121,13 @@ func (c *Client) SetActiveSession() error {
} }
func (c *Client) IsBugleDefault() (*gmproto.IsBugleDefaultResponse, error) { func (c *Client) IsBugleDefault() (*gmproto.IsBugleDefaultResponse, error) {
c.sessionHandler.ResetSessionID()
actionType := gmproto.ActionType_IS_BUGLE_DEFAULT actionType := gmproto.ActionType_IS_BUGLE_DEFAULT
return typedResponse[*gmproto.IsBugleDefaultResponse](c.sessionHandler.sendMessage(actionType, nil)) return typedResponse[*gmproto.IsBugleDefaultResponse](c.sessionHandler.sendMessage(actionType, nil))
} }
func (c *Client) NotifyDittoActivity() error { func (c *Client) NotifyDittoActivity() (<-chan *IncomingRPCMessage, error) {
payload := &gmproto.NotifyDittoActivityRequest{Success: true} return c.sessionHandler.sendAsyncMessage(SendMessageParams{
actionType := gmproto.ActionType_NOTIFY_DITTO_ACTIVITY Action: gmproto.ActionType_NOTIFY_DITTO_ACTIVITY,
Data: &gmproto.NotifyDittoActivityRequest{Success: true},
_, err := c.sessionHandler.sendMessage(actionType, payload) })
return err
} }

View file

@ -25,8 +25,6 @@ type SessionHandler struct {
ackTicker *time.Ticker ackTicker *time.Ticker
sessionID string sessionID string
responseTimeout time.Duration
} }
func (s *SessionHandler) ResetSessionID() { func (s *SessionHandler) ResetSessionID() {
@ -124,7 +122,17 @@ func (s *SessionHandler) sendMessageWithParams(params SendMessageParams) (*Incom
return nil, err return nil, err
} }
// TODO add timeout select {
case resp := <-ch:
return resp, nil
case <-time.After(5 * time.Second):
// Notify the pinger in order to trigger an event that the phone isn't responding
select {
case s.client.pingShortCircuit <- struct{}{}:
default:
}
}
// TODO hard timeout?
return <-ch, nil return <-ch, nil
} }

30
user.go
View file

@ -71,6 +71,7 @@ type User struct {
browserInactiveType status.BridgeStateErrorCode browserInactiveType status.BridgeStateErrorCode
batteryLow bool batteryLow bool
mobileData bool mobileData bool
phoneResponding bool
ready bool ready bool
batteryLowAlertSent time.Time batteryLowAlertSent time.Time
pollErrorAlertSent bool pollErrorAlertSent bool
@ -254,6 +255,7 @@ func (br *GMBridge) NewUser(dbUser *database.User) *User {
} }
user.log = maulogadapt.ZeroAsMau(&user.zlog) user.log = maulogadapt.ZeroAsMau(&user.zlog)
user.longPollingError = errors.New("not connected") user.longPollingError = errors.New("not connected")
user.phoneResponding = true
user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID) user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID)
user.Whitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelUser user.Whitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelUser
@ -615,6 +617,12 @@ func (user *User) HandleEvent(event interface{}) {
go user.sendMarkdownBridgeAlert(false, "Reconnected to Google Messages") go user.sendMarkdownBridgeAlert(false, "Reconnected to Google Messages")
user.pollErrorAlertSent = false user.pollErrorAlertSent = false
} }
case *events.PhoneNotResponding:
user.phoneResponding = false
user.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
case *events.PhoneRespondingAgain:
user.phoneResponding = true
user.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
case *events.PairSuccessful: case *events.PairSuccessful:
user.Session = user.Client.AuthData user.Session = user.Client.AuthData
user.PhoneID = v.GetMobile().GetSourceID() user.PhoneID = v.GetMobile().GetSourceID()
@ -652,13 +660,18 @@ func (user *User) HandleEvent(event interface{}) {
} }
func (user *User) aggressiveSetActive() { func (user *User) aggressiveSetActive() {
sleepTimes := []int{2, 5, 10, 30} sleepTimes := []int{5, 10, 30}
for i := 0; i < 4; i++ { for i := 0; i < 3; i++ {
sleep := time.Duration(sleepTimes[i]) * time.Second sleep := time.Duration(sleepTimes[i]) * time.Second
user.zlog.Info(). user.zlog.Info().
Int("sleep_seconds", int(sleep.Seconds())). Int("sleep_seconds", int(sleep.Seconds())).
Msg("Aggressively reactivating after sleep") Msg("Aggressively reactivating bridge session after sleep")
time.Sleep(sleep) time.Sleep(sleep)
if user.browserInactiveType == "" {
user.zlog.Info().Msg("Bridge session became active on its own, not reactivating")
return
}
user.zlog.Info().Msg("Now reactivating bridge session")
err := user.Client.SetActiveSession() err := user.Client.SetActiveSession()
if err != nil { if err != nil {
user.zlog.Warn().Err(err).Msg("Failed to set self as active session") user.zlog.Warn().Err(err).Msg("Failed to set self as active session")
@ -689,6 +702,7 @@ func (user *User) handleUserAlert(v *gmproto.UserAlertEvent) {
user.browserInactiveType = GMBrowserInactive user.browserInactiveType = GMBrowserInactive
becameInactive = true becameInactive = true
case gmproto.AlertType_BROWSER_ACTIVE: case gmproto.AlertType_BROWSER_ACTIVE:
// TODO check if session ID changed?
user.pollErrorAlertSent = false user.pollErrorAlertSent = false
user.browserInactiveType = "" user.browserInactiveType = ""
user.ready = true user.ready = true
@ -741,13 +755,21 @@ func (user *User) FillBridgeState(state status.BridgeState) status.BridgeState {
state.StateEvent = status.StateConnecting state.StateEvent = status.StateConnecting
state.Error = GMConnecting state.Error = GMConnecting
} }
if !user.phoneResponding {
state.StateEvent = status.StateBadCredentials
state.Error = GMPhoneNotResponding
}
if user.longPollingError != nil { if user.longPollingError != nil {
state.StateEvent = status.StateTransientDisconnect state.StateEvent = status.StateTransientDisconnect
state.Error = GMListenError state.Error = GMListenError
state.Info["go_error"] = user.longPollingError.Error() state.Info["go_error"] = user.longPollingError.Error()
} }
if user.browserInactiveType != "" { if user.browserInactiveType != "" {
state.StateEvent = status.StateTransientDisconnect if user.bridge.Config.GoogleMessages.AggressiveReconnect {
state.StateEvent = status.StateTransientDisconnect
} else {
state.StateEvent = status.StateBadCredentials
}
state.Error = user.browserInactiveType state.Error = user.browserInactiveType
} }
} }