Add priority for and allow cancellation of outgoing stanzas

This commit is contained in:
Marvin W 2023-01-31 15:13:12 +01:00
parent 18321ed15c
commit d76e12b215
No known key found for this signature in database
GPG key ID: 072E9235DB996F2A
10 changed files with 85 additions and 57 deletions

View file

@ -118,7 +118,7 @@ public class Dino.HistorySync {
} }
} }
public async void fetch_everything(Account account, Jid mam_server, DateTime until_earliest_time = new DateTime.from_unix_utc(0)) { public async void fetch_everything(Account account, Jid mam_server, Cancellable? cancellable = null, DateTime until_earliest_time = new DateTime.from_unix_utc(0)) {
debug("Fetch everything for %s %s", mam_server.to_string(), until_earliest_time != null ? @"(until $until_earliest_time)" : ""); debug("Fetch everything for %s %s", mam_server.to_string(), until_earliest_time != null ? @"(until $until_earliest_time)" : "");
RowOption latest_row_opt = db.mam_catchup.select() RowOption latest_row_opt = db.mam_catchup.select()
.with(db.mam_catchup.account_id, "=", account.id) .with(db.mam_catchup.account_id, "=", account.id)
@ -128,7 +128,7 @@ public class Dino.HistorySync {
.single().row(); .single().row();
Row? latest_row = latest_row_opt.is_present() ? latest_row_opt.inner : null; Row? latest_row = latest_row_opt.is_present() ? latest_row_opt.inner : null;
Row? new_row = yield fetch_latest_page(account, mam_server, latest_row, until_earliest_time); Row? new_row = yield fetch_latest_page(account, mam_server, latest_row, until_earliest_time, cancellable);
if (new_row != null) { if (new_row != null) {
current_catchup_id[account][mam_server] = new_row[db.mam_catchup.id]; current_catchup_id[account][mam_server] = new_row[db.mam_catchup.id];
@ -182,7 +182,7 @@ public class Dino.HistorySync {
} }
// Fetches the latest page (up to previous db row). Extends the previous db row if it was reached, creates a new row otherwise. // Fetches the latest page (up to previous db row). Extends the previous db row if it was reached, creates a new row otherwise.
public async Row? fetch_latest_page(Account account, Jid mam_server, Row? latest_row, DateTime? until_earliest_time) { public async Row? fetch_latest_page(Account account, Jid mam_server, Row? latest_row, DateTime? until_earliest_time, Cancellable? cancellable = null) {
debug("[%s | %s] Fetching latest page", account.bare_jid.to_string(), mam_server.to_string()); debug("[%s | %s] Fetching latest page", account.bare_jid.to_string(), mam_server.to_string());
int latest_row_id = -1; int latest_row_id = -1;
@ -203,7 +203,7 @@ public class Dino.HistorySync {
var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_latest(mam_server, latest_message_time, latest_message_id); var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_latest(mam_server, latest_message_time, latest_message_id);
PageRequestResult page_result = yield get_mam_page(account, query_params, null); PageRequestResult page_result = yield get_mam_page(account, query_params, null, cancellable);
debug("[%s | %s] Latest page result: %s", account.bare_jid.to_string(), mam_server.to_string(), page_result.page_result.to_string()); debug("[%s | %s] Latest page result: %s", account.bare_jid.to_string(), mam_server.to_string(), page_result.page_result.to_string());
if (page_result.page_result == PageResult.Error) { if (page_result.page_result == PageResult.Error) {
@ -299,7 +299,7 @@ public class Dino.HistorySync {
return null; return null;
} }
private async void fetch_before_range(Account account, Jid mam_server, Row range, DateTime? until_earliest_time) { private async void fetch_before_range(Account account, Jid mam_server, Row range, DateTime? until_earliest_time, Cancellable? cancellable = null) {
DateTime latest_time = new DateTime.from_unix_utc(range[db.mam_catchup.from_time]); DateTime latest_time = new DateTime.from_unix_utc(range[db.mam_catchup.from_time]);
string latest_id = range[db.mam_catchup.from_id]; string latest_id = range[db.mam_catchup.from_id];
debug("[%s | %s] Fetching before range < %s, %s", account.bare_jid.to_string(), mam_server.to_string(), latest_time.to_string(), latest_id); debug("[%s | %s] Fetching before range < %s, %s", account.bare_jid.to_string(), mam_server.to_string(), latest_time.to_string(), latest_id);
@ -314,18 +314,18 @@ public class Dino.HistorySync {
latest_time, latest_id latest_time, latest_id
); );
} }
yield fetch_query(account, query_params, range[db.mam_catchup.id]); yield fetch_query(account, query_params, range[db.mam_catchup.id], cancellable);
} }
/** /**
* Iteratively fetches all pages returned for a query (until a PageResult other than MorePagesAvailable is returned) * Iteratively fetches all pages returned for a query (until a PageResult other than MorePagesAvailable is returned)
* @return The last PageRequestResult result * @return The last PageRequestResult result
**/ **/
private async PageRequestResult fetch_query(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int db_id) { private async PageRequestResult fetch_query(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int db_id, Cancellable? cancellable = null) {
debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : ""); debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : "");
PageRequestResult? page_result = null; PageRequestResult? page_result = null;
do { do {
page_result = yield get_mam_page(account, query_params, page_result); page_result = yield get_mam_page(account, query_params, page_result, cancellable);
debug("Page result %s %b", page_result.page_result.to_string(), page_result.stanzas == null); debug("Page result %s %b", page_result.page_result.to_string(), page_result.stanzas == null);
if (page_result.page_result == PageResult.Error || page_result.stanzas == null) return page_result; if (page_result.page_result == PageResult.Error || page_result.stanzas == null) return page_result;
@ -360,7 +360,7 @@ public class Dino.HistorySync {
/** /**
* prev_page_result: null if this is the first page request * prev_page_result: null if this is the first page request
**/ **/
private async PageRequestResult get_mam_page(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, PageRequestResult? prev_page_result) { private async PageRequestResult get_mam_page(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, PageRequestResult? prev_page_result, Cancellable? cancellable = null) {
XmppStream stream = stream_interactor.get_stream(account); XmppStream stream = stream_interactor.get_stream(account);
Xmpp.MessageArchiveManagement.QueryResult query_result = null; Xmpp.MessageArchiveManagement.QueryResult query_result = null;
if (prev_page_result == null) { if (prev_page_result == null) {
@ -368,10 +368,10 @@ public class Dino.HistorySync {
} else { } else {
query_result = yield Xmpp.MessageArchiveManagement.V2.page_through_results(stream, query_params, prev_page_result.query_result); query_result = yield Xmpp.MessageArchiveManagement.V2.page_through_results(stream, query_params, prev_page_result.query_result);
} }
return yield process_query_result(account, query_params, query_result); return yield process_query_result(account, query_params, query_result, cancellable);
} }
private async PageRequestResult process_query_result(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, Xmpp.MessageArchiveManagement.QueryResult query_result) { private async PageRequestResult process_query_result(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, Xmpp.MessageArchiveManagement.QueryResult query_result, Cancellable? cancellable = null) {
PageResult page_result = PageResult.MorePagesAvailable; PageResult page_result = PageResult.MorePagesAvailable;
if (query_result.malformed || query_result.error) { if (query_result.malformed || query_result.error) {

View file

@ -109,7 +109,7 @@ public class MucManager : StreamInteractionModule, Object {
} else { } else {
// Fetch everything up to the last time the user actively joined // Fetch everything up to the last time the user actively joined
stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync
.fetch_everything.begin(account, jid.bare_jid, conversation.active_last_changed); .fetch_everything.begin(account, jid.bare_jid, null, conversation.active_last_changed);
} }
} }
} else if (res.muc_error != null) { } else if (res.muc_error != null) {

View file

@ -1,7 +1,7 @@
using Gee; using Gee;
public interface Xmpp.WriteNodeFunc : Object { public interface Xmpp.WriteNodeFunc : Object {
public abstract async void write_stanza(XmppStream stream, StanzaNode node) throws IOError; public abstract async void write_stanza(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError;
} }
public abstract class Xmpp.IoXmppStream : XmppStream { public abstract class Xmpp.IoXmppStream : XmppStream {
@ -44,22 +44,22 @@ public abstract class Xmpp.IoXmppStream : XmppStream {
} }
[Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")] [Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")]
public override void write(StanzaNode node) { public override void write(StanzaNode node, int io_priority = Priority.DEFAULT) {
write_async.begin(node, (obj, res) => { write_async.begin(node, io_priority, null, (obj, res) => {
try { try {
write_async.end(res); write_async.end(res);
} catch (Error e) { } } catch (Error e) { }
}); });
} }
public override async void write_async(StanzaNode node) throws IOError { public override async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
if (write_obj != null) { if (write_obj != null) {
yield write_obj.write_stanza(this, node); yield write_obj.write_stanza(this, node, io_priority, cancellable);
} else { } else {
StanzaWriter? writer = this.writer; StanzaWriter? writer = this.writer;
if (writer == null) throw new IOError.NOT_CONNECTED("trying to write, but no stream open"); if (writer == null) throw new IOError.NOT_CONNECTED("trying to write, but no stream open");
log.node("OUT", node, this); log.node("OUT", node, this);
yield ((!)writer).write_node(node); yield ((!)writer).write_node(node, io_priority, cancellable);
} }
} }

View file

@ -12,11 +12,11 @@ public class StanzaWriter {
this.output = output; this.output = output;
} }
public async void write_node(StanzaNode node) throws IOError { public async void write_node(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield write_data(node.to_xml().data); yield write_data(node.to_xml().data, io_priority, cancellable);
} }
public async void write_nodes(StanzaNode node1, StanzaNode node2) throws IOError { public async void write_nodes(StanzaNode node1, StanzaNode node2, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
var data1 = node1.to_xml().data; var data1 = node1.to_xml().data;
var data2 = node2.to_xml().data; var data2 = node2.to_xml().data;
@ -29,21 +29,21 @@ public class StanzaWriter {
concat[i++] = datum; concat[i++] = datum;
} }
yield write_data(concat); yield write_data(concat, io_priority, cancellable);
} }
public async void write(string s) throws IOError { public async void write(string s, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield write_data(s.data); yield write_data(s.data, io_priority, cancellable);
} }
private async void write_data(owned uint8[] data) throws IOError { private async void write_data(owned uint8[] data, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
if (running) { if (running) {
queue.push_tail(new SourceFuncWrapper(write_data.callback)); queue.push_tail(new SourceFuncWrapper(write_data.callback));
yield; yield;
} }
running = true; running = true;
try { try {
yield output.write_all_async(data, 0, null, null); yield output.write_all_async(data, io_priority, cancellable, null);
} catch (IOError e) { } catch (IOError e) {
cancel(); cancel();
throw e; throw e;

View file

@ -37,9 +37,9 @@ public abstract class Xmpp.XmppStream {
public abstract async StanzaNode read() throws IOError; public abstract async StanzaNode read() throws IOError;
[Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")] [Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")]
public abstract void write(StanzaNode node); public abstract void write(StanzaNode node, int io_priority = Priority.DEFAULT);
public abstract async void write_async(StanzaNode node) throws IOError; public abstract async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError;
public abstract async void setup() throws IOError; public abstract async void setup() throws IOError;

View file

@ -12,22 +12,25 @@ namespace Xmpp.Iq {
private HashMap<string, ResponseListener> responseListeners = new HashMap<string, ResponseListener>(); private HashMap<string, ResponseListener> responseListeners = new HashMap<string, ResponseListener>();
private HashMap<string, ArrayList<Handler>> namespaceRegistrants = new HashMap<string, ArrayList<Handler>>(); private HashMap<string, ArrayList<Handler>> namespaceRegistrants = new HashMap<string, ArrayList<Handler>>();
public async Iq.Stanza send_iq_async(XmppStream stream, Iq.Stanza iq) { public async Iq.Stanza send_iq_async(XmppStream stream, Iq.Stanza iq, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
assert(iq.type_ == Iq.Stanza.TYPE_GET || iq.type_ == Iq.Stanza.TYPE_SET); assert(iq.type_ == Iq.Stanza.TYPE_GET || iq.type_ == Iq.Stanza.TYPE_SET);
preprocess_outgoing_iq_set_get(stream, iq);
Iq.Stanza? return_stanza = null; Iq.Stanza? return_stanza = null;
send_iq(stream, iq, (_, result_iq) => { responseListeners[iq.id] = new ResponseListener((_, result_iq) => {
return_stanza = result_iq; return_stanza = result_iq;
Idle.add(send_iq_async.callback); Idle.add(send_iq_async.callback);
}); });
stream.write_async(iq.stanza, io_priority, cancellable);
yield; yield;
cancellable.set_error_if_cancelled();
return return_stanza; return return_stanza;
} }
public delegate void OnResult(XmppStream stream, Iq.Stanza iq); public delegate void OnResult(XmppStream stream, Iq.Stanza iq);
public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null) { public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null, int io_priority = Priority.DEFAULT) {
preprocess_outgoing_iq_set_get(stream, iq); preprocess_outgoing_iq_set_get(stream, iq);
stream.write(iq.stanza); stream.write(iq.stanza, io_priority);
if (listener != null) { if (listener != null) {
responseListeners[iq.id] = new ResponseListener((owned) listener); responseListeners[iq.id] = new ResponseListener((owned) listener);
} }

View file

@ -13,32 +13,51 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
public string? session_id { get; set; default=null; } public string? session_id { get; set; default=null; }
public Gee.List<XmppStreamFlag> flags = null; public Gee.List<XmppStreamFlag> flags = null;
private HashMap<int, QueueItem> in_flight_stanzas = new HashMap<int, QueueItem>(); private HashMap<int, QueueItem> in_flight_stanzas = new HashMap<int, QueueItem>();
private Gee.List<QueueItem> node_queue = new ArrayList<QueueItem>(); private Gee.Queue<QueueItem> node_queue = new PriorityQueue<QueueItem>((a, b) => {
return a.io_priority - b.io_priority;
});
private class QueueItem { private class QueueItem {
public StanzaNode node; public StanzaNode node;
public Promise<IOError?> promise; public int io_priority;
public Cancellable? cancellable;
public Promise<void*> promise;
public QueueItem(StanzaNode node, Promise<IOError?> promise) { public QueueItem(StanzaNode node, int io_priority, Cancellable? cancellable) {
this.node = node; this.node = node;
this.promise = promise; this.io_priority = io_priority;
this.cancellable = cancellable;
this.promise = new Promise<void*>();
} }
} }
public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError { public async void write_stanza(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
var promise = new Promise<IOError?>(); var future = enqueue_stanza(stream, node, io_priority, cancellable);
node_queue.add(new QueueItem(node, promise));
check_queue(stream);
try { try {
yield promise.future.wait_async(); yield future.wait_async();
} catch (FutureError e) { } catch (FutureError e) {
throw new IOError.FAILED("Future returned error %i".printf(e.code)); if (e is FutureError.ABANDON_PROMISE) {
throw new IOError.FAILED("Future abandoned: %s".printf(e.message));
} else if (e is FutureError.EXCEPTION) {
if (future.exception is IOError) {
throw (IOError) future.exception;
} else {
throw new IOError.FAILED("Unknown error %s".printf(future.exception.message));
}
} else {
throw new IOError.FAILED("Unknown future error: %s".printf(e.message));
}
} }
} }
internal async void write_node(XmppStream stream, StanzaNode node) { private Future<void*> enqueue_stanza(XmppStream stream, StanzaNode node, int io_priority, Cancellable? cancellable) {
var queue_item = new QueueItem(node, io_priority, cancellable);
node_queue.offer(queue_item);
check_queue(stream);
return queue_item.promise.future;
}
internal async void write_node(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) {
StanzaWriter? writer = ((IoXmppStream)stream).writer; StanzaWriter? writer = ((IoXmppStream)stream).writer;
if (writer == null) return; if (writer == null) return;
try { try {
@ -46,22 +65,28 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
if (node.name == "message" || node.name == "iq" || node.name == "presence") { if (node.name == "message" || node.name == "iq" || node.name == "presence") {
var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns(); var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns();
stream.log.node("OUT", r_node, stream); stream.log.node("OUT", r_node, stream);
yield ((!)writer).write_nodes(node, r_node); yield ((!)writer).write_nodes(node, r_node, io_priority, cancellable);
} else { } else {
yield ((!)writer).write_node(node); yield ((!)writer).write_node(node, io_priority, cancellable);
} }
} catch (IOError e) { } } catch (IOError e) { }
} }
private void check_queue(XmppStream stream) { private void check_queue(XmppStream stream) {
while (!node_queue.is_empty && in_flight_stanzas.size < 10) { while (!node_queue.is_empty && in_flight_stanzas.size < 10) {
QueueItem queue_item = node_queue.remove_at(0); QueueItem queue_item = node_queue.poll();
try {
queue_item.cancellable.set_error_if_cancelled();
} catch (IOError e) {
queue_item.promise.set_exception(e);
continue;
}
StanzaNode node = queue_item.node; StanzaNode node = queue_item.node;
if (node.name == "message" || node.name == "iq" || node.name == "presence") { if (node.name == "message" || node.name == "iq" || node.name == "presence") {
in_flight_stanzas[++h_outbound] = queue_item; in_flight_stanzas[++h_outbound] = queue_item;
} }
write_node.begin(stream, node); write_node.begin(stream, node, queue_item.io_priority, queue_item.cancellable);
} }
} }

View file

@ -9,7 +9,7 @@ namespace Xmpp.Xep.Ping {
public async Iq.Stanza send_ping(XmppStream stream, Jid jid) { public async Iq.Stanza send_ping(XmppStream stream, Jid jid) {
StanzaNode ping_node = new StanzaNode.build("ping", NS_URI).add_self_xmlns(); StanzaNode ping_node = new StanzaNode.build("ping", NS_URI).add_self_xmlns();
Iq.Stanza iq = new Iq.Stanza.get(ping_node) { to=jid }; Iq.Stanza iq = new Iq.Stanza.get(ping_node) { to=jid };
return yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq); return yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq, Priority.HIGH);
} }
public override void attach(XmppStream stream) { public override void attach(XmppStream stream) {
@ -23,7 +23,7 @@ namespace Xmpp.Xep.Ping {
} }
public async void on_iq_get(XmppStream stream, Iq.Stanza iq) { public async void on_iq_get(XmppStream stream, Iq.Stanza iq) {
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq), null, Priority.HIGH);
} }
public override string get_ns() { return NS_URI; } public override string get_ns() { return NS_URI; }

View file

@ -61,20 +61,20 @@ namespace Xmpp.MessageArchiveManagement.V2 {
return MessageArchiveManagement.create_base_query(stream, MessageArchiveManagement.NS_URI_2, mam_params.query_id, fields); return MessageArchiveManagement.create_base_query(stream, MessageArchiveManagement.NS_URI_2, mam_params.query_id, fields);
} }
public async QueryResult query_archive(XmppStream stream, MamQueryParams mam_params) { public async QueryResult query_archive(XmppStream stream, MamQueryParams mam_params, Cancellable? cancellable = null) {
var query_node = create_base_query(stream, mam_params); var query_node = create_base_query(stream, mam_params);
if (!mam_params.use_ns2_extended) { if (!mam_params.use_ns2_extended) {
query_node.put_node(ResultSetManagement.create_set_rsm_node_before(mam_params.end_id)); query_node.put_node(ResultSetManagement.create_set_rsm_node_before(mam_params.end_id));
} }
return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node); return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node, cancellable);
} }
public async QueryResult page_through_results(XmppStream stream, MamQueryParams mam_params, QueryResult prev_result) { public async QueryResult page_through_results(XmppStream stream, MamQueryParams mam_params, QueryResult prev_result, Cancellable? cancellable = null) {
var query_node = create_base_query(stream, mam_params); var query_node = create_base_query(stream, mam_params);
query_node.put_node(ResultSetManagement.create_set_rsm_node_before(prev_result.first)); query_node.put_node(ResultSetManagement.create_set_rsm_node_before(prev_result.first));
return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node); return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node, cancellable);
} }
} }

View file

@ -71,7 +71,7 @@ public class Module : XmppStreamModule {
return query_node; return query_node;
} }
internal async QueryResult query_archive(XmppStream stream, string ns, Jid? mam_server, StanzaNode query_node) { internal async QueryResult query_archive(XmppStream stream, string ns, Jid? mam_server, StanzaNode query_node, Cancellable? cancellable = null) {
var res = new QueryResult(); var res = new QueryResult();
if (stream.get_flag(Flag.IDENTITY) == null) { res.error = true; return res; } if (stream.get_flag(Flag.IDENTITY) == null) { res.error = true; return res; }
@ -79,7 +79,7 @@ public class Module : XmppStreamModule {
// Build and send query // Build and send query
Iq.Stanza iq = new Iq.Stanza.set(query_node) { to=mam_server }; Iq.Stanza iq = new Iq.Stanza.set(query_node) { to=mam_server };
Iq.Stanza result_iq = yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq); Iq.Stanza result_iq = yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq, Priority.LOW, cancellable);
// Parse the response IQ into a QueryResult. // Parse the response IQ into a QueryResult.
StanzaNode? fin_node = result_iq.stanza.get_subnode("fin", ns); StanzaNode? fin_node = result_iq.stanza.get_subnode("fin", ns);