diff --git a/libdino/CMakeLists.txt b/libdino/CMakeLists.txt index 6d37ef15..73386d0a 100644 --- a/libdino/CMakeLists.txt +++ b/libdino/CMakeLists.txt @@ -35,6 +35,7 @@ SOURCES src/service/database.vala src/service/entity_capabilities_storage.vala src/service/file_manager.vala + src/service/jingle_file_manager.vala src/service/message_processor.vala src/service/message_storage.vala src/service/module_manager.vala diff --git a/libdino/src/application.vala b/libdino/src/application.vala index da098fb4..396aa91f 100644 --- a/libdino/src/application.vala +++ b/libdino/src/application.vala @@ -37,6 +37,7 @@ public interface Dino.Application : GLib.Application { RosterManager.start(stream_interactor, db); ChatInteraction.start(stream_interactor); FileManager.start(stream_interactor, db); + JingleFileManager.start(stream_interactor); ContentItemStore.start(stream_interactor, db); NotificationEvents.start(stream_interactor); SearchProcessor.start(stream_interactor, db); diff --git a/libdino/src/entity/file_transfer.vala b/libdino/src/entity/file_transfer.vala index 6b1492d6..68234a48 100644 --- a/libdino/src/entity/file_transfer.vala +++ b/libdino/src/entity/file_transfer.vala @@ -53,6 +53,7 @@ public class FileTransfer : Object { } public string path { get; set; } public string? mime_type { get; set; } + // TODO(hrxi): expand to 64 bit public int size { get; set; } public State state { get; set; } diff --git a/libdino/src/service/file_manager.vala b/libdino/src/service/file_manager.vala index 6f8ccee4..7665936c 100644 --- a/libdino/src/service/file_manager.vala +++ b/libdino/src/service/file_manager.vala @@ -66,6 +66,7 @@ public class FileManager : StreamInteractionModule, Object { foreach (FileSender file_sender in file_senders) { if (file_sender.can_send(conversation, file_transfer)) { file_sender.send_file(conversation, file_transfer); + break; } } received_file(file_transfer, conversation); @@ -120,7 +121,9 @@ public class FileManager : StreamInteractionModule, Object { } public void add_sender(FileSender file_sender) { - file_senders.add(file_sender); + // Order file_senders in reverse order of adding them -- HTTP is added + // later than Jingle. + file_senders.insert(0, file_sender); file_sender.upload_available.connect((account) => { upload_available(account); }); diff --git a/libdino/src/service/jingle_file_manager.vala b/libdino/src/service/jingle_file_manager.vala new file mode 100644 index 00000000..595afae0 --- /dev/null +++ b/libdino/src/service/jingle_file_manager.vala @@ -0,0 +1,122 @@ +using Gdk; +using Gee; + +using Xmpp; +using Dino.Entities; + +namespace Dino { + +public class JingleFileManager : StreamInteractionModule, FileProvider, FileSender, Object { + public static ModuleIdentity IDENTITY = new ModuleIdentity("jingle_files"); + public string id { get { return IDENTITY.id; } } + + private StreamInteractor stream_interactor; + private HashMap file_transfers + = new HashMap(); + + public static void start(StreamInteractor stream_interactor) { + JingleFileManager m = new JingleFileManager(stream_interactor); + stream_interactor.add_module(m); + } + + private JingleFileManager(StreamInteractor stream_interactor) { + this.stream_interactor = stream_interactor; + + stream_interactor.get_module(FileManager.IDENTITY).add_sender(this); + stream_interactor.get_module(FileManager.IDENTITY).add_provider(this); + stream_interactor.stream_negotiated.connect(on_stream_negotiated); + } + + private void on_stream_negotiated(Account account, XmppStream stream) { + stream_interactor.module_manager.get_module(account, Xmpp.Xep.JingleFileTransfer.Module.IDENTITY).file_incoming.connect((stream, jingle_file_transfer) => { + Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(jingle_file_transfer.peer.bare_jid, account); + if (conversation == null) { + // TODO(hrxi): What to do? + return; + } + string id = random_uuid(); + + FileTransfer file_transfer = new FileTransfer(); + file_transfer.account = account; + file_transfer.counterpart = jingle_file_transfer.peer.bare_jid; + file_transfer.ourpart = account.bare_jid; + file_transfer.encryption = Encryption.NONE; + file_transfer.time = new DateTime.now_utc(); + file_transfer.local_time = new DateTime.now_utc(); + file_transfer.direction = FileTransfer.DIRECTION_RECEIVED; + file_transfer.file_name = jingle_file_transfer.file_name; + file_transfer.size = (int)jingle_file_transfer.size; + file_transfer.state = FileTransfer.State.NOT_STARTED; + file_transfer.provider = 1; + file_transfer.info = id; + file_transfers[id] = jingle_file_transfer; + + file_incoming(file_transfer, conversation); + }); + } + + async void get_meta_info(FileTransfer file_transfer) { + // In Jingle, all the metadata is provided up-front, so there's no more + // metadata to get. + } + async void download(FileTransfer file_transfer, File file_) { + // TODO(hrxi) What should happen if `stream == null`? + XmppStream? stream = stream_interactor.get_stream(file_transfer.account); + Xmpp.Xep.JingleFileTransfer.FileTransfer jingle_file_transfer = file_transfers[file_transfer.info]; + jingle_file_transfer.accept(stream); + file_transfer.input_stream = jingle_file_transfer.stream; + + // TODO(hrxi): BEGIN: Copied from plugins/http-files/src/file_provider.vala + foreach (IncomingFileProcessor processor in stream_interactor.get_module(FileManager.IDENTITY).incoming_processors) { + if (processor.can_process(file_transfer)) { + processor.process(file_transfer); + } + } + + // TODO(hrxi): should this be an &&? + File file = file_; + if (file_transfer.encryption == Encryption.PGP || file.get_path().has_suffix(".pgp")) { + file = File.new_for_path(file.get_path().substring(0, file.get_path().length - 4)); + } + // TODO(hrxi): END: Copied from plugins/http-files/src/file_provider.vala + + try { + OutputStream os = file.create(FileCreateFlags.REPLACE_DESTINATION); + yield os.splice_async(file_transfer.input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET); + file_transfer.path = file.get_basename(); + file_transfer.input_stream = yield file.read_async(); + + file_transfer.state = FileTransfer.State.COMPLETE; + } catch (Error e) { + file_transfer.state = FileTransfer.State.FAILED; + return; + } + } + + public bool is_upload_available(Conversation conversation) { + // TODO(hrxi) Here and in `send_file`: What should happen if `stream == null`? + XmppStream? stream = stream_interactor.get_stream(conversation.account); + foreach (Jid full_jid in stream.get_flag(Presence.Flag.IDENTITY).get_resources(conversation.counterpart)) { + if (stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) { + return true; + } + } + return false; + } + public bool can_send(Conversation conversation, FileTransfer file_transfer) { + return file_transfer.encryption != Encryption.OMEMO; + } + public void send_file(Conversation conversation, FileTransfer file_transfer) { + XmppStream? stream = stream_interactor.get_stream(file_transfer.account); + foreach (Jid full_jid in stream.get_flag(Presence.Flag.IDENTITY).get_resources(conversation.counterpart)) { + // TODO(hrxi): Prioritization of transports (and resources?). + if (!stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) { + continue; + } + stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream.begin(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size); + return; + } + } +} + +} diff --git a/libdino/src/service/module_manager.vala b/libdino/src/service/module_manager.vala index 41a2c6a0..16bf5a60 100644 --- a/libdino/src/service/module_manager.vala +++ b/libdino/src/service/module_manager.vala @@ -78,6 +78,10 @@ public class ModuleManager { module_map[account].add(new StreamError.Module()); module_map[account].add(new Xep.InBandRegistration.Module()); module_map[account].add(new Xep.HttpFileUpload.Module()); + module_map[account].add(new Xep.InBandBytestreams.Module()); + module_map[account].add(new Xep.Jingle.Module()); + module_map[account].add(new Xep.JingleInBandBytestreams.Module()); + module_map[account].add(new Xep.JingleFileTransfer.Module()); initialize_account_modules(account, module_map[account]); } } diff --git a/xmpp-vala/CMakeLists.txt b/xmpp-vala/CMakeLists.txt index faff9e79..528c84a6 100644 --- a/xmpp-vala/CMakeLists.txt +++ b/xmpp-vala/CMakeLists.txt @@ -49,8 +49,9 @@ SOURCES "src/module/xep/0045_muc/flag.vala" "src/module/xep/0045_muc/module.vala" "src/module/xep/0045_muc/status_code.vala" - "src/module/xep/0048_bookmarks/module.vala" + "src/module/xep/0047_in_band_bytestreams.vala" "src/module/xep/0048_bookmarks/conference.vala" + "src/module/xep/0048_bookmarks/module.vala" "src/module/xep/0049_private_xml_storage.vala" "src/module/xep/0054_vcard/module.vala" "src/module/xep/0060_pubsub.vala" @@ -60,11 +61,14 @@ SOURCES "src/module/xep/0084_user_avatars.vala" "src/module/xep/0085_chat_state_notifications.vala" "src/module/xep/0115_entitiy_capabilities.vala" + "src/module/xep/0166_jingle.vala" + "src/module/xep/0184_message_delivery_receipts.vala" "src/module/xep/0191_blocking_command.vala" "src/module/xep/0198_stream_management.vala" "src/module/xep/0199_ping.vala" - "src/module/xep/0184_message_delivery_receipts.vala" "src/module/xep/0203_delayed_delivery.vala" + "src/module/xep/0234_jingle_file_transfer.vala" + "src/module/xep/0261_jingle_in_band_bytestreams.vala" "src/module/xep/0280_message_carbons.vala" "src/module/xep/0313_message_archive_management.vala" "src/module/xep/0333_chat_markers.vala" diff --git a/xmpp-vala/src/module/message/stanza.vala b/xmpp-vala/src/module/message/stanza.vala index 640f2796..053c44dd 100644 --- a/xmpp-vala/src/module/message/stanza.vala +++ b/xmpp-vala/src/module/message/stanza.vala @@ -18,7 +18,7 @@ public class MessageStanza : Xmpp.Stanza { public string body { get { StanzaNode? body_node = stanza.get_subnode(NODE_BODY); - return body_node == null? null : body_node.get_string_content(); + return body_node == null ? null : body_node.get_string_content(); } set { StanzaNode? body_node = stanza.get_subnode(NODE_BODY); @@ -65,4 +65,4 @@ public abstract class MessageFlag : Object { public abstract string get_id(); } -} \ No newline at end of file +} diff --git a/xmpp-vala/src/module/stanza_error.vala b/xmpp-vala/src/module/stanza_error.vala index 651e8d2b..c108b02a 100644 --- a/xmpp-vala/src/module/stanza_error.vala +++ b/xmpp-vala/src/module/stanza_error.vala @@ -82,8 +82,8 @@ namespace Xmpp { public ErrorStanza.bad_request(string? human_readable = null) { this.build(TYPE_MODIFY, CONDITION_BAD_REQUEST, human_readable, null); } - public ErrorStanza.feature_not_implemented(StanzaNode? application_condition = null) { - this.build(TYPE_MODIFY, CONDITION_FEATURE_NOT_IMPLEMENTED, null, application_condition); + public ErrorStanza.feature_not_implemented(string? human_readable = null) { + this.build(TYPE_MODIFY, CONDITION_FEATURE_NOT_IMPLEMENTED, human_readable, null); } public ErrorStanza.item_not_found(StanzaNode? application_condition = null) { this.build(TYPE_CANCEL, CONDITION_ITEM_NOT_FOUND, null, application_condition); @@ -91,6 +91,9 @@ namespace Xmpp { public ErrorStanza.not_acceptable(string? human_readable = null) { this.build(TYPE_MODIFY, CONDITION_NOT_ACCEPTABLE, human_readable, null); } + public ErrorStanza.not_allowed(string? human_readable = null) { + this.build(TYPE_CANCEL, CONDITION_NOT_ALLOWED, human_readable, null); + } public ErrorStanza.service_unavailable() { this.build(TYPE_CANCEL, CONDITION_SERVICE_UNAVAILABLE, null, null); } diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala new file mode 100644 index 00000000..89247780 --- /dev/null +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -0,0 +1,468 @@ +using Gee; +using Xmpp; +using Xmpp.Xep; + +namespace Xmpp.Xep.InBandBytestreams { + +private const string NS_URI = "http://jabber.org/protocol/ibb"; +private const int SEQ_MODULUS = 65536; + +public class Module : XmppStreamModule, Iq.Handler { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0047_in_band_bytestreams"); + + public override void attach(XmppStream stream) { + stream.add_flag(new Flag()); + stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this); + } + public override void detach(XmppStream stream) { } + + public void on_iq_set(XmppStream stream, Iq.Stanza iq) { + // the iq module ensures that there's only one child node + StanzaNode? node = null; + node = (node != null) ? node : iq.stanza.get_subnode("open", NS_URI); + node = (node != null) ? node : iq.stanza.get_subnode("data", NS_URI); + node = (node != null) ? node : iq.stanza.get_subnode("close", NS_URI); + if (node == null) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("unknown IBB action"))); + return; + } + string? sid = node.get_attribute("sid"); + if (sid == null) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing sid"))); + return; + } + Connection? conn = stream.get_flag(Flag.IDENTITY).get_connection(sid); + if (node.name == "open") { + if (conn == null) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_acceptable("unexpected IBB connection"))); + return; + } + if (conn.state != WAITING_FOR_CONNECT) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("IBB open for already open connection"))); + return; + } + conn.handle_open(stream, node, iq); + } else { + if (conn == null || conn.state != Connection.State.CONNECTED) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found())); + return; + } + if (node.name == "close") { + conn.handle_close(stream, node, iq); + } else { + conn.handle_data(stream, node, iq); + } + } + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +public class Connection : IOStream { + // TODO(hrxi): Fix reference cycle + public class Input : InputStream { + private Connection connection; + public Input(Connection connection) { + this.connection = connection; + } + public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async reads on in-band bytestreams"); + } + public override async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.read_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_read(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_read_async(io_priority, cancellable); + } + } + public class Output : OutputStream { + private Connection connection; + public Output(Connection connection) { + this.connection = connection; + } + public override ssize_t write(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async writes on in-band bytestreams"); + } + public override async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.write_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_write(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_write_async(io_priority, cancellable); + } + } + + private Input input; + private Output output; + public override InputStream input_stream { get { return input; } } + public override OutputStream output_stream { get { return output; } } + + public enum State { + WAITING_FOR_CONNECT, + CONNECTING, + CONNECTED, + DISCONNECTING, + DISCONNECTED, + ERROR, + } + public State state { get; private set; } + Jid receiver_full_jid; + public string sid { get; private set; } + int block_size; + int local_seq = 0; + int remote_ack = 0; + internal int remote_seq = 0; + + bool input_closed = false; + bool output_closed = false; + + // ERROR + string? error = null; + + XmppStream stream; + + int read_callback_priority; + Cancellable? read_callback_cancellable = null; + ulong read_callback_cancellable_id; + SourceFunc? read_callback = null; + int write_callback_priority; + SourceFunc? write_callback = null; + ulong write_callback_cancellable_id; + Cancellable? write_callback_cancellable = null; + // Need `Bytes` instead of `uint8[]` because the latter doesn't work in + // parameter position of `LinkedList`. + LinkedList received = new LinkedList(); + + private Connection(XmppStream stream, Jid receiver_full_jid, string sid, int block_size, bool initiate) { + this.stream = stream; + this.receiver_full_jid = receiver_full_jid; + this.sid = sid; + this.block_size = block_size; + this.state = initiate ? State.CONNECTING : State.WAITING_FOR_CONNECT; + + input = new Input(this); + output = new Output(this); + } + + public void set_read_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { + if (read_callback != null) { + throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream"); + } + if (cancellable != null) { + read_callback_cancellable_id = cancellable.connect(trigger_read_callback); + } + read_callback = callback; + read_callback_cancellable = cancellable; + read_callback_priority = io_priority; + } + public void set_write_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { + if (write_callback != null) { + throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream"); + } + if (cancellable != null) { + write_callback_cancellable_id = cancellable.connect(trigger_write_callback); + } + write_callback = callback; + write_callback_cancellable = cancellable; + write_callback_priority = io_priority; + } + public void trigger_read_callback() { + if (read_callback != null) { + Idle.add((owned) read_callback, read_callback_priority); + read_callback = null; + if (read_callback_cancellable != null) { + read_callback_cancellable.disconnect(read_callback_cancellable_id); + } + read_callback_cancellable = null; + } + } + public void trigger_write_callback() { + if (write_callback != null) { + Idle.add((owned) write_callback, write_callback_priority); + write_callback = null; + if (write_callback_cancellable != null) { + write_callback_cancellable.disconnect(write_callback_cancellable_id); + } + write_callback_cancellable = null; + } + } + + public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + while (true) { + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } + if (input_closed) { + return 0; + } + Bytes? chunk = received.poll(); + if (chunk != null) { + int read = int.min(buffer.length, chunk.length); + for (int i = 0; i < read; i++) { + buffer[i] = chunk[i]; + } + if (buffer.length < chunk.length) { + received.offer_head(chunk[buffer.length:chunk.length]); + } + return read; + } + if (state == DISCONNECTED) { + return 0; + } + set_read_callback(read_async.callback, cancellable, io_priority); + yield; + } + } + + public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + while (state == WAITING_FOR_CONNECT || state == CONNECTING) { + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } + set_write_callback(write_async.callback, cancellable, io_priority); + yield; + } + throw_if_closed(); + assert(state == CONNECTED); + // TODO(hrxi): merging? + int seq = local_seq; + local_seq = (local_seq + 1) % SEQ_MODULUS; + if (buffer.length > block_size) { + buffer = buffer[0:block_size]; + } + StanzaNode data = new StanzaNode.build("data", NS_URI) + .add_self_xmlns() + .put_attribute("sid", sid) + .put_attribute("seq", seq.to_string()) + .put_node(new StanzaNode.text(Base64.encode(buffer))); + Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid }; + set_write_callback(write_async.callback, cancellable, io_priority); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + if (iq.is_error()) { + set_error("sending failed"); + } else if (remote_ack != seq) { + set_error("out of order acks"); + } else { + remote_ack = (remote_ack + 1) % SEQ_MODULUS; + if (local_seq == remote_ack) { + trigger_write_callback(); + } + } + }); + yield; + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } + throw_if_error(); + return buffer.length; + } + + public bool close_read(Cancellable? cancellable = null) { + input_closed = true; + if (!output_closed) { + return true; + } + return close_impl(cancellable); + } + public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + input_closed = true; + if (!output_closed) { + return true; + } + return yield close_async_impl(io_priority, cancellable); + } + public bool close_write(Cancellable? cancellable = null) { + output_closed = true; + if (!input_closed) { + return true; + } + return close_impl(cancellable); + } + public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + output_closed = true; + if (!input_closed) { + return true; + } + return yield close_async_impl(io_priority, cancellable); + } + delegate void OnClose(bool success); + private bool close_impl(Cancellable? cancellable = null, OnClose? on_close = null) { + if (state == DISCONNECTING || state == DISCONNECTED || state == ERROR) { + on_close(true); + return true; + } + if (state == WAITING_FOR_CONNECT) { + state = DISCONNECTED; + stream.get_flag(Flag.IDENTITY).remove_connection(this); + trigger_read_callback(); + on_close(true); + return true; + } + state = DISCONNECTING; + StanzaNode close = new StanzaNode.build("close", NS_URI) + .add_self_xmlns() + .put_attribute("sid", sid); + Iq.Stanza iq = new Iq.Stanza.set(close) { to=receiver_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + assert(state == DISCONNECTING); + if (iq.is_error()) { + set_error("disconnecting failed"); + } else { + state = DISCONNECTED; + } + stream.get_flag(Flag.IDENTITY).remove_connection(this); + trigger_read_callback(); + on_close(!iq.is_error()); + }); + return true; + } + private async bool close_async_impl(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + SourceFunc callback = close_async_impl.callback; + close_impl(cancellable, () => { Idle.add((owned) callback); }); + yield; + throw_if_error(); + return true; + } + + public static Connection create(XmppStream stream, Jid receiver_full_jid, string sid, int block_size, bool initiate) { + Connection conn = new Connection(stream, receiver_full_jid, sid, block_size, initiate); + if (initiate) { + StanzaNode open = new StanzaNode.build("open", NS_URI) + .add_self_xmlns() + .put_attribute("block-size", block_size.to_string()) + .put_attribute("sid", sid); + + Iq.Stanza iq = new Iq.Stanza.set(open) { to=receiver_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + if (conn.state != CONNECTING) { + assert(conn.state != CONNECTED); + return; + } + if (!iq.is_error()) { + conn.state = CONNECTED; + stream.get_flag(Flag.IDENTITY).add_connection(conn); + conn.trigger_write_callback(); + } else { + conn.set_error("connection failed"); + } + }); + } else { + stream.get_flag(Flag.IDENTITY).add_connection(conn); + } + return conn; + } + + void throw_if_error() throws IOError { + if (state == ERROR) { + throw new IOError.FAILED(error); + } + } + + void throw_if_closed() throws IOError { + throw_if_error(); + if (state == DISCONNECTING || state == DISCONNECTED) { + throw new IOError.CLOSED("can't read/write on a closed connection"); + } + } + + void set_error(string error) { + if (state != WAITING_FOR_CONNECT && state != DISCONNECTING && state != DISCONNECTED && state != ERROR) { + close_async.begin(); + } + state = ERROR; + this.error = error; + stream.get_flag(Flag.IDENTITY).remove_connection(this); + } + + public void handle_open(XmppStream stream, StanzaNode open, Iq.Stanza iq) { + assert(state == WAITING_FOR_CONNECT); + int block_size = open.get_attribute_int("block-size"); + string? stanza = open.get_attribute("stanza"); + if (block_size < 0 || (stanza != null && stanza != "iq" && stanza != "message")) { + set_error("invalid open"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing block_size or invalid stanza"))); + return; + } + if (stanza != null && stanza != "iq") { + set_error("invalid open"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.feature_not_implemented("cannot use message stanzas for IBB"))); + return; + } + if (block_size > this.block_size) { + set_error("invalid open"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_RESOURCE_CONSTRAINT, "opening a connection with a greater than negotiated/acceptable block size", null))); + return; + } + this.block_size = block_size; + state = CONNECTED; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + trigger_write_callback(); + } + public void handle_data(XmppStream stream, StanzaNode data, Iq.Stanza iq) { + assert(state == CONNECTED); + if (input_closed) { + set_error("unexpected data"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_allowed("unexpected data"))); + return; + } + int seq = data.get_attribute_int("seq"); + // TODO(hrxi): return an error on malformed base64 (need to do this + // according to the xep) + uint8[] content = Base64.decode(data.get_string_content()); + if (content.length > block_size) { + set_error("data longer than negotiated block size"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("data longer than negotiated block size"))); + return; + } + if (seq < 0 || seq != remote_seq) { + set_error("out of order data packets"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, "out of order data packets", null))); + return; + } + remote_seq = (remote_seq + 1) % SEQ_MODULUS; + + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + if (content.length != 0) { + received.offer(new Bytes.take(content)); + trigger_read_callback(); + } + } + public void handle_close(XmppStream stream, StanzaNode close, Iq.Stanza iq) { + assert(state == CONNECTED); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + stream.get_flag(Flag.IDENTITY).remove_connection(this); + input_closed = true; + output_closed = true; + state = DISCONNECTED; + + trigger_read_callback(); + } +} + + +public class Flag : XmppStreamFlag { + public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "in_band_bytestreams"); + + private HashMap active = new HashMap(); + + public void add_connection(Connection conn) { + active[conn.sid] = conn; + } + public Connection? get_connection(string sid) { + return active.has_key(sid) ? active[sid] : null; + } + public void remove_connection(Connection conn) { + active.unset(conn.sid); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +} diff --git a/xmpp-vala/src/module/xep/0048_bookmarks/conference.vala b/xmpp-vala/src/module/xep/0048_bookmarks/conference.vala index c00d8f86..7f80490b 100644 --- a/xmpp-vala/src/module/xep/0048_bookmarks/conference.vala +++ b/xmpp-vala/src/module/xep/0048_bookmarks/conference.vala @@ -36,7 +36,7 @@ public class Conference : Object { public string? nick { get { StanzaNode? nick_node = stanza_node.get_subnode(NODE_NICK); - return nick_node == null? null : nick_node.get_string_content(); + return nick_node == null ? null : nick_node.get_string_content(); } set { StanzaNode? nick_node = stanza_node.get_subnode(NODE_NICK); @@ -56,7 +56,7 @@ public class Conference : Object { public string? password { get { StanzaNode? password_node = stanza_node.get_subnode(NODE_PASSWORD); - return password_node == null? null : password_node.get_string_content(); + return password_node == null ? null : password_node.get_string_content(); } set { StanzaNode? password_node = stanza_node.get_subnode(NODE_PASSWORD); diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala new file mode 100644 index 00000000..7413ff4f --- /dev/null +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -0,0 +1,494 @@ +using Gee; +using Xmpp.Xep; +using Xmpp; + +namespace Xmpp.Xep.Jingle { + +private const string NS_URI = "urn:xmpp:jingle:1"; +private const string ERROR_NS_URI = "urn:xmpp:jingle:errors:1"; + +public errordomain IqError { + BAD_REQUEST, + NOT_ACCEPTABLE, + NOT_IMPLEMENTED, + OUT_OF_ORDER, +} + +void send_iq_error(IqError iq_error, XmppStream stream, Iq.Stanza iq) { + ErrorStanza error; + if (iq_error is IqError.BAD_REQUEST) { + error = new ErrorStanza.bad_request(iq_error.message); + } else if (iq_error is IqError.NOT_ACCEPTABLE) { + error = new ErrorStanza.not_acceptable(iq_error.message); + } else if (iq_error is IqError.NOT_IMPLEMENTED) { + error = new ErrorStanza.feature_not_implemented(iq_error.message); + } else if (iq_error is IqError.OUT_OF_ORDER) { + StanzaNode out_of_order = new StanzaNode.build("out-of-order", ERROR_NS_URI).add_self_xmlns(); + error = new ErrorStanza.build(ErrorStanza.TYPE_MODIFY, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, iq_error.message, out_of_order); + } else { + assert_not_reached(); + } + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, error)); +} + +public errordomain Error { + GENERAL, + BAD_REQUEST, + INVALID_PARAMETERS, + UNSUPPORTED_TRANSPORT, + NO_SHARED_PROTOCOLS, + TRANSPORT_ERROR, +} + +StanzaNode get_single_node_anyns(StanzaNode parent, string node_name) throws IqError { + StanzaNode? result = null; + foreach (StanzaNode child in parent.get_all_subnodes()) { + if (child.name == node_name) { + if (result != null) { + throw new IqError.BAD_REQUEST(@"multiple $(node_name) nodes"); + } + result = child; + } + } + if (result == null) { + throw new IqError.BAD_REQUEST(@"missing $(node_name) node"); + } + return result; +} + +public class Module : XmppStreamModule, Iq.Handler { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0166_jingle"); + + private HashMap content_types = new HashMap(); + private HashMap transports = new HashMap(); + + public override void attach(XmppStream stream) { + stream.add_flag(new Flag()); + stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); + stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this); + } + public override void detach(XmppStream stream) { } + + public void register_content_type(ContentType content_type) { + content_types[content_type.content_type_ns_uri()] = content_type; + } + public ContentType? get_content_type(string ns_uri) { + if (!content_types.has_key(ns_uri)) { + return null; + } + return content_types[ns_uri]; + } + public void register_transport(Transport transport) { + transports[transport.transport_ns_uri()] = transport; + } + public Transport? get_transport(string ns_uri) { + if (!transports.has_key(ns_uri)) { + return null; + } + return transports[ns_uri]; + } + public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid) { + foreach (Transport transport in transports.values) { + if (transport.transport_type() != type) { + continue; + } + // TODO(hrxi): prioritization + if (transport.is_transport_available(stream, receiver_full_jid)) { + return transport; + } + } + return null; + } + + private bool is_jingle_available(XmppStream stream, Jid full_jid) { + bool? has_jingle = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI); + return has_jingle != null && has_jingle; + } + + public bool is_available(XmppStream stream, TransportType type, Jid full_jid) { + return is_jingle_available(stream, full_jid) && select_transport(stream, type, full_jid) != null; + } + + public Session create_session(XmppStream stream, TransportType type, Jid receiver_full_jid, Senders senders, string content_name, StanzaNode description) throws Error { + if (!is_jingle_available(stream, receiver_full_jid)) { + throw new Error.NO_SHARED_PROTOCOLS("No Jingle support"); + } + Transport? transport = select_transport(stream, type, receiver_full_jid); + if (transport == null) { + throw new Error.NO_SHARED_PROTOCOLS("No suitable transports"); + } + Jid? my_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid; + if (my_jid == null) { + throw new Error.GENERAL("Couldn't determine own JID"); + } + TransportParameters transport_params = transport.create_transport_parameters(); + Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name); + StanzaNode content = new StanzaNode.build("content", NS_URI) + .put_attribute("creator", "initiator") + .put_attribute("name", content_name) + .put_attribute("senders", senders.to_string()) + .put_node(description) + .put_node(transport_params.to_transport_stanza_node()); + StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "session-initiate") + .put_attribute("initiator", my_jid.to_string()) + .put_attribute("sid", session.sid) + .put_node(content); + Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=receiver_full_jid }; + + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + // TODO(hrxi): handle errors + stream.get_flag(Flag.IDENTITY).add_session(session); + }); + + return session; + } + + public void handle_session_initiate(XmppStream stream, string sid, StanzaNode jingle, Iq.Stanza iq) throws IqError { + Gee.List contents = jingle.get_subnodes("content"); + if (contents.size == 0) { + throw new IqError.BAD_REQUEST("missing content node"); + } + if (contents.size > 1) { + throw new IqError.NOT_IMPLEMENTED("can't process multiple content nodes"); + } + StanzaNode content = contents[0]; + string? name = content.get_attribute("name"); + StanzaNode description = get_single_node_anyns(content, "description"); + StanzaNode transport_node = get_single_node_anyns(content, "transport"); + if (name == null) { + throw new IqError.BAD_REQUEST("missing name"); + } + + Transport? transport = get_transport(transport_node.ns_uri); + TransportParameters? transport_params = null; + if (transport != null) { + transport_params = transport.parse_transport_parameters(transport_node); + } else { + // terminate the session below + } + + ContentType? content_type = get_content_type(description.ns_uri); + if (content_type == null) { + // TODO(hrxi): how do we signal an unknown content type? + throw new IqError.NOT_IMPLEMENTED("unknown content type"); + } + ContentParameters content_params = content_type.parse_content_parameters(description); + + TransportType type = content_type.content_type_transport_type(); + Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name); + stream.get_flag(Flag.IDENTITY).add_session(session); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + + if (transport == null || transport.transport_type() != type) { + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("unsupported-transports", NS_URI)); + session.terminate(stream, reason); + return; + } + + content_params.on_session_initiate(stream, session); + } + + public void on_iq_set(XmppStream stream, Iq.Stanza iq) { + try { + handle_iq_set(stream, iq); + } catch (IqError e) { + send_iq_error(e, stream, iq); + } + } + + public void handle_iq_set(XmppStream stream, Iq.Stanza iq) throws IqError { + StanzaNode? jingle = iq.stanza.get_subnode("jingle", NS_URI); + string? sid = jingle != null ? jingle.get_attribute("sid") : null; + string? action = jingle != null ? jingle.get_attribute("action") : null; + if (jingle == null || sid == null || action == null) { + throw new IqError.BAD_REQUEST("missing jingle node, sid or action"); + } + Session? session = stream.get_flag(Flag.IDENTITY).get_session(sid); + if (action == "session-initiate") { + if (session != null) { + // TODO(hrxi): Info leak if other clients use predictable session IDs? + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_MODIFY, ErrorStanza.CONDITION_CONFLICT, "session ID already in use", null))); + return; + } + handle_session_initiate(stream, sid, jingle, iq); + return; + } + if (session == null) { + StanzaNode unknown_session = new StanzaNode.build("unknown-session", ERROR_NS_URI).add_self_xmlns(); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found(unknown_session))); + return; + } + session.handle_iq_set(stream, action, jingle, iq); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +public enum TransportType { + DATAGRAM, + STREAMING, +} + +public enum Senders { + BOTH, + INITIATOR, + NONE, + RESPONDER; + + public string to_string() { + switch (this) { + case BOTH: return "both"; + case INITIATOR: return "initiator"; + case NONE: return "none"; + case RESPONDER: return "responder"; + } + assert_not_reached(); + } +} + +public interface Transport : Object { + public abstract string transport_ns_uri(); + public abstract bool is_transport_available(XmppStream stream, Jid full_jid); + public abstract TransportType transport_type(); + public abstract TransportParameters create_transport_parameters(); + public abstract TransportParameters parse_transport_parameters(StanzaNode transport) throws IqError; +} + +public interface TransportParameters : Object { + public abstract string transport_ns_uri(); + public abstract StanzaNode to_transport_stanza_node(); + public abstract void update_transport(StanzaNode transport) throws IqError; + public abstract IOStream create_transport_connection(XmppStream stream, Jid peer_full_jid, Role role); +} + +public enum Role { + INITIATOR, + RESPONDER; + + public string to_string() { + switch (this) { + case INITIATOR: return "initiator"; + case RESPONDER: return "responder"; + } + assert_not_reached(); + } +} + +public interface ContentType : Object { + public abstract string content_type_ns_uri(); + public abstract TransportType content_type_transport_type(); + public abstract ContentParameters parse_content_parameters(StanzaNode description) throws IqError; +} + +public interface ContentParameters : Object { + public abstract void on_session_initiate(XmppStream stream, Session session); +} + + +public class Session { + // INITIATE_SENT -> ACTIVE -> ENDED + // INITIATE_RECEIVED -> ACTIVE -> ENDED + public enum State { + INITIATE_SENT, + INITIATE_RECEIVED, + ACTIVE, + ENDED, + } + + public State state { get; private set; } + + public string sid { get; private set; } + public Type type_ { get; private set; } + public Jid peer_full_jid { get; private set; } + public string content_name { get; private set; } + + // INITIATE_SENT | INITIATE_RECEIVED + TransportParameters? transport = null; + + // ACTIVE + public IOStream? conn { get; private set; } + + // Only interesting in INITIATE_SENT. + // Signals that the session has been accepted by the peer. + public signal void accepted(XmppStream stream); + + public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name) { + this.state = INITIATE_SENT; + this.sid = sid; + this.type_ = type; + this.peer_full_jid = peer_full_jid; + this.content_name = content_name; + this.transport = transport; + this.conn = null; + } + + public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) { + this.state = INITIATE_RECEIVED; + this.sid = sid; + this.type_ = type; + this.peer_full_jid = peer_full_jid; + this.content_name = content_name; + this.transport = transport; + this.conn = null; + } + + public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { + switch (action) { + case "session-accept": + if (state != INITIATE_SENT) { + throw new IqError.OUT_OF_ORDER("got session-accept while not waiting for one"); + } + handle_session_accept(stream, jingle, iq); + break; + case "session-terminate": + handle_session_terminate(stream, jingle, iq); + break; + case "content-accept": + case "content-add": + case "content-modify": + case "content-reject": + case "content-remove": + case "security-info": + case "transport-accept": + case "transport-info": + case "transport-reject": + case "transport-replace": + throw new IqError.NOT_IMPLEMENTED(@"$(action) is not implemented"); + default: + throw new IqError.BAD_REQUEST("invalid action"); + } + } + void handle_session_accept(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + string? responder_str = jingle.get_attribute("responder"); + Jid responder; + if (responder_str != null) { + responder = Jid.parse(responder_str) ?? iq.from; + } else { + responder = iq.from; // TODO(hrxi): and above, can we assume iq.from != null + // TODO(hrxi): more sanity checking, perhaps replace who we're talking to + } + if (!responder.is_full()) { + throw new IqError.BAD_REQUEST("invalid responder JID"); + } + Gee.List contents = jingle.get_subnodes("content"); + if (contents.size == 0) { + // TODO(hrxi): here and below, should we terminate the session? + throw new IqError.BAD_REQUEST("missing content node"); + } + if (contents.size > 1) { + throw new IqError.NOT_IMPLEMENTED("can't process multiple content nodes"); + } + StanzaNode content = contents[0]; + StanzaNode description = get_single_node_anyns(content, "description"); + StanzaNode transport_node = get_single_node_anyns(content, "transport"); + if (transport_node.ns_uri != transport.transport_ns_uri()) { + throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method"); + } + transport.update_transport(transport_node); + conn = transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR); + transport = null; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + state = ACTIVE; + accepted(stream); + } + void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + // TODO(hrxi): also handle presence type=unavailable + } + + public void accept(XmppStream stream, StanzaNode description) { + if (state != INITIATE_RECEIVED) { + return; // TODO(hrxi): what to do? + } + StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "session-accept") + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("content", NS_URI) + .put_attribute("creator", "initiator") + .put_attribute("name", content_name) + .put_node(description) + .put_node(transport.to_transport_stanza_node()) + ); + Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); + + conn = transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER); + transport = null; + + state = ACTIVE; + } + + public void reject(XmppStream stream) { + if (state != INITIATE_RECEIVED) { + return; // TODO(hrxi): what to do? + } + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("decline", NS_URI)); + terminate(stream, reason); + } + + public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) { + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("failed-application", NS_URI)); + if (application_reason != null) { + reason.put_node(application_reason); + } + terminate(stream, reason); + } + + public void close_connection(XmppStream stream) { + if (state != ACTIVE) { + return; // TODO(hrxi): what to do? + } + conn.close(); + } + + public void terminate(XmppStream stream, StanzaNode reason) { + if (state != INITIATE_SENT && state != INITIATE_RECEIVED && state != ACTIVE) { + // TODO(hrxi): what to do? + return; + } + if (state == ACTIVE) { + conn.close(); + } + + StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "session-terminate") + .put_attribute("sid", sid) + .put_node(reason); + Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); + + state = ENDED; + // Immediately remove the session from the open sessions as per the + // XEP, don't wait for confirmation. + stream.get_flag(Flag.IDENTITY).remove_session(sid); + } +} + +public class Flag : XmppStreamFlag { + public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "jingle"); + + private HashMap sessions = new HashMap(); + + public void add_session(Session session) { + sessions[session.sid] = session; + } + public Session? get_session(string sid) { + return sessions.has_key(sid) ? sessions[sid] : null; + } + public void remove_session(string sid) { + sessions.unset(sid); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +} diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala new file mode 100644 index 00000000..57222bae --- /dev/null +++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala @@ -0,0 +1,130 @@ +using Gee; +using Xmpp; +using Xmpp.Xep; + +namespace Xmpp.Xep.JingleFileTransfer { + +private const string NS_URI = "urn:xmpp:jingle:apps:file-transfer:5"; + +public class Module : Jingle.ContentType, XmppStreamModule { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0234_jingle_file_transfer"); + + public override void attach(XmppStream stream) { + stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); + stream.get_module(Jingle.Module.IDENTITY).register_content_type(this); + } + public override void detach(XmppStream stream) { } + + public string content_type_ns_uri() { + return NS_URI; + } + public Jingle.TransportType content_type_transport_type() { + return Jingle.TransportType.STREAMING; + } + public Jingle.ContentParameters parse_content_parameters(StanzaNode description) throws Jingle.IqError { + return Parameters.parse(this, description); + } + + public signal void file_incoming(XmppStream stream, FileTransfer file_transfer); + + public bool is_available(XmppStream stream, Jid full_jid) { + bool? has_feature = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI); + if (has_feature == null || !(!)has_feature) { + return false; + } + return stream.get_module(Jingle.Module.IDENTITY).is_available(stream, Jingle.TransportType.STREAMING, full_jid); + } + + public async void offer_file_stream(XmppStream stream, Jid receiver_full_jid, InputStream input_stream, string basename, int64 size) throws IOError { + StanzaNode description = new StanzaNode.build("description", NS_URI) + .add_self_xmlns() + .put_node(new StanzaNode.build("file", NS_URI) + .put_node(new StanzaNode.build("name", NS_URI).put_node(new StanzaNode.text(basename))) + .put_node(new StanzaNode.build("size", NS_URI).put_node(new StanzaNode.text(size.to_string())))); + // TODO(hrxi): Add the mandatory hash field + + Jingle.Session session = stream.get_module(Jingle.Module.IDENTITY) + .create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"? + + SourceFunc callback = offer_file_stream.callback; + session.accepted.connect((stream) => { + session.conn.input_stream.close(); + Idle.add((owned) callback); + }); + yield; + + // TODO(hrxi): catch errors + yield session.conn.output_stream.splice_async(input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +public class Parameters : Jingle.ContentParameters, Object { + Module parent; + string? media_type; + public string? name { get; private set; } + public int64 size { get; private set; } + public StanzaNode original_description { get; private set; } + public Parameters(Module parent, StanzaNode original_description, string? media_type, string? name, int64? size) { + this.parent = parent; + this.original_description = original_description; + this.media_type = media_type; + this.name = name; + this.size = size; + } + public static Parameters parse(Module parent, StanzaNode description) throws Jingle.IqError { + Gee.List files = description.get_subnodes("file", NS_URI); + if (files.size != 1) { + throw new Jingle.IqError.BAD_REQUEST("there needs to be exactly one file node"); + } + StanzaNode file = files[0]; + StanzaNode? media_type_node = file.get_subnode("media-type", NS_URI); + StanzaNode? name_node = file.get_subnode("name", NS_URI); + StanzaNode? size_node = file.get_subnode("size", NS_URI); + string? media_type = media_type_node != null ? media_type_node.get_string_content() : null; + string? name = name_node != null ? name_node.get_string_content() : null; + string? size_raw = size_node != null ? size_node.get_string_content() : null; + // TODO(hrxi): For some reason, the ?:-expression does not work due to a type error. + //int64? size = size_raw != null ? int64.parse(size_raw) : null; // TODO(hrxi): this has no error handling + int64 size = -1; + if (size_raw != null) { + size = int64.parse(size_raw); + if (size < 0) { + throw new Jingle.IqError.BAD_REQUEST("negative file size is invalid"); + } + } + + return new Parameters(parent, description, media_type, name, size); + } + void on_session_initiate(XmppStream stream, Jingle.Session session) { + parent.file_incoming(stream, new FileTransfer(session, this)); + } +} + +public class FileTransfer : Object { + Jingle.Session session; + Parameters parameters; + + public Jid peer { get { return session.peer_full_jid; } } + public string? file_name { get { return parameters.name; } } + public int64 size { get { return parameters.size; } } + + public InputStream? stream { get { return session.conn != null ? session.conn.input_stream : null; } } + + public FileTransfer(Jingle.Session session, Parameters parameters) { + this.session = session; + this.parameters = parameters; + } + + public void accept(XmppStream stream) { + session.accept(stream, parameters.original_description); + session.conn.output_stream.close(); + } + public void reject(XmppStream stream) { + session.reject(stream); + } +} + +} diff --git a/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala new file mode 100644 index 00000000..dc2e8d7c --- /dev/null +++ b/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala @@ -0,0 +1,77 @@ +using Xmpp; +using Xmpp.Xep; + +namespace Xmpp.Xep.JingleInBandBytestreams { + +private const string NS_URI = "urn:xmpp:jingle:transports:ibb:1"; +private const int DEFAULT_BLOCKSIZE = 4096; +private const int MAX_BLOCKSIZE = 65535; + +public class Module : Jingle.Transport, XmppStreamModule { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0261_jingle_in_band_bytestreams"); + + public override void attach(XmppStream stream) { + stream.get_module(Jingle.Module.IDENTITY).register_transport(this); + stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); + } + public override void detach(XmppStream stream) { } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } + + public bool is_transport_available(XmppStream stream, Jid full_jid) { + bool? result = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI); + return result != null && result; + } + + public string transport_ns_uri() { + return NS_URI; + } + public Jingle.TransportType transport_type() { + return Jingle.TransportType.STREAMING; + } + public Jingle.TransportParameters create_transport_parameters() { + return new Parameters(random_uuid(), DEFAULT_BLOCKSIZE); + } + public Jingle.TransportParameters parse_transport_parameters(StanzaNode transport) throws Jingle.IqError { + return Parameters.parse(transport); + } +} + +class Parameters : Jingle.TransportParameters, Object { + public string sid { get; private set; } + public int block_size { get; private set; } + public Parameters(string sid, int block_size) { + this.sid = sid; + this.block_size = block_size; + } + public static Parameters parse(StanzaNode transport) throws Jingle.IqError { + string? sid = transport.get_attribute("sid"); + int block_size = transport.get_attribute_int("block-size"); + if (sid == null || block_size <= 0 || block_size > MAX_BLOCKSIZE) { + throw new Jingle.IqError.BAD_REQUEST("missing or invalid sid or blocksize"); + } + return new Parameters(sid, block_size); + } + public string transport_ns_uri() { + return NS_URI; + } + public StanzaNode to_transport_stanza_node() { + return new StanzaNode.build("transport", NS_URI) + .add_self_xmlns() + .put_attribute("block-size", block_size.to_string()) + .put_attribute("sid", sid); + } + public void update_transport(StanzaNode transport) throws Jingle.IqError { + Parameters other = Parameters.parse(transport); + if (other.sid != sid || other.block_size > block_size) { + throw new Jingle.IqError.NOT_ACCEPTABLE("invalid IBB sid or block_size"); + } + block_size = other.block_size; + } + public IOStream create_transport_connection(XmppStream stream, Jid peer_full_jid, Jingle.Role role) { + return InBandBytestreams.Connection.create(stream, peer_full_jid, sid, block_size, role == Jingle.Role.INITIATOR); + } +} + +}