diff --git a/_examples/xmpp_component/xmpp_component.go b/_examples/xmpp_component/xmpp_component.go index 4b76a82..ad209ca 100644 --- a/_examples/xmpp_component/xmpp_component.go +++ b/_examples/xmpp_component/xmpp_component.go @@ -8,31 +8,36 @@ import ( ) func main() { - component := MyComponent{Name: "Test Component", Category: "gateway", Type: "service"} - component.xmpp = &xmpp.Component{Host: "service.localhost", Secret: "mypass"} - if err := component.xmpp.Connect("localhost:8888"); err != nil { + opts := xmpp.ComponentOptions{ + Domain: "service.localhost", + Secret: "mypass", + Address: "localhost:8888", + Name: "Test Component", + Category: "gateway", + Type: "service", + } + component, err := xmpp.NewComponent(opts) + if err != nil { + log.Fatalf("%+v", err) + } + + // If you pass the component to a connection manager, it will handle the reconnect policy + // for you automatically. + cm := xmpp.NewStreamManager(component, nil) + err = cm.Start() + if err != nil { log.Fatal(err) } - for { - packet, err := component.xmpp.ReadPacket() - if err != nil { - fmt.Println("read error", err) - return - } - + // Iterator to receive packets coming from our XMPP connection + for packet := range component.Recv() { switch p := packet.(type) { case xmpp.IQ: switch inner := p.Payload[0].(type) { - case *xmpp.DiscoInfo: - fmt.Println("Disco Info") - if p.Type == "get" { - DiscoResult(component, p.PacketAttrs, inner) - } case *xmpp.DiscoItems: fmt.Println("DiscoItems") if p.Type == "get" { - DiscoItems(component, p.PacketAttrs, inner) + discoItems(component, p.PacketAttrs, inner) } default: fmt.Println("ignoring iq packet", inner) @@ -42,7 +47,7 @@ func main() { Type: "cancel", } reply := p.MakeError(xError) - _ = component.xmpp.Send(&reply) + _ = component.Send(&reply) } case xmpp.Message: @@ -57,39 +62,7 @@ func main() { } } -type MyComponent struct { - Name string - // Typical categories and types: https://xmpp.org/registrar/disco-categories.html - Category string - Type string - - xmpp *xmpp.Component -} - -func DiscoResult(c MyComponent, attrs xmpp.PacketAttrs, info *xmpp.DiscoInfo) { - iq := xmpp.NewIQ("result", attrs.To, attrs.From, attrs.Id, "en") - var identity xmpp.Identity - if info.Node == "" { - identity = xmpp.Identity{ - Name: c.Name, - Category: c.Category, - Type: c.Type, - } - } - - payload := xmpp.DiscoInfo{ - Identity: identity, - Features: []xmpp.Feature{ - {Var: xmpp.NSDiscoInfo}, - {Var: xmpp.NSDiscoItems}, - }, - } - iq.AddPayload(&payload) - - _ = c.xmpp.Send(iq) -} - -func DiscoItems(c MyComponent, attrs xmpp.PacketAttrs, items *xmpp.DiscoItems) { +func discoItems(c *xmpp.Component, attrs xmpp.PacketAttrs, items *xmpp.DiscoItems) { iq := xmpp.NewIQ("result", attrs.To, attrs.From, attrs.Id, "en") var payload xmpp.DiscoItems @@ -101,5 +74,5 @@ func DiscoItems(c MyComponent, attrs xmpp.PacketAttrs, items *xmpp.DiscoItems) { } } iq.AddPayload(&payload) - _ = c.xmpp.Send(iq) + _ = c.Send(iq) } diff --git a/component.go b/component.go index d897837..4a42281 100644 --- a/component.go +++ b/component.go @@ -9,16 +9,47 @@ import ( "io" "net" "time" + + "gosrc.io/xmpp" ) const componentStreamOpen = "" +type ComponentOptions struct { + // ================================= + // Component Connection Info + + // Domain is the XMPP server subdomain that the component will handle + Domain string + // Secret is the "password" used by the XMPP server to secure component access + Secret string + // Address is the XMPP Host and port to connect to. Host is of + // the form 'serverhost:port' i.e "localhost:8888" + Address string + + // ================================= + // Component discovery + + // Component human readable name, that will be shown in XMPP discovery + Name string + // Typical categories and types: https://xmpp.org/registrar/disco-categories.html + Category string + Type string + + // ================================= + // Communication with developer client / StreamManager + + // Packet channel + RecvChannel chan interface{} + // Track and broadcast connection state + EventManager +} + // Component implements an XMPP extension allowing to extend XMPP server // using external components. Component specifications are defined // in XEP-0114, XEP-0355 and XEP-0356. type Component struct { - Host string - Secret string + ComponentOptions // TCP level connection conn net.Conn @@ -28,18 +59,22 @@ type Component struct { decoder *xml.Decoder } +func NewComponent(opts ComponentOptions) (*Component, error) { + return &Component{ComponentOptions: opts}, nil +} + // Connect triggers component connection to XMPP server component port. -// TODO Helper to prepare connection string -func (c *Component) Connect(connStr string) error { +// TODO: Failed handshake should be a permanent error +func (c *Component) Connect() error { var conn net.Conn var err error - if conn, err = net.DialTimeout("tcp", connStr, time.Duration(5)*time.Second); err != nil { + if conn, err = net.DialTimeout("tcp", c.Address, time.Duration(5)*time.Second); err != nil { return err } c.conn = conn // 1. Send stream open tag - if _, err := fmt.Fprintf(conn, componentStreamOpen, c.Host, NSComponent, NSStream); err != nil { + if _, err := fmt.Fprintf(conn, componentStreamOpen, c.Domain, NSComponent, NSStream); err != nil { return errors.New("cannot send stream open " + err.Error()) } c.decoder = xml.NewDecoder(conn) @@ -65,16 +100,59 @@ func (c *Component) Connect(connStr string) error { case StreamError: return errors.New("handshake failed " + v.Error.Local) case Handshake: + // Start the receiver go routine + go c.recv() return nil default: - return errors.New("unexpected packet, got " + v.Name()) + return errors.New("expecting handshake result, got " + v.Name()) } } -// ReadPacket reads next incoming XMPP packet -func (c *Component) ReadPacket() (Packet, error) { - // TODO use defined interface Packet - return next(c.decoder) +func (c *Component) Disconnect() { + _ = c.SendRaw("") + // TODO: Add a way to wait for stream close acknowledgement from the server for clean disconnect + _ = c.conn.Close() +} + +func (c *Component) SetHandler(handler EventHandler) { + c.Handler = handler +} + +// Recv abstracts receiving preparsed XMPP packets from a channel. +// Channel allow client to receive / dispatch packets in for range loop. +// TODO: Deprecate this function in favor of reading directly from the RecvChannel ? +func (c *Component) Recv() <-chan interface{} { + return c.RecvChannel +} + +func (c *Component) recv() (err error) { + for { + val, err := next(c.decoder) + if err != nil { + c.updateState(StateDisconnected) + return err + } + + // Handle stream errors + switch p := val.(type) { + case StreamError: + c.RecvChannel <- val + close(c.RecvChannel) + c.streamError(p.Error.Local, p.Text) + return errors.New("stream error: " + p.Error.Local) + case xmpp.IQ: + switch inner := p.Payload[0].(type) { + // Our component module handle disco info but can let component implementation + // handle disco items queries + case *xmpp.DiscoInfo: + if p.Type == "get" { + c.discoResult(p.PacketAttrs, inner) + } + } + break + } + c.RecvChannel <- val + } } // Send marshalls XMPP stanza and sends it to the server. @@ -142,3 +220,28 @@ func (handshakeDecoder) decode(p *xml.Decoder, se xml.StartElement) (Handshake, err := p.DecodeElement(&packet, &se) return packet, err } + +// Service discovery + +func (c *Component) discoResult(attrs xmpp.PacketAttrs, info *xmpp.DiscoInfo) { + iq := xmpp.NewIQ("result", attrs.To, attrs.From, attrs.Id, "en") + var identity xmpp.Identity + if info.Node == "" { + identity = xmpp.Identity{ + Name: c.Name, + Category: c.Category, + Type: c.Type, + } + } + + payload := xmpp.DiscoInfo{ + Identity: identity, + Features: []xmpp.Feature{ + {Var: xmpp.NSDiscoInfo}, + {Var: xmpp.NSDiscoItems}, + }, + } + iq.AddPayload(&payload) + + _ = c.xmpp.Send(iq) +}