Improve stream management queue

This commit is contained in:
fiaxh 2020-07-21 15:48:42 +02:00
parent 7309c6f3ac
commit c887240fdc
3 changed files with 22 additions and 21 deletions

View file

@ -1,6 +1,8 @@
namespace Xmpp { namespace Xmpp {
public class StanzaWriter { public class StanzaWriter {
public signal void cancel();
private OutputStream output; private OutputStream output;
private Queue<SourceFuncWrapper> queue = new Queue<SourceFuncWrapper>(); private Queue<SourceFuncWrapper> queue = new Queue<SourceFuncWrapper>();
@ -43,6 +45,7 @@ public class StanzaWriter {
try { try {
yield output.write_all_async(data, 0, null, null); yield output.write_all_async(data, 0, null, null);
} catch (GLib.Error e) { } catch (GLib.Error e) {
cancel();
throw new XmlError.IO(@"IOError in GLib: $(e.message)"); throw new XmlError.IO(@"IOError in GLib: $(e.message)");
} finally { } finally {
SourceFuncWrapper? sfw = queue.pop_head(); SourceFuncWrapper? sfw = queue.pop_head();

View file

@ -97,6 +97,8 @@ public class XmppStream {
this.stream = stream; this.stream = stream;
reader = new StanzaReader.for_stream(stream.input_stream); reader = new StanzaReader.for_stream(stream.input_stream);
writer = new StanzaWriter.for_stream(stream.output_stream); writer = new StanzaWriter.for_stream(stream.output_stream);
writer.cancel.connect(reader.cancel);
require_setup(); require_setup();
} }
@ -123,7 +125,9 @@ public class XmppStream {
[Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")] [Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")]
public void write(StanzaNode node) { public void write(StanzaNode node) {
write_async.begin(node, (obj, res) => { write_async.begin(node, (obj, res) => {
write_async.end(res); try {
write_async.end(res);
} catch (Error e) { }
}); });
} }

View file

@ -26,25 +26,21 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
} }
public async void write_stanza(XmppStream stream, StanzaNode node) throws IOStreamError { public async void write_stanza(XmppStream stream, StanzaNode node) throws IOStreamError {
if (stream.has_flag(Flag.IDENTITY)) { var promise = new Promise<IOError?>();
var promise = new Promise<IOError?>();
node_queue.add(new QueueItem(node, promise)); node_queue.add(new QueueItem(node, promise));
check_queue(stream); check_queue(stream);
try { try {
yield promise.future.wait_async(); yield promise.future.wait_async();
} catch (FutureError e) { } catch (FutureError e) {
throw new IOStreamError.WRITE("Future returned error %i".printf(e.code)); throw new IOStreamError.WRITE("Future returned error %i".printf(e.code));
}
} else {
yield write_node(stream, node);
} }
} }
internal async void write_node(XmppStream stream, StanzaNode node) throws IOError { internal async void write_node(XmppStream stream, StanzaNode node) {
StanzaWriter? writer = stream.writer; StanzaWriter? writer = stream.writer;
if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); if (writer == null) return;
try { try {
stream.log.node("OUT", node, stream); stream.log.node("OUT", node, stream);
if (node.name == "message" || node.name == "iq" || node.name == "presence") { if (node.name == "message" || node.name == "iq" || node.name == "presence") {
@ -54,12 +50,10 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
} else { } else {
yield ((!)writer).write_node(node); yield ((!)writer).write_node(node);
} }
} catch (XmlError e) { } catch (XmlError e) { }
throw new IOStreamError.WRITE(e.message);
}
} }
private void check_queue(XmppStream stream) throws IOError { private void check_queue(XmppStream stream) {
while (!node_queue.is_empty && in_flight_stanzas.size < 10) { while (!node_queue.is_empty && in_flight_stanzas.size < 10) {
QueueItem queue_item = node_queue.remove_at(0); QueueItem queue_item = node_queue.remove_at(0);
StanzaNode node = queue_item.node; StanzaNode node = queue_item.node;
@ -115,7 +109,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns() StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns()
.put_attribute("h", h_inbound.to_string()) .put_attribute("h", h_inbound.to_string())
.put_attribute("previd", session_id); .put_attribute("previd", session_id);
write_node(stream, node); write_node.begin(stream, node);
stream.add_flag(new Flag()); stream.add_flag(new Flag());
} }
} }
@ -124,7 +118,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
private void check_enable(XmppStream stream) { private void check_enable(XmppStream stream) {
if (stream_has_sm_feature(stream) && session_id == null) { if (stream_has_sm_feature(stream) && session_id == null) {
StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true"); StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true");
write_node(stream, node); write_node.begin(stream, node);
stream.add_flag(new Flag()); stream.add_flag(new Flag());
h_outbound = 0; h_outbound = 0;
} }
@ -174,7 +168,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
private void send_ack(XmppStream stream) { private void send_ack(XmppStream stream) {
StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string()); StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string());
write_node(stream, node); write_node.begin(stream, node);
} }
private void handle_ack(XmppStream stream, StanzaNode node) { private void handle_ack(XmppStream stream, StanzaNode node) {