Async service lookup, connect and write
This commit is contained in:
parent
de133218da
commit
9165c4db27
|
@ -40,7 +40,10 @@ public interface Dino.Application : GLib.Application {
|
||||||
|
|
||||||
activate.connect(() => {
|
activate.connect(() => {
|
||||||
stream_interactor.connection_manager.log_options = print_xmpp;
|
stream_interactor.connection_manager.log_options = print_xmpp;
|
||||||
|
Idle.add(() => {
|
||||||
restore();
|
restore();
|
||||||
|
return false;
|
||||||
|
});
|
||||||
});
|
});
|
||||||
shutdown.connect(() => {
|
shutdown.connect(() => {
|
||||||
stream_interactor.connection_manager.make_offline_all();
|
stream_interactor.connection_manager.make_offline_all();
|
||||||
|
|
|
@ -8,6 +8,8 @@ find_packages(ENGINE_PACKAGES REQUIRED
|
||||||
|
|
||||||
vala_precompile(ENGINE_VALA_C
|
vala_precompile(ENGINE_VALA_C
|
||||||
SOURCES
|
SOURCES
|
||||||
|
"src/glib_fixes.vapi"
|
||||||
|
|
||||||
"src/core/namespace_state.vala"
|
"src/core/namespace_state.vala"
|
||||||
"src/core/stanza_attribute.vala"
|
"src/core/stanza_attribute.vala"
|
||||||
"src/core/stanza_node.vala"
|
"src/core/stanza_node.vala"
|
||||||
|
|
|
@ -2,26 +2,48 @@ namespace Xmpp.Core {
|
||||||
public class StanzaWriter {
|
public class StanzaWriter {
|
||||||
private OutputStream output;
|
private OutputStream output;
|
||||||
|
|
||||||
|
private Queue<SourceFuncWrapper> queue = new Queue<SourceFuncWrapper>();
|
||||||
|
private bool running = false;
|
||||||
|
|
||||||
public StanzaWriter.for_stream(OutputStream output) {
|
public StanzaWriter.for_stream(OutputStream output) {
|
||||||
this.output = output;
|
this.output = output;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void write_node(StanzaNode node) throws XmlError {
|
public async void write_node(StanzaNode node) throws XmlError {
|
||||||
try {
|
yield write_data(node.to_xml().data);
|
||||||
lock(output) {
|
|
||||||
output.write_all(node.to_xml().data, null);
|
|
||||||
}
|
|
||||||
} catch (GLib.IOError e) {
|
|
||||||
throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async void write(string s) throws XmlError {
|
public async void write(string s) throws XmlError {
|
||||||
|
yield write_data(s.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async void write_data(uint8[] data) throws XmlError {
|
||||||
|
if (running) {
|
||||||
|
queue.push_tail(new SourceFuncWrapper(write_data.callback));
|
||||||
|
yield;
|
||||||
|
}
|
||||||
|
running = true;
|
||||||
try {
|
try {
|
||||||
output.write_all(s.data, null);
|
yield output.write_all_async(data, 0, null, null);
|
||||||
} catch (GLib.IOError e) {
|
SourceFuncWrapper? sfw = queue.pop_head();
|
||||||
|
if (sfw != null) {
|
||||||
|
sfw.sfun();
|
||||||
|
}
|
||||||
|
} catch (GLib.Error e) {
|
||||||
throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)");
|
throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)");
|
||||||
|
} finally {
|
||||||
|
running = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class SourceFuncWrapper : Object {
|
||||||
|
|
||||||
|
public SourceFunc sfun;
|
||||||
|
|
||||||
|
public SourceFuncWrapper(owned SourceFunc sfun) {
|
||||||
|
this.sfun = (owned)sfun;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ public class XmppStream {
|
||||||
int min_priority = -1;
|
int min_priority = -1;
|
||||||
ConnectionProvider? best_provider = null;
|
ConnectionProvider? best_provider = null;
|
||||||
foreach (ConnectionProvider connection_provider in connection_providers) {
|
foreach (ConnectionProvider connection_provider in connection_providers) {
|
||||||
int? priority = connection_provider.get_priority(remote_name);
|
int? priority = yield connection_provider.get_priority(remote_name);
|
||||||
if (priority != null && (priority < min_priority || min_priority == -1)) {
|
if (priority != null && (priority < min_priority || min_priority == -1)) {
|
||||||
min_priority = priority;
|
min_priority = priority;
|
||||||
best_provider = connection_provider;
|
best_provider = connection_provider;
|
||||||
|
@ -57,9 +57,9 @@ public class XmppStream {
|
||||||
}
|
}
|
||||||
IOStream? stream = null;
|
IOStream? stream = null;
|
||||||
if (best_provider != null) {
|
if (best_provider != null) {
|
||||||
stream = best_provider.connect(this);
|
stream = yield best_provider.connect(this);
|
||||||
} else {
|
} else {
|
||||||
stream = (new SocketClient()).connect(new NetworkService("xmpp-client", "tcp", this.remote_name));
|
stream = yield (new SocketClient()).connect_async(new NetworkService("xmpp-client", "tcp", this.remote_name));
|
||||||
}
|
}
|
||||||
if (stream == null) throw new IOStreamError.CONNECT("client.connect() returned null");
|
if (stream == null) throw new IOStreamError.CONNECT("client.connect() returned null");
|
||||||
reset_stream((!)stream);
|
reset_stream((!)stream);
|
||||||
|
@ -108,12 +108,16 @@ public class XmppStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void write(StanzaNode node) throws IOStreamError {
|
public void write(StanzaNode node) {
|
||||||
|
write_async.begin(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async void write_async(StanzaNode node) throws IOStreamError {
|
||||||
StanzaWriter? writer = this.writer;
|
StanzaWriter? writer = this.writer;
|
||||||
if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open");
|
if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open");
|
||||||
try {
|
try {
|
||||||
log.node("OUT", node);
|
log.node("OUT", node);
|
||||||
((!)writer).write_node(node);
|
yield ((!)writer).write_node(node);
|
||||||
} catch (XmlError e) {
|
} catch (XmlError e) {
|
||||||
throw new IOStreamError.WRITE(e.message);
|
throw new IOStreamError.WRITE(e.message);
|
||||||
}
|
}
|
||||||
|
@ -342,19 +346,19 @@ public abstract class XmppStreamNegotiationModule : XmppStreamModule {
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract class ConnectionProvider {
|
public abstract class ConnectionProvider {
|
||||||
public abstract int? get_priority(string remote_name);
|
public async abstract int? get_priority(string remote_name);
|
||||||
public abstract IOStream? connect(XmppStream stream);
|
public async abstract IOStream? connect(XmppStream stream);
|
||||||
public abstract string get_id();
|
public abstract string get_id();
|
||||||
}
|
}
|
||||||
|
|
||||||
public class StartTlsConnectionProvider : ConnectionProvider {
|
public class StartTlsConnectionProvider : ConnectionProvider {
|
||||||
private SrvTarget? srv_target;
|
private SrvTarget? srv_target;
|
||||||
|
|
||||||
public override int? get_priority(string remote_name) {
|
public async override int? get_priority(string remote_name) {
|
||||||
GLib.List<SrvTarget>? xmpp_target = null;
|
GLib.List<SrvTarget>? xmpp_target = null;
|
||||||
try {
|
try {
|
||||||
Resolver resolver = Resolver.get_default();
|
GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default();
|
||||||
xmpp_target = resolver.lookup_service("xmpp-client", "tcp", remote_name, null);
|
xmpp_target = yield resolver.lookup_service_async("xmpp-client", "tcp", remote_name, null);
|
||||||
} catch (Error e) {
|
} catch (Error e) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -363,10 +367,10 @@ public class StartTlsConnectionProvider : ConnectionProvider {
|
||||||
return xmpp_target.nth(0).data.get_priority();
|
return xmpp_target.nth(0).data.get_priority();
|
||||||
}
|
}
|
||||||
|
|
||||||
public override IOStream? connect(XmppStream stream) {
|
public async override IOStream? connect(XmppStream stream) {
|
||||||
try {
|
try {
|
||||||
SocketClient client = new SocketClient();
|
SocketClient client = new SocketClient();
|
||||||
return client.connect_to_host(srv_target.get_hostname(), srv_target.get_port());
|
return yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port());
|
||||||
} catch (Error e) {
|
} catch (Error e) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
42
xmpp-vala/src/glib_fixes.vapi
Normal file
42
xmpp-vala/src/glib_fixes.vapi
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
[CCode (cprefix = "G", gir_namespace = "Gio", gir_version = "2.0", lower_case_cprefix = "g_")]
|
||||||
|
namespace GLibFixes {
|
||||||
|
|
||||||
|
[CCode (cheader_filename = "gio/gio.h", type_id = "g_resolver_get_type ()")]
|
||||||
|
public class Resolver : GLib.Object {
|
||||||
|
[CCode (has_construct_function = false)]
|
||||||
|
protected Resolver();
|
||||||
|
|
||||||
|
[Version (since = "2.22")]
|
||||||
|
public static Resolver get_default();
|
||||||
|
|
||||||
|
[Version (since = "2.22")]
|
||||||
|
public virtual string lookup_by_address(GLib.InetAddress address, GLib.Cancellable? cancellable = null) throws GLib.Error ;
|
||||||
|
|
||||||
|
[Version (since = "2.22")]
|
||||||
|
public virtual async string lookup_by_address_async(GLib.InetAddress address, GLib.Cancellable? cancellable = null) throws GLib.Error ;
|
||||||
|
|
||||||
|
[Version (since = "2.22")]
|
||||||
|
public virtual GLib.List<GLib.InetAddress> lookup_by_name(string hostname, GLib.Cancellable? cancellable = null) throws GLib.Error ;
|
||||||
|
|
||||||
|
[Version (since = "2.22")]
|
||||||
|
public virtual async GLib.List<GLib.InetAddress> lookup_by_name_async(string hostname, GLib.Cancellable? cancellable = null) throws GLib.Error ;
|
||||||
|
|
||||||
|
[Version (since = "2.34")]
|
||||||
|
public virtual GLib.List<GLib.Variant> lookup_records(string rrname, GLib.ResolverRecordType record_type, GLib.Cancellable? cancellable = null) throws GLib.Error ;
|
||||||
|
|
||||||
|
[Version (since = "2.34")]
|
||||||
|
public virtual async GLib.List<GLib.Variant> lookup_records_async(string rrname, GLib.ResolverRecordType record_type, GLib.Cancellable? cancellable = null) throws GLib.Error ;
|
||||||
|
|
||||||
|
[Version (since = "2.22")]
|
||||||
|
public virtual GLib.List<GLib.SrvTarget> lookup_service(string service, string protocol, string domain, GLib.Cancellable? cancellable = null) throws GLib.Error ;
|
||||||
|
|
||||||
|
[CCode (finish_vfunc_name = "lookup_service_finish", vfunc_name = "lookup_service_async")]
|
||||||
|
public async GLib.List<GLib.SrvTarget> lookup_service_async (string service, string protocol, string domain, GLib.Cancellable? cancellable = null) throws GLib.Error;
|
||||||
|
|
||||||
|
[Version (since = "2.22")]
|
||||||
|
public void set_default();
|
||||||
|
|
||||||
|
public virtual signal void reload ();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -13,11 +13,7 @@ namespace Xmpp.Iq {
|
||||||
|
|
||||||
public delegate void OnResult(XmppStream stream, Iq.Stanza iq);
|
public delegate void OnResult(XmppStream stream, Iq.Stanza iq);
|
||||||
public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null) {
|
public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null) {
|
||||||
try {
|
|
||||||
stream.write(iq.stanza);
|
stream.write(iq.stanza);
|
||||||
} catch (IOStreamError e) {
|
|
||||||
print(@"$(e.message)\n");
|
|
||||||
}
|
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
responseListeners[iq.id] = new ResponseListener((owned) listener);
|
responseListeners[iq.id] = new ResponseListener((owned) listener);
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,11 +53,7 @@ namespace Xmpp.Tls {
|
||||||
server_requires_tls = true;
|
server_requires_tls = true;
|
||||||
}
|
}
|
||||||
if (server_requires_tls || require) {
|
if (server_requires_tls || require) {
|
||||||
try {
|
|
||||||
stream.write(new StanzaNode.build("starttls", NS_URI).add_self_xmlns());
|
stream.write(new StanzaNode.build("starttls", NS_URI).add_self_xmlns());
|
||||||
} catch (IOStreamError e) {
|
|
||||||
stderr.printf("Failed to request TLS: %s\n", e.message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (identity == null) {
|
if (identity == null) {
|
||||||
identity = new NetworkService("xmpp-client", "tcp", stream.remote_name);
|
identity = new NetworkService("xmpp-client", "tcp", stream.remote_name);
|
||||||
|
|
|
@ -48,16 +48,6 @@ public class StanzaListenerHolder<T> : Object {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Gee.List<StanzaListener<T>> set_minus(Gee.List<StanzaListener<T>> main_set, Gee.List<StanzaListener<T>> minus) {
|
|
||||||
Gee.List<StanzaListener<T>> res = new ArrayList<StanzaListener<T>>();
|
|
||||||
foreach (StanzaListener<T> l in main_set) {
|
|
||||||
if (!minus.contains(l)) {
|
|
||||||
res.add(l);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
private bool set_contains_action(Gee.List<StanzaListener<T>> s, string[] actions) {
|
private bool set_contains_action(Gee.List<StanzaListener<T>> s, string[] actions) {
|
||||||
foreach(StanzaListener<T> l in s) {
|
foreach(StanzaListener<T> l in s) {
|
||||||
if (l.action_group in actions) {
|
if (l.action_group in actions) {
|
||||||
|
@ -69,16 +59,23 @@ public class StanzaListenerHolder<T> : Object {
|
||||||
|
|
||||||
private void resort_list() {
|
private void resort_list() {
|
||||||
ArrayList<StanzaListener<T>> new_list = new ArrayList<StanzaListener<T>>();
|
ArrayList<StanzaListener<T>> new_list = new ArrayList<StanzaListener<T>>();
|
||||||
while (listeners.size > new_list.size) {
|
ArrayList<StanzaListener<T>> remaining = new ArrayList<StanzaListener<T>>();
|
||||||
|
remaining.add_all(listeners);
|
||||||
|
while (remaining.size > 0) {
|
||||||
bool changed = false;
|
bool changed = false;
|
||||||
foreach (StanzaListener<T> l in listeners) {
|
Gee.Iterator<StanzaListener<T>> iter = remaining.iterator();
|
||||||
Gee.List<StanzaListener<T>> remaining = set_minus(listeners, new_list);
|
while (iter.has_next()) {
|
||||||
|
if (!iter.valid) {
|
||||||
|
iter.next();
|
||||||
|
}
|
||||||
|
StanzaListener<T> l = iter.get();
|
||||||
if (!set_contains_action(remaining, l.after_actions)) {
|
if (!set_contains_action(remaining, l.after_actions)) {
|
||||||
new_list.add(l);
|
new_list.add(l);
|
||||||
|
iter.remove();
|
||||||
changed = true;
|
changed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!changed) warning("Can't sort listeners");
|
if (!changed) error("Can't sort listeners");
|
||||||
}
|
}
|
||||||
listeners = new_list;
|
listeners = new_list;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,11 +22,11 @@ public class Module : XmppStreamNegotiationModule {
|
||||||
public class TlsConnectionProvider : ConnectionProvider {
|
public class TlsConnectionProvider : ConnectionProvider {
|
||||||
private SrvTarget? srv_target;
|
private SrvTarget? srv_target;
|
||||||
|
|
||||||
public override int? get_priority(string remote_name) {
|
public async override int? get_priority(string remote_name) {
|
||||||
GLib.List<SrvTarget>? xmpp_target = null;
|
GLib.List<SrvTarget>? xmpp_target = null;
|
||||||
try {
|
try {
|
||||||
Resolver resolver = Resolver.get_default();
|
GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default();
|
||||||
xmpp_target = resolver.lookup_service("xmpps-client", "tcp", remote_name, null);
|
xmpp_target = yield resolver.lookup_service_async("xmpps-client", "tcp", remote_name, null);
|
||||||
} catch (Error e) {
|
} catch (Error e) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -35,10 +35,10 @@ public class TlsConnectionProvider : ConnectionProvider {
|
||||||
return xmpp_target.nth(0).data.get_priority();
|
return xmpp_target.nth(0).data.get_priority();
|
||||||
}
|
}
|
||||||
|
|
||||||
public override IOStream? connect(XmppStream stream) {
|
public async override IOStream? connect(XmppStream stream) {
|
||||||
SocketClient client = new SocketClient();
|
SocketClient client = new SocketClient();
|
||||||
try {
|
try {
|
||||||
IOStream? io_stream = client.connect_to_host(srv_target.get_hostname(), srv_target.get_port());
|
IOStream? io_stream = yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port());
|
||||||
io_stream = TlsClientConnection.new(io_stream, new NetworkAddress(srv_target.get_hostname(), srv_target.get_port()));
|
io_stream = TlsClientConnection.new(io_stream, new NetworkAddress(srv_target.get_hostname(), srv_target.get_port()));
|
||||||
stream.add_flag(new Tls.Flag() { finished=true });
|
stream.add_flag(new Tls.Flag() { finished=true });
|
||||||
return io_stream;
|
return io_stream;
|
||||||
|
|
Loading…
Reference in a new issue