From 95596e25a5cdacc7634dd2b9ca0e98599ae7d1b1 Mon Sep 17 00:00:00 2001 From: hrxi Date: Mon, 5 Aug 2019 17:05:33 +0200 Subject: [PATCH 01/18] Add jingle connection for better interfacing with jingle (terminate etc.) --- .../module/xep/0047_in_band_bytestreams.vala | 25 +-- xmpp-vala/src/module/xep/0166_jingle.vala | 185 ++++++++++++++++-- .../module/xep/0234_jingle_file_transfer.vala | 7 +- 3 files changed, 176 insertions(+), 41 deletions(-) diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala index 2650a194..9af9f30e 100644 --- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -60,9 +60,8 @@ public class Module : XmppStreamModule, Iq.Handler { } public class Connection : IOStream { - // TODO(hrxi): Fix reference cycle public class Input : InputStream { - private Connection connection; + private weak Connection connection; public Input(Connection connection) { this.connection = connection; } @@ -73,14 +72,14 @@ public class Connection : IOStream { return yield connection.read_async(buffer, io_priority, cancellable); } public override bool close(Cancellable? cancellable = null) throws IOError { - return connection.close_read(cancellable); + throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams"); } public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { return yield connection.close_read_async(io_priority, cancellable); } } public class Output : OutputStream { - private Connection connection; + private weak Connection connection; public Output(Connection connection) { this.connection = connection; } @@ -91,7 +90,7 @@ public class Connection : IOStream { return yield connection.write_async(buffer, io_priority, cancellable); } public override bool close(Cancellable? cancellable = null) throws IOError { - return connection.close_write(cancellable); + throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams"); } public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { return yield connection.close_write_async(io_priority, cancellable); @@ -263,13 +262,6 @@ public class Connection : IOStream { return buffer.length; } - public bool close_read(Cancellable? cancellable = null) { - input_closed = true; - if (!output_closed) { - return true; - } - return close_impl(cancellable); - } public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { input_closed = true; if (!output_closed) { @@ -277,13 +269,6 @@ public class Connection : IOStream { } return yield close_async_impl(io_priority, cancellable); } - public bool close_write(Cancellable? cancellable = null) { - output_closed = true; - if (!input_closed) { - return true; - } - return close_impl(cancellable); - } public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { output_closed = true; if (!input_closed) { @@ -292,7 +277,7 @@ public class Connection : IOStream { return yield close_async_impl(io_priority, cancellable); } delegate void OnClose(bool success); - private bool close_impl(Cancellable? cancellable = null, OnClose? on_close = null) { + private bool close_impl(Cancellable? cancellable, OnClose on_close) { if (state == State.DISCONNECTING || state == State.DISCONNECTED || state == State.ERROR) { on_close(true); return true; diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index ae872ac6..ee7df994 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -184,7 +184,7 @@ public class Module : XmppStreamModule, Iq.Handler { if (transport == null || transport.transport_type() != type) { StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("unsupported-transports", NS_URI)); - session.terminate(stream, reason); + session.terminate(stream, reason, "unsupported transports"); return; } @@ -310,7 +310,8 @@ public class Session { TransportParameters? transport = null; // ACTIVE - public IOStream? conn { get; private set; } + private Connection? connection; + public IOStream? conn { get { return connection; } } // Only interesting in INITIATE_SENT. // Signals that the session has been accepted by the peer. @@ -323,7 +324,7 @@ public class Session { this.peer_full_jid = peer_full_jid; this.content_name = content_name; this.transport = transport; - this.conn = null; + this.connection = new Connection(this); } public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) { @@ -333,7 +334,7 @@ public class Session { this.peer_full_jid = peer_full_jid; this.content_name = content_name; this.transport = transport; - this.conn = null; + this.connection = new Connection(this); } public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { @@ -389,13 +390,17 @@ public class Session { throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method"); } transport.update_transport(transport_node); - conn = transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR); + connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR)); transport = null; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); state = State.ACTIVE; accepted(stream); } void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + connection.on_terminated_by_jingle("remote terminated jingle session"); + state = ENDED; + stream.get_flag(Flag.IDENTITY).remove_session(sid); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); // TODO(hrxi): also handle presence type=unavailable } @@ -417,7 +422,7 @@ public class Session { Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); - conn = transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER); + connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER)); transport = null; state = State.ACTIVE; @@ -429,7 +434,7 @@ public class Session { } StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("decline", NS_URI)); - terminate(stream, reason); + terminate(stream, reason, "declined"); } public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) { @@ -438,23 +443,24 @@ public class Session { if (application_reason != null) { reason.put_node(application_reason); } - terminate(stream, reason); + terminate(stream, reason, "application error"); } - public void close_connection(XmppStream stream) { - if (state != State.ACTIVE) { - return; // TODO(hrxi): what to do? - } - conn.close(); + public void on_connection_error(IOError error) { + // TODO(hrxi): conjure an XmppStream out of nowhere and terminate the session } - public void terminate(XmppStream stream, StanzaNode reason) { + public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) { if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) { // TODO(hrxi): what to do? return; } if (state == State.ACTIVE) { - conn.close(); + if (local_reason != null) { + connection.on_terminated_by_jingle(@"local session-terminate: $(local_reason)"); + } else { + connection.on_terminated_by_jingle("local session-terminate"); + } } StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) @@ -472,6 +478,155 @@ public class Session { } } +public class Connection : IOStream { + public class Input : InputStream { + private weak Connection connection; + public Input(Connection connection) { + this.connection = connection; + } + public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async reads on jingle connections"); + } + public override async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.read_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_read(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_read_async(io_priority, cancellable); + } + } + public class Output : OutputStream { + private weak Connection connection; + public Output(Connection connection) { + this.connection = connection; + } + public override ssize_t write(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async writes on jingle connections"); + } + public override async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.write_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_write(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_write_async(io_priority, cancellable); + } + } + + private Input input; + private Output output; + public override InputStream input_stream { get { return input; } } + public override OutputStream output_stream { get { return output; } } + + private weak Session session; + private IOStream? inner = null; + private string? error = null; + + private class OnSetInnerCallback { + public SourceFunc callback; + public int io_priority; + } + + Gee.List callbacks = new ArrayList(); + + public Connection(Session session) { + this.input = new Input(this); + this.output = new Output(this); + this.session = session; + } + + public void set_inner(IOStream inner) { + assert(this.inner == null); + this.inner = inner; + foreach (OnSetInnerCallback c in callbacks) { + Idle.add((owned) c.callback, c.io_priority); + } + callbacks = null; + } + + public void on_terminated_by_jingle(string reason) { + if (error == null) { + close_async.begin(); + error = reason; + } + } + + private void check_for_errors() throws IOError { + if (error != null) { + throw new IOError.CLOSED(error); + } + } + private async void wait_and_check_for_errors(int io_priority, Cancellable? cancellable = null) throws IOError { + while (true) { + check_for_errors(); + if (inner != null) { + return; + } + SourceFunc callback = wait_and_check_for_errors.callback; + ulong id = cancellable.connect(() => callback()); + callbacks.add(new OnSetInnerCallback() { callback=callback, io_priority=io_priority}); + yield; + cancellable.disconnect(id); + } + } + private void handle_connection_error(IOError error) { + Session? strong = session; + if (strong != null) { + strong.on_connection_error(error); + } + } + + public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.input_stream.read_async(buffer, io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } + public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.output_stream.write_async(buffer, io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } + public bool close_read(Cancellable? cancellable = null) throws IOError { + check_for_errors(); + close_read_async.begin(GLib.Priority.DEFAULT, cancellable); + return true; + } + public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.input_stream.close_async(io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } + public bool close_write(Cancellable? cancellable = null) throws IOError { + check_for_errors(); + close_write_async.begin(GLib.Priority.DEFAULT, cancellable); + return true; + } + public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.output_stream.close_async(io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } +} + public class Flag : XmppStreamFlag { public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "jingle"); diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala index 2e636491..cce7b967 100644 --- a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala +++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala @@ -46,12 +46,7 @@ public class Module : Jingle.ContentType, XmppStreamModule { Jingle.Session session = stream.get_module(Jingle.Module.IDENTITY) .create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"? - SourceFunc callback = offer_file_stream.callback; - session.accepted.connect((stream) => { - session.conn.input_stream.close(); - Idle.add((owned) callback); - }); - yield; + yield session.conn.input_stream.close_async(); // TODO(hrxi): catch errors yield session.conn.output_stream.splice_async(input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET); From 1be1d471222b6d3763df1ad9b9d3b631ac8185f1 Mon Sep 17 00:00:00 2001 From: hrxi Date: Mon, 22 Jul 2019 10:41:26 +0200 Subject: [PATCH 02/18] Fix a couple of delegate copy warnigs "warning: copying delegates is not supported" --- xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala | 8 ++++---- xmpp-vala/src/module/xep/0166_jingle.vala | 2 +- .../src/module/xep/0313_message_archive_management.vala | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala index 9af9f30e..5d59d18c 100644 --- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -149,25 +149,25 @@ public class Connection : IOStream { output = new Output(this); } - public void set_read_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { + public void set_read_callback(owned SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { if (read_callback != null) { throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream"); } if (cancellable != null) { read_callback_cancellable_id = cancellable.connect(trigger_read_callback); } - read_callback = callback; + read_callback = (owned)callback; read_callback_cancellable = cancellable; read_callback_priority = io_priority; } - public void set_write_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { + public void set_write_callback(owned SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { if (write_callback != null) { throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream"); } if (cancellable != null) { write_callback_cancellable_id = cancellable.connect(trigger_write_callback); } - write_callback = callback; + write_callback = (owned)callback; write_callback_cancellable = cancellable; write_callback_priority = io_priority; } diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index ee7df994..4d9a472a 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -567,7 +567,7 @@ public class Connection : IOStream { } SourceFunc callback = wait_and_check_for_errors.callback; ulong id = cancellable.connect(() => callback()); - callbacks.add(new OnSetInnerCallback() { callback=callback, io_priority=io_priority}); + callbacks.add(new OnSetInnerCallback() { callback=(owned)callback, io_priority=io_priority}); yield; cancellable.disconnect(id); } diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala index 00f8f99b..674224c9 100644 --- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala +++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala @@ -39,7 +39,7 @@ public class Module : XmppStreamModule { } StanzaNode query_node = new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node(data_form.get_submit_node()); Iq.Stanza iq = new Iq.Stanza.set(query_node); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { page_through_results(stream, iq, on_finished); }); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { page_through_results(stream, iq, (owned)on_finished); }); } public override void attach(XmppStream stream) { @@ -69,7 +69,7 @@ public class Module : XmppStreamModule { ) ) ); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, paging_iq, (stream, iq) => { page_through_results(stream, iq, on_finished); }); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, paging_iq, (stream, iq) => { page_through_results(stream, iq, (owned)on_finished); }); } private void query_availability(XmppStream stream) { From 9bbcff4afed1d30b7ea476f38cf2971516f1ccc3 Mon Sep 17 00:00:00 2001 From: hrxi Date: Sun, 21 Jul 2019 02:12:55 +0200 Subject: [PATCH 03/18] Fix human_readable in stanza errors --- xmpp-vala/src/module/stanza_error.vala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/xmpp-vala/src/module/stanza_error.vala b/xmpp-vala/src/module/stanza_error.vala index c108b02a..a5c3956c 100644 --- a/xmpp-vala/src/module/stanza_error.vala +++ b/xmpp-vala/src/module/stanza_error.vala @@ -75,7 +75,7 @@ namespace Xmpp { error_node.put_node(new StanzaNode.build("text", ERROR_NS_URI) .add_self_xmlns() .put_attribute("xml:lang", "en") - .put_node(new StanzaNode.text(text)) + .put_node(new StanzaNode.text(human_readable)) ); } } @@ -94,6 +94,9 @@ namespace Xmpp { public ErrorStanza.not_allowed(string? human_readable = null) { this.build(TYPE_CANCEL, CONDITION_NOT_ALLOWED, human_readable, null); } + public ErrorStanza.resource_constraint(string? human_readable = null) { + this.build(TYPE_WAIT, CONDITION_RESOURCE_CONSTRAINT, human_readable, null); + } public ErrorStanza.service_unavailable() { this.build(TYPE_CANCEL, CONDITION_SERVICE_UNAVAILABLE, null, null); } From 7fe6dda4c9bbc2da189c3818d233e3aa43c363b2 Mon Sep 17 00:00:00 2001 From: hrxi Date: Mon, 22 Jul 2019 21:35:29 +0200 Subject: [PATCH 04/18] Finish file transfer after receiving enough data This means that we no longer rely on the remote end to close the connection after sending the file, but additionally use the `` element from the initial file transfer `` to check whether the file transfer has been completed. This was motivated by Conversations not closing the connection for SOCKS5 file transfers. --- .../module/xep/0234_jingle_file_transfer.vala | 61 ++++++++++++++++--- 1 file changed, 53 insertions(+), 8 deletions(-) diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala index cce7b967..dc9851d4 100644 --- a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala +++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala @@ -64,7 +64,7 @@ public class Parameters : Jingle.ContentParameters, Object { public int64 size { get; private set; } public StanzaNode original_description { get; private set; } - public Parameters(Module parent, StanzaNode original_description, string? media_type, string? name, int64? size) { + public Parameters(Module parent, StanzaNode original_description, string? media_type, string? name, int64 size) { this.parent = parent; this.original_description = original_description; this.media_type = media_type; @@ -86,12 +86,15 @@ public class Parameters : Jingle.ContentParameters, Object { string? size_raw = size_node != null ? size_node.get_string_content() : null; // TODO(hrxi): For some reason, the ?:-expression does not work due to a type error. //int64? size = size_raw != null ? int64.parse(size_raw) : null; // TODO(hrxi): this has no error handling - int64 size = -1; - if (size_raw != null) { - size = int64.parse(size_raw); - if (size < 0) { - throw new Jingle.IqError.BAD_REQUEST("negative file size is invalid"); - } + if (size_raw == null) { + // Jingle file transfers (XEP-0234) theoretically SHOULD send a + // file size, however, we do require it in order to reliably find + // the end of the file transfer. + throw new Jingle.IqError.BAD_REQUEST("file offer without file size"); + } + int64 size = int64.parse(size_raw); + if (size < 0) { + throw new Jingle.IqError.BAD_REQUEST("negative file size is invalid"); } return new Parameters(parent, description, media_type, name, size); @@ -102,6 +105,47 @@ public class Parameters : Jingle.ContentParameters, Object { } } +// Does nothing except wrapping an input stream to signal EOF after reading +// `max_size` bytes. +private class FileTransferInputStream : InputStream { + InputStream inner; + int64 remaining_size; + public FileTransferInputStream(InputStream inner, int64 max_size) { + this.inner = inner; + this.remaining_size = max_size; + } + private ssize_t update_remaining(ssize_t read) { + this.remaining_size -= read; + return read; + } + public override ssize_t read(uint8[] buffer_, Cancellable? cancellable = null) throws IOError { + unowned uint8[] buffer = buffer_; + if (remaining_size <= 0) { + return 0; + } + if (buffer.length > remaining_size) { + buffer = buffer[0:remaining_size]; + } + return update_remaining(inner.read(buffer, cancellable)); + } + public override async ssize_t read_async(uint8[]? buffer_, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + unowned uint8[] buffer = buffer_; + if (remaining_size <= 0) { + return 0; + } + if (buffer.length > remaining_size) { + buffer = buffer[0:remaining_size]; + } + return update_remaining(yield inner.read_async(buffer, io_priority, cancellable)); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return inner.close(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield inner.close_async(io_priority, cancellable); + } +} + public class FileTransfer : Object { Jingle.Session session; Parameters parameters; @@ -110,11 +154,12 @@ public class FileTransfer : Object { public string? file_name { get { return parameters.name; } } public int64 size { get { return parameters.size; } } - public InputStream? stream { get { return session.conn != null ? session.conn.input_stream : null; } } + public InputStream? stream { get; private set; } public FileTransfer(Jingle.Session session, Parameters parameters) { this.session = session; this.parameters = parameters; + this.stream = new FileTransferInputStream(session.conn.input_stream, parameters.size); } public void accept(XmppStream stream) { From 308d71b70325917e60dc0750f44bee9f2d58a1a4 Mon Sep 17 00:00:00 2001 From: hrxi Date: Mon, 22 Jul 2019 21:37:47 +0200 Subject: [PATCH 05/18] Close files involved in file transfers explicitly --- libdino/src/service/file_manager.vala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/libdino/src/service/file_manager.vala b/libdino/src/service/file_manager.vala index 50b38f01..5b89d1c2 100644 --- a/libdino/src/service/file_manager.vala +++ b/libdino/src/service/file_manager.vala @@ -269,8 +269,7 @@ public class FileManager : StreamInteractionModule, Object { } OutputStream os = file.create(FileCreateFlags.REPLACE_DESTINATION); - yield os.splice_async(input_stream, 0); - os.close(); + yield os.splice_async(input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET); file_transfer.size = (int)file_meta.size; file_transfer.file_name = file_meta.file_name; file_transfer.path = file.get_basename(); @@ -327,8 +326,7 @@ public class FileManager : StreamInteractionModule, Object { string filename = Random.next_int().to_string("%x") + "_" + file_transfer.file_name; File file = File.new_for_path(Path.build_filename(get_storage_dir(), filename)); OutputStream os = file.create(FileCreateFlags.REPLACE_DESTINATION); - yield os.splice_async(file_transfer.input_stream, 0); - os.close(); + yield os.splice_async(file_transfer.input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET); file_transfer.state = FileTransfer.State.COMPLETE; file_transfer.path = filename; file_transfer.input_stream = yield file.read_async(); From 77ff73a1ca5d2283dc5335c1048ccb3fce66e508 Mon Sep 17 00:00:00 2001 From: hrxi Date: Mon, 22 Jul 2019 21:40:30 +0200 Subject: [PATCH 06/18] Terminate the Jingle session after the file transfer is complete --- xmpp-vala/src/module/xep/0166_jingle.vala | 114 ++++++++++++++++++---- 1 file changed, 97 insertions(+), 17 deletions(-) diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index 4d9a472a..ac1dd10b 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -122,7 +122,7 @@ public class Module : XmppStreamModule, Iq.Handler { throw new Error.GENERAL("Couldn't determine own JID"); } TransportParameters transport_params = transport.create_transport_parameters(); - Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name); + Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name, stream); StanzaNode content = new StanzaNode.build("content", NS_URI) .put_attribute("creator", "initiator") .put_attribute("name", content_name) @@ -177,7 +177,7 @@ public class Module : XmppStreamModule, Iq.Handler { ContentParameters content_params = content_type.parse_content_parameters(description); TransportType type = content_type.content_type_transport_type(); - Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name); + Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name, stream); stream.get_flag(Flag.IDENTITY).add_session(session); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); @@ -317,7 +317,9 @@ public class Session { // Signals that the session has been accepted by the peer. public signal void accepted(XmppStream stream); - public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name) { + XmppStream hack; + + public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name, XmppStream hack) { this.state = State.INITIATE_SENT; this.sid = sid; this.type_ = type; @@ -325,9 +327,10 @@ public class Session { this.content_name = content_name; this.transport = transport; this.connection = new Connection(this); + this.hack = hack; } - public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) { + public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name, XmppStream hack) { this.state = State.INITIATE_RECEIVED; this.sid = sid; this.type_ = type; @@ -335,6 +338,7 @@ public class Session { this.content_name = content_name; this.transport = transport; this.connection = new Connection(this); + this.hack = hack; } public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { @@ -447,12 +451,22 @@ public class Session { } public void on_connection_error(IOError error) { - // TODO(hrxi): conjure an XmppStream out of nowhere and terminate the session + // TODO(hrxi): where can we get an XmppStream from? + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("failed-transport", NS_URI)) + .put_node(new StanzaNode.build("text", NS_URI) + .put_node(new StanzaNode.text(error.message)) + ); + terminate(hack, reason, "transport error: $(error.message)"); + } + public void on_connection_close() { + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("success", NS_URI)); + terminate(hack, reason, "success"); } public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) { - if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) { - // TODO(hrxi): what to do? + if (state == State.ENDED) { return; } if (state == State.ACTIVE) { @@ -525,6 +539,9 @@ public class Connection : IOStream { private IOStream? inner = null; private string? error = null; + private bool read_closed = false; + private bool write_closed = false; + private class OnSetInnerCallback { public SourceFunc callback; public int io_priority; @@ -578,12 +595,19 @@ public class Connection : IOStream { strong.on_connection_error(error); } } + private void handle_connection_close() { + Session? strong = session; + if (strong != null) { + strong.on_connection_close(); + } + } public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { yield wait_and_check_for_errors(io_priority, cancellable); try { return yield inner.input_stream.read_async(buffer, io_priority, cancellable); } catch (IOError e) { + print("read_async error\n"); handle_connection_error(e); throw e; } @@ -593,37 +617,93 @@ public class Connection : IOStream { try { return yield inner.output_stream.write_async(buffer, io_priority, cancellable); } catch (IOError e) { + print("write_async error\n"); handle_connection_error(e); throw e; } } public bool close_read(Cancellable? cancellable = null) throws IOError { check_for_errors(); + if (read_closed) { + return true; + } close_read_async.begin(GLib.Priority.DEFAULT, cancellable); return true; } public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { yield wait_and_check_for_errors(io_priority, cancellable); - try { - return yield inner.input_stream.close_async(io_priority, cancellable); - } catch (IOError e) { - handle_connection_error(e); - throw e; + if (read_closed) { + return true; } + read_closed = true; + IOError error = null; + bool result = true; + try { + result = yield inner.input_stream.close_async(io_priority, cancellable); + } catch (IOError e) { + print("input_stream.close_async error\n"); + if (error == null) { + error = e; + } + } + try { + result = (yield close_if_both_closed(io_priority, cancellable)) && result; + } catch (IOError e) { + print("close_if_both_closed error\n"); + if (error == null) { + error = e; + } + } + if (error != null) { + handle_connection_error(error); + throw error; + } + return result; } public bool close_write(Cancellable? cancellable = null) throws IOError { check_for_errors(); + if (write_closed) { + return true; + } close_write_async.begin(GLib.Priority.DEFAULT, cancellable); return true; } public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { yield wait_and_check_for_errors(io_priority, cancellable); - try { - return yield inner.output_stream.close_async(io_priority, cancellable); - } catch (IOError e) { - handle_connection_error(e); - throw e; + if (write_closed) { + return true; } + write_closed = true; + IOError error = null; + bool result = true; + try { + result = yield inner.output_stream.close_async(io_priority, cancellable); + } catch (IOError e) { + print("output_stream.close_async error\n"); + if (error == null) { + error = e; + } + } + try { + result = (yield close_if_both_closed(io_priority, cancellable)) && result; + } catch (IOError e) { + print("close_if_both_closed error\n"); + if (error == null) { + error = e; + } + } + if (error != null) { + handle_connection_error(error); + throw error; + } + return result; + } + private async bool close_if_both_closed(int io_priority, Cancellable? cancellable = null) throws IOError { + if (read_closed && write_closed) { + handle_connection_close(); + //return yield inner.close_async(io_priority, cancellable); + } + return true; } } From 811408fcb5da3501b66d9a4d8a1da5a91da3a3da Mon Sep 17 00:00:00 2001 From: hrxi Date: Sun, 4 Aug 2019 11:48:14 +0200 Subject: [PATCH 07/18] Fix a warning --- libdino/src/service/jingle_file_transfers.vala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libdino/src/service/jingle_file_transfers.vala b/libdino/src/service/jingle_file_transfers.vala index 95c3266f..94fbce09 100644 --- a/libdino/src/service/jingle_file_transfers.vala +++ b/libdino/src/service/jingle_file_transfers.vala @@ -39,7 +39,11 @@ public class JingleFileProvider : FileProvider, Object { if (jingle_file_transfer == null) { throw new FileReceiveError.DOWNLOAD_FAILED("Transfer data not available anymore"); } - jingle_file_transfer.accept(stream); + try { + jingle_file_transfer.accept(stream); + } catch (IOError e) { + throw new FileReceiveError.DOWNLOAD_FAILED("Establishing connection did not work"); + } return jingle_file_transfer.stream; } From 642dac9aa0b90dd2f17df5dddd0e7914a7d306d3 Mon Sep 17 00:00:00 2001 From: hrxi Date: Sat, 20 Jul 2019 23:14:40 +0200 Subject: [PATCH 08/18] Add support for Jingle SOCKS5 bytestreams (XEP-0260) --- libdino/src/service/module_manager.vala | 2 + xmpp-vala/CMakeLists.txt | 2 + .../module/xep/0065_socks5_bytestreams.vala | 83 +++ xmpp-vala/src/module/xep/0166_jingle.vala | 246 ++++++--- .../module/xep/0234_jingle_file_transfer.vala | 2 +- .../xep/0260_jingle_socks5_bytestreams.vala | 505 ++++++++++++++++++ .../xep/0261_jingle_in_band_bytestreams.vala | 35 +- 7 files changed, 790 insertions(+), 85 deletions(-) create mode 100644 xmpp-vala/src/module/xep/0065_socks5_bytestreams.vala create mode 100644 xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala diff --git a/libdino/src/service/module_manager.vala b/libdino/src/service/module_manager.vala index 16bf5a60..6a07a146 100644 --- a/libdino/src/service/module_manager.vala +++ b/libdino/src/service/module_manager.vala @@ -78,8 +78,10 @@ public class ModuleManager { module_map[account].add(new StreamError.Module()); module_map[account].add(new Xep.InBandRegistration.Module()); module_map[account].add(new Xep.HttpFileUpload.Module()); + module_map[account].add(new Xep.Socks5Bytestreams.Module()); module_map[account].add(new Xep.InBandBytestreams.Module()); module_map[account].add(new Xep.Jingle.Module()); + module_map[account].add(new Xep.JingleSocks5Bytestreams.Module()); module_map[account].add(new Xep.JingleInBandBytestreams.Module()); module_map[account].add(new Xep.JingleFileTransfer.Module()); initialize_account_modules(account, module_map[account]); diff --git a/xmpp-vala/CMakeLists.txt b/xmpp-vala/CMakeLists.txt index 528c84a6..a0c15579 100644 --- a/xmpp-vala/CMakeLists.txt +++ b/xmpp-vala/CMakeLists.txt @@ -55,6 +55,7 @@ SOURCES "src/module/xep/0049_private_xml_storage.vala" "src/module/xep/0054_vcard/module.vala" "src/module/xep/0060_pubsub.vala" + "src/module/xep/0065_socks5_bytestreams.vala" "src/module/xep/0066_out_of_band_data.vala" "src/module/xep/0077_in_band_registration.vala" "src/module/xep/0082_date_time_profiles.vala" @@ -68,6 +69,7 @@ SOURCES "src/module/xep/0199_ping.vala" "src/module/xep/0203_delayed_delivery.vala" "src/module/xep/0234_jingle_file_transfer.vala" + "src/module/xep/0260_jingle_socks5_bytestreams.vala" "src/module/xep/0261_jingle_in_band_bytestreams.vala" "src/module/xep/0280_message_carbons.vala" "src/module/xep/0313_message_archive_management.vala" diff --git a/xmpp-vala/src/module/xep/0065_socks5_bytestreams.vala b/xmpp-vala/src/module/xep/0065_socks5_bytestreams.vala new file mode 100644 index 00000000..1890aac3 --- /dev/null +++ b/xmpp-vala/src/module/xep/0065_socks5_bytestreams.vala @@ -0,0 +1,83 @@ +using Gee; +using Xmpp; +using Xmpp.Xep; + +namespace Xmpp.Xep.Socks5Bytestreams { + +internal const string NS_URI = "http://jabber.org/protocol/bytestreams"; + +public class Proxy : Object { + public string host { get; private set; } + public Jid jid { get; private set; } + public int port { get; private set; } + + public Proxy(string host, Jid jid, int port) { + this.host = host; + this.jid = jid; + this.port = port; + } +} + +public class Module : XmppStreamModule, Iq.Handler { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0065_socks5_bytestreams"); + + public override void attach(XmppStream stream) { + stream.add_flag(new Flag()); + query_availability(stream); + } + public override void detach(XmppStream stream) { } + + public void on_iq_set(XmppStream stream, Iq.Stanza iq) { } + + public Gee.List get_proxies(XmppStream stream) { + return stream.get_flag(Flag.IDENTITY).proxies; + } + + private void query_availability(XmppStream stream) { + stream.get_module(ServiceDiscovery.Module.IDENTITY).request_items(stream, stream.remote_name, (stream, items_result) => { + foreach (Xep.ServiceDiscovery.Item item in items_result.items) { + stream.get_module(ServiceDiscovery.Module.IDENTITY).request_info(stream, item.jid, (stream, info_result) => { + foreach (string feature in info_result.features) { + if (feature == NS_URI) { + StanzaNode query_ = new StanzaNode.build("query", NS_URI).add_self_xmlns(); + Iq.Stanza iq = new Iq.Stanza.get(query_) { to=item.jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + if (iq.is_error()) { + return; + } + StanzaNode? query = iq.stanza.get_subnode("query", NS_URI); + StanzaNode? stream_host = query != null ? query.get_subnode("streamhost", NS_URI) : null; + if (query == null || stream_host == null) { + return; + } + string? host = stream_host.get_attribute("host"); + string? jid_str = stream_host.get_attribute("jid"); + Jid? jid = jid_str != null ? Jid.parse(jid_str) : null; + int port = stream_host.get_attribute_int("port"); + if (host == null || jid == null || port <= 0 || port > 65535) { + return; + } + stream.get_flag(Flag.IDENTITY).proxies.add(new Proxy(host, jid, port)); + }); + } + } + }); + } + }); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +public class Flag : XmppStreamFlag { + public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "socks5_bytestreams"); + + public Gee.List proxies = new ArrayList(); + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + + +} diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index ac1dd10b..a251293d 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -11,7 +11,9 @@ public errordomain IqError { BAD_REQUEST, NOT_ACCEPTABLE, NOT_IMPLEMENTED, + UNSUPPORTED_INFO, OUT_OF_ORDER, + RESOURCE_CONSTRAINT, } void send_iq_error(IqError iq_error, XmppStream stream, Iq.Stanza iq) { @@ -22,9 +24,14 @@ void send_iq_error(IqError iq_error, XmppStream stream, Iq.Stanza iq) { error = new ErrorStanza.not_acceptable(iq_error.message); } else if (iq_error is IqError.NOT_IMPLEMENTED) { error = new ErrorStanza.feature_not_implemented(iq_error.message); + } else if (iq_error is IqError.UNSUPPORTED_INFO) { + StanzaNode unsupported_info = new StanzaNode.build("unsupported-info", ERROR_NS_URI).add_self_xmlns(); + error = new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_FEATURE_NOT_IMPLEMENTED, iq_error.message, unsupported_info); } else if (iq_error is IqError.OUT_OF_ORDER) { StanzaNode out_of_order = new StanzaNode.build("out-of-order", ERROR_NS_URI).add_self_xmlns(); error = new ErrorStanza.build(ErrorStanza.TYPE_MODIFY, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, iq_error.message, out_of_order); + } else if (iq_error is IqError.RESOURCE_CONSTRAINT) { + error = new ErrorStanza.resource_constraint(iq_error.message); } else { assert_not_reached(); } @@ -40,7 +47,7 @@ public errordomain Error { TRANSPORT_ERROR, } -StanzaNode get_single_node_anyns(StanzaNode parent, string node_name) throws IqError { +StanzaNode? get_single_node_anyns(StanzaNode parent, string node_name) throws IqError { StanzaNode? result = null; foreach (StanzaNode child in parent.get_all_subnodes()) { if (child.name == node_name) { @@ -50,12 +57,51 @@ StanzaNode get_single_node_anyns(StanzaNode parent, string node_name) throws IqE result = child; } } - if (result == null) { - throw new IqError.BAD_REQUEST(@"missing $(node_name) node"); - } return result; } +class ContentNode { + public Role creator; + public string name; + public StanzaNode? description; + public StanzaNode? transport; +} + +ContentNode get_single_content_node(StanzaNode jingle) throws IqError { + Gee.List contents = jingle.get_subnodes("content"); + if (contents.size == 0) { + throw new IqError.BAD_REQUEST("missing content node"); + } + if (contents.size > 1) { + throw new IqError.NOT_IMPLEMENTED("can't process multiple content nodes"); + } + StanzaNode content = contents[0]; + string? creator_str = content.get_attribute("creator"); + // Vala can't typecheck the ternary operator here. + Role? creator = null; + if (creator_str != null) { + creator = Role.parse(creator_str); + } else { + // TODO(hrxi): now, is the creator attribute optional or not (XEP-0166 + // Jingle)? + creator = Role.INITIATOR; + } + + string? name = content.get_attribute("name"); + StanzaNode? description = get_single_node_anyns(content, "description"); + StanzaNode? transport = get_single_node_anyns(content, "transport"); + if (name == null || creator == null) { + throw new IqError.BAD_REQUEST("missing name or creator"); + } + + return new ContentNode() { + creator=creator, + name=name, + description=description, + transport=transport + }; +} + public class Module : XmppStreamModule, Iq.Handler { public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0166_jingle"); @@ -88,16 +134,21 @@ public class Module : XmppStreamModule, Iq.Handler { return transports[ns_uri]; } public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid) { + Transport? result = null; foreach (Transport transport in transports.values) { if (transport.transport_type() != type) { continue; } - // TODO(hrxi): prioritization if (transport.is_transport_available(stream, receiver_full_jid)) { - return transport; + if (result != null) { + if (result.transport_priority() >= transport.transport_priority()) { + continue; + } + } + result = transport; } } - return null; + return result; } private bool is_jingle_available(XmppStream stream, Jid full_jid) { @@ -121,8 +172,8 @@ public class Module : XmppStreamModule, Iq.Handler { if (my_jid == null) { throw new Error.GENERAL("Couldn't determine own JID"); } - TransportParameters transport_params = transport.create_transport_parameters(); - Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name, stream); + TransportParameters transport_params = transport.create_transport_parameters(stream, my_jid, receiver_full_jid); + Session session = new Session.initiate_sent(random_uuid(), type, transport_params, my_jid, receiver_full_jid, content_name, stream); StanzaNode content = new StanzaNode.build("content", NS_URI) .put_attribute("creator", "initiator") .put_attribute("name", content_name) @@ -146,38 +197,31 @@ public class Module : XmppStreamModule, Iq.Handler { } public void handle_session_initiate(XmppStream stream, string sid, StanzaNode jingle, Iq.Stanza iq) throws IqError { - Gee.List contents = jingle.get_subnodes("content"); - if (contents.size == 0) { - throw new IqError.BAD_REQUEST("missing content node"); + ContentNode content = get_single_content_node(jingle); + if (content.description == null || content.transport == null) { + throw new IqError.BAD_REQUEST("missing description or transport node"); } - if (contents.size > 1) { - throw new IqError.NOT_IMPLEMENTED("can't process multiple content nodes"); + Jid? my_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid; + if (my_jid == null) { + throw new IqError.RESOURCE_CONSTRAINT("Couldn't determine own JID"); } - StanzaNode content = contents[0]; - string? name = content.get_attribute("name"); - StanzaNode description = get_single_node_anyns(content, "description"); - StanzaNode transport_node = get_single_node_anyns(content, "transport"); - if (name == null) { - throw new IqError.BAD_REQUEST("missing name"); - } - - Transport? transport = get_transport(transport_node.ns_uri); + Transport? transport = get_transport(content.transport.ns_uri); TransportParameters? transport_params = null; if (transport != null) { - transport_params = transport.parse_transport_parameters(transport_node); + transport_params = transport.parse_transport_parameters(stream, my_jid, iq.from, content.transport); } else { // terminate the session below } - ContentType? content_type = get_content_type(description.ns_uri); + ContentType? content_type = get_content_type(content.description.ns_uri); if (content_type == null) { // TODO(hrxi): how do we signal an unknown content type? throw new IqError.NOT_IMPLEMENTED("unknown content type"); } - ContentParameters content_params = content_type.parse_content_parameters(description); + ContentParameters content_params = content_type.parse_content_parameters(content.description); TransportType type = content_type.content_type_transport_type(); - Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name, stream); + Session session = new Session.initiate_received(sid, type, transport_params, my_jid, iq.from, content.name, stream); stream.get_flag(Flag.IDENTITY).add_session(session); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); @@ -254,15 +298,20 @@ public interface Transport : Object { public abstract string transport_ns_uri(); public abstract bool is_transport_available(XmppStream stream, Jid full_jid); public abstract TransportType transport_type(); - public abstract TransportParameters create_transport_parameters(); - public abstract TransportParameters parse_transport_parameters(StanzaNode transport) throws IqError; + public abstract int transport_priority(); + public abstract TransportParameters create_transport_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid); + public abstract TransportParameters parse_transport_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid, StanzaNode transport) throws IqError; } + +// Gets a null `stream` if connection setup was unsuccessful and another +// transport method should be tried. public interface TransportParameters : Object { public abstract string transport_ns_uri(); public abstract StanzaNode to_transport_stanza_node(); - public abstract void update_transport(StanzaNode transport) throws IqError; - public abstract IOStream create_transport_connection(XmppStream stream, Jid peer_full_jid, Role role); + public abstract void on_transport_accept(StanzaNode transport) throws IqError; + public abstract void on_transport_info(StanzaNode transport) throws IqError; + public abstract void create_transport_connection(XmppStream stream, Session session); } public enum Role { @@ -276,6 +325,14 @@ public enum Role { } assert_not_reached(); } + + public static Role parse(string role) throws IqError { + switch (role) { + case "initiator": return INITIATOR; + case "responder": return RESPONDER; + } + throw new IqError.BAD_REQUEST(@"invalid role $(role)"); + } } public interface ContentType : Object { @@ -290,11 +347,12 @@ public interface ContentParameters : Object { public class Session { - // INITIATE_SENT -> ACTIVE -> ENDED - // INITIATE_RECEIVED -> ACTIVE -> ENDED + // INITIATE_SENT -> CONNECTING -> ACTIVE -> ENDED + // INITIATE_RECEIVED -> CONNECTING -> ACTIVE -> ENDED public enum State { INITIATE_SENT, INITIATE_RECEIVED, + CONNECTING, ACTIVE, ENDED, } @@ -303,38 +361,39 @@ public class Session { public string sid { get; private set; } public Type type_ { get; private set; } + public Jid local_full_jid { get; private set; } public Jid peer_full_jid { get; private set; } + public Role content_creator { get; private set; } public string content_name { get; private set; } - // INITIATE_SENT | INITIATE_RECEIVED + private Connection connection; + public IOStream conn { get { return connection; } } + + // INITIATE_SENT | INITIATE_RECEIVED | CONNECTING TransportParameters? transport = null; - // ACTIVE - private Connection? connection; - public IOStream? conn { get { return connection; } } - - // Only interesting in INITIATE_SENT. - // Signals that the session has been accepted by the peer. - public signal void accepted(XmppStream stream); - XmppStream hack; - public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name, XmppStream hack) { + public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid local_full_jid, Jid peer_full_jid, string content_name, XmppStream hack) { this.state = State.INITIATE_SENT; this.sid = sid; this.type_ = type; + this.local_full_jid = local_full_jid; this.peer_full_jid = peer_full_jid; + this.content_creator = Role.INITIATOR; this.content_name = content_name; this.transport = transport; this.connection = new Connection(this); this.hack = hack; } - public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name, XmppStream hack) { + public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid local_full_jid, Jid peer_full_jid, string content_name, XmppStream hack) { this.state = State.INITIATE_RECEIVED; this.sid = sid; this.type_ = type; + this.local_full_jid = local_full_jid; this.peer_full_jid = peer_full_jid; + this.content_creator = Role.INITIATOR; this.content_name = content_name; this.transport = transport; this.connection = new Connection(this); @@ -352,6 +411,9 @@ public class Session { case "session-terminate": handle_session_terminate(stream, jingle, iq); break; + case "transport-info": + handle_transport_info(stream, jingle, iq); + return; case "content-accept": case "content-add": case "content-modify": @@ -359,7 +421,6 @@ public class Session { case "content-remove": case "security-info": case "transport-accept": - case "transport-info": case "transport-reject": case "transport-replace": throw new IqError.NOT_IMPLEMENTED(@"$(action) is not implemented"); @@ -379,36 +440,83 @@ public class Session { if (!responder.is_full()) { throw new IqError.BAD_REQUEST("invalid responder JID"); } - Gee.List contents = jingle.get_subnodes("content"); - if (contents.size == 0) { - // TODO(hrxi): here and below, should we terminate the session? - throw new IqError.BAD_REQUEST("missing content node"); + ContentNode content = get_single_content_node(jingle); + verify_content(content); + if (content.description == null || content.transport == null) { + throw new IqError.BAD_REQUEST("missing description or transport node"); } - if (contents.size > 1) { - throw new IqError.NOT_IMPLEMENTED("can't process multiple content nodes"); - } - StanzaNode content = contents[0]; - StanzaNode description = get_single_node_anyns(content, "description"); - StanzaNode transport_node = get_single_node_anyns(content, "transport"); - if (transport_node.ns_uri != transport.transport_ns_uri()) { + if (content.transport.ns_uri != transport.transport_ns_uri()) { throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method"); } - transport.update_transport(transport_node); - connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR)); - transport = null; + transport.on_transport_accept(content.transport); + StanzaNode description = content.description; // TODO(hrxi): handle this :P + state = State.CONNECTING; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); - state = State.ACTIVE; - accepted(stream); + transport.create_transport_connection(stream, this); + } + void connection_created(XmppStream stream, IOStream? conn) { + if (state != State.CONNECTING) { + return; + } + if (conn != null) { + state = State.ACTIVE; + transport = null; + connection.set_inner(conn); + } else { + // TODO(hrxi): try negotiating other transports… + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("failed-transport", NS_URI)); + terminate(stream, reason, "failed transport"); + } } void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { connection.on_terminated_by_jingle("remote terminated jingle session"); - state = ENDED; + state = State.ENDED; stream.get_flag(Flag.IDENTITY).remove_session(sid); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); // TODO(hrxi): also handle presence type=unavailable } - + void handle_transport_info(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + if (state != State.INITIATE_RECEIVED && state != State.INITIATE_SENT && state != State.CONNECTING) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + throw new IqError.UNSUPPORTED_INFO("transport-info unsupported after connection setup"); + } + ContentNode content = get_single_content_node(jingle); + verify_content(content); + if (content.description != null || content.transport == null) { + throw new IqError.BAD_REQUEST("unexpected description node or missing transport node"); + } + transport.on_transport_info(content.transport); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + } + void verify_content(ContentNode content) throws IqError { + if (content.name != content_name || content.creator != content_creator) { + throw new IqError.BAD_REQUEST("unknown content"); + } + } + public void set_transport_connection(XmppStream stream, IOStream? conn) { + if (state != State.CONNECTING) { + return; + } + connection_created(stream, conn); + } + public void send_transport_info(XmppStream stream, StanzaNode transport) { + if (state != State.CONNECTING) { + return; + } + StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "transport-info") + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("content", NS_URI) + .put_attribute("creator", "initiator") + .put_attribute("name", content_name) + .put_node(transport) + ); + Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); + } public void accept(XmppStream stream, StanzaNode description) { if (state != State.INITIATE_RECEIVED) { return; // TODO(hrxi): what to do? @@ -426,10 +534,8 @@ public class Session { Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); - connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER)); - transport = null; - - state = State.ACTIVE; + state = State.CONNECTING; + transport.create_transport_connection(stream, this); } public void reject(XmppStream stream) { @@ -607,7 +713,6 @@ public class Connection : IOStream { try { return yield inner.input_stream.read_async(buffer, io_priority, cancellable); } catch (IOError e) { - print("read_async error\n"); handle_connection_error(e); throw e; } @@ -617,7 +722,6 @@ public class Connection : IOStream { try { return yield inner.output_stream.write_async(buffer, io_priority, cancellable); } catch (IOError e) { - print("write_async error\n"); handle_connection_error(e); throw e; } @@ -641,7 +745,6 @@ public class Connection : IOStream { try { result = yield inner.input_stream.close_async(io_priority, cancellable); } catch (IOError e) { - print("input_stream.close_async error\n"); if (error == null) { error = e; } @@ -649,7 +752,6 @@ public class Connection : IOStream { try { result = (yield close_if_both_closed(io_priority, cancellable)) && result; } catch (IOError e) { - print("close_if_both_closed error\n"); if (error == null) { error = e; } @@ -679,7 +781,6 @@ public class Connection : IOStream { try { result = yield inner.output_stream.close_async(io_priority, cancellable); } catch (IOError e) { - print("output_stream.close_async error\n"); if (error == null) { error = e; } @@ -687,7 +788,6 @@ public class Connection : IOStream { try { result = (yield close_if_both_closed(io_priority, cancellable)) && result; } catch (IOError e) { - print("close_if_both_closed error\n"); if (error == null) { error = e; } diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala index dc9851d4..867a26e3 100644 --- a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala +++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala @@ -162,7 +162,7 @@ public class FileTransfer : Object { this.stream = new FileTransferInputStream(session.conn.input_stream, parameters.size); } - public void accept(XmppStream stream) { + public void accept(XmppStream stream) throws IOError { session.accept(stream, parameters.original_description); session.conn.output_stream.close(); } diff --git a/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala new file mode 100644 index 00000000..abe5e0a9 --- /dev/null +++ b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala @@ -0,0 +1,505 @@ +using Gee; +using Xmpp; +using Xmpp.Xep; + +namespace Xmpp.Xep.JingleSocks5Bytestreams { + +private const string NS_URI = "urn:xmpp:jingle:transports:s5b:1"; + +public class Module : Jingle.Transport, XmppStreamModule { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0260_jingle_socks5_bytestreams"); + + public override void attach(XmppStream stream) { + stream.get_module(Jingle.Module.IDENTITY).register_transport(this); + stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); + } + public override void detach(XmppStream stream) { } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } + + public bool is_transport_available(XmppStream stream, Jid full_jid) { + bool? result = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI); + return result != null && result; + } + + public string transport_ns_uri() { + return NS_URI; + } + public Jingle.TransportType transport_type() { + return Jingle.TransportType.STREAMING; + } + public int transport_priority() { + return 1; + } + private Gee.List get_local_candidates(XmppStream stream) { + Gee.List result = new ArrayList(); + int i = 1 << 15; + foreach (Socks5Bytestreams.Proxy proxy in stream.get_module(Socks5Bytestreams.Module.IDENTITY).get_proxies(stream)) { + result.add(new Candidate.proxy(random_uuid(), proxy, i)); + i -= 1; + } + return result; + } + public Jingle.TransportParameters create_transport_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid) { + Parameters result = new Parameters.create(local_full_jid, peer_full_jid, random_uuid()); + result.local_candidates.add_all(get_local_candidates(stream)); + return result; + } + public Jingle.TransportParameters parse_transport_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid, StanzaNode transport) throws Jingle.IqError { + Parameters result = Parameters.parse(local_full_jid, peer_full_jid, transport); + result.local_candidates.add_all(get_local_candidates(stream)); + return result; + } +} + +public enum CandidateType { + ASSISTED, + DIRECT, + PROXY, + TUNNEL; + + public static CandidateType parse(string type) throws Jingle.IqError { + switch (type) { + case "assisted": return CandidateType.ASSISTED; + case "direct": return CandidateType.DIRECT; + case "proxy": return CandidateType.PROXY; + case "tunnel": return CandidateType.TUNNEL; + } + throw new Jingle.IqError.BAD_REQUEST(@"unknown candidate type $(type)"); + } + + public string to_string() { + switch (this) { + case ASSISTED: return "assisted"; + case DIRECT: return "direct"; + case PROXY: return "proxy"; + case TUNNEL: return "tunnel"; + } + assert_not_reached(); + } + + private int type_preference_impl() { + switch (this) { + case ASSISTED: return 120; + case DIRECT: return 126; + case PROXY: return 10; + case TUNNEL: return 110; + } + assert_not_reached(); + } + public int type_preference() { + return type_preference_impl() << 16; + } +} + +public class Candidate : Socks5Bytestreams.Proxy { + public string cid { get; private set; } + public int priority { get; private set; } + public CandidateType type_ { get; private set; } + + private Candidate(string cid, string host, Jid jid, int port, int priority, CandidateType type) { + base(host, jid, port); + this.cid = cid; + this.priority = priority; + this.type_ = type; + } + + public Candidate.build(string cid, string host, Jid jid, int port, int local_priority, CandidateType type) { + this(cid, host, jid, port, type.type_preference() + local_priority, type); + } + public Candidate.proxy(string cid, Socks5Bytestreams.Proxy proxy, int local_priority) { + this.build(cid, proxy.host, proxy.jid, proxy.port, local_priority, CandidateType.PROXY); + } + + public static Candidate parse(StanzaNode candidate) throws Jingle.IqError { + string? cid = candidate.get_attribute("cid"); + string? host = candidate.get_attribute("host"); + string? jid_str = candidate.get_attribute("jid"); + Jid? jid = jid_str != null ? Jid.parse(jid_str) : null; + int port = candidate.get_attribute("port") != null ? candidate.get_attribute_int("port") : 1080; + int priority = candidate.get_attribute_int("priority"); + string? type_str = candidate.get_attribute("type"); + CandidateType type = type_str != null ? CandidateType.parse(type_str) : CandidateType.DIRECT; + + if (cid == null || host == null || jid == null || port <= 0 || priority <= 0) { + throw new Jingle.IqError.BAD_REQUEST("missing or invalid cid, host, jid or port"); + } + + return new Candidate(cid, host, jid, port, priority, type); + } + public StanzaNode to_xml() { + return new StanzaNode.build("candidate", NS_URI) + .put_attribute("cid", cid) + .put_attribute("host", host) + .put_attribute("jid", jid.to_string()) + .put_attribute("port", port.to_string()) + .put_attribute("priority", priority.to_string()) + .put_attribute("type", type_.to_string()); + } +} + +bool bytes_equal(uint8[] a, uint8[] b) { + if (a.length != b.length) { + return false; + } + for (int i = 0; i < a.length; i++) { + if (a[i] != b[i]) { + return false; + } + } + return true; +} + +class Parameters : Jingle.TransportParameters, Object { + public Jingle.Role role { get; private set; } + public string sid { get; private set; } + public string remote_dstaddr { get; private set; } + public string local_dstaddr { get; private set; } + public Gee.List local_candidates = new ArrayList(); + public Gee.List remote_candidates = new ArrayList(); + + Jid peer_full_jid; + + bool remote_sent_selected_candidate = false; + Candidate? remote_selected_candidate = null; + bool local_determined_selected_candidate = false; + Candidate? local_selected_candidate = null; + SocketConnection? local_selected_candidate_conn = null; + weak Jingle.Session? session = null; + XmppStream? hack = null; + + string? waiting_for_activation_cid = null; + SourceFunc waiting_for_activation_callback; + + private static string calculate_dstaddr(string sid, Jid first_jid, Jid second_jid) { + string hashed = sid + first_jid.to_string() + second_jid.to_string(); + return Checksum.compute_for_string(ChecksumType.SHA1, hashed); + } + private Parameters(Jingle.Role role, string sid, Jid local_full_jid, Jid peer_full_jid, string? remote_dstaddr) { + this.role = role; + this.sid = sid; + this.local_dstaddr = calculate_dstaddr(sid, local_full_jid, peer_full_jid); + this.remote_dstaddr = remote_dstaddr ?? calculate_dstaddr(sid, peer_full_jid, local_full_jid); + + this.peer_full_jid = peer_full_jid; + } + public Parameters.create(Jid local_full_jid, Jid peer_full_jid, string sid) { + this(Jingle.Role.INITIATOR, sid, local_full_jid, peer_full_jid, null); + } + public static Parameters parse(Jid local_full_jid, Jid peer_full_jid, StanzaNode transport) throws Jingle.IqError { + string? dstaddr = transport.get_attribute("dstaddr"); + string? mode = transport.get_attribute("mode"); + string? sid = transport.get_attribute("sid"); + if (mode != null && mode != "tcp") { + throw new Jingle.IqError.BAD_REQUEST(@"unknown transport method $(mode)"); + } + if (dstaddr != null && dstaddr.length > 255) { + throw new Jingle.IqError.BAD_REQUEST("too long dstaddr"); + } + Parameters result = new Parameters(Jingle.Role.RESPONDER, sid, local_full_jid, peer_full_jid, dstaddr); + //result.remote_candidates.add(new Candidate("b", "0.0.0.0", new Jid("a@b/c"), 1234, 2000000000, CandidateType.PROXY)); + foreach (StanzaNode candidate in transport.get_subnodes("candidate", NS_URI)) { + result.remote_candidates.add(Candidate.parse(candidate)); + } + return result; + } + public string transport_ns_uri() { + return NS_URI; + } + public StanzaNode to_transport_stanza_node() { + StanzaNode transport = new StanzaNode.build("transport", NS_URI) + .add_self_xmlns() + .put_attribute("dstaddr", local_dstaddr); + + if (role == Jingle.Role.INITIATOR) { + // Must not be included by the responder according to XEP-0260. + transport.put_attribute("mode", "tcp"); + } + + transport.put_attribute("sid", sid); + foreach (Candidate candidate in local_candidates) { + transport.put_node(candidate.to_xml()); + } + return transport; + } + public void on_transport_accept(StanzaNode transport) throws Jingle.IqError { + throw new Jingle.IqError.BAD_REQUEST("blurb"); + } + public void on_transport_info(StanzaNode transport) throws Jingle.IqError { + StanzaNode? candidate_error = transport.get_subnode("candidate-error", NS_URI); + StanzaNode? candidate_used = transport.get_subnode("candidate-used", NS_URI); + StanzaNode? activated = transport.get_subnode("activated", NS_URI); + int num_children = 0; + if (candidate_error != null) { num_children += 1; } + if (candidate_used != null) { num_children += 1; } + if (activated != null) { num_children += 1; } + if (num_children == 0) { + throw new Jingle.IqError.UNSUPPORTED_INFO("unknown transport-info"); + } else if (num_children > 1) { + throw new Jingle.IqError.BAD_REQUEST("transport-info with more than one child"); + } + if (candidate_error != null) { + handle_remote_candidate(null); + } + if (candidate_used != null) { + string? cid = candidate_used.get_attribute("cid"); + if (cid == null) { + throw new Jingle.IqError.BAD_REQUEST("missing cid"); + } + handle_remote_candidate(cid); + } + if (activated != null) { + string? cid = activated.get_attribute("cid"); + if (cid == null) { + throw new Jingle.IqError.BAD_REQUEST("missing cid"); + } + handle_activated(cid); + } + } + private void handle_remote_candidate(string? cid) throws Jingle.IqError { + if (remote_sent_selected_candidate) { + throw new Jingle.IqError.BAD_REQUEST("remote candidate already specified"); + } + Candidate? candidate = null; + if (cid != null) { + foreach (Candidate c in local_candidates) { + if (c.cid == cid) { + candidate = c; + break; + } + } + if (candidate == null) { + throw new Jingle.IqError.BAD_REQUEST("unknown cid"); + } + } + remote_sent_selected_candidate = true; + remote_selected_candidate = candidate; + try_completing_negotiation(); + } + private void handle_activated(string cid) throws Jingle.IqError { + if (waiting_for_activation_cid == null || cid != waiting_for_activation_cid) { + throw new Jingle.IqError.BAD_REQUEST("unexpected proxy activation message"); + } + Idle.add((owned)waiting_for_activation_callback); + waiting_for_activation_cid = null; + } + private void try_completing_negotiation() { + if (!remote_sent_selected_candidate || !local_determined_selected_candidate) { + return; + } + + Candidate? remote = remote_selected_candidate; + Candidate? local = local_selected_candidate; + + int num_candidates = 0; + if (remote != null) { num_candidates += 1; } + if (local != null) { num_candidates += 1; } + + if (num_candidates == 0) { + // Notify Jingle of the failed transport. + session.set_transport_connection(hack, null); + return; + } + + bool remote_wins; + if (num_candidates == 1) { + remote_wins = remote != null; + } else { + if (local.priority < remote.priority) { + remote_wins = true; + } else if (local.priority > remote.priority) { + remote_wins = false; + } else { + // equal priority -> XEP-0260 says that the initiator wins + remote_wins = role != Jingle.Role.INITIATOR; + } + } + + if (!remote_wins) { + if (local_selected_candidate.type_ != CandidateType.PROXY) { + Jingle.Session? strong = session; + if (strong == null) { + return; + } + strong.set_transport_connection(hack, local_selected_candidate_conn); + } else { + wait_for_remote_activation.begin(local_selected_candidate, local_selected_candidate_conn); + } + } else { + connect_to_local_candidate.begin(remote_selected_candidate); + } + } + public async void wait_for_remote_activation(Candidate candidate, SocketConnection conn) { + waiting_for_activation_cid = candidate.cid; + waiting_for_activation_callback = wait_for_remote_activation.callback; + yield; + + Jingle.Session? strong = session; + if (strong == null) { + return; + } + strong.set_transport_connection(hack, conn); + } + public async void connect_to_local_candidate(Candidate candidate) { + try { + SocketConnection conn = yield connect_to_socks5(candidate, local_dstaddr); + + bool activation_error = false; + SourceFunc callback = connect_to_local_candidate.callback; + StanzaNode query = new StanzaNode.build("query", Socks5Bytestreams.NS_URI) + .add_self_xmlns() + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("activate", Socks5Bytestreams.NS_URI) + .put_node(new StanzaNode.text(peer_full_jid.to_string())) + ); + Iq.Stanza iq = new Iq.Stanza.set(query) { to=candidate.jid }; + hack.get_module(Iq.Module.IDENTITY).send_iq(hack, iq, (stream, iq) => { + activation_error = iq.is_error(); + Idle.add((owned)callback); + }); + yield; + + if (activation_error) { + throw new IOError.PROXY_FAILED("activation iq error"); + } + + Jingle.Session? strong = session; + if (strong == null) { + return; + } + strong.send_transport_info(hack, new StanzaNode.build("transport", NS_URI) + .add_self_xmlns() + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("activated", NS_URI) + .put_attribute("cid", candidate.cid) + ) + ); + + strong.set_transport_connection(hack, conn); + } catch (Error e) { + Jingle.Session? strong = session; + if (strong == null) { + return; + } + strong.send_transport_info(hack, new StanzaNode.build("transport", NS_URI) + .add_self_xmlns() + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("proxy-error", NS_URI)) + ); + strong.set_transport_connection(hack, null); + } + } + public async SocketConnection connect_to_socks5(Candidate candidate, string dstaddr) throws Error { + SocketClient socket_client = new SocketClient() { timeout=3 }; + + string address = @"[$(candidate.host)]:$(candidate.port)"; + + size_t written; + size_t read; + uint8[] read_buffer = new uint8[1024]; + ByteArray write_buffer = new ByteArray(); + + SocketConnection conn = yield socket_client.connect_to_host_async(address, 0); + + // 05 SOCKS version 5 + // 01 number of authentication methods: 1 + // 00 nop authentication + yield conn.output_stream.write_all_async({0x05, 0x01, 0x00}, GLib.Priority.DEFAULT, null, out written); + + yield conn.input_stream.read_all_async(read_buffer[0:2], GLib.Priority.DEFAULT, null, out read); + // 05 SOCKS version 5 + // 01 success + if (read_buffer[0] != 0x05 || read_buffer[1] != 0x00) { + throw new IOError.PROXY_FAILED("wanted 05 00, got %02x %02x".printf(read_buffer[0], read_buffer[1])); + } + + // 05 SOCKS version 5 + // 01 connect + // 00 reserved + // 03 address type: domain name + // ?? length of the domain + // .. domain + // 00 port 0 (upper half) + // 00 port 0 (lower half) + write_buffer.append({0x05, 0x01, 0x00, 0x03}); + write_buffer.append({(uint8)dstaddr.length}); + write_buffer.append(dstaddr.data); + write_buffer.append({0x00, 0x00}); + yield conn.output_stream.write_all_async(write_buffer.data, GLib.Priority.DEFAULT, null, out written); + + yield conn.input_stream.read_all_async(read_buffer[0:write_buffer.len], GLib.Priority.DEFAULT, null, out read); + // 05 SOCKS version 5 + // 00 success + // 00 reserved + // 03 address type: domain name + // ?? length of the domain + // .. domain + // 00 port 0 (upper half) + // 00 port 0 (lower half) + if (read_buffer[0] != 0x05 || read_buffer[1] != 0x00 || read_buffer[3] != 0x03) { + throw new IOError.PROXY_FAILED("wanted 05 00 ?? 03, got %02x %02x %02x %02x".printf(read_buffer[0], read_buffer[1], read_buffer[2], read_buffer[3])); + } + if (read_buffer[4] != (uint8)dstaddr.length) { + throw new IOError.PROXY_FAILED("wanted %02x for length, got %02x".printf(dstaddr.length, read_buffer[4])); + } + if (!bytes_equal(read_buffer[5:5+dstaddr.length], dstaddr.data)) { + string repr = ((string)read_buffer[5:5+dstaddr.length]).make_valid().escape(); + throw new IOError.PROXY_FAILED(@"wanted dstaddr $(dstaddr), got $(repr)"); + } + if (read_buffer[5+dstaddr.length] != 0x00 || read_buffer[5+dstaddr.length+1] != 0x00) { + throw new IOError.PROXY_FAILED("wanted port 00 00, got %02x %02x".printf(read_buffer[5+dstaddr.length], read_buffer[5+dstaddr.length+1])); + } + + return conn; + } + public async void try_connecting_to_candidates(XmppStream stream, Jingle.Session session) throws Error { + remote_candidates.sort((c1, c2) => { + // sort from priorities from high to low + if (c1.priority < c2.priority) { return 1; } + if (c1.priority > c2.priority) { return -1; } + return 0; + }); + foreach (Candidate candidate in remote_candidates) { + if (remote_selected_candidate != null && remote_selected_candidate.priority > candidate.priority) { + // Don't try candidates with lower priority than the one the + // peer already selected. + break; + } + try { + SocketConnection conn = yield connect_to_socks5(candidate, remote_dstaddr); + + local_determined_selected_candidate = true; + local_selected_candidate = candidate; + local_selected_candidate_conn = conn; + session.send_transport_info(stream, new StanzaNode.build("transport", NS_URI) + .add_self_xmlns() + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("candidate-used", NS_URI) + .put_attribute("cid", candidate.cid) + ) + ); + try_completing_negotiation(); + return; + } catch (Error e) { + // An error in the connection establishment isn't fatal, just + // try the next candidate or respond that none of the + // candidates work. + } + } + local_determined_selected_candidate = true; + local_selected_candidate = null; + session.send_transport_info(stream, new StanzaNode.build("transport", NS_URI) + .add_self_xmlns() + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("candidate-error", NS_URI)) + ); + } + public void create_transport_connection(XmppStream stream, Jingle.Session session) { + this.session = session; + this.hack = stream; + try_connecting_to_candidates.begin(stream, session); + } +} + +} diff --git a/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala index dc2e8d7c..1a810ee8 100644 --- a/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala @@ -30,28 +30,38 @@ public class Module : Jingle.Transport, XmppStreamModule { public Jingle.TransportType transport_type() { return Jingle.TransportType.STREAMING; } - public Jingle.TransportParameters create_transport_parameters() { - return new Parameters(random_uuid(), DEFAULT_BLOCKSIZE); + public int transport_priority() { + return 0; } - public Jingle.TransportParameters parse_transport_parameters(StanzaNode transport) throws Jingle.IqError { - return Parameters.parse(transport); + public Jingle.TransportParameters create_transport_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid) { + return new Parameters.create(peer_full_jid, random_uuid()); + } + public Jingle.TransportParameters parse_transport_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid, StanzaNode transport) throws Jingle.IqError { + return Parameters.parse(peer_full_jid, transport); } } class Parameters : Jingle.TransportParameters, Object { + public Jingle.Role role { get; private set; } + public Jid peer_full_jid { get; private set; } public string sid { get; private set; } public int block_size { get; private set; } - public Parameters(string sid, int block_size) { + private Parameters(Jingle.Role role, Jid peer_full_jid, string sid, int block_size) { + this.role = role; + this.peer_full_jid = peer_full_jid; this.sid = sid; this.block_size = block_size; } - public static Parameters parse(StanzaNode transport) throws Jingle.IqError { + public Parameters.create(Jid peer_full_jid, string sid) { + this(Jingle.Role.INITIATOR, peer_full_jid, sid, DEFAULT_BLOCKSIZE); + } + public static Parameters parse(Jid peer_full_jid, StanzaNode transport) throws Jingle.IqError { string? sid = transport.get_attribute("sid"); int block_size = transport.get_attribute_int("block-size"); if (sid == null || block_size <= 0 || block_size > MAX_BLOCKSIZE) { throw new Jingle.IqError.BAD_REQUEST("missing or invalid sid or blocksize"); } - return new Parameters(sid, block_size); + return new Parameters(Jingle.Role.RESPONDER, peer_full_jid, sid, block_size); } public string transport_ns_uri() { return NS_URI; @@ -62,15 +72,18 @@ class Parameters : Jingle.TransportParameters, Object { .put_attribute("block-size", block_size.to_string()) .put_attribute("sid", sid); } - public void update_transport(StanzaNode transport) throws Jingle.IqError { - Parameters other = Parameters.parse(transport); + public void on_transport_accept(StanzaNode transport) throws Jingle.IqError { + Parameters other = Parameters.parse(peer_full_jid, transport); if (other.sid != sid || other.block_size > block_size) { throw new Jingle.IqError.NOT_ACCEPTABLE("invalid IBB sid or block_size"); } block_size = other.block_size; } - public IOStream create_transport_connection(XmppStream stream, Jid peer_full_jid, Jingle.Role role) { - return InBandBytestreams.Connection.create(stream, peer_full_jid, sid, block_size, role == Jingle.Role.INITIATOR); + public void on_transport_info(StanzaNode transport) throws Jingle.IqError { + throw new Jingle.IqError.UNSUPPORTED_INFO("transport-info not supported for IBBs"); + } + public void create_transport_connection(XmppStream stream, Jingle.Session session) { + session.set_transport_connection(stream, InBandBytestreams.Connection.create(stream, peer_full_jid, sid, block_size, role == Jingle.Role.INITIATOR)); } } From 94794666d7ac7555a60ba5bb6d7382d776327cb7 Mon Sep 17 00:00:00 2001 From: hrxi Date: Tue, 6 Aug 2019 15:35:27 +0200 Subject: [PATCH 09/18] Factor out the session-terminate handler --- xmpp-vala/src/module/xep/0166_jingle.vala | 60 +++++++++++++---------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index a251293d..351d8e0c 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -102,16 +102,20 @@ ContentNode get_single_content_node(StanzaNode jingle) throws IqError { }; } +// This module can only be attached to one stream at a time. public class Module : XmppStreamModule, Iq.Handler { public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0166_jingle"); private HashMap content_types = new HashMap(); private HashMap transports = new HashMap(); + private XmppStream? current_stream = null; + public override void attach(XmppStream stream) { stream.add_flag(new Flag()); stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this); + current_stream = stream; } public override void detach(XmppStream stream) { } @@ -173,7 +177,7 @@ public class Module : XmppStreamModule, Iq.Handler { throw new Error.GENERAL("Couldn't determine own JID"); } TransportParameters transport_params = transport.create_transport_parameters(stream, my_jid, receiver_full_jid); - Session session = new Session.initiate_sent(random_uuid(), type, transport_params, my_jid, receiver_full_jid, content_name, stream); + Session session = new Session.initiate_sent(random_uuid(), type, transport_params, my_jid, receiver_full_jid, content_name, send_terminate_and_remove_session); StanzaNode content = new StanzaNode.build("content", NS_URI) .put_attribute("creator", "initiator") .put_attribute("name", content_name) @@ -221,20 +225,34 @@ public class Module : XmppStreamModule, Iq.Handler { ContentParameters content_params = content_type.parse_content_parameters(content.description); TransportType type = content_type.content_type_transport_type(); - Session session = new Session.initiate_received(sid, type, transport_params, my_jid, iq.from, content.name, stream); + Session session = new Session.initiate_received(sid, type, transport_params, my_jid, iq.from, content.name, send_terminate_and_remove_session); stream.get_flag(Flag.IDENTITY).add_session(session); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); if (transport == null || transport.transport_type() != type) { StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("unsupported-transports", NS_URI)); - session.terminate(stream, reason, "unsupported transports"); + session.terminate(reason, "unsupported transports"); return; } content_params.on_session_initiate(stream, session); } + private void send_terminate_and_remove_session(Jid to, string sid, StanzaNode reason) { + StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "session-terminate") + .put_attribute("sid", sid) + .put_node(reason); + Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=to }; + current_stream.get_module(Iq.Module.IDENTITY).send_iq(current_stream, iq); + + // Immediately remove the session from the open sessions as per the + // XEP, don't wait for confirmation. + current_stream.get_flag(Flag.IDENTITY).remove_session(sid); + } + public void on_iq_set(XmppStream stream, Iq.Stanza iq) { try { handle_iq_set(stream, iq); @@ -294,6 +312,8 @@ public enum Senders { } } +public delegate void SessionTerminate(Jid to, string sid, StanzaNode reason); + public interface Transport : Object { public abstract string transport_ns_uri(); public abstract bool is_transport_available(XmppStream stream, Jid full_jid); @@ -372,9 +392,9 @@ public class Session { // INITIATE_SENT | INITIATE_RECEIVED | CONNECTING TransportParameters? transport = null; - XmppStream hack; + SessionTerminate session_terminate_handler; - public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid local_full_jid, Jid peer_full_jid, string content_name, XmppStream hack) { + public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid local_full_jid, Jid peer_full_jid, string content_name, owned SessionTerminate session_terminate_handler) { this.state = State.INITIATE_SENT; this.sid = sid; this.type_ = type; @@ -384,10 +404,10 @@ public class Session { this.content_name = content_name; this.transport = transport; this.connection = new Connection(this); - this.hack = hack; + this.session_terminate_handler = (owned)session_terminate_handler; } - public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid local_full_jid, Jid peer_full_jid, string content_name, XmppStream hack) { + public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid local_full_jid, Jid peer_full_jid, string content_name, owned SessionTerminate session_terminate_handler) { this.state = State.INITIATE_RECEIVED; this.sid = sid; this.type_ = type; @@ -397,7 +417,7 @@ public class Session { this.content_name = content_name; this.transport = transport; this.connection = new Connection(this); - this.hack = hack; + this.session_terminate_handler = (owned)session_terminate_handler; } public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { @@ -466,7 +486,7 @@ public class Session { // TODO(hrxi): try negotiating other transports… StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("failed-transport", NS_URI)); - terminate(stream, reason, "failed transport"); + terminate(reason, "failed transport"); } } void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { @@ -544,7 +564,7 @@ public class Session { } StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("decline", NS_URI)); - terminate(stream, reason, "declined"); + terminate(reason, "declined"); } public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) { @@ -553,7 +573,7 @@ public class Session { if (application_reason != null) { reason.put_node(application_reason); } - terminate(stream, reason, "application error"); + terminate(reason, "application error"); } public void on_connection_error(IOError error) { @@ -563,15 +583,15 @@ public class Session { .put_node(new StanzaNode.build("text", NS_URI) .put_node(new StanzaNode.text(error.message)) ); - terminate(hack, reason, "transport error: $(error.message)"); + terminate(reason, "transport error: $(error.message)"); } public void on_connection_close() { StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("success", NS_URI)); - terminate(hack, reason, "success"); + terminate(reason, "success"); } - public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) { + public void terminate(StanzaNode reason, string? local_reason) { if (state == State.ENDED) { return; } @@ -583,18 +603,8 @@ public class Session { } } - StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) - .add_self_xmlns() - .put_attribute("action", "session-terminate") - .put_attribute("sid", sid) - .put_node(reason); - Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); - + session_terminate_handler(peer_full_jid, sid, reason); state = State.ENDED; - // Immediately remove the session from the open sessions as per the - // XEP, don't wait for confirmation. - stream.get_flag(Flag.IDENTITY).remove_session(sid); } } From 1b1fac0bb567d7e51423ac384fa37c2c0a41bc33 Mon Sep 17 00:00:00 2001 From: hrxi Date: Tue, 6 Aug 2019 15:37:49 +0200 Subject: [PATCH 10/18] Implement detach --- xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala | 4 +++- xmpp-vala/src/module/xep/0166_jingle.vala | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala index 5d59d18c..caf5309d 100644 --- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -14,7 +14,9 @@ public class Module : XmppStreamModule, Iq.Handler { stream.add_flag(new Flag()); stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this); } - public override void detach(XmppStream stream) { } + public override void detach(XmppStream stream) { + stream.get_module(Iq.Module.IDENTITY).unregister_from_namespace(NS_URI, this); + } public void on_iq_set(XmppStream stream, Iq.Stanza iq) { // the iq module ensures that there's only one child node diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index 351d8e0c..150fed8c 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -117,7 +117,9 @@ public class Module : XmppStreamModule, Iq.Handler { stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this); current_stream = stream; } - public override void detach(XmppStream stream) { } + public override void detach(XmppStream stream) { + stream.get_module(Iq.Module.IDENTITY).unregister_from_namespace(NS_URI, this); + } public void register_content_type(ContentType content_type) { content_types[content_type.content_type_ns_uri()] = content_type; From e1c98a0fd94d0536c231ce79c2af63d891bf4c16 Mon Sep 17 00:00:00 2001 From: hrxi Date: Tue, 6 Aug 2019 16:45:48 +0200 Subject: [PATCH 11/18] Forgot to add outgoing Jingle SOCKS5 transfer --- .../src/module/xep/0260_jingle_socks5_bytestreams.vala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala index abe5e0a9..78f47cfb 100644 --- a/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala @@ -159,6 +159,7 @@ class Parameters : Jingle.TransportParameters, Object { public Gee.List local_candidates = new ArrayList(); public Gee.List remote_candidates = new ArrayList(); + Jid local_full_jid; Jid peer_full_jid; bool remote_sent_selected_candidate = false; @@ -182,6 +183,7 @@ class Parameters : Jingle.TransportParameters, Object { this.local_dstaddr = calculate_dstaddr(sid, local_full_jid, peer_full_jid); this.remote_dstaddr = remote_dstaddr ?? calculate_dstaddr(sid, peer_full_jid, local_full_jid); + this.local_full_jid = local_full_jid; this.peer_full_jid = peer_full_jid; } public Parameters.create(Jid local_full_jid, Jid peer_full_jid, string sid) { @@ -224,7 +226,12 @@ class Parameters : Jingle.TransportParameters, Object { return transport; } public void on_transport_accept(StanzaNode transport) throws Jingle.IqError { - throw new Jingle.IqError.BAD_REQUEST("blurb"); + Parameters other = Parameters.parse(local_full_jid, peer_full_jid, transport); + if (other.sid != sid) { + throw new Jingle.IqError.BAD_REQUEST("invalid sid"); + } + remote_candidates = other.remote_candidates; + remote_dstaddr = other.remote_dstaddr; } public void on_transport_info(StanzaNode transport) throws Jingle.IqError { StanzaNode? candidate_error = transport.get_subnode("candidate-error", NS_URI); From 4e0adcd2b4cae09b8ede7cf4f357e447afd1e723 Mon Sep 17 00:00:00 2001 From: hrxi Date: Tue, 6 Aug 2019 20:20:43 +0200 Subject: [PATCH 12/18] Fix destination address of IBB error codes --- .../module/xep/0047_in_band_bytestreams.vala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala index caf5309d..9aa2d98c 100644 --- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -25,28 +25,28 @@ public class Module : XmppStreamModule, Iq.Handler { node = (node != null) ? node : iq.stanza.get_subnode("data", NS_URI); node = (node != null) ? node : iq.stanza.get_subnode("close", NS_URI); if (node == null) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("unknown IBB action"))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("unknown IBB action")) { to=iq.from }); return; } string? sid = node.get_attribute("sid"); if (sid == null) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing sid"))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing sid")) { to=iq.from }); return; } Connection? conn = stream.get_flag(Flag.IDENTITY).get_connection(sid); if (node.name == "open") { if (conn == null) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_acceptable("unexpected IBB connection"))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_acceptable("unexpected IBB connection")) { to=iq.from }); return; } if (conn.state != Connection.State.WAITING_FOR_CONNECT) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("IBB open for already open connection"))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("IBB open for already open connection")) { to=iq.from }); return; } conn.handle_open(stream, node, iq); } else { if (conn == null || conn.state != Connection.State.CONNECTED) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found())); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found()) { to=iq.from }); return; } if (node.name == "close") { @@ -373,17 +373,17 @@ public class Connection : IOStream { string? stanza = open.get_attribute("stanza"); if (block_size < 0 || (stanza != null && stanza != "iq" && stanza != "message")) { set_error("invalid open"); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing block_size or invalid stanza"))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing block_size or invalid stanza")) { to=iq.from }); return; } if (stanza != null && stanza != "iq") { set_error("invalid open"); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.feature_not_implemented("cannot use message stanzas for IBB"))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.feature_not_implemented("cannot use message stanzas for IBB")) { to=iq.from }); return; } if (block_size > this.block_size) { set_error("invalid open"); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_RESOURCE_CONSTRAINT, "opening a connection with a greater than negotiated/acceptable block size", null))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_RESOURCE_CONSTRAINT, "opening a connection with a greater than negotiated/acceptable block size", null)) { to=iq.from }); return; } this.block_size = block_size; @@ -395,7 +395,7 @@ public class Connection : IOStream { assert(state == State.CONNECTED); if (input_closed) { set_error("unexpected data"); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_allowed("unexpected data"))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_allowed("unexpected data")) { to=iq.from }); return; } int seq = data.get_attribute_int("seq"); @@ -404,12 +404,12 @@ public class Connection : IOStream { uint8[] content = Base64.decode(data.get_string_content()); if (content.length > block_size) { set_error("data longer than negotiated block size"); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("data longer than negotiated block size"))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("data longer than negotiated block size")) { to=iq.from }); return; } if (seq < 0 || seq != remote_seq) { set_error("out of order data packets"); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, "out of order data packets", null))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, "out of order data packets", null)) { to=iq.from }); return; } remote_seq = (remote_seq + 1) % SEQ_MODULUS; From 2327dc783c56c7b82ad82c9e9ba5670807a467d2 Mon Sep 17 00:00:00 2001 From: hrxi Date: Tue, 6 Aug 2019 20:30:18 +0200 Subject: [PATCH 13/18] Send Jingle errors to the right JID --- xmpp-vala/src/module/xep/0166_jingle.vala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index 150fed8c..34816fcd 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -35,7 +35,7 @@ void send_iq_error(IqError iq_error, XmppStream stream, Iq.Stanza iq) { } else { assert_not_reached(); } - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, error)); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, error) { to=iq.from }); } public errordomain Error { @@ -274,7 +274,7 @@ public class Module : XmppStreamModule, Iq.Handler { if (action == "session-initiate") { if (session != null) { // TODO(hrxi): Info leak if other clients use predictable session IDs? - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_MODIFY, ErrorStanza.CONDITION_CONFLICT, "session ID already in use", null))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_MODIFY, ErrorStanza.CONDITION_CONFLICT, "session ID already in use", null)) { to=iq.from }); return; } handle_session_initiate(stream, sid, jingle, iq); @@ -282,7 +282,7 @@ public class Module : XmppStreamModule, Iq.Handler { } if (session == null) { StanzaNode unknown_session = new StanzaNode.build("unknown-session", ERROR_NS_URI).add_self_xmlns(); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found(unknown_session))); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found(unknown_session)) { to=iq.from }); return; } session.handle_iq_set(stream, action, jingle, iq); From 9a1e9864d6ea5bc967a69b2245ead55ba7c05812 Mon Sep 17 00:00:00 2001 From: hrxi Date: Mon, 5 Aug 2019 20:54:45 +0200 Subject: [PATCH 14/18] Fall back to IBB if S5B does not work out This mostly happens if connectivity to the candidates cannot be established. --- xmpp-vala/src/module/xep/0166_jingle.vala | 207 ++++++++++++++---- .../xep/0260_jingle_socks5_bytestreams.vala | 22 +- 2 files changed, 189 insertions(+), 40 deletions(-) diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index 34816fcd..f5d112aa 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -139,12 +139,15 @@ public class Module : XmppStreamModule, Iq.Handler { } return transports[ns_uri]; } - public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid) { + public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid, Set blacklist) { Transport? result = null; foreach (Transport transport in transports.values) { if (transport.transport_type() != type) { continue; } + if (transport.transport_ns_uri() in blacklist) { + continue; + } if (transport.is_transport_available(stream, receiver_full_jid)) { if (result != null) { if (result.transport_priority() >= transport.transport_priority()) { @@ -163,14 +166,14 @@ public class Module : XmppStreamModule, Iq.Handler { } public bool is_available(XmppStream stream, TransportType type, Jid full_jid) { - return is_jingle_available(stream, full_jid) && select_transport(stream, type, full_jid) != null; + return is_jingle_available(stream, full_jid) && select_transport(stream, type, full_jid, Set.empty()) != null; } public Session create_session(XmppStream stream, TransportType type, Jid receiver_full_jid, Senders senders, string content_name, StanzaNode description) throws Error { if (!is_jingle_available(stream, receiver_full_jid)) { throw new Error.NO_SHARED_PROTOCOLS("No Jingle support"); } - Transport? transport = select_transport(stream, type, receiver_full_jid); + Transport? transport = select_transport(stream, type, receiver_full_jid, Set.empty()); if (transport == null) { throw new Error.NO_SHARED_PROTOCOLS("No suitable transports"); } @@ -369,11 +372,13 @@ public interface ContentParameters : Object { public class Session { - // INITIATE_SENT -> CONNECTING -> ACTIVE -> ENDED - // INITIATE_RECEIVED -> CONNECTING -> ACTIVE -> ENDED + // INITIATE_SENT -> CONNECTING -> [REPLACING_TRANSPORT -> CONNECTING ->]... ACTIVE -> ENDED + // INITIATE_RECEIVED -> CONNECTING -> [WAITING_FOR_TRANSPORT_REPLACE -> CONNECTING ->].. ACTIVE -> ENDED public enum State { INITIATE_SENT, + REPLACING_TRANSPORT, INITIATE_RECEIVED, + WAITING_FOR_TRANSPORT_REPLACE, CONNECTING, ACTIVE, ENDED, @@ -381,8 +386,9 @@ public class Session { public State state { get; private set; } + public Role role { get; private set; } public string sid { get; private set; } - public Type type_ { get; private set; } + public TransportType type_ { get; private set; } public Jid local_full_jid { get; private set; } public Jid peer_full_jid { get; private set; } public Role content_creator { get; private set; } @@ -392,25 +398,30 @@ public class Session { public IOStream conn { get { return connection; } } // INITIATE_SENT | INITIATE_RECEIVED | CONNECTING + Set tried_transport_methods = new HashSet(); TransportParameters? transport = null; SessionTerminate session_terminate_handler; - public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid local_full_jid, Jid peer_full_jid, string content_name, owned SessionTerminate session_terminate_handler) { + public Session.initiate_sent(string sid, TransportType type, TransportParameters transport, Jid local_full_jid, Jid peer_full_jid, string content_name, owned SessionTerminate session_terminate_handler) { this.state = State.INITIATE_SENT; + this.role = Role.INITIATOR; this.sid = sid; this.type_ = type; this.local_full_jid = local_full_jid; this.peer_full_jid = peer_full_jid; this.content_creator = Role.INITIATOR; this.content_name = content_name; + this.tried_transport_methods = new HashSet(); + this.tried_transport_methods.add(transport.transport_ns_uri()); this.transport = transport; this.connection = new Connection(this); this.session_terminate_handler = (owned)session_terminate_handler; } - public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid local_full_jid, Jid peer_full_jid, string content_name, owned SessionTerminate session_terminate_handler) { + public Session.initiate_received(string sid, TransportType type, TransportParameters? transport, Jid local_full_jid, Jid peer_full_jid, string content_name, owned SessionTerminate session_terminate_handler) { this.state = State.INITIATE_RECEIVED; + this.role = Role.RESPONDER; this.sid = sid; this.type_ = type; this.local_full_jid = local_full_jid; @@ -418,39 +429,87 @@ public class Session { this.content_creator = Role.INITIATOR; this.content_name = content_name; this.transport = transport; + this.tried_transport_methods = new HashSet(); + if (transport != null) { + this.tried_transport_methods.add(transport.transport_ns_uri()); + } this.connection = new Connection(this); this.session_terminate_handler = (owned)session_terminate_handler; } public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { + // Validate action. switch (action) { case "session-accept": - if (state != State.INITIATE_SENT) { - throw new IqError.OUT_OF_ORDER("got session-accept while not waiting for one"); - } - handle_session_accept(stream, jingle, iq); - break; case "session-terminate": - handle_session_terminate(stream, jingle, iq); - break; + case "transport-accept": + case "transport-reject": + case "transport-replace": case "transport-info": - handle_transport_info(stream, jingle, iq); - return; + break; case "content-accept": case "content-add": case "content-modify": case "content-reject": case "content-remove": case "security-info": - case "transport-accept": - case "transport-reject": - case "transport-replace": - throw new IqError.NOT_IMPLEMENTED(@"$(action) is not implemented"); default: throw new IqError.BAD_REQUEST("invalid action"); } + ContentNode? content = null; + StanzaNode? transport = null; + // Do some pre-processing. + if (action != "session-terminate") { + content = get_single_content_node(jingle); + verify_content(content); + switch (action) { + case "transport-accept": + case "transport-reject": + case "transport-replace": + case "transport-info": + switch (state) { + case State.INITIATE_SENT: + case State.REPLACING_TRANSPORT: + case State.INITIATE_RECEIVED: + case State.WAITING_FOR_TRANSPORT_REPLACE: + case State.CONNECTING: + break; + default: + throw new IqError.OUT_OF_ORDER("transport-* unsupported after connection setup"); + } + // TODO(hrxi): What to do with description nodes? + if (content.transport == null) { + throw new IqError.BAD_REQUEST("missing transport node"); + } + transport = content.transport; + break; + } + } + switch (action) { + case "session-accept": + if (state != State.INITIATE_SENT) { + throw new IqError.OUT_OF_ORDER("got session-accept while not waiting for one"); + } + handle_session_accept(stream, content, jingle, iq); + break; + case "session-terminate": + handle_session_terminate(stream, jingle, iq); + break; + case "transport-accept": + handle_transport_accept(stream, transport, jingle, iq); + break; + case "transport-reject": + handle_transport_reject(stream, jingle, iq); + break; + case "transport-replace": + handle_transport_replace(stream, transport, jingle, iq); + break; + case "transport-info": + handle_transport_info(stream, transport, jingle, iq); + break; + } } - void handle_session_accept(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + void handle_session_accept(XmppStream stream, ContentNode content, StanzaNode jingle, Iq.Stanza iq) throws IqError { string? responder_str = jingle.get_attribute("responder"); Jid responder; if (responder_str != null) { @@ -462,8 +521,6 @@ public class Session { if (!responder.is_full()) { throw new IqError.BAD_REQUEST("invalid responder JID"); } - ContentNode content = get_single_content_node(jingle); - verify_content(content); if (content.description == null || content.transport == null) { throw new IqError.BAD_REQUEST("missing description or transport node"); } @@ -472,8 +529,9 @@ public class Session { } transport.on_transport_accept(content.transport); StanzaNode description = content.description; // TODO(hrxi): handle this :P - state = State.CONNECTING; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + + state = State.CONNECTING; transport.create_transport_connection(stream, this); } void connection_created(XmppStream stream, IOStream? conn) { @@ -483,12 +541,14 @@ public class Session { if (conn != null) { state = State.ACTIVE; transport = null; + tried_transport_methods.clear(); connection.set_inner(conn); } else { - // TODO(hrxi): try negotiating other transports… - StanzaNode reason = new StanzaNode.build("reason", NS_URI) - .put_node(new StanzaNode.build("failed-transport", NS_URI)); - terminate(reason, "failed transport"); + if (role == Role.INITIATOR) { + select_new_transport(stream); + } else { + state = State.WAITING_FOR_TRANSPORT_REPLACE; + } } } void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { @@ -499,17 +559,88 @@ public class Session { stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); // TODO(hrxi): also handle presence type=unavailable } - void handle_transport_info(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { - if (state != State.INITIATE_RECEIVED && state != State.INITIATE_SENT && state != State.CONNECTING) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); - throw new IqError.UNSUPPORTED_INFO("transport-info unsupported after connection setup"); + void select_new_transport(XmppStream stream) { + Transport? new_transport = stream.get_module(Module.IDENTITY).select_transport(stream, type_, peer_full_jid, tried_transport_methods); + if (new_transport == null) { + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("failed-transport", NS_URI)); + terminate(reason, "failed transport"); + return; } - ContentNode content = get_single_content_node(jingle); - verify_content(content); - if (content.description != null || content.transport == null) { - throw new IqError.BAD_REQUEST("unexpected description node or missing transport node"); + tried_transport_methods.add(new_transport.transport_ns_uri()); + transport = new_transport.create_transport_parameters(stream, local_full_jid, peer_full_jid); + StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "transport-replace") + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("content", NS_URI) + .put_attribute("creator", "initiator") + .put_attribute("name", content_name) + .put_node(transport.to_transport_stanza_node()) + ); + Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); + state = State.REPLACING_TRANSPORT; + } + void handle_transport_accept(XmppStream stream, StanzaNode transport_node, StanzaNode jingle, Iq.Stanza iq) throws IqError { + if (state != State.REPLACING_TRANSPORT) { + throw new IqError.OUT_OF_ORDER("no outstanding transport-replace request"); } - transport.on_transport_info(content.transport); + if (transport_node.ns_uri != transport.transport_ns_uri()) { + throw new IqError.BAD_REQUEST("transport-accept with unnegotiated transport method"); + } + transport.on_transport_accept(transport_node); + state = State.CONNECTING; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + transport.create_transport_connection(stream, this); + } + void handle_transport_reject(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + if (state != State.REPLACING_TRANSPORT) { + throw new IqError.OUT_OF_ORDER("no outstanding transport-replace request"); + } + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + select_new_transport(stream); + } + void handle_transport_replace(XmppStream stream, StanzaNode transport_node, StanzaNode jingle, Iq.Stanza iq) throws IqError { + Transport? transport = stream.get_module(Module.IDENTITY).get_transport(transport_node.ns_uri); + TransportParameters? parameters = null; + if (transport != null) { + // Just parse the transport info for the errors. + parameters = transport.parse_transport_parameters(stream, local_full_jid, peer_full_jid, transport_node); + } + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + if (state != State.WAITING_FOR_TRANSPORT_REPLACE || transport == null) { + StanzaNode jingle_response = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "transport-reject") + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("content", NS_URI) + .put_attribute("creator", "initiator") + .put_attribute("name", content_name) + .put_node(transport_node) + ); + Iq.Stanza iq_response = new Iq.Stanza.set(jingle_response) { to=peer_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq_response); + return; + } + this.transport = parameters; + StanzaNode jingle_response = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "transport-accept") + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("content", NS_URI) + .put_attribute("creator", "initiator") + .put_attribute("name", content_name) + .put_node(this.transport.to_transport_stanza_node()) + ); + Iq.Stanza iq_response = new Iq.Stanza.set(jingle_response) { to=peer_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq_response); + + state = State.CONNECTING; + this.transport.create_transport_connection(stream, this); + } + void handle_transport_info(XmppStream stream, StanzaNode transport, StanzaNode jingle, Iq.Stanza iq) throws IqError { + this.transport.on_transport_info(transport); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); } void verify_content(ContentNode content) throws IqError { diff --git a/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala index 78f47cfb..79c62d68 100644 --- a/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala @@ -172,6 +172,7 @@ class Parameters : Jingle.TransportParameters, Object { string? waiting_for_activation_cid = null; SourceFunc waiting_for_activation_callback; + bool waiting_for_activation_error = false; private static string calculate_dstaddr(string sid, Jid first_jid, Jid second_jid) { string hashed = sid + first_jid.to_string() + second_jid.to_string(); @@ -200,7 +201,6 @@ class Parameters : Jingle.TransportParameters, Object { throw new Jingle.IqError.BAD_REQUEST("too long dstaddr"); } Parameters result = new Parameters(Jingle.Role.RESPONDER, sid, local_full_jid, peer_full_jid, dstaddr); - //result.remote_candidates.add(new Candidate("b", "0.0.0.0", new Jid("a@b/c"), 1234, 2000000000, CandidateType.PROXY)); foreach (StanzaNode candidate in transport.get_subnodes("candidate", NS_URI)) { result.remote_candidates.add(Candidate.parse(candidate)); } @@ -237,10 +237,12 @@ class Parameters : Jingle.TransportParameters, Object { StanzaNode? candidate_error = transport.get_subnode("candidate-error", NS_URI); StanzaNode? candidate_used = transport.get_subnode("candidate-used", NS_URI); StanzaNode? activated = transport.get_subnode("activated", NS_URI); + StanzaNode? proxy_error = transport.get_subnode("proxy-error", NS_URI); int num_children = 0; if (candidate_error != null) { num_children += 1; } if (candidate_used != null) { num_children += 1; } if (activated != null) { num_children += 1; } + if (proxy_error != null) { num_children += 1; } if (num_children == 0) { throw new Jingle.IqError.UNSUPPORTED_INFO("unknown transport-info"); } else if (num_children > 1) { @@ -263,6 +265,9 @@ class Parameters : Jingle.TransportParameters, Object { } handle_activated(cid); } + if (proxy_error != null) { + handle_proxy_error(); + } } private void handle_remote_candidate(string? cid) throws Jingle.IqError { if (remote_sent_selected_candidate) { @@ -291,6 +296,15 @@ class Parameters : Jingle.TransportParameters, Object { Idle.add((owned)waiting_for_activation_callback); waiting_for_activation_cid = null; } + private void handle_proxy_error() throws Jingle.IqError { + if (waiting_for_activation_cid == null) { + throw new Jingle.IqError.BAD_REQUEST("unexpected proxy error message"); + } + Idle.add((owned)waiting_for_activation_callback); + waiting_for_activation_cid = null; + waiting_for_activation_error = true; + + } private void try_completing_negotiation() { if (!remote_sent_selected_candidate || !local_determined_selected_candidate) { return; @@ -346,7 +360,11 @@ class Parameters : Jingle.TransportParameters, Object { if (strong == null) { return; } - strong.set_transport_connection(hack, conn); + if (!waiting_for_activation_error) { + strong.set_transport_connection(hack, conn); + } else { + strong.set_transport_connection(hack, null); + } } public async void connect_to_local_candidate(Candidate candidate) { try { From 6494d7a45dabd180767890a310886146d83ae3be Mon Sep 17 00:00:00 2001 From: hrxi Date: Thu, 8 Aug 2019 17:12:02 +0200 Subject: [PATCH 15/18] Fix race condition involving `session-terminate` The Jingle file transfer (XEP-0234) specifies that the receiver of the file transfer is the one to terminate the session. Otherwise, there might be a race condition between the XMPP stream and out-of-band SOCKS5 connections. --- xmpp-vala/src/module/xep/0166_jingle.vala | 12 +++++++++--- .../src/module/xep/0234_jingle_file_transfer.vala | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index f5d112aa..2e38b164 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -397,6 +397,8 @@ public class Session { private Connection connection; public IOStream conn { get { return connection; } } + public bool terminate_on_connection_close { get; set; } + // INITIATE_SENT | INITIATE_RECEIVED | CONNECTING Set tried_transport_methods = new HashSet(); TransportParameters? transport = null; @@ -417,6 +419,7 @@ public class Session { this.transport = transport; this.connection = new Connection(this); this.session_terminate_handler = (owned)session_terminate_handler; + this.terminate_on_connection_close = true; } public Session.initiate_received(string sid, TransportType type, TransportParameters? transport, Jid local_full_jid, Jid peer_full_jid, string content_name, owned SessionTerminate session_terminate_handler) { @@ -435,6 +438,7 @@ public class Session { } this.connection = new Connection(this); this.session_terminate_handler = (owned)session_terminate_handler; + this.terminate_on_connection_close = true; } public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { @@ -719,9 +723,11 @@ public class Session { terminate(reason, "transport error: $(error.message)"); } public void on_connection_close() { - StanzaNode reason = new StanzaNode.build("reason", NS_URI) - .put_node(new StanzaNode.build("success", NS_URI)); - terminate(reason, "success"); + if (terminate_on_connection_close) { + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("success", NS_URI)); + terminate(reason, "success"); + } } public void terminate(StanzaNode reason, string? local_reason) { diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala index 867a26e3..43c212f5 100644 --- a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala +++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala @@ -45,6 +45,7 @@ public class Module : Jingle.ContentType, XmppStreamModule { Jingle.Session session = stream.get_module(Jingle.Module.IDENTITY) .create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"? + session.terminate_on_connection_close = false; yield session.conn.input_stream.close_async(); From 34d7b5f515d120a80b8730dadb0a66326b8d0c4a Mon Sep 17 00:00:00 2001 From: hrxi Date: Fri, 9 Aug 2019 14:46:12 +0200 Subject: [PATCH 16/18] Fix Jingle connection code when `cancellable` is `null` --- xmpp-vala/src/module/xep/0166_jingle.vala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index 2e38b164..e0a96cc6 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -838,10 +838,15 @@ public class Connection : IOStream { return; } SourceFunc callback = wait_and_check_for_errors.callback; - ulong id = cancellable.connect(() => callback()); + ulong id = 0; + if (cancellable != null) { + id = cancellable.connect(() => callback()); + } callbacks.add(new OnSetInnerCallback() { callback=(owned)callback, io_priority=io_priority}); yield; - cancellable.disconnect(id); + if (cancellable != null) { + cancellable.disconnect(id); + } } } private void handle_connection_error(IOError error) { From 6083f446b47e258e0381c0c22755dbfa881c7df7 Mon Sep 17 00:00:00 2001 From: hrxi Date: Sat, 24 Aug 2019 13:30:23 +0200 Subject: [PATCH 17/18] Fix candidate selection for equal priority XEP-0260 states that the candidate selected (offered) by the initiator wins, not the one that was chosen by the initiator (i.e. offered by the responder). --- xmpp-vala/src/module/xep/0166_jingle.vala | 2 +- xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index e0a96cc6..d6dbcd9e 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -720,7 +720,7 @@ public class Session { .put_node(new StanzaNode.build("text", NS_URI) .put_node(new StanzaNode.text(error.message)) ); - terminate(reason, "transport error: $(error.message)"); + terminate(reason, @"transport error: $(error.message)"); } public void on_connection_close() { if (terminate_on_connection_close) { diff --git a/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala index 79c62d68..c17dc0b3 100644 --- a/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala @@ -332,8 +332,9 @@ class Parameters : Jingle.TransportParameters, Object { } else if (local.priority > remote.priority) { remote_wins = false; } else { - // equal priority -> XEP-0260 says that the initiator wins - remote_wins = role != Jingle.Role.INITIATOR; + // equal priority -> XEP-0260 says that the candidate offered + // by the initiator wins, so the one that the remote chose + remote_wins = role == Jingle.Role.INITIATOR; } } From 6028fd15a81a084b63311bc61f7b48d9f3d00746 Mon Sep 17 00:00:00 2001 From: hrxi Date: Mon, 26 Aug 2019 17:30:47 +0200 Subject: [PATCH 18/18] Don't error on Jingle file transfer hash session-info --- xmpp-vala/src/module/xep/0166_jingle.vala | 34 ++++++++++++++++--- .../module/xep/0234_jingle_file_transfer.vala | 13 +++++++ .../xep/0260_jingle_socks5_bytestreams.vala | 2 ++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index d6dbcd9e..06e3d5c8 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -47,12 +47,16 @@ public errordomain Error { TRANSPORT_ERROR, } -StanzaNode? get_single_node_anyns(StanzaNode parent, string node_name) throws IqError { +StanzaNode? get_single_node_anyns(StanzaNode parent, string? node_name = null) throws IqError { StanzaNode? result = null; foreach (StanzaNode child in parent.get_all_subnodes()) { - if (child.name == node_name) { + if (node_name == null || child.name == node_name) { if (result != null) { - throw new IqError.BAD_REQUEST(@"multiple $(node_name) nodes"); + if (node_name != null) { + throw new IqError.BAD_REQUEST(@"multiple $(node_name) nodes"); + } else { + throw new IqError.BAD_REQUEST(@"expected single subnode"); + } } result = child; } @@ -364,6 +368,7 @@ public interface ContentType : Object { public abstract string content_type_ns_uri(); public abstract TransportType content_type_transport_type(); public abstract ContentParameters parse_content_parameters(StanzaNode description) throws IqError; + public abstract void handle_content_session_info(XmppStream stream, Session session, StanzaNode info, Iq.Stanza iq) throws IqError; } public interface ContentParameters : Object { @@ -445,25 +450,28 @@ public class Session { // Validate action. switch (action) { case "session-accept": + case "session-info": case "session-terminate": case "transport-accept": + case "transport-info": case "transport-reject": case "transport-replace": - case "transport-info": break; case "content-accept": case "content-add": case "content-modify": case "content-reject": case "content-remove": + case "description-info": case "security-info": + throw new IqError.NOT_IMPLEMENTED(@"$(action) is not implemented"); default: throw new IqError.BAD_REQUEST("invalid action"); } ContentNode? content = null; StanzaNode? transport = null; // Do some pre-processing. - if (action != "session-terminate") { + if (action != "session-info" && action != "session-terminate") { content = get_single_content_node(jingle); verify_content(content); switch (action) { @@ -496,6 +504,9 @@ public class Session { } handle_session_accept(stream, content, jingle, iq); break; + case "session-info": + handle_session_info(stream, jingle, iq); + break; case "session-terminate": handle_session_terminate(stream, jingle, iq); break; @@ -563,6 +574,19 @@ public class Session { stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); // TODO(hrxi): also handle presence type=unavailable } + void handle_session_info(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + StanzaNode? info = get_single_node_anyns(jingle); + if (info == null) { + // Jingle session ping + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + return; + } + ContentType? content_type = stream.get_module(Module.IDENTITY).get_content_type(info.ns_uri); + if (content_type == null) { + throw new IqError.UNSUPPORTED_INFO("unknown session-info namespace"); + } + content_type.handle_content_session_info(stream, this, info, iq); + } void select_new_transport(XmppStream stream) { Transport? new_transport = stream.get_module(Module.IDENTITY).select_transport(stream, type_, peer_full_jid, tried_transport_methods); if (new_transport == null) { diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala index 43c212f5..25fe3ce4 100644 --- a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala +++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala @@ -24,6 +24,19 @@ public class Module : Jingle.ContentType, XmppStreamModule { public Jingle.ContentParameters parse_content_parameters(StanzaNode description) throws Jingle.IqError { return Parameters.parse(this, description); } + public void handle_content_session_info(XmppStream stream, Jingle.Session session, StanzaNode info, Iq.Stanza iq) throws Jingle.IqError { + switch (info.name) { + case "received": + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + break; + case "checksum": + // TODO(hrxi): handle hash + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + break; + default: + throw new Jingle.IqError.UNSUPPORTED_INFO(@"unsupported file transfer info $(info.name)"); + } + } public signal void file_incoming(XmppStream stream, FileTransfer file_transfer); diff --git a/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala index c17dc0b3..e0542207 100644 --- a/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0260_jingle_socks5_bytestreams.vala @@ -477,6 +477,8 @@ class Parameters : Jingle.TransportParameters, Object { throw new IOError.PROXY_FAILED("wanted port 00 00, got %02x %02x".printf(read_buffer[5+dstaddr.length], read_buffer[5+dstaddr.length+1])); } + conn.get_socket().set_timeout(0); + return conn; } public async void try_connecting_to_candidates(XmppStream stream, Jingle.Session session) throws Error {