Add queue and resending to stream management
This commit is contained in:
parent
8e3462b1b7
commit
74f7fa897f
|
@ -14,7 +14,8 @@ public class Message : Object {
|
||||||
READ,
|
READ,
|
||||||
ACKNOWLEDGED,
|
ACKNOWLEDGED,
|
||||||
UNSENT,
|
UNSENT,
|
||||||
WONTSEND
|
WONTSEND,
|
||||||
|
SENT
|
||||||
}
|
}
|
||||||
|
|
||||||
public enum Type {
|
public enum Type {
|
||||||
|
|
|
@ -638,6 +638,9 @@ public class MessageProcessor : StreamInteractionModule, Object {
|
||||||
stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, new_message, (_, res) => {
|
stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, new_message, (_, res) => {
|
||||||
try {
|
try {
|
||||||
stream.get_module(MessageModule.IDENTITY).send_message.end(res);
|
stream.get_module(MessageModule.IDENTITY).send_message.end(res);
|
||||||
|
if (message.marked == Message.Marked.NONE/* && (yield stream.get_module(Xep.ServiceDiscovery.Module.IDENTITY).has_entity_feature(stream, conversation.account.bare_jid, Xep.UniqueStableStanzaIDs.NS_URI))*/) {
|
||||||
|
message.marked = Message.Marked.SENT;
|
||||||
|
}
|
||||||
|
|
||||||
// The server might not have given us the resource we asked for. In that case, store the actual resource the message was sent with. Relevant for deduplication.
|
// The server might not have given us the resource we asked for. In that case, store the actual resource the message was sent with. Relevant for deduplication.
|
||||||
Jid? current_own_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid;
|
Jid? current_own_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid;
|
||||||
|
|
|
@ -14,6 +14,22 @@ public class StanzaWriter {
|
||||||
yield write_data(node.to_xml().data);
|
yield write_data(node.to_xml().data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async void write_nodes(StanzaNode node1, StanzaNode node2) throws XmlError {
|
||||||
|
var data1 = node1.to_xml().data;
|
||||||
|
var data2 = node2.to_xml().data;
|
||||||
|
|
||||||
|
uint8[] concat = new uint8[data1.length + data2.length];
|
||||||
|
int i = 0;
|
||||||
|
foreach (var datum in data1) {
|
||||||
|
concat[i++] = datum;
|
||||||
|
}
|
||||||
|
foreach (var datum in data2) {
|
||||||
|
concat[i++] = datum;
|
||||||
|
}
|
||||||
|
|
||||||
|
yield write_data(concat);
|
||||||
|
}
|
||||||
|
|
||||||
public async void write(string s) throws XmlError {
|
public async void write(string s) throws XmlError {
|
||||||
yield write_data(s.data);
|
yield write_data(s.data);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,14 @@ public class XmppStream {
|
||||||
public StanzaNode? features { get; private set; default = new StanzaNode.build("features", NS_URI); }
|
public StanzaNode? features { get; private set; default = new StanzaNode.build("features", NS_URI); }
|
||||||
|
|
||||||
private IOStream? stream;
|
private IOStream? stream;
|
||||||
private StanzaReader? reader;
|
internal StanzaReader? reader;
|
||||||
private StanzaWriter? writer;
|
internal StanzaWriter? writer;
|
||||||
|
|
||||||
public Gee.List<XmppStreamFlag> flags { get; private set; default=new ArrayList<XmppStreamFlag>(); }
|
public Gee.List<XmppStreamFlag> flags { get; private set; default=new ArrayList<XmppStreamFlag>(); }
|
||||||
public Gee.List<XmppStreamModule> modules { get; private set; default=new ArrayList<XmppStreamModule>(); }
|
public Gee.List<XmppStreamModule> modules { get; private set; default=new ArrayList<XmppStreamModule>(); }
|
||||||
private Gee.List<ConnectionProvider> connection_providers = new ArrayList<ConnectionProvider>();
|
private Gee.List<ConnectionProvider> connection_providers = new ArrayList<ConnectionProvider>();
|
||||||
|
|
||||||
|
internal WriteNodeFunc? write_obj = null;
|
||||||
public bool negotiation_complete { get; set; default=false; }
|
public bool negotiation_complete { get; set; default=false; }
|
||||||
private bool setup_needed = false;
|
private bool setup_needed = false;
|
||||||
private bool non_negotiation_modules_attached = false;
|
private bool non_negotiation_modules_attached = false;
|
||||||
|
@ -126,6 +128,9 @@ public class XmppStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
public async void write_async(StanzaNode node) throws IOStreamError {
|
public async void write_async(StanzaNode node) throws IOStreamError {
|
||||||
|
if (write_obj != null) {
|
||||||
|
yield write_obj.write_stanza(this, node);
|
||||||
|
} else {
|
||||||
StanzaWriter? writer = this.writer;
|
StanzaWriter? writer = this.writer;
|
||||||
if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open");
|
if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open");
|
||||||
try {
|
try {
|
||||||
|
@ -135,6 +140,7 @@ public class XmppStream {
|
||||||
throw new IOStreamError.WRITE(e.message);
|
throw new IOStreamError.WRITE(e.message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
internal IOStream? get_stream() {
|
internal IOStream? get_stream() {
|
||||||
return stream;
|
return stream;
|
||||||
|
@ -403,4 +409,8 @@ public class StartTlsConnectionProvider : ConnectionProvider {
|
||||||
public override string get_id() { return "start_tls"; }
|
public override string get_id() { return "start_tls"; }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public interface WriteNodeFunc : Object {
|
||||||
|
public abstract async void write_stanza(XmppStream stream, StanzaNode node) throws IOError;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,71 @@
|
||||||
|
using Gee;
|
||||||
|
|
||||||
namespace Xmpp.Xep.StreamManagement {
|
namespace Xmpp.Xep.StreamManagement {
|
||||||
|
|
||||||
public const string NS_URI = "urn:xmpp:sm:3";
|
public const string NS_URI = "urn:xmpp:sm:3";
|
||||||
|
|
||||||
public class Module : XmppStreamNegotiationModule {
|
public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
|
||||||
public static ModuleIdentity<Module> IDENTITY = new ModuleIdentity<Module>(NS_URI, "0198_stream_management");
|
public static ModuleIdentity<Module> IDENTITY = new ModuleIdentity<Module>(NS_URI, "0198_stream_management");
|
||||||
|
|
||||||
public int h_inbound { get; private set; default=0; }
|
public int h_inbound = 0;
|
||||||
|
public int h_outbound = 0;
|
||||||
|
|
||||||
public string? session_id { get; set; default=null; }
|
public string? session_id { get; set; default=null; }
|
||||||
public Gee.List<XmppStreamFlag> flags = null;
|
public Gee.List<XmppStreamFlag> flags = null;
|
||||||
|
private HashMap<int, QueueItem> in_flight_stanzas = new HashMap<int, QueueItem>();
|
||||||
|
private Gee.List<QueueItem> node_queue = new ArrayList<QueueItem>();
|
||||||
|
|
||||||
|
private class QueueItem {
|
||||||
|
public StanzaNode node;
|
||||||
|
public Promise<IOError?> promise;
|
||||||
|
|
||||||
|
public QueueItem(StanzaNode node, Promise<IOError?> promise) {
|
||||||
|
this.node = node;
|
||||||
|
this.promise = promise;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError {
|
||||||
|
if (stream.has_flag(Flag.IDENTITY)) {
|
||||||
|
var promise = new Promise<IOError?>();
|
||||||
|
|
||||||
|
node_queue.add(new QueueItem(node, promise));
|
||||||
|
check_queue(stream);
|
||||||
|
|
||||||
|
yield promise.future.wait_async();
|
||||||
|
} else {
|
||||||
|
yield write_node(stream, node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal async void write_node(XmppStream stream, StanzaNode node) throws IOError {
|
||||||
|
StanzaWriter? writer = stream.writer;
|
||||||
|
if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open");
|
||||||
|
try {
|
||||||
|
stream.log.node("OUT", node, stream);
|
||||||
|
if (node.name == "message" || node.name == "iq" || node.name == "presence") {
|
||||||
|
var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns();
|
||||||
|
stream.log.node("OUT", r_node, stream);
|
||||||
|
yield ((!)writer).write_nodes(node, r_node);
|
||||||
|
} else {
|
||||||
|
yield ((!)writer).write_node(node);
|
||||||
|
}
|
||||||
|
} catch (XmlError e) {
|
||||||
|
throw new IOStreamError.WRITE(e.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void check_queue(XmppStream stream) throws IOError {
|
||||||
|
while (!node_queue.is_empty && in_flight_stanzas.size < 10) {
|
||||||
|
QueueItem queue_item = node_queue.remove_at(0);
|
||||||
|
StanzaNode node = queue_item.node;
|
||||||
|
|
||||||
|
if (node.name == "message" || node.name == "iq" || node.name == "presence") {
|
||||||
|
in_flight_stanzas[++h_outbound] = queue_item;
|
||||||
|
}
|
||||||
|
write_node.begin(stream, node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public override void attach(XmppStream stream) {
|
public override void attach(XmppStream stream) {
|
||||||
stream.get_module(Bind.Module.IDENTITY).bound_to_resource.connect(check_enable);
|
stream.get_module(Bind.Module.IDENTITY).bound_to_resource.connect(check_enable);
|
||||||
|
@ -19,7 +77,15 @@ public class Module : XmppStreamNegotiationModule {
|
||||||
stream.received_iq_stanza.connect(on_stanza_received);
|
stream.received_iq_stanza.connect(on_stanza_received);
|
||||||
}
|
}
|
||||||
|
|
||||||
public override void detach(XmppStream stream) { }
|
public override void detach(XmppStream stream) {
|
||||||
|
stream.get_module(Bind.Module.IDENTITY).bound_to_resource.disconnect(check_enable);
|
||||||
|
stream.received_features_node.disconnect(check_resume);
|
||||||
|
|
||||||
|
stream.received_nonza.disconnect(on_received_nonza);
|
||||||
|
stream.received_message_stanza.disconnect(on_stanza_received);
|
||||||
|
stream.received_presence_stanza.disconnect(on_stanza_received);
|
||||||
|
stream.received_iq_stanza.disconnect(on_stanza_received);
|
||||||
|
}
|
||||||
|
|
||||||
public static void require(XmppStream stream) {
|
public static void require(XmppStream stream) {
|
||||||
if (stream.get_module(IDENTITY) == null) stream.add_module(new PrivateXmlStorage.Module());
|
if (stream.get_module(IDENTITY) == null) stream.add_module(new PrivateXmlStorage.Module());
|
||||||
|
@ -35,7 +101,7 @@ public class Module : XmppStreamNegotiationModule {
|
||||||
public override string get_id() { return IDENTITY.id; }
|
public override string get_id() { return IDENTITY.id; }
|
||||||
|
|
||||||
private void on_stanza_received(XmppStream stream, StanzaNode node) {
|
private void on_stanza_received(XmppStream stream, StanzaNode node) {
|
||||||
lock (h_inbound) h_inbound++;
|
h_inbound++;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void check_resume(XmppStream stream) {
|
private void check_resume(XmppStream stream) {
|
||||||
|
@ -45,7 +111,7 @@ public class Module : XmppStreamNegotiationModule {
|
||||||
StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns()
|
StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns()
|
||||||
.put_attribute("h", h_inbound.to_string())
|
.put_attribute("h", h_inbound.to_string())
|
||||||
.put_attribute("previd", session_id);
|
.put_attribute("previd", session_id);
|
||||||
stream.write(node);
|
write_node(stream, node);
|
||||||
stream.add_flag(new Flag());
|
stream.add_flag(new Flag());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -54,8 +120,9 @@ public class Module : XmppStreamNegotiationModule {
|
||||||
private void check_enable(XmppStream stream) {
|
private void check_enable(XmppStream stream) {
|
||||||
if (stream_has_sm_feature(stream) && session_id == null) {
|
if (stream_has_sm_feature(stream) && session_id == null) {
|
||||||
StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true");
|
StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true");
|
||||||
stream.write(node);
|
write_node(stream, node);
|
||||||
stream.add_flag(new Flag());
|
stream.add_flag(new Flag());
|
||||||
|
h_outbound = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,17 +136,32 @@ public class Module : XmppStreamNegotiationModule {
|
||||||
stream.get_flag(Flag.IDENTITY).finished = true;
|
stream.get_flag(Flag.IDENTITY).finished = true;
|
||||||
|
|
||||||
if (node.name == "enabled") {
|
if (node.name == "enabled") {
|
||||||
lock(h_inbound) h_inbound = 0;
|
h_inbound = 0;
|
||||||
session_id = node.get_attribute("id", NS_URI);
|
session_id = node.get_attribute("id", NS_URI);
|
||||||
flags = stream.flags;
|
flags = stream.flags;
|
||||||
|
stream.write_obj = this;
|
||||||
} else if (node.name == "resumed") {
|
} else if (node.name == "resumed") {
|
||||||
foreach (XmppStreamFlag flag in flags) {
|
foreach (XmppStreamFlag flag in flags) {
|
||||||
stream.add_flag(flag);
|
stream.add_flag(flag);
|
||||||
}
|
}
|
||||||
stream.negotiation_complete = true;
|
stream.negotiation_complete = true;
|
||||||
|
|
||||||
|
h_outbound = int.parse(node.get_attribute("h", NS_URI));
|
||||||
|
handle_incoming_h(stream, h_outbound);
|
||||||
|
foreach (var id in in_flight_stanzas.keys) {
|
||||||
|
node_queue.add(in_flight_stanzas[id]);
|
||||||
|
}
|
||||||
|
in_flight_stanzas.clear();
|
||||||
|
check_queue(stream);
|
||||||
|
stream.write_obj = this;
|
||||||
} else if (node.name == "failed") {
|
} else if (node.name == "failed") {
|
||||||
stream.received_features_node(stream);
|
stream.received_features_node(stream);
|
||||||
session_id = null;
|
session_id = null;
|
||||||
|
foreach (var id in in_flight_stanzas.keys) {
|
||||||
|
in_flight_stanzas[id].promise.set_exception(new IOError.FAILED("bla"));
|
||||||
|
}
|
||||||
|
in_flight_stanzas.clear();
|
||||||
|
check_queue(stream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,11 +169,27 @@ public class Module : XmppStreamNegotiationModule {
|
||||||
|
|
||||||
private void send_ack(XmppStream stream) {
|
private void send_ack(XmppStream stream) {
|
||||||
StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string());
|
StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string());
|
||||||
stream.write(node);
|
write_node(stream, node);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handle_ack(XmppStream stream, StanzaNode node) {
|
private void handle_ack(XmppStream stream, StanzaNode node) {
|
||||||
|
string? h_acked = node.get_attribute("h", NS_URI);
|
||||||
|
int parsed_int = int.parse(h_acked);
|
||||||
|
handle_incoming_h(stream, parsed_int);
|
||||||
|
check_queue(stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handle_incoming_h(XmppStream stream, int h) {
|
||||||
|
var remove_nrs = new ArrayList<int>();
|
||||||
|
foreach (int nr in in_flight_stanzas.keys) {
|
||||||
|
if (nr <= h) {
|
||||||
|
in_flight_stanzas[nr].promise.set_value(null);
|
||||||
|
remove_nrs.add(nr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
foreach (int nr in remove_nrs) {
|
||||||
|
in_flight_stanzas.unset(nr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private bool stream_has_sm_feature(XmppStream stream) {
|
private bool stream_has_sm_feature(XmppStream stream) {
|
||||||
|
|
Loading…
Reference in a new issue