diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala index 2650a194..9af9f30e 100644 --- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -60,9 +60,8 @@ public class Module : XmppStreamModule, Iq.Handler { } public class Connection : IOStream { - // TODO(hrxi): Fix reference cycle public class Input : InputStream { - private Connection connection; + private weak Connection connection; public Input(Connection connection) { this.connection = connection; } @@ -73,14 +72,14 @@ public class Connection : IOStream { return yield connection.read_async(buffer, io_priority, cancellable); } public override bool close(Cancellable? cancellable = null) throws IOError { - return connection.close_read(cancellable); + throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams"); } public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { return yield connection.close_read_async(io_priority, cancellable); } } public class Output : OutputStream { - private Connection connection; + private weak Connection connection; public Output(Connection connection) { this.connection = connection; } @@ -91,7 +90,7 @@ public class Connection : IOStream { return yield connection.write_async(buffer, io_priority, cancellable); } public override bool close(Cancellable? cancellable = null) throws IOError { - return connection.close_write(cancellable); + throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams"); } public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { return yield connection.close_write_async(io_priority, cancellable); @@ -263,13 +262,6 @@ public class Connection : IOStream { return buffer.length; } - public bool close_read(Cancellable? cancellable = null) { - input_closed = true; - if (!output_closed) { - return true; - } - return close_impl(cancellable); - } public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { input_closed = true; if (!output_closed) { @@ -277,13 +269,6 @@ public class Connection : IOStream { } return yield close_async_impl(io_priority, cancellable); } - public bool close_write(Cancellable? cancellable = null) { - output_closed = true; - if (!input_closed) { - return true; - } - return close_impl(cancellable); - } public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { output_closed = true; if (!input_closed) { @@ -292,7 +277,7 @@ public class Connection : IOStream { return yield close_async_impl(io_priority, cancellable); } delegate void OnClose(bool success); - private bool close_impl(Cancellable? cancellable = null, OnClose? on_close = null) { + private bool close_impl(Cancellable? cancellable, OnClose on_close) { if (state == State.DISCONNECTING || state == State.DISCONNECTED || state == State.ERROR) { on_close(true); return true; diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index ae872ac6..ee7df994 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -184,7 +184,7 @@ public class Module : XmppStreamModule, Iq.Handler { if (transport == null || transport.transport_type() != type) { StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("unsupported-transports", NS_URI)); - session.terminate(stream, reason); + session.terminate(stream, reason, "unsupported transports"); return; } @@ -310,7 +310,8 @@ public class Session { TransportParameters? transport = null; // ACTIVE - public IOStream? conn { get; private set; } + private Connection? connection; + public IOStream? conn { get { return connection; } } // Only interesting in INITIATE_SENT. // Signals that the session has been accepted by the peer. @@ -323,7 +324,7 @@ public class Session { this.peer_full_jid = peer_full_jid; this.content_name = content_name; this.transport = transport; - this.conn = null; + this.connection = new Connection(this); } public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) { @@ -333,7 +334,7 @@ public class Session { this.peer_full_jid = peer_full_jid; this.content_name = content_name; this.transport = transport; - this.conn = null; + this.connection = new Connection(this); } public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { @@ -389,13 +390,17 @@ public class Session { throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method"); } transport.update_transport(transport_node); - conn = transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR); + connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR)); transport = null; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); state = State.ACTIVE; accepted(stream); } void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + connection.on_terminated_by_jingle("remote terminated jingle session"); + state = ENDED; + stream.get_flag(Flag.IDENTITY).remove_session(sid); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); // TODO(hrxi): also handle presence type=unavailable } @@ -417,7 +422,7 @@ public class Session { Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); - conn = transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER); + connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER)); transport = null; state = State.ACTIVE; @@ -429,7 +434,7 @@ public class Session { } StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("decline", NS_URI)); - terminate(stream, reason); + terminate(stream, reason, "declined"); } public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) { @@ -438,23 +443,24 @@ public class Session { if (application_reason != null) { reason.put_node(application_reason); } - terminate(stream, reason); + terminate(stream, reason, "application error"); } - public void close_connection(XmppStream stream) { - if (state != State.ACTIVE) { - return; // TODO(hrxi): what to do? - } - conn.close(); + public void on_connection_error(IOError error) { + // TODO(hrxi): conjure an XmppStream out of nowhere and terminate the session } - public void terminate(XmppStream stream, StanzaNode reason) { + public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) { if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) { // TODO(hrxi): what to do? return; } if (state == State.ACTIVE) { - conn.close(); + if (local_reason != null) { + connection.on_terminated_by_jingle(@"local session-terminate: $(local_reason)"); + } else { + connection.on_terminated_by_jingle("local session-terminate"); + } } StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) @@ -472,6 +478,155 @@ public class Session { } } +public class Connection : IOStream { + public class Input : InputStream { + private weak Connection connection; + public Input(Connection connection) { + this.connection = connection; + } + public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async reads on jingle connections"); + } + public override async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.read_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_read(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_read_async(io_priority, cancellable); + } + } + public class Output : OutputStream { + private weak Connection connection; + public Output(Connection connection) { + this.connection = connection; + } + public override ssize_t write(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async writes on jingle connections"); + } + public override async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.write_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_write(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_write_async(io_priority, cancellable); + } + } + + private Input input; + private Output output; + public override InputStream input_stream { get { return input; } } + public override OutputStream output_stream { get { return output; } } + + private weak Session session; + private IOStream? inner = null; + private string? error = null; + + private class OnSetInnerCallback { + public SourceFunc callback; + public int io_priority; + } + + Gee.List callbacks = new ArrayList(); + + public Connection(Session session) { + this.input = new Input(this); + this.output = new Output(this); + this.session = session; + } + + public void set_inner(IOStream inner) { + assert(this.inner == null); + this.inner = inner; + foreach (OnSetInnerCallback c in callbacks) { + Idle.add((owned) c.callback, c.io_priority); + } + callbacks = null; + } + + public void on_terminated_by_jingle(string reason) { + if (error == null) { + close_async.begin(); + error = reason; + } + } + + private void check_for_errors() throws IOError { + if (error != null) { + throw new IOError.CLOSED(error); + } + } + private async void wait_and_check_for_errors(int io_priority, Cancellable? cancellable = null) throws IOError { + while (true) { + check_for_errors(); + if (inner != null) { + return; + } + SourceFunc callback = wait_and_check_for_errors.callback; + ulong id = cancellable.connect(() => callback()); + callbacks.add(new OnSetInnerCallback() { callback=callback, io_priority=io_priority}); + yield; + cancellable.disconnect(id); + } + } + private void handle_connection_error(IOError error) { + Session? strong = session; + if (strong != null) { + strong.on_connection_error(error); + } + } + + public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.input_stream.read_async(buffer, io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } + public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.output_stream.write_async(buffer, io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } + public bool close_read(Cancellable? cancellable = null) throws IOError { + check_for_errors(); + close_read_async.begin(GLib.Priority.DEFAULT, cancellable); + return true; + } + public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.input_stream.close_async(io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } + public bool close_write(Cancellable? cancellable = null) throws IOError { + check_for_errors(); + close_write_async.begin(GLib.Priority.DEFAULT, cancellable); + return true; + } + public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.output_stream.close_async(io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } +} + public class Flag : XmppStreamFlag { public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "jingle"); diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala index 2e636491..cce7b967 100644 --- a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala +++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala @@ -46,12 +46,7 @@ public class Module : Jingle.ContentType, XmppStreamModule { Jingle.Session session = stream.get_module(Jingle.Module.IDENTITY) .create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"? - SourceFunc callback = offer_file_stream.callback; - session.accepted.connect((stream) => { - session.conn.input_stream.close(); - Idle.add((owned) callback); - }); - yield; + yield session.conn.input_stream.close_async(); // TODO(hrxi): catch errors yield session.conn.output_stream.splice_async(input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET);