Add jingle connection for better interfacing with jingle (terminate etc.)

This commit is contained in:
hrxi 2019-08-05 17:05:33 +02:00
parent 08a5088c16
commit 95596e25a5
3 changed files with 176 additions and 41 deletions

View file

@ -60,9 +60,8 @@ public class Module : XmppStreamModule, Iq.Handler {
} }
public class Connection : IOStream { public class Connection : IOStream {
// TODO(hrxi): Fix reference cycle
public class Input : InputStream { public class Input : InputStream {
private Connection connection; private weak Connection connection;
public Input(Connection connection) { public Input(Connection connection) {
this.connection = connection; this.connection = connection;
} }
@ -73,14 +72,14 @@ public class Connection : IOStream {
return yield connection.read_async(buffer, io_priority, cancellable); return yield connection.read_async(buffer, io_priority, cancellable);
} }
public override bool close(Cancellable? cancellable = null) throws IOError { public override bool close(Cancellable? cancellable = null) throws IOError {
return connection.close_read(cancellable); throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams");
} }
public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.close_read_async(io_priority, cancellable); return yield connection.close_read_async(io_priority, cancellable);
} }
} }
public class Output : OutputStream { public class Output : OutputStream {
private Connection connection; private weak Connection connection;
public Output(Connection connection) { public Output(Connection connection) {
this.connection = connection; this.connection = connection;
} }
@ -91,7 +90,7 @@ public class Connection : IOStream {
return yield connection.write_async(buffer, io_priority, cancellable); return yield connection.write_async(buffer, io_priority, cancellable);
} }
public override bool close(Cancellable? cancellable = null) throws IOError { public override bool close(Cancellable? cancellable = null) throws IOError {
return connection.close_write(cancellable); throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams");
} }
public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.close_write_async(io_priority, cancellable); return yield connection.close_write_async(io_priority, cancellable);
@ -263,13 +262,6 @@ public class Connection : IOStream {
return buffer.length; return buffer.length;
} }
public bool close_read(Cancellable? cancellable = null) {
input_closed = true;
if (!output_closed) {
return true;
}
return close_impl(cancellable);
}
public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
input_closed = true; input_closed = true;
if (!output_closed) { if (!output_closed) {
@ -277,13 +269,6 @@ public class Connection : IOStream {
} }
return yield close_async_impl(io_priority, cancellable); return yield close_async_impl(io_priority, cancellable);
} }
public bool close_write(Cancellable? cancellable = null) {
output_closed = true;
if (!input_closed) {
return true;
}
return close_impl(cancellable);
}
public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
output_closed = true; output_closed = true;
if (!input_closed) { if (!input_closed) {
@ -292,7 +277,7 @@ public class Connection : IOStream {
return yield close_async_impl(io_priority, cancellable); return yield close_async_impl(io_priority, cancellable);
} }
delegate void OnClose(bool success); delegate void OnClose(bool success);
private bool close_impl(Cancellable? cancellable = null, OnClose? on_close = null) { private bool close_impl(Cancellable? cancellable, OnClose on_close) {
if (state == State.DISCONNECTING || state == State.DISCONNECTED || state == State.ERROR) { if (state == State.DISCONNECTING || state == State.DISCONNECTED || state == State.ERROR) {
on_close(true); on_close(true);
return true; return true;

View file

@ -184,7 +184,7 @@ public class Module : XmppStreamModule, Iq.Handler {
if (transport == null || transport.transport_type() != type) { if (transport == null || transport.transport_type() != type) {
StanzaNode reason = new StanzaNode.build("reason", NS_URI) StanzaNode reason = new StanzaNode.build("reason", NS_URI)
.put_node(new StanzaNode.build("unsupported-transports", NS_URI)); .put_node(new StanzaNode.build("unsupported-transports", NS_URI));
session.terminate(stream, reason); session.terminate(stream, reason, "unsupported transports");
return; return;
} }
@ -310,7 +310,8 @@ public class Session {
TransportParameters? transport = null; TransportParameters? transport = null;
// ACTIVE // ACTIVE
public IOStream? conn { get; private set; } private Connection? connection;
public IOStream? conn { get { return connection; } }
// Only interesting in INITIATE_SENT. // Only interesting in INITIATE_SENT.
// Signals that the session has been accepted by the peer. // Signals that the session has been accepted by the peer.
@ -323,7 +324,7 @@ public class Session {
this.peer_full_jid = peer_full_jid; this.peer_full_jid = peer_full_jid;
this.content_name = content_name; this.content_name = content_name;
this.transport = transport; this.transport = transport;
this.conn = null; this.connection = new Connection(this);
} }
public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) { public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) {
@ -333,7 +334,7 @@ public class Session {
this.peer_full_jid = peer_full_jid; this.peer_full_jid = peer_full_jid;
this.content_name = content_name; this.content_name = content_name;
this.transport = transport; this.transport = transport;
this.conn = null; this.connection = new Connection(this);
} }
public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError {
@ -389,13 +390,17 @@ public class Session {
throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method"); throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method");
} }
transport.update_transport(transport_node); transport.update_transport(transport_node);
conn = transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR); connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR));
transport = null; transport = null;
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq));
state = State.ACTIVE; state = State.ACTIVE;
accepted(stream); accepted(stream);
} }
void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError {
connection.on_terminated_by_jingle("remote terminated jingle session");
state = ENDED;
stream.get_flag(Flag.IDENTITY).remove_session(sid);
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq));
// TODO(hrxi): also handle presence type=unavailable // TODO(hrxi): also handle presence type=unavailable
} }
@ -417,7 +422,7 @@ public class Session {
Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid };
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq);
conn = transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER); connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER));
transport = null; transport = null;
state = State.ACTIVE; state = State.ACTIVE;
@ -429,7 +434,7 @@ public class Session {
} }
StanzaNode reason = new StanzaNode.build("reason", NS_URI) StanzaNode reason = new StanzaNode.build("reason", NS_URI)
.put_node(new StanzaNode.build("decline", NS_URI)); .put_node(new StanzaNode.build("decline", NS_URI));
terminate(stream, reason); terminate(stream, reason, "declined");
} }
public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) { public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) {
@ -438,23 +443,24 @@ public class Session {
if (application_reason != null) { if (application_reason != null) {
reason.put_node(application_reason); reason.put_node(application_reason);
} }
terminate(stream, reason); terminate(stream, reason, "application error");
} }
public void close_connection(XmppStream stream) { public void on_connection_error(IOError error) {
if (state != State.ACTIVE) { // TODO(hrxi): conjure an XmppStream out of nowhere and terminate the session
return; // TODO(hrxi): what to do?
}
conn.close();
} }
public void terminate(XmppStream stream, StanzaNode reason) { public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) {
if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) { if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) {
// TODO(hrxi): what to do? // TODO(hrxi): what to do?
return; return;
} }
if (state == State.ACTIVE) { if (state == State.ACTIVE) {
conn.close(); if (local_reason != null) {
connection.on_terminated_by_jingle(@"local session-terminate: $(local_reason)");
} else {
connection.on_terminated_by_jingle("local session-terminate");
}
} }
StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) StanzaNode jingle = new StanzaNode.build("jingle", NS_URI)
@ -472,6 +478,155 @@ public class Session {
} }
} }
public class Connection : IOStream {
public class Input : InputStream {
private weak Connection connection;
public Input(Connection connection) {
this.connection = connection;
}
public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError {
throw new IOError.NOT_SUPPORTED("can't do non-async reads on jingle connections");
}
public override async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.read_async(buffer, io_priority, cancellable);
}
public override bool close(Cancellable? cancellable = null) throws IOError {
return connection.close_read(cancellable);
}
public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.close_read_async(io_priority, cancellable);
}
}
public class Output : OutputStream {
private weak Connection connection;
public Output(Connection connection) {
this.connection = connection;
}
public override ssize_t write(uint8[] buffer, Cancellable? cancellable = null) throws IOError {
throw new IOError.NOT_SUPPORTED("can't do non-async writes on jingle connections");
}
public override async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.write_async(buffer, io_priority, cancellable);
}
public override bool close(Cancellable? cancellable = null) throws IOError {
return connection.close_write(cancellable);
}
public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.close_write_async(io_priority, cancellable);
}
}
private Input input;
private Output output;
public override InputStream input_stream { get { return input; } }
public override OutputStream output_stream { get { return output; } }
private weak Session session;
private IOStream? inner = null;
private string? error = null;
private class OnSetInnerCallback {
public SourceFunc callback;
public int io_priority;
}
Gee.List<OnSetInnerCallback> callbacks = new ArrayList<OnSetInnerCallback>();
public Connection(Session session) {
this.input = new Input(this);
this.output = new Output(this);
this.session = session;
}
public void set_inner(IOStream inner) {
assert(this.inner == null);
this.inner = inner;
foreach (OnSetInnerCallback c in callbacks) {
Idle.add((owned) c.callback, c.io_priority);
}
callbacks = null;
}
public void on_terminated_by_jingle(string reason) {
if (error == null) {
close_async.begin();
error = reason;
}
}
private void check_for_errors() throws IOError {
if (error != null) {
throw new IOError.CLOSED(error);
}
}
private async void wait_and_check_for_errors(int io_priority, Cancellable? cancellable = null) throws IOError {
while (true) {
check_for_errors();
if (inner != null) {
return;
}
SourceFunc callback = wait_and_check_for_errors.callback;
ulong id = cancellable.connect(() => callback());
callbacks.add(new OnSetInnerCallback() { callback=callback, io_priority=io_priority});
yield;
cancellable.disconnect(id);
}
}
private void handle_connection_error(IOError error) {
Session? strong = session;
if (strong != null) {
strong.on_connection_error(error);
}
}
public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
try {
return yield inner.input_stream.read_async(buffer, io_priority, cancellable);
} catch (IOError e) {
handle_connection_error(e);
throw e;
}
}
public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
try {
return yield inner.output_stream.write_async(buffer, io_priority, cancellable);
} catch (IOError e) {
handle_connection_error(e);
throw e;
}
}
public bool close_read(Cancellable? cancellable = null) throws IOError {
check_for_errors();
close_read_async.begin(GLib.Priority.DEFAULT, cancellable);
return true;
}
public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
try {
return yield inner.input_stream.close_async(io_priority, cancellable);
} catch (IOError e) {
handle_connection_error(e);
throw e;
}
}
public bool close_write(Cancellable? cancellable = null) throws IOError {
check_for_errors();
close_write_async.begin(GLib.Priority.DEFAULT, cancellable);
return true;
}
public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
try {
return yield inner.output_stream.close_async(io_priority, cancellable);
} catch (IOError e) {
handle_connection_error(e);
throw e;
}
}
}
public class Flag : XmppStreamFlag { public class Flag : XmppStreamFlag {
public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "jingle"); public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "jingle");

View file

@ -46,12 +46,7 @@ public class Module : Jingle.ContentType, XmppStreamModule {
Jingle.Session session = stream.get_module(Jingle.Module.IDENTITY) Jingle.Session session = stream.get_module(Jingle.Module.IDENTITY)
.create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"? .create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"?
SourceFunc callback = offer_file_stream.callback; yield session.conn.input_stream.close_async();
session.accepted.connect((stream) => {
session.conn.input_stream.close();
Idle.add((owned) callback);
});
yield;
// TODO(hrxi): catch errors // TODO(hrxi): catch errors
yield session.conn.output_stream.splice_async(input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET); yield session.conn.output_stream.splice_async(input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET);