diff --git a/bridgestate.go b/bridgestate.go index f67ba1a..88405d1 100644 --- a/bridgestate.go +++ b/bridgestate.go @@ -34,6 +34,8 @@ const ( GMBrowserInactive status.BridgeStateErrorCode = "gm-browser-inactive" GMBrowserInactiveTimeout status.BridgeStateErrorCode = "gm-browser-inactive-timeout" GMBrowserInactiveInactivity status.BridgeStateErrorCode = "gm-browser-inactive-inactivity" + + GMPhoneNotResponding status.BridgeStateErrorCode = "gm-phone-not-responding" ) func init() { @@ -44,6 +46,7 @@ func init() { GMBrowserInactive: "Google Messages opened in another browser", GMBrowserInactiveTimeout: "Google Messages disconnected due to timeout", GMBrowserInactiveInactivity: "Google Messages disconnected due to inactivity", + GMPhoneNotResponding: "Your phone is not responding, please check that it is connected to the internet", }) } diff --git a/libgm/client.go b/libgm/client.go index 03808b6..e9358dd 100644 --- a/libgm/client.go +++ b/libgm/client.go @@ -56,6 +56,8 @@ type Client struct { skipCount int disconnecting bool + pingShortCircuit chan struct{} + recentUpdates [8]updateDedupItem recentUpdatesPtr int @@ -77,13 +79,14 @@ func NewAuthData() *AuthData { func NewClient(authData *AuthData, logger zerolog.Logger) *Client { sessionHandler := &SessionHandler{ responseWaiters: make(map[string]chan<- *IncomingRPCMessage), - responseTimeout: time.Duration(5000) * time.Millisecond, } cli := &Client{ AuthData: authData, Logger: logger, sessionHandler: sessionHandler, http: &http.Client{}, + + pingShortCircuit: make(chan struct{}), } sessionHandler.client = cli cli.FetchConfigVersion() @@ -127,17 +130,33 @@ func (c *Client) Connect() error { c.updateWebEncryptionKey(webEncryptionKeyResponse.GetKey()) go c.doLongPoll(true) c.sessionHandler.startAckInterval() + go c.postConnect() + return nil +} - bugleRes, bugleErr := c.IsBugleDefault() - if bugleErr != nil { - return fmt.Errorf("failed to check bugle default: %w", err) +func (c *Client) postConnect() { + err := c.SetActiveSession() + if err != nil { + c.Logger.Err(err).Msg("Failed to set active session") + return + } + + doneChan := make(chan struct{}) + go func() { + select { + case <-time.After(5 * time.Second): + c.Logger.Warn().Msg("Checking bugle default on connect is taking long") + case <-doneChan: + } + }() + bugleRes, err := c.IsBugleDefault() + close(doneChan) + if err != nil { + c.Logger.Err(err).Msg("Failed to check bugle default") + return } c.Logger.Debug().Bool("bugle_default", bugleRes.Success).Msg("Got is bugle default response on connect") - sessionErr := c.SetActiveSession() - if sessionErr != nil { - return fmt.Errorf("failed to set active session: %w", err) - } - return nil + } func (c *Client) Disconnect() { diff --git a/libgm/events/ready.go b/libgm/events/ready.go index a45175d..0e30210 100644 --- a/libgm/events/ready.go +++ b/libgm/events/ready.go @@ -35,3 +35,7 @@ type ListenTemporaryError struct { } type ListenRecovered struct{} + +type PhoneNotResponding struct{} + +type PhoneRespondingAgain struct{} diff --git a/libgm/longpoll.go b/libgm/longpoll.go index 2e00861..c150d7f 100644 --- a/libgm/longpoll.go +++ b/libgm/longpoll.go @@ -20,21 +20,84 @@ import ( "go.mau.fi/mautrix-gmessages/libgm/util" ) +func (c *Client) doDittoPinger(log *zerolog.Logger, dittoPing chan struct{}, stopPinger chan struct{}) { + notResponding := false + exit := false + onRespond := func() { + if notResponding { + log.Debug().Msg("Ditto ping succeeded, phone is back online") + c.triggerEvent(&events.PhoneRespondingAgain{}) + notResponding = false + } + } + doPing := func() { + pingChan, err := c.NotifyDittoActivity() + if err != nil { + log.Err(err).Msg("Error notifying ditto activity") + return + } + select { + case <-pingChan: + onRespond() + return + case <-time.After(15 * time.Second): + log.Warn().Msg("Ditto ping is taking long, phone may be offline") + c.triggerEvent(&events.PhoneNotResponding{}) + notResponding = true + case <-stopPinger: + exit = true + return + } + select { + case <-pingChan: + onRespond() + case <-stopPinger: + exit = true + return + } + } + for !exit { + select { + case <-c.pingShortCircuit: + log.Debug().Msg("Ditto ping wait short-circuited") + doPing() + case <-dittoPing: + log.Trace().Msg("Doing normal ditto ping") + doPing() + case <-stopPinger: + return + } + } +} + func (c *Client) doLongPoll(loggedIn bool) { c.listenID++ listenID := c.listenID errored := true listenReqID := uuid.NewString() + + log := c.Logger.With().Int("listen_id", listenID).Logger() + defer func() { + log.Debug().Msg("Long polling stopped") + }() + log.Debug().Str("listen_uuid", listenReqID).Msg("Long polling starting") + + dittoPing := make(chan struct{}, 1) + stopDittoPinger := make(chan struct{}) + + defer close(stopDittoPinger) + go c.doDittoPinger(&log, dittoPing, stopDittoPinger) + for c.listenID == listenID { err := c.refreshAuthToken() if err != nil { - c.Logger.Err(err).Msg("Error refreshing auth token") + log.Err(err).Msg("Error refreshing auth token") if loggedIn { c.triggerEvent(&events.ListenFatalError{Error: fmt.Errorf("failed to refresh auth token: %w", err)}) } return } - c.Logger.Debug().Msg("Starting new long-polling request") + log.Debug().Msg("Starting new long-polling request") payload := &gmproto.ReceiveMessagesRequest{ Auth: &gmproto.AuthMessage{ RequestID: listenReqID, @@ -51,12 +114,12 @@ func (c *Client) doLongPoll(loggedIn bool) { c.triggerEvent(&events.ListenTemporaryError{Error: err}) } errored = true - c.Logger.Err(err).Msg("Error making listen request, retrying in 5 seconds") + log.Err(err).Msg("Error making listen request, retrying in 5 seconds") time.Sleep(5 * time.Second) continue } if resp.StatusCode >= 400 && resp.StatusCode < 500 { - c.Logger.Error().Int("status_code", resp.StatusCode).Msg("Error making listen request") + log.Error().Int("status_code", resp.StatusCode).Msg("Error making listen request") if loggedIn { c.triggerEvent(&events.ListenFatalError{Error: events.HTTPError{Action: "polling", Resp: resp}}) } @@ -66,7 +129,7 @@ func (c *Client) doLongPoll(loggedIn bool) { c.triggerEvent(&events.ListenTemporaryError{Error: events.HTTPError{Action: "polling", Resp: resp}}) } errored = true - c.Logger.Debug().Int("statusCode", resp.StatusCode).Msg("5xx error in long polling, retrying in 5 seconds") + log.Debug().Int("statusCode", resp.StatusCode).Msg("5xx error in long polling, retrying in 5 seconds") time.Sleep(5 * time.Second) continue } @@ -76,22 +139,21 @@ func (c *Client) doLongPoll(loggedIn bool) { c.triggerEvent(&events.ListenRecovered{}) } } - c.Logger.Debug().Int("statusCode", resp.StatusCode).Msg("Long polling opened") + log.Debug().Int("statusCode", resp.StatusCode).Msg("Long polling opened") c.longPollingConn = resp.Body - if c.AuthData.Browser != nil { - go func() { - err := c.NotifyDittoActivity() - if err != nil { - c.Logger.Err(err).Msg("Error notifying ditto activity") - } - }() + if loggedIn { + select { + case dittoPing <- struct{}{}: + default: + log.Debug().Msg("Ditto pinger is still waiting for previous ping, skipping new ping") + } } - c.readLongPoll(resp.Body) + c.readLongPoll(&log, resp.Body) c.longPollingConn = nil } } -func (c *Client) readLongPoll(rc io.ReadCloser) { +func (c *Client) readLongPoll(log *zerolog.Logger, rc io.ReadCloser) { defer rc.Close() c.disconnecting = false reader := bufio.NewReader(rc) @@ -99,10 +161,10 @@ func (c *Client) readLongPoll(rc io.ReadCloser) { var accumulatedData []byte n, err := reader.Read(buf[:2]) if err != nil { - c.Logger.Err(err).Msg("Error reading opening bytes") + log.Err(err).Msg("Error reading opening bytes") return } else if n != 2 || string(buf[:2]) != "[[" { - c.Logger.Err(err).Msg("Opening is not [[") + log.Err(err).Msg("Opening is not [[") return } var expectEOF bool @@ -111,19 +173,19 @@ func (c *Client) readLongPoll(rc io.ReadCloser) { if err != nil { var logEvt *zerolog.Event if (errors.Is(err, io.EOF) && expectEOF) || c.disconnecting { - logEvt = c.Logger.Debug() + logEvt = log.Debug() } else { - logEvt = c.Logger.Warn() + logEvt = log.Warn() } logEvt.Err(err).Msg("Stopped reading data from server") return } else if expectEOF { - c.Logger.Warn().Msg("Didn't get EOF after stream end marker") + log.Warn().Msg("Didn't get EOF after stream end marker") } chunk := buf[:n] if len(accumulatedData) == 0 { if len(chunk) == 2 && string(chunk) == "]]" { - c.Logger.Debug().Msg("Got stream end marker") + log.Debug().Msg("Got stream end marker") expectEOF = true continue } @@ -131,7 +193,7 @@ func (c *Client) readLongPoll(rc io.ReadCloser) { } accumulatedData = append(accumulatedData, chunk...) if !json.Valid(accumulatedData) { - c.Logger.Trace().Bytes("data", chunk).Msg("Invalid JSON, reading next chunk") + log.Trace().Bytes("data", chunk).Msg("Invalid JSON, reading next chunk") continue } currentBlock := accumulatedData @@ -139,21 +201,21 @@ func (c *Client) readLongPoll(rc io.ReadCloser) { msg := &gmproto.LongPollingPayload{} err = pblite.Unmarshal(currentBlock, msg) if err != nil { - c.Logger.Err(err).Msg("Error deserializing pblite message") + log.Err(err).Msg("Error deserializing pblite message") continue } switch { case msg.GetData() != nil: c.HandleRPCMsg(msg.GetData()) case msg.GetAck() != nil: - c.Logger.Debug().Int32("count", msg.GetAck().GetCount()).Msg("Got startup ack count message") + log.Debug().Int32("count", msg.GetAck().GetCount()).Msg("Got startup ack count message") c.skipCount = int(msg.GetAck().GetCount()) case msg.GetStartRead() != nil: - c.Logger.Trace().Msg("Got startRead message") + log.Trace().Msg("Got startRead message") case msg.GetHeartbeat() != nil: - c.Logger.Trace().Msg("Got heartbeat message") + log.Trace().Msg("Got heartbeat message") default: - c.Logger.Warn(). + log.Warn(). Str("data", base64.StdEncoding.EncodeToString(currentBlock)). Msg("Got unknown message") } @@ -162,7 +224,7 @@ func (c *Client) readLongPoll(rc io.ReadCloser) { func (c *Client) closeLongPolling() { if conn := c.longPollingConn; conn != nil { - c.Logger.Debug().Msg("Closing long polling connection manually") + c.Logger.Debug().Int("current_listen_id", c.listenID).Msg("Closing long polling connection manually") c.listenID++ c.disconnecting = true _ = conn.Close() diff --git a/libgm/methods.go b/libgm/methods.go index a3fe6b5..fa90129 100644 --- a/libgm/methods.go +++ b/libgm/methods.go @@ -121,15 +121,13 @@ func (c *Client) SetActiveSession() error { } func (c *Client) IsBugleDefault() (*gmproto.IsBugleDefaultResponse, error) { - c.sessionHandler.ResetSessionID() actionType := gmproto.ActionType_IS_BUGLE_DEFAULT return typedResponse[*gmproto.IsBugleDefaultResponse](c.sessionHandler.sendMessage(actionType, nil)) } -func (c *Client) NotifyDittoActivity() error { - payload := &gmproto.NotifyDittoActivityRequest{Success: true} - actionType := gmproto.ActionType_NOTIFY_DITTO_ACTIVITY - - _, err := c.sessionHandler.sendMessage(actionType, payload) - return err +func (c *Client) NotifyDittoActivity() (<-chan *IncomingRPCMessage, error) { + return c.sessionHandler.sendAsyncMessage(SendMessageParams{ + Action: gmproto.ActionType_NOTIFY_DITTO_ACTIVITY, + Data: &gmproto.NotifyDittoActivityRequest{Success: true}, + }) } diff --git a/libgm/session_handler.go b/libgm/session_handler.go index e03ace1..b2e8978 100644 --- a/libgm/session_handler.go +++ b/libgm/session_handler.go @@ -25,8 +25,6 @@ type SessionHandler struct { ackTicker *time.Ticker sessionID string - - responseTimeout time.Duration } func (s *SessionHandler) ResetSessionID() { @@ -124,7 +122,17 @@ func (s *SessionHandler) sendMessageWithParams(params SendMessageParams) (*Incom return nil, err } - // TODO add timeout + select { + case resp := <-ch: + return resp, nil + case <-time.After(5 * time.Second): + // Notify the pinger in order to trigger an event that the phone isn't responding + select { + case s.client.pingShortCircuit <- struct{}{}: + default: + } + } + // TODO hard timeout? return <-ch, nil } diff --git a/user.go b/user.go index 6c17028..37cb03b 100644 --- a/user.go +++ b/user.go @@ -71,6 +71,7 @@ type User struct { browserInactiveType status.BridgeStateErrorCode batteryLow bool mobileData bool + phoneResponding bool ready bool batteryLowAlertSent time.Time pollErrorAlertSent bool @@ -254,6 +255,7 @@ func (br *GMBridge) NewUser(dbUser *database.User) *User { } user.log = maulogadapt.ZeroAsMau(&user.zlog) user.longPollingError = errors.New("not connected") + user.phoneResponding = true user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID) user.Whitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelUser @@ -615,6 +617,12 @@ func (user *User) HandleEvent(event interface{}) { go user.sendMarkdownBridgeAlert(false, "Reconnected to Google Messages") user.pollErrorAlertSent = false } + case *events.PhoneNotResponding: + user.phoneResponding = false + user.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected}) + case *events.PhoneRespondingAgain: + user.phoneResponding = true + user.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected}) case *events.PairSuccessful: user.Session = user.Client.AuthData user.PhoneID = v.GetMobile().GetSourceID() @@ -652,13 +660,18 @@ func (user *User) HandleEvent(event interface{}) { } func (user *User) aggressiveSetActive() { - sleepTimes := []int{2, 5, 10, 30} - for i := 0; i < 4; i++ { + sleepTimes := []int{5, 10, 30} + for i := 0; i < 3; i++ { sleep := time.Duration(sleepTimes[i]) * time.Second user.zlog.Info(). Int("sleep_seconds", int(sleep.Seconds())). - Msg("Aggressively reactivating after sleep") + Msg("Aggressively reactivating bridge session after sleep") time.Sleep(sleep) + if user.browserInactiveType == "" { + user.zlog.Info().Msg("Bridge session became active on its own, not reactivating") + return + } + user.zlog.Info().Msg("Now reactivating bridge session") err := user.Client.SetActiveSession() if err != nil { user.zlog.Warn().Err(err).Msg("Failed to set self as active session") @@ -689,6 +702,7 @@ func (user *User) handleUserAlert(v *gmproto.UserAlertEvent) { user.browserInactiveType = GMBrowserInactive becameInactive = true case gmproto.AlertType_BROWSER_ACTIVE: + // TODO check if session ID changed? user.pollErrorAlertSent = false user.browserInactiveType = "" user.ready = true @@ -741,13 +755,21 @@ func (user *User) FillBridgeState(state status.BridgeState) status.BridgeState { state.StateEvent = status.StateConnecting state.Error = GMConnecting } + if !user.phoneResponding { + state.StateEvent = status.StateBadCredentials + state.Error = GMPhoneNotResponding + } if user.longPollingError != nil { state.StateEvent = status.StateTransientDisconnect state.Error = GMListenError state.Info["go_error"] = user.longPollingError.Error() } if user.browserInactiveType != "" { - state.StateEvent = status.StateTransientDisconnect + if user.bridge.Config.GoogleMessages.AggressiveReconnect { + state.StateEvent = status.StateTransientDisconnect + } else { + state.StateEvent = status.StateBadCredentials + } state.Error = user.browserInactiveType } }