Use a channel based API for SendIQ

This makes sending IQ more idiomatic Go, but more importantly it solves
a problem with contexts that were not being cancelled correctly with
the previous API.

As a side-effect of this change `Route.route` must now be invoked in a
go-routine to prevent deadlocks. This also allows for stanzas to be processed
in parallel, which can result in a nice performance win.
disco_info_form
Wichert Akkerman 5 years ago committed by Mickaël Rémond
parent 83bc8581fd
commit a0e74051fd

@ -231,18 +231,16 @@ func (c *Client) Send(packet stanza.Packet) error {
// forever for an IQ result. For example: // forever for an IQ result. For example:
// //
// ctx, _ := context.WithTimeout(context.Background(), 30 * time.Second) // ctx, _ := context.WithTimeout(context.Background(), 30 * time.Second)
// client.SendIQ(ctx, iq, func(s Sender, p stanza.Packet) { // result := <- client.SendIQ(ctx, iq)
// // Handle the result here
// })
// //
func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ, handler IQResultHandlerFunc) (*IQResultRoute, error) { func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ) (chan stanza.IQ, error) {
if iq.Attrs.Type != "set" && iq.Attrs.Type != "get" { if iq.Attrs.Type != "set" && iq.Attrs.Type != "get" {
return nil, ErrCanOnlySendGetOrSetIq return nil, ErrCanOnlySendGetOrSetIq
} }
if err := c.Send(iq); err != nil { if err := c.Send(iq); err != nil {
return nil, err return nil, err
} }
return c.router.NewIQResultRoute(ctx, iq.Attrs.Id).HandlerFunc(handler), nil return c.router.NewIQResultRoute(ctx, iq.Attrs.Id), nil
} }
// SendRaw sends an XMPP stanza as a string to the server. // SendRaw sends an XMPP stanza as a string to the server.
@ -295,7 +293,10 @@ func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) (err error)
state.Inbound++ state.Inbound++
} }
c.router.route(c, val) // Do normal route processing in a go-routine so we can immediately
// start receiving other stanzas. This also allows route handlers to
// send and receive more stanzas.
go c.router.route(c, val)
} }
} }

