Refactor ClientManager into a more generic StreamManager

This commit is contained in:
Mickael Remond 2019-06-08 18:09:22 +02:00 committed by Mickaël Rémond
parent 54dfa60f12
commit 021f6d3740
5 changed files with 49 additions and 27 deletions

View file

@ -28,7 +28,7 @@ func main() {
// If you pass the client to a connection manager, it will handle the reconnect policy // If you pass the client to a connection manager, it will handle the reconnect policy
// for you automatically. // for you automatically.
cm := xmpp.NewClientManager(client, nil) cm := xmpp.NewStreamManager(client, nil)
err = cm.Start() err = cm.Start()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View file

@ -107,7 +107,7 @@ func connectXmpp(jid string, password string, address string) (client *xmpp.Clie
return return
} }
if _, err = client.Connect(); err != nil { if err = client.Connect(); err != nil {
return return
} }
return return

View file

@ -11,6 +11,7 @@ import (
) )
//============================================================================= //=============================================================================
// EventManager
// ConnState represents the current connection state. // ConnState represents the current connection state.
type ConnState = uint8 type ConnState = uint8
@ -133,18 +134,18 @@ func checkAddress(addr string) (string, error) {
} }
// Connect triggers actual TCP connection, based on previously defined parameters. // Connect triggers actual TCP connection, based on previously defined parameters.
func (c *Client) Connect() (*Session, error) { func (c *Client) Connect() error {
var err error var err error
c.conn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second) c.conn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second)
if err != nil { if err != nil {
return nil, err return err
} }
c.updateState(StateConnected) c.updateState(StateConnected)
// Connection is ok, we now open XMPP session // Connection is ok, we now open XMPP session
if c.conn, c.Session, err = NewSession(c.conn, c.config); err != nil { if c.conn, c.Session, err = NewSession(c.conn, c.config); err != nil {
return c.Session, err return err
} }
c.updateState(StateSessionEstablished) c.updateState(StateSessionEstablished)
@ -157,7 +158,7 @@ func (c *Client) Connect() (*Session, error) {
// Start the receiver go routine // Start the receiver go routine
go c.recv() go c.recv()
return c.Session, err return err
} }
func (c *Client) Disconnect() { func (c *Client) Disconnect() {
@ -166,6 +167,17 @@ func (c *Client) Disconnect() {
_ = c.conn.Close() _ = c.conn.Close()
} }
func (c *Client) SetHandler(handler EventHandler) {
c.Handler = handler
}
// Recv abstracts receiving preparsed XMPP packets from a channel.
// Channel allow client to receive / dispatch packets in for range loop.
// TODO: Deprecate this function in favor of reading directly from the RecvChannel ?
func (c *Client) Recv() <-chan interface{} {
return c.RecvChannel
}
func (c *Client) recv() (err error) { func (c *Client) recv() (err error) {
for { for {
val, err := next(c.Session.decoder) val, err := next(c.Session.decoder)
@ -187,13 +199,6 @@ func (c *Client) recv() (err error) {
} }
} }
// Recv abstracts receiving preparsed XMPP packets from a channel.
// Channel allow client to receive / dispatch packets in for range loop.
// TODO: Deprecate this function in favor of reading directly from the RecvChannel
func (c *Client) Recv() <-chan interface{} {
return c.RecvChannel
}
// Send marshalls XMPP stanza and sends it to the server. // Send marshalls XMPP stanza and sends it to the server.
func (c *Client) Send(packet Packet) error { func (c *Client) Send(packet Packet) error {
data, err := xml.Marshal(packet) data, err := xml.Marshal(packet)

View file

@ -31,7 +31,7 @@ func TestClient_Connect(t *testing.T) {
t.Errorf("connect create XMPP client: %s", err) t.Errorf("connect create XMPP client: %s", err)
} }
if _, err = client.Connect(); err != nil { if err = client.Connect(); err != nil {
t.Errorf("XMPP connection failed: %s", err) t.Errorf("XMPP connection failed: %s", err)
} }
@ -52,7 +52,7 @@ func TestClient_NoInsecure(t *testing.T) {
t.Errorf("cannot create XMPP client: %s", err) t.Errorf("cannot create XMPP client: %s", err)
} }
if _, err = client.Connect(); err == nil { if err = client.Connect(); err == nil {
// When insecure is not allowed: // When insecure is not allowed:
t.Errorf("should fail as insecure connection is not allowed and server does not support TLS") t.Errorf("should fail as insecure connection is not allowed and server does not support TLS")
} }

View file

@ -6,11 +6,26 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
type PostConnect func(c *Client) // The Fluux XMPP lib can manage client or component XMPP streams.
// The StreamManager handles the stream workflow handling the common
// stream events and doing the right operations.
//
// It can handle:
// - Connection
// - Stream establishment workflow
// - Reconnection strategies, with exponential backoff. It also takes into account
// permanent errors to avoid useless reconnection loops.
// - Metrics processing
// ClientManager supervises an XMPP client connection. Its role is to handle connection events and type StreamSession interface {
Connect() error
Disconnect()
SetHandler(handler EventHandler)
}
// StreamManager supervises an XMPP client connection. Its role is to handle connection events and
// apply reconnection strategy. // apply reconnection strategy.
type ClientManager struct { type StreamManager struct {
Client *Client Client *Client
Session *Session Session *Session
PostConnect PostConnect PostConnect PostConnect
@ -19,18 +34,20 @@ type ClientManager struct {
Metrics *Metrics Metrics *Metrics
} }
// NewClientManager creates a new client manager structure, intended to support type PostConnect func(c *Client)
// NewStreamManager creates a new StreamManager structure, intended to support
// handling XMPP client state event changes and auto-trigger reconnection // handling XMPP client state event changes and auto-trigger reconnection
// based on ClientManager configuration. // based on StreamManager configuration.
func NewClientManager(client *Client, pc PostConnect) *ClientManager { func NewStreamManager(client *Client, pc PostConnect) *StreamManager {
return &ClientManager{ return &StreamManager{
Client: client, Client: client,
PostConnect: pc, PostConnect: pc,
} }
} }
// Start launch the connection loop // Start launch the connection loop
func (cm *ClientManager) Start() error { func (cm *StreamManager) Start() error {
cm.Client.Handler = func(e Event) { cm.Client.Handler = func(e Event) {
switch e.State { switch e.State {
case StateConnected: case StateConnected:
@ -53,14 +70,14 @@ func (cm *ClientManager) Start() error {
} }
// Stop cancels pending operations and terminates existing XMPP client. // Stop cancels pending operations and terminates existing XMPP client.
func (cm *ClientManager) Stop() { func (cm *StreamManager) Stop() {
// Remove on disconnect handler to avoid triggering reconnect // Remove on disconnect handler to avoid triggering reconnect
cm.Client.Handler = nil cm.Client.Handler = nil
cm.Client.Disconnect() cm.Client.Disconnect()
} }
// connect manages the reconnection loop and apply the define backoff to avoid overloading the server. // connect manages the reconnection loop and apply the define backoff to avoid overloading the server.
func (cm *ClientManager) connect() error { func (cm *StreamManager) connect() error {
var backoff Backoff // TODO: Group backoff calculation features with connection manager? var backoff Backoff // TODO: Group backoff calculation features with connection manager?
for { for {
@ -68,7 +85,7 @@ func (cm *ClientManager) connect() error {
// TODO: Make it possible to define logger to log disconnect and reconnection attempts // TODO: Make it possible to define logger to log disconnect and reconnection attempts
cm.Metrics = initMetrics() cm.Metrics = initMetrics()
if cm.Client.Session, err = cm.Client.Connect(); err != nil { if err = cm.Client.Connect(); err != nil {
var actualErr ConnError var actualErr ConnError
if xerrors.As(err, &actualErr) { if xerrors.As(err, &actualErr) {
if actualErr.Permanent { if actualErr.Permanent {
@ -87,7 +104,7 @@ func (cm *ClientManager) connect() error {
return nil return nil
} }
// Client Metrics // Stream Metrics
// ============================================================================ // ============================================================================
type Metrics struct { type Metrics struct {