From 4d4710463dbcab1369a1efd46ce54b82437c73f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20R=C3=A9mond?= Date: Tue, 11 Jun 2019 15:29:08 +0200 Subject: [PATCH] Add basic support for keep-alive (#48) Fix #35 This should also help with #8 --- client.go | 79 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 28 deletions(-) diff --git a/client.go b/client.go index a426b48..7d5a424 100644 --- a/client.go +++ b/client.go @@ -1,7 +1,6 @@ package xmpp // import "gosrc.io/xmpp" import ( - "bytes" "encoding/xml" "errors" "fmt" @@ -155,8 +154,11 @@ func (c *Client) Connect() error { // Do we need an option to avoid that or do we rely on client to send the presence itself ? fmt.Fprintf(c.Session.socketProxy, "") + // Start the keepalive go routine + keepaliveQuit := make(chan struct{}) + go keepalive(c.conn, keepaliveQuit) // Start the receiver go routine - go c.recv() + go c.recv(keepaliveQuit) return err } @@ -178,28 +180,7 @@ func (c *Client) Recv() <-chan Packet { return c.RecvChannel } -func (c *Client) recv() (err error) { - for { - val, err := next(c.Session.decoder) - if err != nil { - c.updateState(StateDisconnected) - return err - } - - // Handle stream errors - switch packet := val.(type) { - case StreamError: - c.RecvChannel <- val - close(c.RecvChannel) - c.streamError(packet.Error.Local, packet.Text) - return errors.New("stream error: " + packet.Error.Local) - } - - c.RecvChannel <- val - } -} - -// Send marshalls XMPP stanza and sends it to the server. +// Send marshals XMPP stanza and sends it to the server. func (c *Client) Send(packet Packet) error { data, err := xml.Marshal(packet) if err != nil { @@ -222,8 +203,50 @@ func (c *Client) SendRaw(packet string) error { return err } -func xmlEscape(s string) string { - var b bytes.Buffer - xml.Escape(&b, []byte(s)) - return b.String() +// ============================================================================ +// Go routines + +// Loop: Receive data from server +func (c *Client) recv(keepaliveQuit chan<- struct{}) (err error) { + for { + val, err := next(c.Session.decoder) + if err != nil { + close(keepaliveQuit) + c.updateState(StateDisconnected) + return err + } + + // Handle stream errors + switch packet := val.(type) { + case StreamError: + c.RecvChannel <- val + close(c.RecvChannel) + c.streamError(packet.Error.Local, packet.Text) + return errors.New("stream error: " + packet.Error.Local) + } + + c.RecvChannel <- val + } +} + +// Loop: send whitespace keepalive to server +// This is use to keep the connection open, but also to detect connection loss +// and trigger proper client connection shutdown. +func keepalive(conn net.Conn, quit <-chan struct{}) { + // TODO: Make keepalive interval configurable + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ticker.C: + if n, err := fmt.Fprintf(conn, "\n"); err != nil || n != 1 { + // When keep alive fails, we force close the connection. In all cases, the recv will also fail. + ticker.Stop() + _ = conn.Close() + return + } + case <-quit: + ticker.Stop() + return + } + } }