From 21f6a549db77e39b5f643f609a25ed4f22b5a8c1 Mon Sep 17 00:00:00 2001 From: Wichert Akkerman Date: Mon, 28 Oct 2019 21:21:35 +0100 Subject: [PATCH 1/8] Always add an id to IQ queries --- go.mod | 1 + go.sum | 3 +++ stanza/iq.go | 8 +++++++- stanza/iq_test.go | 18 ++++++++++++++++++ 4 files changed, 29 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 3298cfa..f31fe40 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/google/go-cmp v0.3.1 + github.com/google/uuid v1.1.1 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 nhooyr.io/websocket v1.6.5 ) diff --git a/go.sum b/go.sum index cf05d14..ae38d07 100644 --- a/go.sum +++ b/go.sum @@ -21,9 +21,12 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190908185732-236ed259b199/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/knq/sysutil v0.0.0-20181215143952-f05b59f0f307/go.mod h1:BjPj+aVjl9FW/cCGiF3nGh5v+9Gd3VCgBQbod/GlMaQ= diff --git a/stanza/iq.go b/stanza/iq.go index f1663bc..923cf28 100644 --- a/stanza/iq.go +++ b/stanza/iq.go @@ -2,6 +2,8 @@ package stanza import ( "encoding/xml" + + "github.com/google/uuid" ) /* @@ -31,8 +33,12 @@ type IQPayload interface { } func NewIQ(a Attrs) IQ { - // TODO generate IQ ID if not set // TODO ensure that type is set, as it is required + if a.Id == "" { + if id, err := uuid.NewRandom(); err == nil { + a.Id = id.String() + } + } return IQ{ XMLName: xml.Name{Local: "iq"}, Attrs: a, diff --git a/stanza/iq_test.go b/stanza/iq_test.go index 04a868a..93f7ebb 100644 --- a/stanza/iq_test.go +++ b/stanza/iq_test.go @@ -34,6 +34,24 @@ func TestUnmarshalIqs(t *testing.T) { } } +func TestGenerateIqId(t *testing.T) { + t.Parallel() + iq := stanza.NewIQ(stanza.Attrs{Id: "1"}) + if iq.Id != "1" { + t.Errorf("NewIQ replaced id with %s", iq.Id) + } + + iq = stanza.NewIQ(stanza.Attrs{}) + if iq.Id != "1" { + t.Error("NewIQ did not generate an Id") + } + + otherIq := stanza.NewIQ(stanza.Attrs{}) + if iq.Id == otherIq.Id { + t.Errorf("NewIQ generated two identical ids: %s", iq.Id) + } +} + func TestGenerateIq(t *testing.T) { iq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, From: "admin@localhost", To: "test@localhost", Id: "1"}) payload := stanza.DiscoInfo{ From 8e1dac6ffae0f80e9f57662fa08a46e9a012e250 Mon Sep 17 00:00:00 2001 From: Wichert Akkerman Date: Mon, 28 Oct 2019 21:48:01 +0100 Subject: [PATCH 2/8] Add IQ result routes to the Router These are used to quickly match IQ result stanzas and invoke a handler for them. IQ result routes take precendence of normal routes. --- router.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 5 deletions(-) diff --git a/router.go b/router.go index 21f0035..bf6d5e6 100644 --- a/router.go +++ b/router.go @@ -1,6 +1,7 @@ package xmpp import ( + "context" "encoding/xml" "strings" @@ -25,16 +26,26 @@ TODO: Automatically reply to IQ that do not match any route, to comply to XMPP s type Router struct { // Routes to be matched, in order. routes []*Route + + iqResultRoutes map[string]*IqResultRoute } // NewRouter returns a new router instance. func NewRouter() *Router { - return &Router{} + return &Router{ + iqResultRoutes: make(map[string]*IqResultRoute), + } } // route is called by the XMPP client to dispatch stanza received using the set up routes. // It is also used by test, but is not supposed to be used directly by users of the library. func (r *Router) route(s Sender, p stanza.Packet) { + iq, isIq := p.(stanza.IQ) + if isIq { + if route, ok := r.iqResultRoutes[iq.Id]; ok { + route.handler.HandlePacket(s, p) + } + } var match RouteMatch if r.Match(p, &match) { @@ -42,11 +53,10 @@ func (r *Router) route(s Sender, p stanza.Packet) { match.Handler.HandlePacket(s, p) return } + // If there is no match and we receive an iq set or get, we need to send a reply - if iq, ok := p.(stanza.IQ); ok { - if iq.Type == stanza.IQTypeGet || iq.Type == stanza.IQTypeSet { - iqNotImplemented(s, iq) - } + if isIq && (iq.Type == stanza.IQTypeGet || iq.Type == stanza.IQTypeSet) { + iqNotImplemented(s, iq) } } @@ -68,6 +78,28 @@ func (r *Router) NewRoute() *Route { return route } +// NewIqResultRoute register a route that will catch an IQ result stanza with +// the given Id. The route will only match ones, after which it will automatically +// be unregistered +func (r *Router) NewIqResultRoute(ctx context.Context, id string) *IqResultRoute { + route := &IqResultRoute{ + context: ctx, + matched: make(chan struct{}), + } + r.iqResultRoutes[id] = route + go func() { + select { + case <-route.context.Done(): + if route.timeoutHandler != nil { + route.timeoutHandler(route.context.Err()) + } + case <-route.matched: + } + delete(r.iqResultRoutes, id) + }() + return route +} + func (r *Router) Match(p stanza.Packet, match *RouteMatch) bool { for _, route := range r.routes { if route.Match(p, match) { @@ -89,6 +121,40 @@ func (r *Router) HandleFunc(name string, f func(s Sender, p stanza.Packet)) *Rou return r.NewRoute().Packet(name).HandlerFunc(f) } +// HandleIqResult register a temporary route +func (r *Router) HandleIqResult(id string, handler Handler) *IqResultRoute { + return r.NewIqResultRoute(context.Background(), id).Handler(handler) +} + +func (r *Router) HandleFuncIqResult(id string, f func(s Sender, p stanza.Packet)) *IqResultRoute { + return r.NewIqResultRoute(context.Background(), id).HandlerFunc(f) +} + +// ============================================================================ +// IqResultRoute +type TimeoutHandlerFunc func(err error) + +type IqResultRoute struct { + context context.Context + matched chan struct{} + handler Handler + timeoutHandler TimeoutHandlerFunc +} + +func (r *IqResultRoute) Handler(handler Handler) *IqResultRoute { + r.handler = handler + return r +} + +func (r *IqResultRoute) HandlerFunc(f HandlerFunc) *IqResultRoute { + return r.Handler(f) +} + +func (r *IqResultRoute) TimeoutHandlerFunc(f TimeoutHandlerFunc) *IqResultRoute { + r.timeoutHandler = f + return r +} + // ============================================================================ // Route type Handler interface { From 6a25856e85f51db610ac2ffc2d1969c6b629b439 Mon Sep 17 00:00:00 2001 From: Wichert Akkerman Date: Tue, 29 Oct 2019 10:34:23 +0100 Subject: [PATCH 3/8] We need to lock isResultRoutes The map is updated from multiple goroutines, so it needs to be locked. --- router.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/router.go b/router.go index bf6d5e6..df3b19f 100644 --- a/router.go +++ b/router.go @@ -4,6 +4,7 @@ import ( "context" "encoding/xml" "strings" + "sync" "gosrc.io/xmpp/stanza" ) @@ -27,7 +28,8 @@ type Router struct { // Routes to be matched, in order. routes []*Route - iqResultRoutes map[string]*IqResultRoute + iqResultRoutes map[string]*IqResultRoute + iqResultRouteLock sync.RWMutex } // NewRouter returns a new router instance. @@ -42,8 +44,16 @@ func NewRouter() *Router { func (r *Router) route(s Sender, p stanza.Packet) { iq, isIq := p.(stanza.IQ) if isIq { - if route, ok := r.iqResultRoutes[iq.Id]; ok { + r.iqResultRouteLock.RLock() + route, ok := r.iqResultRoutes[iq.Id] + r.iqResultRouteLock.RUnlock() + if ok { + r.iqResultRouteLock.Lock() + delete(r.iqResultRoutes, iq.Id) + r.iqResultRouteLock.Unlock() + close(route.matched) route.handler.HandlePacket(s, p) + return } } @@ -86,16 +96,20 @@ func (r *Router) NewIqResultRoute(ctx context.Context, id string) *IqResultRoute context: ctx, matched: make(chan struct{}), } + r.iqResultRouteLock.Lock() r.iqResultRoutes[id] = route + r.iqResultRouteLock.Unlock() go func() { select { case <-route.context.Done(): + r.iqResultRouteLock.Lock() + delete(r.iqResultRoutes, id) + r.iqResultRouteLock.Unlock() if route.timeoutHandler != nil { route.timeoutHandler(route.context.Err()) } case <-route.matched: } - delete(r.iqResultRoutes, id) }() return route } From 070934743f88d0a209a1290418518864e062988b Mon Sep 17 00:00:00 2001 From: Wichert Akkerman Date: Tue, 29 Oct 2019 10:34:33 +0100 Subject: [PATCH 4/8] Add tests for iq result routes --- router_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/router_test.go b/router_test.go index 98a4697..138999c 100644 --- a/router_test.go +++ b/router_test.go @@ -2,7 +2,9 @@ package xmpp import ( "bytes" + "context" "encoding/xml" + "runtime" "testing" "gosrc.io/xmpp/stanza" @@ -11,6 +13,58 @@ import ( // ============================================================================ // Test route & matchers +func TestIqResultRoutes(t *testing.T) { + t.Parallel() + router := NewRouter() + + if router.iqResultRoutes == nil { + t.Fatal("NewRouter does not initialize isResultRoutes") + } + + // Check other IQ does not matcah + conn := NewSenderMock() + iq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "4321"}) + router.NewIqResultRoute(context.Background(), "1234").HandlerFunc(func(s Sender, p stanza.Packet) { + _ = s.SendRaw(successFlag) + }) + if conn.String() == successFlag { + t.Fatal("IQ result with wrong ID was matched") + } + + // Check if the IQ handler was called + conn = NewSenderMock() + iq = stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) + router.route(conn, iq) + if conn.String() != successFlag { + t.Fatal("IQ result was not matched") + } + + // The match must only happen once, so we if receive the same package again it + // must not be matched. + conn = NewSenderMock() + router.route(conn, iq) + if conn.String() == successFlag { + t.Fatal("IQ result was matched twice") + } + + // After cancelling a route it should no longer match + conn = NewSenderMock() + ctx, cancel := context.WithCancel(context.Background()) + iq = stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) + router.NewIqResultRoute(ctx, "1234").HandlerFunc(func(s Sender, p stanza.Packet) { + _ = s.SendRaw(successFlag) + }).TimeoutHandlerFunc(func(err error) { + conn.SendRaw(cancelledFlag) + }) + cancel() + // Yield the processor so the cancellation goroutine is triggered + runtime.Gosched() + router.route(conn, iq) + if conn.String() != cancelledFlag { + t.Fatal("IQ result route was matched after cancellation") + } +} + func TestNameMatcher(t *testing.T) { router := NewRouter() router.HandleFunc("message", func(s Sender, p stanza.Packet) { @@ -211,7 +265,8 @@ func TestCatchallMatcher(t *testing.T) { // ============================================================================ // SenderMock -var successFlag = "matched" +const successFlag = "matched" +const cancelledFlag = "cancelled" type SenderMock struct { buffer *bytes.Buffer From 8088e3fa7ef780198752aa2bf626b7b5ccbd2b83 Mon Sep 17 00:00:00 2001 From: Wichert Akkerman Date: Tue, 29 Oct 2019 10:49:01 +0100 Subject: [PATCH 5/8] Add Client.SendIQ method --- client.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/client.go b/client.go index 686519a..32b2394 100644 --- a/client.go +++ b/client.go @@ -1,6 +1,7 @@ package xmpp import ( + "context" "encoding/xml" "errors" "fmt" @@ -82,6 +83,8 @@ func (em EventManager) streamError(error, desc string) { // Client // ============================================================================ +var ErrCanOnlySendGetOrSetIq = errors.New("SendIQ can only send get and set IQ stanzas") + // Client is the main structure used to connect as a client on an XMPP // server. type Client struct { @@ -221,6 +224,27 @@ func (c *Client) Send(packet stanza.Packet) error { return c.sendWithWriter(c.transport, data) } +// SendIQ sends an IQ set or get stanza to the server. If a result is received +// the provided handler function will automatically be called. +// +// The provided context should have a timeout to prevent the client from waiting +// forever for an IQ result. For example: +// +// ctx, _ := context.WithTimeout(context.Background(), 30 * time.Second) +// client.SendIQ(ctx, iq, func(s Sender, p stanza.Packet) { +// // Handle the result here +// }) +// +func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ, handler HandlerFunc) (*IqResultRoute, error) { + if iq.Attrs.Type != "set" && iq.Attrs.Type != "get" { + return nil, ErrCanOnlySendGetOrSetIq + } + if err := c.Send(iq); err != nil { + return nil, err + } + return c.router.NewIqResultRoute(ctx, iq.Attrs.Id).HandlerFunc(handler), nil +} + // SendRaw sends an XMPP stanza as a string to the server. // It can be invalid XML or XMPP content. In that case, the server will // disconnect the client. It is up to the user of this method to From 83bc8581fd264568c9c5d9348c5909a5ad3f2e85 Mon Sep 17 00:00:00 2001 From: Wichert Akkerman Date: Tue, 29 Oct 2019 11:02:41 +0100 Subject: [PATCH 6/8] Cleanup the IQ result route API Simplify the API in several ways: - provide the context to the IQ result handler, making it possible to pass in extra context and handle timeouts within the handler. - pass the stanza in as an IQ type, removing the need to always type-cast it in the handler - remove Router.HandleIqResult and Router.HandleFuncIqResult. Since the router is private to Client nobody would ever use these, and they do not really make things simpler anyway. --- client.go | 4 +-- router.go | 84 ++++++++++++++++++++++++++++++-------------------- router_test.go | 8 ++--- 3 files changed, 56 insertions(+), 40 deletions(-) diff --git a/client.go b/client.go index 32b2394..c8000bd 100644 --- a/client.go +++ b/client.go @@ -235,14 +235,14 @@ func (c *Client) Send(packet stanza.Packet) error { // // Handle the result here // }) // -func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ, handler HandlerFunc) (*IqResultRoute, error) { +func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ, handler IQResultHandlerFunc) (*IQResultRoute, error) { if iq.Attrs.Type != "set" && iq.Attrs.Type != "get" { return nil, ErrCanOnlySendGetOrSetIq } if err := c.Send(iq); err != nil { return nil, err } - return c.router.NewIqResultRoute(ctx, iq.Attrs.Id).HandlerFunc(handler), nil + return c.router.NewIQResultRoute(ctx, iq.Attrs.Id).HandlerFunc(handler), nil } // SendRaw sends an XMPP stanza as a string to the server. diff --git a/router.go b/router.go index df3b19f..05f4b09 100644 --- a/router.go +++ b/router.go @@ -28,14 +28,14 @@ type Router struct { // Routes to be matched, in order. routes []*Route - iqResultRoutes map[string]*IqResultRoute - iqResultRouteLock sync.RWMutex + IQResultRoutes map[string]*IQResultRoute + IQResultRouteLock sync.RWMutex } // NewRouter returns a new router instance. func NewRouter() *Router { return &Router{ - iqResultRoutes: make(map[string]*IqResultRoute), + IQResultRoutes: make(map[string]*IQResultRoute), } } @@ -44,15 +44,15 @@ func NewRouter() *Router { func (r *Router) route(s Sender, p stanza.Packet) { iq, isIq := p.(stanza.IQ) if isIq { - r.iqResultRouteLock.RLock() - route, ok := r.iqResultRoutes[iq.Id] - r.iqResultRouteLock.RUnlock() + r.IQResultRouteLock.RLock() + route, ok := r.IQResultRoutes[iq.Id] + r.IQResultRouteLock.RUnlock() if ok { - r.iqResultRouteLock.Lock() - delete(r.iqResultRoutes, iq.Id) - r.iqResultRouteLock.Unlock() + r.IQResultRouteLock.Lock() + delete(r.IQResultRoutes, iq.Id) + r.IQResultRouteLock.Unlock() close(route.matched) - route.handler.HandlePacket(s, p) + route.handler.HandleIQ(route.context, s, iq) return } } @@ -88,29 +88,31 @@ func (r *Router) NewRoute() *Route { return route } -// NewIqResultRoute register a route that will catch an IQ result stanza with +// NewIQResultRoute register a route that will catch an IQ result stanza with // the given Id. The route will only match ones, after which it will automatically // be unregistered -func (r *Router) NewIqResultRoute(ctx context.Context, id string) *IqResultRoute { - route := &IqResultRoute{ +func (r *Router) NewIQResultRoute(ctx context.Context, id string) *IQResultRoute { + route := &IQResultRoute{ context: ctx, matched: make(chan struct{}), } - r.iqResultRouteLock.Lock() - r.iqResultRoutes[id] = route - r.iqResultRouteLock.Unlock() + r.IQResultRouteLock.Lock() + r.IQResultRoutes[id] = route + r.IQResultRouteLock.Unlock() + go func() { select { case <-route.context.Done(): - r.iqResultRouteLock.Lock() - delete(r.iqResultRoutes, id) - r.iqResultRouteLock.Unlock() + r.IQResultRouteLock.Lock() + delete(r.IQResultRoutes, id) + r.IQResultRouteLock.Unlock() if route.timeoutHandler != nil { route.timeoutHandler(route.context.Err()) } case <-route.matched: } }() + return route } @@ -135,42 +137,56 @@ func (r *Router) HandleFunc(name string, f func(s Sender, p stanza.Packet)) *Rou return r.NewRoute().Packet(name).HandlerFunc(f) } -// HandleIqResult register a temporary route -func (r *Router) HandleIqResult(id string, handler Handler) *IqResultRoute { - return r.NewIqResultRoute(context.Background(), id).Handler(handler) -} - -func (r *Router) HandleFuncIqResult(id string, f func(s Sender, p stanza.Packet)) *IqResultRoute { - return r.NewIqResultRoute(context.Background(), id).HandlerFunc(f) -} - // ============================================================================ -// IqResultRoute + +// TimeoutHandlerFunc is a function type for handling IQ result timeouts. type TimeoutHandlerFunc func(err error) -type IqResultRoute struct { +// IQResultRoute is a temporary route to match IQ result stanzas +type IQResultRoute struct { context context.Context matched chan struct{} - handler Handler + handler IQResultHandler timeoutHandler TimeoutHandlerFunc } -func (r *IqResultRoute) Handler(handler Handler) *IqResultRoute { +// Handler adds an IQ handler to the route. +func (r *IQResultRoute) Handler(handler IQResultHandler) *IQResultRoute { r.handler = handler return r } -func (r *IqResultRoute) HandlerFunc(f HandlerFunc) *IqResultRoute { +// HandlerFunc updates the route to call a handler function when an IQ result is received. +func (r *IQResultRoute) HandlerFunc(f IQResultHandlerFunc) *IQResultRoute { return r.Handler(f) } -func (r *IqResultRoute) TimeoutHandlerFunc(f TimeoutHandlerFunc) *IqResultRoute { +// TimeoutHandlerFunc registers a function that will be called automatically when +// the IQ result route is cancelled (most likely due to a timeout on the context). +func (r *IQResultRoute) TimeoutHandlerFunc(f TimeoutHandlerFunc) *IQResultRoute { r.timeoutHandler = f return r } +// ============================================================================ +// IQ result handler + +// IQResultHandler is a utility interface for IQ result handlers +type IQResultHandler interface { + HandleIQ(ctx context.Context, s Sender, iq stanza.IQ) +} + +// IQResultHandlerFunc is an adapter to allow using functions as IQ result handlers. +type IQResultHandlerFunc func(ctx context.Context, s Sender, iq stanza.IQ) + +// HandleIQ is a proxy function to implement IQResultHandler using a function. +func (f IQResultHandlerFunc) HandleIQ(ctx context.Context, s Sender, iq stanza.IQ) { + f(ctx, s, iq) +} + // ============================================================================ // Route + type Handler interface { HandlePacket(s Sender, p stanza.Packet) } diff --git a/router_test.go b/router_test.go index 138999c..b63553d 100644 --- a/router_test.go +++ b/router_test.go @@ -13,18 +13,18 @@ import ( // ============================================================================ // Test route & matchers -func TestIqResultRoutes(t *testing.T) { +func TestIQResultRoutes(t *testing.T) { t.Parallel() router := NewRouter() - if router.iqResultRoutes == nil { + if router.IQResultRoutes == nil { t.Fatal("NewRouter does not initialize isResultRoutes") } // Check other IQ does not matcah conn := NewSenderMock() iq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "4321"}) - router.NewIqResultRoute(context.Background(), "1234").HandlerFunc(func(s Sender, p stanza.Packet) { + router.NewIQResultRoute(context.Background(), "1234").HandlerFunc(func(ctx context.Context, s Sender, iq stanza.IQ) { _ = s.SendRaw(successFlag) }) if conn.String() == successFlag { @@ -51,7 +51,7 @@ func TestIqResultRoutes(t *testing.T) { conn = NewSenderMock() ctx, cancel := context.WithCancel(context.Background()) iq = stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) - router.NewIqResultRoute(ctx, "1234").HandlerFunc(func(s Sender, p stanza.Packet) { + router.NewIQResultRoute(ctx, "1234").HandlerFunc(func(ctx context.Context, s Sender, iq stanza.IQ) { _ = s.SendRaw(successFlag) }).TimeoutHandlerFunc(func(err error) { conn.SendRaw(cancelledFlag) From a0e74051fd3abb57a1507b791fc89f46a3cdc474 Mon Sep 17 00:00:00 2001 From: Wichert Akkerman Date: Tue, 29 Oct 2019 11:37:49 +0100 Subject: [PATCH 7/8] Use a channel based API for SendIQ This makes sending IQ more idiomatic Go, but more importantly it solves a problem with contexts that were not being cancelled correctly with the previous API. As a side-effect of this change `Route.route` must now be invoked in a go-routine to prevent deadlocks. This also allows for stanzas to be processed in parallel, which can result in a nice performance win. --- client.go | 13 ++++++----- router.go | 57 ++++++++++++++++------------------------------ router_test.go | 61 +++++++++++++++++++++----------------------------- 3 files changed, 51 insertions(+), 80 deletions(-) diff --git a/client.go b/client.go index c8000bd..a7e6c7d 100644 --- a/client.go +++ b/client.go @@ -231,18 +231,16 @@ func (c *Client) Send(packet stanza.Packet) error { // forever for an IQ result. For example: // // ctx, _ := context.WithTimeout(context.Background(), 30 * time.Second) -// client.SendIQ(ctx, iq, func(s Sender, p stanza.Packet) { -// // Handle the result here -// }) +// result := <- client.SendIQ(ctx, iq) // -func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ, handler IQResultHandlerFunc) (*IQResultRoute, error) { +func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ) (chan stanza.IQ, error) { if iq.Attrs.Type != "set" && iq.Attrs.Type != "get" { return nil, ErrCanOnlySendGetOrSetIq } if err := c.Send(iq); err != nil { return nil, err } - return c.router.NewIQResultRoute(ctx, iq.Attrs.Id).HandlerFunc(handler), nil + return c.router.NewIQResultRoute(ctx, iq.Attrs.Id), nil } // SendRaw sends an XMPP stanza as a string to the server. @@ -295,7 +293,10 @@ func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) (err error) state.Inbound++ } - c.router.route(c, val) + // Do normal route processing in a go-routine so we can immediately + // start receiving other stanzas. This also allows route handlers to + // send and receive more stanzas. + go c.router.route(c, val) } } diff --git a/router.go b/router.go index 05f4b09..23a134e 100644 --- a/router.go +++ b/router.go @@ -51,8 +51,8 @@ func (r *Router) route(s Sender, p stanza.Packet) { r.IQResultRouteLock.Lock() delete(r.IQResultRoutes, iq.Id) r.IQResultRouteLock.Unlock() - close(route.matched) - route.handler.HandleIQ(route.context, s, iq) + route.result <- iq + close(route.result) return } } @@ -91,29 +91,22 @@ func (r *Router) NewRoute() *Route { // NewIQResultRoute register a route that will catch an IQ result stanza with // the given Id. The route will only match ones, after which it will automatically // be unregistered -func (r *Router) NewIQResultRoute(ctx context.Context, id string) *IQResultRoute { - route := &IQResultRoute{ - context: ctx, - matched: make(chan struct{}), - } +func (r *Router) NewIQResultRoute(ctx context.Context, id string) chan stanza.IQ { + route := NewIQResultRoute(ctx) r.IQResultRouteLock.Lock() r.IQResultRoutes[id] = route r.IQResultRouteLock.Unlock() + // Start a go function to make sure the route is unregistered when the context + // is done. go func() { - select { - case <-route.context.Done(): - r.IQResultRouteLock.Lock() - delete(r.IQResultRoutes, id) - r.IQResultRouteLock.Unlock() - if route.timeoutHandler != nil { - route.timeoutHandler(route.context.Err()) - } - case <-route.matched: - } + <-route.context.Done() + r.IQResultRouteLock.Lock() + delete(r.IQResultRoutes, id) + r.IQResultRouteLock.Unlock() }() - return route + return route.result } func (r *Router) Match(p stanza.Packet, match *RouteMatch) bool { @@ -144,28 +137,16 @@ type TimeoutHandlerFunc func(err error) // IQResultRoute is a temporary route to match IQ result stanzas type IQResultRoute struct { - context context.Context - matched chan struct{} - handler IQResultHandler - timeoutHandler TimeoutHandlerFunc -} - -// Handler adds an IQ handler to the route. -func (r *IQResultRoute) Handler(handler IQResultHandler) *IQResultRoute { - r.handler = handler - return r -} - -// HandlerFunc updates the route to call a handler function when an IQ result is received. -func (r *IQResultRoute) HandlerFunc(f IQResultHandlerFunc) *IQResultRoute { - return r.Handler(f) + context context.Context + result chan stanza.IQ } -// TimeoutHandlerFunc registers a function that will be called automatically when -// the IQ result route is cancelled (most likely due to a timeout on the context). -func (r *IQResultRoute) TimeoutHandlerFunc(f TimeoutHandlerFunc) *IQResultRoute { - r.timeoutHandler = f - return r +// NewIQResultRoute creates a new IQResultRoute instance +func NewIQResultRoute(ctx context.Context) *IQResultRoute { + return &IQResultRoute{ + context: ctx, + result: make(chan stanza.IQ), + } } // ============================================================================ diff --git a/router_test.go b/router_test.go index b63553d..f9725ba 100644 --- a/router_test.go +++ b/router_test.go @@ -4,8 +4,8 @@ import ( "bytes" "context" "encoding/xml" - "runtime" "testing" + "time" "gosrc.io/xmpp/stanza" ) @@ -16,52 +16,41 @@ import ( func TestIQResultRoutes(t *testing.T) { t.Parallel() router := NewRouter() + conn := NewSenderMock() if router.IQResultRoutes == nil { t.Fatal("NewRouter does not initialize isResultRoutes") } - // Check other IQ does not matcah - conn := NewSenderMock() - iq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "4321"}) - router.NewIQResultRoute(context.Background(), "1234").HandlerFunc(func(ctx context.Context, s Sender, iq stanza.IQ) { - _ = s.SendRaw(successFlag) - }) - if conn.String() == successFlag { - t.Fatal("IQ result with wrong ID was matched") - } - // Check if the IQ handler was called - conn = NewSenderMock() - iq = stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) - router.route(conn, iq) - if conn.String() != successFlag { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + iq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) + res := router.NewIQResultRoute(ctx, "1234") + go router.route(conn, iq) + select { + case <-ctx.Done(): t.Fatal("IQ result was not matched") + case <-res: + // Success } - // The match must only happen once, so we if receive the same package again it - // must not be matched. - conn = NewSenderMock() - router.route(conn, iq) - if conn.String() == successFlag { - t.Fatal("IQ result was matched twice") + // The match must only happen once, so the id should no longer be in IQResultRoutes + if _, ok := router.IQResultRoutes[iq.Attrs.Id]; ok { + t.Fatal("IQ ID was not removed from the route map") } - // After cancelling a route it should no longer match - conn = NewSenderMock() - ctx, cancel := context.WithCancel(context.Background()) - iq = stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) - router.NewIQResultRoute(ctx, "1234").HandlerFunc(func(ctx context.Context, s Sender, iq stanza.IQ) { - _ = s.SendRaw(successFlag) - }).TimeoutHandlerFunc(func(err error) { - conn.SendRaw(cancelledFlag) - }) - cancel() - // Yield the processor so the cancellation goroutine is triggered - runtime.Gosched() - router.route(conn, iq) - if conn.String() != cancelledFlag { - t.Fatal("IQ result route was matched after cancellation") + // Check other IQ does not matcah + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + iq.Attrs.Id = "4321" + res = router.NewIQResultRoute(ctx, "1234") + go router.route(conn, iq) + select { + case <-ctx.Done(): + // Success + case <-res: + t.Fatal("IQ result with wrong ID was matched") } } From eda5c23c54ecb484fedf1f375d2a25a6047f1dbe Mon Sep 17 00:00:00 2001 From: Wichert Akkerman Date: Thu, 31 Oct 2019 20:08:39 +0100 Subject: [PATCH 8/8] Add SendIQ to StreamClient and Sender This makes it possible to use SendIQ from PostConnect and route handlers. --- component.go | 20 ++++++++++++++++++++ stream_manager.go | 3 +++ 2 files changed, 23 insertions(+) diff --git a/component.go b/component.go index 18be3bc..d459c00 100644 --- a/component.go +++ b/component.go @@ -1,6 +1,7 @@ package xmpp import ( + "context" "crypto/sha1" "encoding/hex" "encoding/xml" @@ -158,6 +159,25 @@ func (c *Component) Send(packet stanza.Packet) error { return nil } +// SendIQ sends an IQ set or get stanza to the server. If a result is received +// the provided handler function will automatically be called. +// +// The provided context should have a timeout to prevent the client from waiting +// forever for an IQ result. For example: +// +// ctx, _ := context.WithTimeout(context.Background(), 30 * time.Second) +// result := <- client.SendIQ(ctx, iq) +// +func (c *Component) SendIQ(ctx context.Context, iq stanza.IQ) (chan stanza.IQ, error) { + if iq.Attrs.Type != "set" && iq.Attrs.Type != "get" { + return nil, ErrCanOnlySendGetOrSetIq + } + if err := c.Send(iq); err != nil { + return nil, err + } + return c.router.NewIQResultRoute(ctx, iq.Attrs.Id), nil +} + // SendRaw sends an XMPP stanza as a string to the server. // It can be invalid XML or XMPP content. In that case, the server will // disconnect the component. It is up to the user of this method to diff --git a/stream_manager.go b/stream_manager.go index c21d79a..1011f6e 100644 --- a/stream_manager.go +++ b/stream_manager.go @@ -1,6 +1,7 @@ package xmpp import ( + "context" "errors" "sync" "time" @@ -26,6 +27,7 @@ type StreamClient interface { Connect() error Resume(state SMState) error Send(packet stanza.Packet) error + SendIQ(ctx context.Context, iq stanza.IQ) (chan stanza.IQ, error) SendRaw(packet string) error Disconnect() SetHandler(handler EventHandler) @@ -35,6 +37,7 @@ type StreamClient interface { // It is mostly use in callback to pass a limited subset of the stream client interface type Sender interface { Send(packet stanza.Packet) error + SendIQ(ctx context.Context, iq stanza.IQ) (chan stanza.IQ, error) SendRaw(packet string) error }