go-xmpp/client.go

359 lines
9.9 KiB
Go
Raw Normal View History

package xmpp
import (
2019-10-29 09:49:01 +00:00
"context"
"encoding/xml"
"errors"
"io"
"net"
2016-02-17 12:45:39 +00:00
"time"
"gosrc.io/xmpp/stanza"
)
//=============================================================================
// EventManager
// ConnState represents the current connection state.
type ConnState = uint8
// This is a the list of events happening on the connection that the
// client can be notified about.
const (
InitialPresence = "<presence/>"
StateDisconnected ConnState = iota
StateConnected
StateSessionEstablished
StateStreamError
StatePermanentError
)
// Event is a structure use to convey event changes related to client state. This
// is for example used to notify the client when the client get disconnected.
type Event struct {
State ConnState
Description string
StreamError string
SMState SMState
}
// SMState holds Stream Management information regarding the session that can be
// used to resume session after disconnect
type SMState struct {
// Stream Management ID
Id string
// Inbound stanza count
Inbound uint
// TODO Store location for IP affinity
// TODO Store max and timestamp, to check if we should retry resumption or not
}
// EventHandler is use to pass events about state of the connection to
// client implementation.
type EventHandler func(Event) error
type EventManager struct {
// Store current state
CurrentState ConnState
// Callback used to propagate connection state changes
Handler EventHandler
}
func (em *EventManager) updateState(state ConnState) {
em.CurrentState = state
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState})
}
}
func (em *EventManager) disconnected(state SMState) {
em.CurrentState = StateDisconnected
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState, SMState: state})
}
}
func (em *EventManager) streamError(error, desc string) {
em.CurrentState = StateStreamError
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState, StreamError: error, Description: desc})
}
}
// Client
// ============================================================================
2019-10-29 09:49:01 +00:00
var ErrCanOnlySendGetOrSetIq = errors.New("SendIQ can only send get and set IQ stanzas")
2018-02-13 21:07:15 +00:00
// Client is the main structure used to connect as a client on an XMPP
2016-02-15 10:05:44 +00:00
// server.
type Client struct {
// Store user defined options and states
2018-09-26 14:25:04 +00:00
config Config
// Session gather data that can be accessed by users of this library
Session *Session
2019-10-06 17:37:56 +00:00
transport Transport
2019-06-18 10:34:25 +00:00
// Router is used to dispatch packets
router *Router
// Track and broadcast connection state
EventManager
// Handle errors from client execution
ErrorHandler func(error)
}
/*
Setting up the client / Checking the parameters
*/
2018-09-26 14:25:04 +00:00
// NewClient generates a new XMPP client, based on Config passed as parameters.
2018-09-23 16:43:46 +00:00
// If host is not specified, the DNS SRV should be used to find the host from the domainpart of the JID.
2016-02-13 16:01:06 +00:00
// Default the port to 5222.
func NewClient(config Config, r *Router, errorHandler func(error)) (c *Client, err error) {
if config.KeepaliveInterval == 0 {
config.KeepaliveInterval = time.Second * 30
}
// Parse JID
if config.parsedJid, err = NewJid(config.Jid); err != nil {
err = errors.New("missing jid")
return nil, NewConnError(err, true)
}
if config.Credential.secret == "" {
err = errors.New("missing credential")
return nil, NewConnError(err, true)
}
2019-07-27 22:19:32 +00:00
// Fallback to jid domain
if config.Address == "" {
config.Address = config.parsedJid.Domain
2019-07-27 16:22:04 +00:00
2019-07-27 22:19:32 +00:00
// Fetch SRV DNS-Entries
_, srvEntries, err := net.LookupSRV("xmpp-client", "tcp", config.parsedJid.Domain)
2019-07-27 16:22:04 +00:00
if err == nil && len(srvEntries) > 0 {
2019-07-27 22:19:32 +00:00
// If we found matching DNS records, use the entry with highest weight
bestSrv := srvEntries[0]
for _, srv := range srvEntries {
if srv.Priority <= bestSrv.Priority && srv.Weight >= bestSrv.Weight {
bestSrv = srv
2019-07-27 16:22:04 +00:00
config.Address = ensurePort(srv.Target, int(srv.Port))
}
}
}
}
c = new(Client)
c.config = config
2019-06-18 10:34:25 +00:00
c.router = r
c.ErrorHandler = errorHandler
2018-09-26 14:25:04 +00:00
if c.config.ConnectTimeout == 0 {
c.config.ConnectTimeout = 15 // 15 second as default
2016-02-17 12:45:39 +00:00
}
if config.TransportConfiguration.Domain == "" {
config.TransportConfiguration.Domain = config.parsedJid.Domain
}
c.config.TransportConfiguration.ConnectTimeout = c.config.ConnectTimeout
c.transport = NewClientTransport(c.config.TransportConfiguration)
if config.StreamLogger != nil {
c.transport.LogTraffic(config.StreamLogger)
}
return
}
2016-02-13 16:01:06 +00:00
// Connect triggers actual TCP connection, based on previously defined parameters.
// Connect simply triggers resumption, with an empty session state.
func (c *Client) Connect() error {
var state SMState
return c.Resume(state)
}
// Resume attempts resuming a Stream Managed session, based on the provided stream management
// state.
func (c *Client) Resume(state SMState) error {
var err error
2016-02-17 12:45:39 +00:00
streamId, err := c.transport.Connect()
2016-02-17 12:45:39 +00:00
if err != nil {
return err
}
c.updateState(StateConnected)
// Client is ok, we now open XMPP session
2019-10-06 17:37:56 +00:00
if c.Session, err = NewSession(c.transport, c.config, state); err != nil {
// Try to get the stream close tag from the server.
go func() {
for {
val, err := stanza.NextPacket(c.transport.GetDecoder())
if err != nil {
c.ErrorHandler(err)
c.disconnected(state)
return
}
switch val.(type) {
case stanza.StreamClosePacket:
// TCP messages should arrive in order, so we can expect to get nothing more after this occurs
c.transport.ReceivedStreamClose()
return
}
}
}()
c.Disconnect()
return err
}
c.Session.StreamId = streamId
c.updateState(StateSessionEstablished)
// Start the keepalive go routine
keepaliveQuit := make(chan struct{})
go keepalive(c.transport, c.config.KeepaliveInterval, keepaliveQuit)
// Start the receiver go routine
state = c.Session.SMState
go c.recv(state, keepaliveQuit)
// We're connected and can now receive and send messages.
//fmt.Fprintf(client.conn, "<presence xml:lang='en'><show>%s</show><status>%s</status></presence>", "chat", "Online")
2016-01-06 16:08:51 +00:00
// TODO: Do we always want to send initial presence automatically ?
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
err = c.sendWithWriter(c.transport, []byte(InitialPresence))
return err
}
func (c *Client) Disconnect() error {
2019-10-11 04:24:47 +00:00
if c.transport != nil {
return c.transport.Close()
}
// No transport so no connection.
return nil
}
func (c *Client) SetHandler(handler EventHandler) {
c.Handler = handler
}
// Send marshals XMPP stanza and sends it to the server.
func (c *Client) Send(packet stanza.Packet) error {
2019-10-06 17:37:56 +00:00
conn := c.transport
if conn == nil {
return errors.New("client is not connected")
}
2018-01-26 08:55:39 +00:00
data, err := xml.Marshal(packet)
if err != nil {
return errors.New("cannot marshal packet " + err.Error())
}
return c.sendWithWriter(c.transport, data)
2018-01-26 08:55:39 +00:00
}
2019-10-29 09:49:01 +00:00
// SendIQ sends an IQ set or get stanza to the server. If a result is received
// the provided handler function will automatically be called.
//
// The provided context should have a timeout to prevent the client from waiting
// forever for an IQ result. For example:
//
// ctx, _ := context.WithTimeout(context.Background(), 30 * time.Second)
// result := <- client.SendIQ(ctx, iq)
2019-10-29 09:49:01 +00:00
//
func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ) (chan stanza.IQ, error) {
if iq.Attrs.Type != stanza.IQTypeSet && iq.Attrs.Type != stanza.IQTypeGet {
2019-10-29 09:49:01 +00:00
return nil, ErrCanOnlySendGetOrSetIq
}
if err := c.Send(iq); err != nil {
return nil, err
}
return c.router.NewIQResultRoute(ctx, iq.Attrs.Id), nil
2019-10-29 09:49:01 +00:00
}
2018-01-26 08:55:39 +00:00
// SendRaw sends an XMPP stanza as a string to the server.
// It can be invalid XML or XMPP content. In that case, the server will
// disconnect the client. It is up to the user of this method to
// carefully craft the XML content to produce valid XMPP.
func (c *Client) SendRaw(packet string) error {
2019-10-06 17:37:56 +00:00
conn := c.transport
if conn == nil {
return errors.New("client is not connected")
}
return c.sendWithWriter(c.transport, []byte(packet))
}
func (c *Client) sendWithWriter(writer io.Writer, packet []byte) error {
var err error
_, err = writer.Write(packet)
return err
}
// ============================================================================
// Go routines
// Loop: Receive data from server
func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) {
for {
val, err := stanza.NextPacket(c.transport.GetDecoder())
if err != nil {
c.ErrorHandler(err)
close(keepaliveQuit)
c.disconnected(state)
return
}
// Handle stream errors
switch packet := val.(type) {
case stanza.StreamError:
c.router.route(c, val)
2019-06-18 10:34:25 +00:00
close(keepaliveQuit)
c.streamError(packet.Error.Local, packet.Text)
c.ErrorHandler(errors.New("stream error: " + packet.Error.Local))
// We don't return here, because we want to wait for the stream close tag from the server, or timeout.
c.Disconnect()
// Process Stream management nonzas
case stanza.SMRequest:
answer := stanza.SMAnswer{XMLName: xml.Name{
Space: stanza.NSStreamManagement,
Local: "a",
}, H: state.Inbound}
err = c.Send(answer)
if err != nil {
c.ErrorHandler(err)
return
}
case stanza.StreamClosePacket:
// TCP messages should arrive in order, so we can expect to get nothing more after this occurs
c.transport.ReceivedStreamClose()
return
default:
state.Inbound++
}
// Do normal route processing in a go-routine so we can immediately
// start receiving other stanzas. This also allows route handlers to
// send and receive more stanzas.
go c.router.route(c, val)
}
}
// Loop: send whitespace keepalive to server
// This is use to keep the connection open, but also to detect connection loss
// and trigger proper client connection shutdown.
func keepalive(transport Transport, interval time.Duration, quit <-chan struct{}) {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
if err := transport.Ping(); err != nil {
// When keepalive fails, we force close the transport. In all cases, the recv will also fail.
ticker.Stop()
2019-10-06 17:37:56 +00:00
_ = transport.Close()
return
}
case <-quit:
ticker.Stop()
return
}
}
}