From 1d1adb0c48afed92c0a8585d389968f5fc0966d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?CORNIERE=20R=C3=A9mi?= Date: Tue, 14 Jan 2020 23:13:13 +0100 Subject: [PATCH] Example pubsub code cleanup --- .../xmpp_pubsub_client/xmpp_ps_client.go | 166 +++++++++++------- 1 file changed, 102 insertions(+), 64 deletions(-) diff --git a/_examples/xmpp_pubsub_client/xmpp_ps_client.go b/_examples/xmpp_pubsub_client/xmpp_ps_client.go index eba9bbd..14f0fb0 100644 --- a/_examples/xmpp_pubsub_client/xmpp_ps_client.go +++ b/_examples/xmpp_pubsub_client/xmpp_ps_client.go @@ -32,10 +32,10 @@ func main() { router.NewRoute().Packet("message"). HandlerFunc(func(s xmpp.Sender, p stanza.Packet) { data, _ := xml.Marshal(p) - fmt.Println("Received a publication ! => \n" + string(data)) + log.Println("Received a message ! => \n" + string(data)) }) - client, err := xmpp.NewClient(config, router, func(err error) { fmt.Println(err) }) + client, err := xmpp.NewClient(config, router, func(err error) { log.Println(err) }) if err != nil { log.Fatalf("%+v", err) } @@ -49,11 +49,45 @@ func main() { // ========================== // Create a node + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + createNode(ctx, cancel, client) + + // ============================= + // Configure the node. This can also be done in a single message with the creation + configureNode(ctx, cancel, client) + + // ==================================== + // Subscribe to this node : + subToNode(ctx, cancel, client) + + // ========================== + // Publish to that node + pubToNode(ctx, cancel, client) + + // ============================= + // Let's purge the node : + purgeRq, _ := stanza.NewPurgeAllItems(serviceName, nodeName) + purgeCh, err := client.SendIQ(ctx, purgeRq) + select { + case purgeResp := <-purgeCh: + if purgeResp.Error != nil { + cancel() + log.Fatalf("error while purging node : %s", purgeResp.Error.Text) + } + log.Println("node successfully purged") + case <-time.After(1000 * time.Millisecond): + cancel() + log.Fatal("No iq response was received in time while purging node") + } + + cancel() +} + +func createNode(ctx context.Context, cancel context.CancelFunc, client *xmpp.Client) { rqCreate, err := stanza.NewCreateNode(serviceName, nodeName) if err != nil { log.Fatalf("%+v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) createCh, err := client.SendIQ(ctx, rqCreate) if err != nil { log.Fatalf("%+v", err) @@ -67,20 +101,73 @@ func main() { if respCr.Error.Reason != "conflict" { log.Fatalf("%+v", respCr.Error.Text) } - fmt.Println(respCr.Error.Text) + log.Println(respCr.Error.Text) } else { fmt.Print("successfully created channel") } case <-time.After(100 * time.Millisecond): cancel() - log.Fatal("No iq response was received in time") + log.Fatal("No iq response was received in time while creating node") } } } +} - // ==================================== - // Now let's subscribe to this node : - rqSubscribe, _ := stanza.NewSubRq(serviceName, stanza.SubInfo{ +func configureNode(ctx context.Context, cancel context.CancelFunc, client *xmpp.Client) { + // First, ask for a form with the config options + confRq, _ := stanza.NewConfigureNode(serviceName, nodeName) + confReqCh, err := client.SendIQ(ctx, confRq) + if err != nil { + log.Fatalf("could not send iq : %v", err) + } + select { + case confForm := <-confReqCh: + // If the request was successful, we now have a form with configuration options to update + fields, err := confForm.GetFormFields() + if err != nil { + log.Fatal("No config fields found !") + } + + // These are some common fields expected to be present. Change processing to your liking + if fields["pubsub#max_payload_size"] != nil { + fields["pubsub#max_payload_size"].ValuesList[0] = "100000" + } + + if fields["pubsub#notification_type"] != nil { + fields["pubsub#notification_type"].ValuesList[0] = "headline" + } + + // Send the modified fields as a form + submitConf, err := stanza.NewFormSubmissionOwner(serviceName, + nodeName, + []*stanza.Field{ + fields["pubsub#max_payload_size"], + fields["pubsub#notification_type"], + }) + + c, _ := client.SendIQ(ctx, submitConf) + select { + case confResp := <-c: + if confResp.Error != nil { + cancel() + log.Fatalf("node configuration failed : %s", confResp.Error.Text) + } + log.Println("node configuration was successful") + return + + case <-time.After(300 * time.Millisecond): + cancel() + log.Fatal("No iq response was received in time while configuring the node") + } + + case <-time.After(300 * time.Millisecond): + cancel() + log.Fatal("No iq response was received in time while asking for the config form") + } +} + +func subToNode(ctx context.Context, cancel context.CancelFunc, client *xmpp.Client) { + rqSubscribe, err := stanza.NewSubRq(serviceName, stanza.SubInfo{ Node: nodeName, Jid: userJID, }) @@ -91,15 +178,15 @@ func main() { if subRespCh != nil { select { case <-subRespCh: - fmt.Println("Subscribed to the service") - case <-time.After(100 * time.Millisecond): + log.Println("Subscribed to the service") + case <-time.After(300 * time.Millisecond): cancel() - log.Fatal("No iq response was received in time") + log.Fatal("No iq response was received in time while subscribing") } } +} - // ========================== - // Publish to that node +func pubToNode(ctx context.Context, cancel context.CancelFunc, client *xmpp.Client) { pub, err := stanza.NewPublishItemRq(serviceName, nodeName, "", stanza.Item{ Publisher: "testuser2", Any: &stanza.Node{ @@ -166,59 +253,10 @@ func main() { if pubRespCh != nil { select { case <-pubRespCh: - fmt.Println("Published item to the service") - case <-time.After(100 * time.Millisecond): - cancel() - log.Fatal("No iq response was received in time") - } - } - - // ============================= - // Let's purge the node now : - purgeRq, _ := stanza.NewPurgeAllItems(serviceName, nodeName) - client.SendIQ(ctx, purgeRq) - - // ============================= - // Configure the node : - confRq, _ := stanza.NewConfigureNode(serviceName, nodeName) - confReqCh, err := client.SendIQ(ctx, confRq) - select { - case confForm := <-confReqCh: - fields, err := confForm.GetFormFields() - if err != nil { - log.Fatal("No config fields found !") - } - - // These are some common fields expected to be present. Change processing to your liking - if fields["pubsub#max_payload_size"] != nil { - fields["pubsub#max_payload_size"].ValuesList[0] = "100000" - } - - if fields["pubsub#notification_type"] != nil { - fields["pubsub#notification_type"].ValuesList[0] = "headline" - } - - submitConf, err := stanza.NewFormSubmissionOwner(serviceName, - nodeName, - []*stanza.Field{ - fields["pubsub#max_payload_size"], - fields["pubsub#notification_type"], - }) - - c, _ := client.SendIQ(ctx, submitConf) - select { - case <-c: - fmt.Println("node configuration was successful") + log.Println("Published item to the service") case <-time.After(300 * time.Millisecond): cancel() - log.Fatal("No iq response was received in time") - + log.Fatal("No iq response was received in time while publishing") } - - case <-time.After(300 * time.Millisecond): - cancel() - log.Fatal("No iq response was received in time") } - - cancel() }