Add Client Manager to monitor connection state and trigger reconnect (#39)

- Support for exponential backoff on reconnect to be gentle on the server.
- Clean up client by moving metrics and retry strategy to the connection manager.
- Update echo_client to use client manager
- Fix echo client XMPP message matching

Fixes #21
Improvements for #8
disco_info_form
Mickaël Rémond 5 years ago committed by GitHub
parent 6cdadc95e9
commit 2f391fde80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,101 @@
/*
Interesting reference on backoff:
- Exponential Backoff And Jitter (AWS Blog):
https://www.awsarchitectureblog.com/2015/03/backoff.html
We use Jitter as a default for exponential backoff, as the goal of
this module is not to provide precise 'ticks', but good behaviour to
implement retries that are helping the server to recover faster in
case of congestion.
It can be used in several ways:
- Using duration to get next sleep time.
- Using ticker channel to trigger callback function on tick
The functions for Backoff are not threadsafe, but you can:
- Keep the attempt counter on your end and use DurationForAttempt(int)
- Use lock in your own code to protect the Backoff structure.
TODO: Implement Backoff Ticker channel
TODO: Implement throttler interface. Throttler could be used to implement various reconnect strategies.
*/
package xmpp // import "gosrc.io/xmpp"
import (
"math"
"math/rand"
"time"
)
const (
defaultBase int = 20 // Backoff base, in ms
defaultFactor int = 2
defaultCap int = 180000 // 3 minutes
)
// Backoff can provide increasing duration with the number of attempt
// performed. The structure is used to support exponential backoff on
// connection attempts to avoid hammering the server we are connecting
// to.
type Backoff struct {
NoJitter bool
Base int
Factor int
Cap int
lastDuration int
attempt int
}
// Duration returns the duration to apply to the current attempt.
func (b *Backoff) Duration() time.Duration {
d := b.DurationForAttempt(b.attempt)
b.attempt++
return d
}
// Wait sleeps for backoff duration for current attempt.
func (b *Backoff) Wait() {
time.Sleep(b.Duration())
}
// DurationForAttempt returns a duration for an attempt number, in a stateless way.
func (b *Backoff) DurationForAttempt(attempt int) time.Duration {
b.setDefault()
expBackoff := math.Min(float64(b.Cap), float64(b.Base)*math.Pow(float64(b.Factor), float64(b.attempt)))
d := int(math.Trunc(expBackoff))
if !b.NoJitter {
d = rand.Intn(d)
}
return time.Duration(d) * time.Millisecond
}
// Reset sets back the number of attempts to 0. This is to be called after a successfull operation has been performed,
// to reset the exponential backoff interval.
func (b *Backoff) Reset() {
b.attempt = 0
}
func (b *Backoff) setDefault() {
if b.Base == 0 {
b.Base = defaultBase
}
if b.Cap == 0 {
b.Cap = defaultCap
}
if b.Factor == 0 {
b.Factor = defaultFactor
}
}
/*
We use full jitter as default for now as it seems to provide good behaviour for reconnect.
Base is the default interval between attempts (if backoff Factor was equal to 1)
Attempt is the number of retry for operation. If we start attempt at 0, first sleep equals base.
Cap is the maximum sleep time duration we tolerate between attempts
*/

@ -0,0 +1,24 @@
package xmpp_test
import (
"testing"
"time"
"gosrc.io/xmpp"
)
func TestDurationForAttempt_NoJitter(t *testing.T) {
b := xmpp.Backoff{Base: 25, NoJitter: true}
bInMS := time.Duration(b.Base) * time.Millisecond
if b.DurationForAttempt(0) != bInMS {
t.Errorf("incorrect default duration for attempt #0 (%d) = %d", b.DurationForAttempt(0)/time.Millisecond, bInMS/time.Millisecond)
}
var prevDuration, d time.Duration
for i := 0; i < 10; i++ {
d = b.DurationForAttempt(i)
if !(d >= prevDuration) {
t.Errorf("duration should be increasing between attempts. #%d (%d) > %d", i, d, prevDuration)
}
prevDuration = d
}
}

@ -10,37 +10,43 @@ import (
"time" "time"
) )
// Client Metrics //=============================================================================
// ============================================================================
type Metrics struct { // ConnState represents the current connection state.
startTime time.Time type ConnState = uint8
// ConnectTime returns the duration between client initiation of the TCP/IP
// connection to the server and actual TCP/IP session establishment.
// This time includes DNS resolution and can be slightly higher if the DNS
// resolution result was not in cache.
ConnectTime time.Duration
// LoginTime returns the between client initiation of the TCP/IP
// connection to the server and the return of the login result.
// This includes ConnectTime, but also XMPP level protocol negociation
// like starttls.
LoginTime time.Duration
}
// initMetrics set metrics with default value and define the starting point // This is a the list of events happening on the connection that the
// for duration calculation (connect time, login time, etc). // client can be notified about.
func initMetrics() *Metrics { const (
return &Metrics{ StateDisconnected ConnState = iota
startTime: time.Now(), StateConnected
} StateSessionEstablished
)
// Event is a structure use to convey event changes related to client state. This
// is for example used to notify the client when the client get disconnected.
type Event struct {
State ConnState
Description string
} }
func (m *Metrics) setConnectTime() { // EventHandler is use to pass events about state of the connection to
m.ConnectTime = time.Since(m.startTime) // client implementation.
type EventHandler func(Event)
type EventManager struct {
// Store current state
CurrentState ConnState
// Callback used to propagate connection state changes
Handler EventHandler
} }
func (m *Metrics) setLoginTime() { func (em EventManager) updateState(state ConnState) {
m.LoginTime = time.Since(m.startTime) em.CurrentState = state
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState})
}
} }
// Client // Client
@ -49,14 +55,16 @@ func (m *Metrics) setLoginTime() {
// Client is the main structure used to connect as a client on an XMPP // Client is the main structure used to connect as a client on an XMPP
// server. // server.
type Client struct { type Client struct {
// Store user defined options // Store user defined options and states
config Config config Config
// Session gather data that can be accessed by users of this library // Session gather data that can be accessed by users of this library
Session *Session Session *Session
// TCP level connection / can be replaced by a TLS session after starttls // TCP level connection / can be replaced by a TLS session after starttls
conn net.Conn conn net.Conn
// store low level metrics // Packet channel
Metrics *Metrics RecvChannel chan interface{}
// Track and broadcast connection state
EventManager
} }
/* /*
@ -89,6 +97,10 @@ func NewClient(config Config) (c *Client, err error) {
if c.config.ConnectTimeout == 0 { if c.config.ConnectTimeout == 0 {
c.config.ConnectTimeout = 15 // 15 second as default c.config.ConnectTimeout = 15 // 15 second as default
} }
// Create a default channel that developer can override
c.RecvChannel = make(chan interface{})
return return
} }
@ -112,59 +124,55 @@ func checkAddress(addr string) (string, error) {
// Connect triggers actual TCP connection, based on previously defined parameters. // Connect triggers actual TCP connection, based on previously defined parameters.
func (c *Client) Connect() (*Session, error) { func (c *Client) Connect() (*Session, error) {
var tcpconn net.Conn
var err error var err error
// TODO: Refactor = abstract retry loop in capped exponential back-off function c.conn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second)
var try = 0
var success bool
c.Metrics = initMetrics()
for try <= c.config.Retry && !success {
if tcpconn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second); err == nil {
c.Metrics.setConnectTime()
success = true
}
try++
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.updateState(StateConnected)
// Connection is ok, we now open XMPP session // Connection is ok, we now open XMPP session
c.conn = tcpconn
if c.conn, c.Session, err = NewSession(c.conn, c.config); err != nil { if c.conn, c.Session, err = NewSession(c.conn, c.config); err != nil {
return c.Session, err return c.Session, err
} }
c.updateState(StateSessionEstablished)
c.Metrics.setLoginTime()
// We're connected and can now receive and send messages. // We're connected and can now receive and send messages.
//fmt.Fprintf(client.conn, "<presence xml:lang='en'><show>%s</show><status>%s</status></presence>", "chat", "Online") //fmt.Fprintf(client.conn, "<presence xml:lang='en'><show>%s</show><status>%s</status></presence>", "chat", "Online")
// TODO: Do we always want to send initial presence automatically ? // TODO: Do we always want to send initial presence automatically ?
// Do we need an option to avoid that or do we rely on client to send the presence itself ? // Do we need an option to avoid that or do we rely on client to send the presence itself ?
fmt.Fprintf(c.Session.socketProxy, "<presence/>") fmt.Fprintf(c.Session.socketProxy, "<presence/>")
// Start the receiver go routine
go c.recv()
return c.Session, err return c.Session, err
} }
func (c *Client) recv(receiver chan<- interface{}) (err error) { func (c *Client) Disconnect() {
_ = c.SendRaw("</stream:stream>")
// TODO: Add a way to wait for stream close acknowledgement from the server for clean disconnect
_ = c.conn.Close()
}
func (c *Client) recv() (err error) {
for { for {
val, err := next(c.Session.decoder) val, err := next(c.Session.decoder)
if err != nil { if err != nil {
close(receiver) c.updateState(StateDisconnected)
return err return err
} }
receiver <- val c.RecvChannel <- val
val = nil val = nil
} }
panic("unreachable")
} }
// Recv abstracts receiving preparsed XMPP packets from a channel. // Recv abstracts receiving preparsed XMPP packets from a channel.
// Channel allow client to receive / dispatch packets in for range loop. // Channel allow client to receive / dispatch packets in for range loop.
// TODO: Deprecate this function in favor of reading directly from the RecvChannel
func (c *Client) Recv() <-chan interface{} { func (c *Client) Recv() <-chan interface{} {
ch := make(chan interface{}) return c.RecvChannel
go c.recv(ch)
return ch
} }
// Send marshalls XMPP stanza and sends it to the server. // Send marshalls XMPP stanza and sends it to the server.
@ -185,8 +193,9 @@ func (c *Client) Send(packet Packet) error {
// disconnect the client. It is up to the user of this method to // disconnect the client. It is up to the user of this method to
// carefully craft the XML content to produce valid XMPP. // carefully craft the XML content to produce valid XMPP.
func (c *Client) SendRaw(packet string) error { func (c *Client) SendRaw(packet string) error {
fmt.Fprintf(c.Session.socketProxy, packet) // TODO handle errors var err error
return nil _, err = fmt.Fprintf(c.Session.socketProxy, packet)
return err
} }
func xmlEscape(s string) string { func xmlEscape(s string) string {

@ -0,0 +1,106 @@
package xmpp // import "gosrc.io/xmpp"
import (
"log"
"time"
)
type PostConnect func(c *Client)
// ClientManager supervises an XMPP client connection. Its role is to handle connection events and
// apply reconnection strategy.
type ClientManager struct {
Client *Client
Session *Session
PostConnect PostConnect
// Store low level metrics
Metrics *Metrics
}
// NewClientManager creates a new client manager structure, intended to support
// handling XMPP client state event changes and auto-trigger reconnection
// based on ClientManager configuration.
func NewClientManager(client *Client, pc PostConnect) *ClientManager {
return &ClientManager{
Client: client,
PostConnect: pc,
}
}
// Start launch the connection loop
func (cm *ClientManager) Start() {
cm.Client.Handler = func(e Event) {
switch e.State {
case StateConnected:
cm.Metrics.setConnectTime()
case StateSessionEstablished:
cm.Metrics.setLoginTime()
case StateDisconnected:
// Reconnect on disconnection
cm.connect()
}
}
cm.connect()
}
// Stop cancels pending operations and terminates existing XMPP client.
func (cm *ClientManager) Stop() {
// Remove on disconnect handler to avoid triggering reconnect
cm.Client.Handler = nil
cm.Client.Disconnect()
}
// connect manages the reconnection loop and apply the define backoff to avoid overloading the server.
func (cm *ClientManager) connect() {
var backoff Backoff // TODO: Group backoff calculation features with connection manager?
for {
var err error
cm.Metrics = initMetrics()
if cm.Client.Session, err = cm.Client.Connect(); err != nil {
log.Printf("Connection error: %v\n", err)
backoff.Wait()
} else {
break
}
}
if cm.PostConnect != nil {
cm.PostConnect(cm.Client)
}
}
// Client Metrics
// ============================================================================
type Metrics struct {
startTime time.Time
// ConnectTime returns the duration between client initiation of the TCP/IP
// connection to the server and actual TCP/IP session establishment.
// This time includes DNS resolution and can be slightly higher if the DNS
// resolution result was not in cache.
ConnectTime time.Duration
// LoginTime returns the between client initiation of the TCP/IP
// connection to the server and the return of the login result.
// This includes ConnectTime, but also XMPP level protocol negociation
// like starttls.
LoginTime time.Duration
}
// initMetrics set metrics with default value and define the starting point
// for duration calculation (connect time, login time, etc).
func initMetrics() *Metrics {
return &Metrics{
startTime: time.Now(),
}
}
func (m *Metrics) setConnectTime() {
m.ConnectTime = time.Since(m.startTime)
}
func (m *Metrics) setLoginTime() {
m.LoginTime = time.Since(m.startTime)
}

@ -26,17 +26,14 @@ func main() {
log.Fatal("Error: ", err) log.Fatal("Error: ", err)
} }
session, err := client.Connect() cm := xmpp.NewClientManager(client, nil)
if err != nil { cm.Start()
log.Fatal("Error: ", err) // connection can be stopped with cm.Stop().
}
fmt.Println("Stream opened, we have streamID = ", session.StreamId)
// Iterator to receive packets coming from our XMPP connection // Iterator to receive packets coming from our XMPP connection
for packet := range client.Recv() { for packet := range client.Recv() {
switch packet := packet.(type) { switch packet := packet.(type) {
case *xmpp.Message: case xmpp.Message:
_, _ = fmt.Fprintf(os.Stdout, "Body = %s - from = %s\n", packet.Body, packet.From) _, _ = fmt.Fprintf(os.Stdout, "Body = %s - from = %s\n", packet.Body, packet.From)
reply := xmpp.Message{PacketAttrs: xmpp.PacketAttrs{To: packet.From}, Body: packet.Body} reply := xmpp.Message{PacketAttrs: xmpp.PacketAttrs{To: packet.From}, Body: packet.Body}
_ = client.Send(reply) _ = client.Send(reply)
@ -47,6 +44,4 @@ func main() {
} }
// TODO create default command line client to send message or to send an arbitrary XMPP sequence from a file, // TODO create default command line client to send message or to send an arbitrary XMPP sequence from a file,
// (using templates ?) // (using templates ?)
// TODO: autoreconnect when connection is lost

@ -101,8 +101,7 @@ func playSCURL(p *mpg123.Player, rawURL string) {
func connectXmpp(jid string, password string, address string) (client *xmpp.Client, err error) { func connectXmpp(jid string, password string, address string) (client *xmpp.Client, err error) {
xmppConfig := xmpp.Config{Address: address, xmppConfig := xmpp.Config{Address: address,
Jid: jid, Password: password, PacketLogger: os.Stdout, Insecure: true, Jid: jid, Password: password, PacketLogger: os.Stdout, Insecure: true}
Retry: 10}
if client, err = xmpp.NewClient(xmppConfig); err != nil { if client, err = xmpp.NewClient(xmppConfig); err != nil {
return return

@ -12,7 +12,6 @@ type Config struct {
Password string Password string
PacketLogger *os.File // Used for debugging PacketLogger *os.File // Used for debugging
Lang string // TODO: should default to 'en' Lang string // TODO: should default to 'en'
Retry int // Number of retries for connect
ConnectTimeout int // Connection timeout in seconds. Default to 15 ConnectTimeout int // Connection timeout in seconds. Default to 15
// Insecure can be set to true to allow to open a session without TLS. If TLS // Insecure can be set to true to allow to open a session without TLS. If TLS
// is supported on the server, we will still try to use it. // is supported on the server, we will still try to use it.

@ -39,7 +39,6 @@ func initDecoder(p *xml.Decoder) (sessionID string, err error) {
return return
} }
} }
panic("unreachable")
} }
// Scan XML token stream to find next StartElement. // Scan XML token stream to find next StartElement.
@ -47,7 +46,7 @@ func nextStart(p *xml.Decoder) (xml.StartElement, error) {
for { for {
t, err := p.Token() t, err := p.Token()
if err == io.EOF { if err == io.EOF {
return xml.StartElement{}, nil return xml.StartElement{}, errors.New("connection closed")
} }
if err != nil { if err != nil {
return xml.StartElement{}, fmt.Errorf("nextStart %s", err) return xml.StartElement{}, fmt.Errorf("nextStart %s", err)
@ -57,7 +56,6 @@ func nextStart(p *xml.Decoder) (xml.StartElement, error) {
return t, nil return t, nil
} }
} }
panic("unreachable")
} }
// next scans XML token stream for next element and then assign a structure to decode // next scans XML token stream for next element and then assign a structure to decode

Loading…
Cancel
Save