|
|
|
@ -40,6 +40,12 @@ func (cm *ClientManager) Start() error {
|
|
|
|
|
case StateDisconnected:
|
|
|
|
|
// Reconnect on disconnection
|
|
|
|
|
cm.connect()
|
|
|
|
|
case StateStreamError:
|
|
|
|
|
cm.Client.Disconnect()
|
|
|
|
|
// Only try reconnecting if we have not been kicked by another session to avoid connection loop.
|
|
|
|
|
if e.StreamError != "conflict" {
|
|
|
|
|
cm.connect()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -62,8 +68,6 @@ func (cm *ClientManager) connect() error {
|
|
|
|
|
// TODO: Make it possible to define logger to log disconnect and reconnection attempts
|
|
|
|
|
cm.Metrics = initMetrics()
|
|
|
|
|
|
|
|
|
|
// TODO: Test for non recoverable errors (invalid username and password) and return an error
|
|
|
|
|
// to start caller. We do not want to retry on non recoverable errors.
|
|
|
|
|
if cm.Client.Session, err = cm.Client.Connect(); err != nil {
|
|
|
|
|
var actualErr ConnError
|
|
|
|
|
if xerrors.As(err, &actualErr) {
|
|
|
|
@ -72,7 +76,7 @@ func (cm *ClientManager) connect() error {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
backoff.Wait()
|
|
|
|
|
} else {
|
|
|
|
|
} else { // We are connected, we can leave the retry loop
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|