From 021f6d374022fe8793ef44a77bbc4f39f3b270bf Mon Sep 17 00:00:00 2001 From: Mickael Remond Date: Sat, 8 Jun 2019 18:09:22 +0200 Subject: [PATCH] Refactor ClientManager into a more generic StreamManager --- _examples/xmpp_echo/xmpp_echo.go | 2 +- _examples/xmpp_jukebox/xmpp_jukebox.go | 2 +- client.go | 27 ++++++++++------- client_test.go | 4 +-- client_manager.go => stream_manager.go | 41 ++++++++++++++++++-------- 5 files changed, 49 insertions(+), 27 deletions(-) rename client_manager.go => stream_manager.go (73%) diff --git a/_examples/xmpp_echo/xmpp_echo.go b/_examples/xmpp_echo/xmpp_echo.go index 0677e0c..f87cdab 100644 --- a/_examples/xmpp_echo/xmpp_echo.go +++ b/_examples/xmpp_echo/xmpp_echo.go @@ -28,7 +28,7 @@ func main() { // If you pass the client to a connection manager, it will handle the reconnect policy // for you automatically. - cm := xmpp.NewClientManager(client, nil) + cm := xmpp.NewStreamManager(client, nil) err = cm.Start() if err != nil { log.Fatal(err) diff --git a/_examples/xmpp_jukebox/xmpp_jukebox.go b/_examples/xmpp_jukebox/xmpp_jukebox.go index b020399..f5e373f 100644 --- a/_examples/xmpp_jukebox/xmpp_jukebox.go +++ b/_examples/xmpp_jukebox/xmpp_jukebox.go @@ -107,7 +107,7 @@ func connectXmpp(jid string, password string, address string) (client *xmpp.Clie return } - if _, err = client.Connect(); err != nil { + if err = client.Connect(); err != nil { return } return diff --git a/client.go b/client.go index c3a2802..f9d572a 100644 --- a/client.go +++ b/client.go @@ -11,6 +11,7 @@ import ( ) //============================================================================= +// EventManager // ConnState represents the current connection state. type ConnState = uint8 @@ -133,18 +134,18 @@ func checkAddress(addr string) (string, error) { } // Connect triggers actual TCP connection, based on previously defined parameters. -func (c *Client) Connect() (*Session, error) { +func (c *Client) Connect() error { var err error c.conn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second) if err != nil { - return nil, err + return err } c.updateState(StateConnected) // Connection is ok, we now open XMPP session if c.conn, c.Session, err = NewSession(c.conn, c.config); err != nil { - return c.Session, err + return err } c.updateState(StateSessionEstablished) @@ -157,7 +158,7 @@ func (c *Client) Connect() (*Session, error) { // Start the receiver go routine go c.recv() - return c.Session, err + return err } func (c *Client) Disconnect() { @@ -166,6 +167,17 @@ func (c *Client) Disconnect() { _ = c.conn.Close() } +func (c *Client) SetHandler(handler EventHandler) { + c.Handler = handler +} + +// Recv abstracts receiving preparsed XMPP packets from a channel. +// Channel allow client to receive / dispatch packets in for range loop. +// TODO: Deprecate this function in favor of reading directly from the RecvChannel ? +func (c *Client) Recv() <-chan interface{} { + return c.RecvChannel +} + func (c *Client) recv() (err error) { for { val, err := next(c.Session.decoder) @@ -187,13 +199,6 @@ func (c *Client) recv() (err error) { } } -// Recv abstracts receiving preparsed XMPP packets from a channel. -// Channel allow client to receive / dispatch packets in for range loop. -// TODO: Deprecate this function in favor of reading directly from the RecvChannel -func (c *Client) Recv() <-chan interface{} { - return c.RecvChannel -} - // Send marshalls XMPP stanza and sends it to the server. func (c *Client) Send(packet Packet) error { data, err := xml.Marshal(packet) diff --git a/client_test.go b/client_test.go index 85b9868..b7fb1ac 100644 --- a/client_test.go +++ b/client_test.go @@ -31,7 +31,7 @@ func TestClient_Connect(t *testing.T) { t.Errorf("connect create XMPP client: %s", err) } - if _, err = client.Connect(); err != nil { + if err = client.Connect(); err != nil { t.Errorf("XMPP connection failed: %s", err) } @@ -52,7 +52,7 @@ func TestClient_NoInsecure(t *testing.T) { t.Errorf("cannot create XMPP client: %s", err) } - if _, err = client.Connect(); err == nil { + if err = client.Connect(); err == nil { // When insecure is not allowed: t.Errorf("should fail as insecure connection is not allowed and server does not support TLS") } diff --git a/client_manager.go b/stream_manager.go similarity index 73% rename from client_manager.go rename to stream_manager.go index 75599be..fd73ef2 100644 --- a/client_manager.go +++ b/stream_manager.go @@ -6,11 +6,26 @@ import ( "golang.org/x/xerrors" ) -type PostConnect func(c *Client) +// The Fluux XMPP lib can manage client or component XMPP streams. +// The StreamManager handles the stream workflow handling the common +// stream events and doing the right operations. +// +// It can handle: +// - Connection +// - Stream establishment workflow +// - Reconnection strategies, with exponential backoff. It also takes into account +// permanent errors to avoid useless reconnection loops. +// - Metrics processing -// ClientManager supervises an XMPP client connection. Its role is to handle connection events and +type StreamSession interface { + Connect() error + Disconnect() + SetHandler(handler EventHandler) +} + +// StreamManager supervises an XMPP client connection. Its role is to handle connection events and // apply reconnection strategy. -type ClientManager struct { +type StreamManager struct { Client *Client Session *Session PostConnect PostConnect @@ -19,18 +34,20 @@ type ClientManager struct { Metrics *Metrics } -// NewClientManager creates a new client manager structure, intended to support +type PostConnect func(c *Client) + +// NewStreamManager creates a new StreamManager structure, intended to support // handling XMPP client state event changes and auto-trigger reconnection -// based on ClientManager configuration. -func NewClientManager(client *Client, pc PostConnect) *ClientManager { - return &ClientManager{ +// based on StreamManager configuration. +func NewStreamManager(client *Client, pc PostConnect) *StreamManager { + return &StreamManager{ Client: client, PostConnect: pc, } } // Start launch the connection loop -func (cm *ClientManager) Start() error { +func (cm *StreamManager) Start() error { cm.Client.Handler = func(e Event) { switch e.State { case StateConnected: @@ -53,14 +70,14 @@ func (cm *ClientManager) Start() error { } // Stop cancels pending operations and terminates existing XMPP client. -func (cm *ClientManager) Stop() { +func (cm *StreamManager) Stop() { // Remove on disconnect handler to avoid triggering reconnect cm.Client.Handler = nil cm.Client.Disconnect() } // connect manages the reconnection loop and apply the define backoff to avoid overloading the server. -func (cm *ClientManager) connect() error { +func (cm *StreamManager) connect() error { var backoff Backoff // TODO: Group backoff calculation features with connection manager? for { @@ -68,7 +85,7 @@ func (cm *ClientManager) connect() error { // TODO: Make it possible to define logger to log disconnect and reconnection attempts cm.Metrics = initMetrics() - if cm.Client.Session, err = cm.Client.Connect(); err != nil { + if err = cm.Client.Connect(); err != nil { var actualErr ConnError if xerrors.As(err, &actualErr) { if actualErr.Permanent { @@ -87,7 +104,7 @@ func (cm *ClientManager) connect() error { return nil } -// Client Metrics +// Stream Metrics // ============================================================================ type Metrics struct {