@ -51,8 +51,8 @@ func (r *Router) route(s Sender, p stanza.Packet) {
r.IQResultRouteLock.Lock() r.IQResultRouteLock.Lock()
delete(r.IQResultRoutes, iq.Id) delete(r.IQResultRoutes, iq.Id)
r.IQResultRouteLock.Unlock() r.IQResultRouteLock.Unlock()
close(route.matched) route.result <- iq
route.handler.HandleIQ(route.context, s, iq) close(route.result)
return return
} }
} }
@ -91,29 +91,22 @@ func (r *Router) NewRoute() *Route {
// NewIQResultRoute register a route that will catch an IQ result stanza with // NewIQResultRoute register a route that will catch an IQ result stanza with
// the given Id. The route will only match ones, after which it will automatically // the given Id. The route will only match ones, after which it will automatically
// be unregistered // be unregistered
func (r *Router) NewIQResultRoute(ctx context.Context, id string) *IQResultRoute { func (r *Router) NewIQResultRoute(ctx context.Context, id string) chan stanza.IQ {
route := &IQResultRoute{ route := NewIQResultRoute(ctx)
context: ctx,
matched: make(chan struct{}),
}
r.IQResultRouteLock.Lock() r.IQResultRouteLock.Lock()
r.IQResultRoutes[id] = route r.IQResultRoutes[id] = route
r.IQResultRouteLock.Unlock() r.IQResultRouteLock.Unlock()
// Start a go function to make sure the route is unregistered when the context
// is done.
go func() { go func() {
select { <-route.context.Done()
case <-route.context.Done(): r.IQResultRouteLock.Lock()
r.IQResultRouteLock.Lock() delete(r.IQResultRoutes, id)
delete(r.IQResultRoutes, id) r.IQResultRouteLock.Unlock()
r.IQResultRouteLock.Unlock()
if route.timeoutHandler != nil {
route.timeoutHandler(route.context.Err())
}
case <-route.matched:
}
}() }()
return route return route.result
} }
func (r *Router) Match(p stanza.Packet, match *RouteMatch) bool { func (r *Router) Match(p stanza.Packet, match *RouteMatch) bool {
@ -144,28 +137,16 @@ type TimeoutHandlerFunc func(err error)
// IQResultRoute is a temporary route to match IQ result stanzas // IQResultRoute is a temporary route to match IQ result stanzas
type IQResultRoute struct { type IQResultRoute struct {
context context.Context context context.Context
matched chan struct{} result chan stanza.IQ
handler IQResultHandler
timeoutHandler TimeoutHandlerFunc
}
// Handler adds an IQ handler to the route.
func (r *IQResultRoute) Handler(handler IQResultHandler) *IQResultRoute {
r.handler = handler
return r
}
// HandlerFunc updates the route to call a handler function when an IQ result is received.
func (r *IQResultRoute) HandlerFunc(f IQResultHandlerFunc) *IQResultRoute {
return r.Handler(f)
} }
// TimeoutHandlerFunc registers a function that will be called automatically when // NewIQResultRoute creates a new IQResultRoute instance
// the IQ result route is cancelled (most likely due to a timeout on the context). func NewIQResultRoute(ctx context.Context) *IQResultRoute {
func (r *IQResultRoute) TimeoutHandlerFunc(f TimeoutHandlerFunc) *IQResultRoute { return &IQResultRoute{
r.timeoutHandler = f context: ctx,
return r result: make(chan stanza.IQ),
}
} }
// ============================================================================ // ============================================================================

@ -4,8 +4,8 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/xml" "encoding/xml"
"runtime"
"testing" "testing"
"time"
"gosrc.io/xmpp/stanza" "gosrc.io/xmpp/stanza"
) )
@ -16,52 +16,41 @@ import (
func TestIQResultRoutes(t *testing.T) { func TestIQResultRoutes(t *testing.T) {
t.Parallel() t.Parallel()
router := NewRouter() router := NewRouter()
conn := NewSenderMock()
if router.IQResultRoutes == nil { if router.IQResultRoutes == nil {
t.Fatal("NewRouter does not initialize isResultRoutes") t.Fatal("NewRouter does not initialize isResultRoutes")
} }
// Check other IQ does not matcah
conn := NewSenderMock()
iq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "4321"})
router.NewIQResultRoute(context.Background(), "1234").HandlerFunc(func(ctx context.Context, s Sender, iq stanza.IQ) {
_ = s.SendRaw(successFlag)
})
if conn.String() == successFlag {
t.Fatal("IQ result with wrong ID was matched")
}
// Check if the IQ handler was called // Check if the IQ handler was called
conn = NewSenderMock() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
iq = stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) defer cancel()
router.route(conn, iq) iq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"})
if conn.String() != successFlag { res := router.NewIQResultRoute(ctx, "1234")
go router.route(conn, iq)
select {
case <-ctx.Done():
t.Fatal("IQ result was not matched") t.Fatal("IQ result was not matched")
case <-res:
// Success
} }
// The match must only happen once, so we if receive the same package again it // The match must only happen once, so the id should no longer be in IQResultRoutes
// must not be matched. if _, ok := router.IQResultRoutes[iq.Attrs.Id]; ok {
conn = NewSenderMock() t.Fatal("IQ ID was not removed from the route map")
router.route(conn, iq)
if conn.String() == successFlag {
t.Fatal("IQ result was matched twice")
} }
// After cancelling a route it should no longer match // Check other IQ does not matcah
conn = NewSenderMock() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
ctx, cancel := context.WithCancel(context.Background()) defer cancel()
iq = stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) iq.Attrs.Id = "4321"
router.NewIQResultRoute(ctx, "1234").HandlerFunc(func(ctx context.Context, s Sender, iq stanza.IQ) { res = router.NewIQResultRoute(ctx, "1234")
_ = s.SendRaw(successFlag) go router.route(conn, iq)
}).TimeoutHandlerFunc(func(err error) { select {
conn.SendRaw(cancelledFlag) case <-ctx.Done():
}) // Success
cancel() case <-res:
// Yield the processor so the cancellation goroutine is triggered t.Fatal("IQ result with wrong ID was matched")
runtime.Gosched()
router.route(conn, iq)
if conn.String() != cancelledFlag {
t.Fatal("IQ result route was matched after cancellation")
} }
} }

Loading…
Cancel
Save