407 lines
14 KiB
Vala
407 lines
14 KiB
Vala
using Gee;
|
|
|
|
namespace Xmpp {
|
|
|
|
public errordomain IOStreamError {
|
|
READ,
|
|
WRITE,
|
|
CONNECT,
|
|
DISCONNECT,
|
|
TLS
|
|
}
|
|
|
|
public class XmppStream {
|
|
public const string NS_URI = "http://etherx.jabber.org/streams";
|
|
|
|
public Jid remote_name;
|
|
public XmppLog log = new XmppLog();
|
|
public StanzaNode? features { get; private set; default = new StanzaNode.build("features", NS_URI); }
|
|
|
|
private IOStream? stream;
|
|
private StanzaReader? reader;
|
|
private StanzaWriter? writer;
|
|
|
|
public Gee.List<XmppStreamFlag> flags { get; private set; default=new ArrayList<XmppStreamFlag>(); }
|
|
public Gee.List<XmppStreamModule> modules { get; private set; default=new ArrayList<XmppStreamModule>(); }
|
|
private Gee.List<ConnectionProvider> connection_providers = new ArrayList<ConnectionProvider>();
|
|
public bool negotiation_complete { get; set; default=false; }
|
|
private bool setup_needed = false;
|
|
private bool non_negotiation_modules_attached = false;
|
|
private bool disconnected = false;
|
|
|
|
public signal void received_node(XmppStream stream, StanzaNode node);
|
|
public signal void received_root_node(XmppStream stream, StanzaNode node);
|
|
public signal void received_features_node(XmppStream stream);
|
|
public signal void received_message_stanza(XmppStream stream, StanzaNode node);
|
|
public signal void received_presence_stanza(XmppStream stream, StanzaNode node);
|
|
public signal void received_iq_stanza(XmppStream stream, StanzaNode node);
|
|
public signal void received_nonza(XmppStream stream, StanzaNode node);
|
|
public signal void stream_negotiated(XmppStream stream);
|
|
public signal void attached_modules(XmppStream stream);
|
|
|
|
public XmppStream() {
|
|
register_connection_provider(new StartTlsConnectionProvider());
|
|
}
|
|
|
|
public async void connect(string? remote_name = null) throws IOStreamError {
|
|
try {
|
|
if (remote_name != null) this.remote_name = new Jid(remote_name);
|
|
} catch (InvalidJidError e) {
|
|
throw new IOStreamError.CONNECT(@"Invalid remote name \"$remote_name\": $(e.message)");
|
|
}
|
|
attach_negotation_modules();
|
|
try {
|
|
int min_priority = -1;
|
|
ConnectionProvider? best_provider = null;
|
|
foreach (ConnectionProvider connection_provider in connection_providers) {
|
|
int? priority = yield connection_provider.get_priority(this.remote_name);
|
|
if (priority != null && (priority < min_priority || min_priority == -1)) {
|
|
min_priority = priority;
|
|
best_provider = connection_provider;
|
|
}
|
|
}
|
|
IOStream? stream = null;
|
|
if (best_provider != null) {
|
|
stream = yield best_provider.connect(this);
|
|
}
|
|
if (stream == null) {
|
|
debug("Connecting to %s, xmpp-client, tcp (fallback)", this.remote_name.to_string());
|
|
stream = yield (new SocketClient()).connect_to_host_async(this.remote_name.to_string(), 5222);
|
|
}
|
|
if (stream == null) {
|
|
throw new IOStreamError.CONNECT("client.connect() returned null");
|
|
}
|
|
reset_stream((!)stream);
|
|
} catch (Error e) {
|
|
debug("[%p] Could not connect to server: %s", this, e.message);
|
|
throw new IOStreamError.CONNECT(e.message);
|
|
}
|
|
debug("Connected to %s", remote_name);
|
|
yield loop();
|
|
}
|
|
|
|
public async void disconnect() throws IOStreamError, XmlError, IOError {
|
|
disconnected = true;
|
|
if (writer == null || reader == null || stream == null) {
|
|
throw new IOStreamError.DISCONNECT("trying to disconnect, but no stream open");
|
|
}
|
|
log.str("OUT", "</stream:stream>", this);
|
|
yield writer.write("</stream:stream>");
|
|
reader.cancel();
|
|
yield stream.close_async();
|
|
}
|
|
|
|
public void reset_stream(IOStream stream) {
|
|
this.stream = stream;
|
|
reader = new StanzaReader.for_stream(stream.input_stream);
|
|
writer = new StanzaWriter.for_stream(stream.output_stream);
|
|
require_setup();
|
|
}
|
|
|
|
public void require_setup() {
|
|
setup_needed = true;
|
|
}
|
|
|
|
public bool is_setup_needed() {
|
|
return setup_needed;
|
|
}
|
|
|
|
public async StanzaNode read() throws IOStreamError {
|
|
StanzaReader? reader = this.reader;
|
|
if (reader == null) throw new IOStreamError.READ("trying to read, but no stream open");
|
|
try {
|
|
StanzaNode node = yield ((!)reader).read_node();
|
|
log.node("IN", node, this);
|
|
return node;
|
|
} catch (XmlError e) {
|
|
throw new IOStreamError.READ(e.message);
|
|
}
|
|
}
|
|
|
|
[Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")]
|
|
public void write(StanzaNode node) {
|
|
write_async.begin(node, (obj, res) => {
|
|
write_async.end(res);
|
|
});
|
|
}
|
|
|
|
public async void write_async(StanzaNode node) throws IOStreamError {
|
|
StanzaWriter? writer = this.writer;
|
|
if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open");
|
|
try {
|
|
log.node("OUT", node, this);
|
|
yield ((!)writer).write_node(node);
|
|
} catch (XmlError e) {
|
|
throw new IOStreamError.WRITE(e.message);
|
|
}
|
|
}
|
|
|
|
internal IOStream? get_stream() {
|
|
return stream;
|
|
}
|
|
|
|
public void add_flag(XmppStreamFlag flag) {
|
|
flags.add(flag);
|
|
}
|
|
|
|
public bool has_flag<T>(FlagIdentity<T>? identity) {
|
|
return get_flag(identity) != null;
|
|
}
|
|
|
|
public T? get_flag<T>(FlagIdentity<T>? identity) {
|
|
if (identity == null) return null;
|
|
foreach (var flag in flags) {
|
|
if (((!)identity).matches(flag)) return ((!)identity).cast(flag);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
public void remove_flag(XmppStreamFlag flag) {
|
|
flags.remove(flag);
|
|
}
|
|
|
|
public XmppStream add_module(XmppStreamModule module) {
|
|
foreach (XmppStreamModule m in modules) {
|
|
if (m.get_ns() == module.get_ns() && m.get_id() == module.get_id()) {
|
|
warning("[%p] Adding already added module: %s\n", this, module.get_id());
|
|
return this;
|
|
}
|
|
}
|
|
modules.add(module);
|
|
if (negotiation_complete) module.attach(this);
|
|
return this;
|
|
}
|
|
|
|
public void detach_modules() {
|
|
foreach (XmppStreamModule module in modules) {
|
|
if (!(module is XmppStreamNegotiationModule) && !negotiation_complete) continue;
|
|
module.detach(this);
|
|
}
|
|
}
|
|
|
|
public T? get_module<T>(ModuleIdentity<T>? identity) {
|
|
if (identity == null) return null;
|
|
foreach (var module in modules) {
|
|
if (((!)identity).matches(module)) return ((!)identity).cast(module);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
public void register_connection_provider(ConnectionProvider connection_provider) {
|
|
connection_providers.add(connection_provider);
|
|
}
|
|
|
|
public bool is_negotiation_active() {
|
|
foreach (XmppStreamModule module in modules) {
|
|
if (module is XmppStreamNegotiationModule) {
|
|
XmppStreamNegotiationModule negotiation_module = (XmppStreamNegotiationModule) module;
|
|
if (negotiation_module.negotiation_active(this)) return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
private async void setup() throws IOStreamError {
|
|
StanzaNode outs = new StanzaNode.build("stream", "http://etherx.jabber.org/streams")
|
|
.put_attribute("to", remote_name.to_string())
|
|
.put_attribute("version", "1.0")
|
|
.put_attribute("xmlns", "jabber:client")
|
|
.put_attribute("stream", "http://etherx.jabber.org/streams", XMLNS_URI);
|
|
outs.has_nodes = true;
|
|
log.node("OUT ROOT", outs, this);
|
|
write(outs);
|
|
received_root_node(this, yield read_root());
|
|
}
|
|
|
|
private async void loop() throws IOStreamError {
|
|
while (true) {
|
|
if (setup_needed) {
|
|
yield setup();
|
|
setup_needed = false;
|
|
}
|
|
|
|
StanzaNode node = yield read();
|
|
|
|
Idle.add(loop.callback);
|
|
yield;
|
|
|
|
if (disconnected) break;
|
|
|
|
received_node(this, node);
|
|
|
|
if (node.ns_uri == NS_URI && node.name == "features") {
|
|
features = node;
|
|
received_features_node(this);
|
|
} else if (node.ns_uri == NS_URI && node.name == "stream" && node.pseudo) {
|
|
debug("[%p] Server closed stream", this);
|
|
try {
|
|
yield disconnect();
|
|
} catch (Error e) {}
|
|
return;
|
|
} else if (node.ns_uri == JABBER_URI) {
|
|
if (node.name == "message") {
|
|
received_message_stanza(this, node);
|
|
} else if (node.name == "presence") {
|
|
received_presence_stanza(this, node);
|
|
} else if (node.name == "iq") {
|
|
received_iq_stanza(this, node);
|
|
} else {
|
|
received_nonza(this, node);
|
|
}
|
|
} else {
|
|
received_nonza(this, node);
|
|
}
|
|
|
|
if (!non_negotiation_modules_attached && negotiation_modules_done()) {
|
|
attach_non_negotation_modules();
|
|
non_negotiation_modules_attached = true;
|
|
if (!negotiation_complete) {
|
|
stream_negotiated(this);
|
|
negotiation_complete = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private bool negotiation_modules_done() throws IOStreamError {
|
|
if (setup_needed) return false;
|
|
if (is_negotiation_active()) return false;
|
|
|
|
foreach (XmppStreamModule module in modules) {
|
|
if (module is XmppStreamNegotiationModule) {
|
|
XmppStreamNegotiationModule negotiation_module = (XmppStreamNegotiationModule) module;
|
|
if (negotiation_module.mandatory_outstanding(this)) {
|
|
throw new IOStreamError.CONNECT("mandatory-to-negotiate feature not negotiated: " + negotiation_module.get_id());
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
private void attach_non_negotation_modules() {
|
|
foreach (XmppStreamModule module in modules) {
|
|
if (module as XmppStreamNegotiationModule == null) {
|
|
module.attach(this);
|
|
}
|
|
}
|
|
attached_modules(this);
|
|
}
|
|
|
|
private void attach_negotation_modules() {
|
|
foreach (XmppStreamModule module in modules) {
|
|
if (module as XmppStreamNegotiationModule != null) {
|
|
module.attach(this);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async StanzaNode read_root() throws IOStreamError {
|
|
StanzaReader? reader = this.reader;
|
|
if (reader == null) throw new IOStreamError.READ("trying to read, but no stream open");
|
|
try {
|
|
StanzaNode node = yield ((!)reader).read_root_node();
|
|
log.node("IN ROOT", node, this);
|
|
return node;
|
|
} catch (XmlError.TLS e) {
|
|
throw new IOStreamError.TLS(e.message);
|
|
} catch (Error e) {
|
|
throw new IOStreamError.READ(e.message);
|
|
}
|
|
}
|
|
}
|
|
|
|
public class FlagIdentity<T> : Object {
|
|
public string ns { get; private set; }
|
|
public string id { get; private set; }
|
|
|
|
public FlagIdentity(string ns, string id) {
|
|
this.ns = ns;
|
|
this.id = id;
|
|
}
|
|
|
|
public T? cast(XmppStreamFlag flag) {
|
|
return flag.get_type().is_a(typeof(T)) ? (T?) flag : null;
|
|
}
|
|
|
|
public bool matches(XmppStreamFlag module) {
|
|
return module.get_ns() == ns && module.get_id() == id;
|
|
}
|
|
}
|
|
|
|
public abstract class XmppStreamFlag : Object {
|
|
public abstract string get_ns();
|
|
|
|
public abstract string get_id();
|
|
}
|
|
|
|
public class ModuleIdentity<T> : Object {
|
|
public string ns { get; private set; }
|
|
public string id { get; private set; }
|
|
|
|
public ModuleIdentity(string ns, string id) {
|
|
this.ns = ns;
|
|
this.id = id;
|
|
}
|
|
|
|
public T? cast(XmppStreamModule module) {
|
|
return module.get_type().is_a(typeof(T)) ? (T?) module : null;
|
|
}
|
|
|
|
public bool matches(XmppStreamModule module) {
|
|
return module.get_ns() == ns && module.get_id() == id;
|
|
}
|
|
}
|
|
|
|
public abstract class XmppStreamModule : Object {
|
|
public abstract void attach(XmppStream stream);
|
|
|
|
public abstract void detach(XmppStream stream);
|
|
|
|
public abstract string get_ns();
|
|
|
|
public abstract string get_id();
|
|
}
|
|
|
|
public abstract class XmppStreamNegotiationModule : XmppStreamModule {
|
|
public abstract bool mandatory_outstanding(XmppStream stream);
|
|
|
|
public abstract bool negotiation_active(XmppStream stream);
|
|
}
|
|
|
|
public abstract class ConnectionProvider {
|
|
public async abstract int? get_priority(Jid remote_name);
|
|
public async abstract IOStream? connect(XmppStream stream);
|
|
public abstract string get_id();
|
|
}
|
|
|
|
public class StartTlsConnectionProvider : ConnectionProvider {
|
|
private SrvTarget? srv_target;
|
|
|
|
public async override int? get_priority(Jid remote_name) {
|
|
GLib.List<SrvTarget>? xmpp_target = null;
|
|
try {
|
|
GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default();
|
|
xmpp_target = yield resolver.lookup_service_async("xmpp-client", "tcp", remote_name.to_string(), null);
|
|
} catch (Error e) {
|
|
return null;
|
|
}
|
|
xmpp_target.sort((a, b) => { return a.get_priority() - b.get_priority(); });
|
|
srv_target = xmpp_target.nth(0).data;
|
|
return xmpp_target.nth(0).data.get_priority();
|
|
}
|
|
|
|
public async override IOStream? connect(XmppStream stream) {
|
|
try {
|
|
SocketClient client = new SocketClient();
|
|
debug("Connecting to %s %i (starttls)", srv_target.get_hostname(), srv_target.get_port());
|
|
return yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port());
|
|
} catch (Error e) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
public override string get_id() { return "start_tls"; }
|
|
}
|
|
|
|
}
|