Make message sending async and set unsent on error

This commit is contained in:
fiaxh 2020-03-05 12:21:43 +01:00
parent b8b3e1c6f5
commit 013b388896
6 changed files with 45 additions and 38 deletions

View file

@ -22,7 +22,6 @@ public class MessageProcessor : StreamInteractionModule, Object {
private StreamInteractor stream_interactor;
private Database db;
private Object lock_send_unsent;
private HashMap<Account, int> current_catchup_id = new HashMap<Account, int>(Account.hash_func, Account.equals_func);
private HashMap<Account, HashMap<string, DateTime>> mam_times = new HashMap<Account, HashMap<string, DateTime>>();
public HashMap<string, int> hitted_range = new HashMap<string, int>();
@ -565,39 +564,48 @@ public class MessageProcessor : StreamInteractionModule, Object {
}
public void send_xmpp_message(Entities.Message message, Conversation conversation, bool delayed = false) {
lock (lock_send_unsent) {
XmppStream stream = stream_interactor.get_stream(conversation.account);
message.marked = Entities.Message.Marked.NONE;
if (stream != null) {
Xmpp.MessageStanza new_message = new Xmpp.MessageStanza(message.stanza_id);
new_message.to = message.counterpart;
new_message.body = message.body;
if (conversation.type_ == Conversation.Type.GROUPCHAT) {
new_message.type_ = Xmpp.MessageStanza.TYPE_GROUPCHAT;
} else {
new_message.type_ = Xmpp.MessageStanza.TYPE_CHAT;
}
build_message_stanza(message, new_message, conversation);
pre_message_send(message, new_message, conversation);
if (message.marked == Entities.Message.Marked.UNSENT || message.marked == Entities.Message.Marked.WONTSEND) return;
if (delayed) {
Xmpp.Xep.DelayedDelivery.Module.set_message_delay(new_message, message.time);
}
XmppStream stream = stream_interactor.get_stream(conversation.account);
message.marked = Entities.Message.Marked.NONE;
// Set an origin ID if a MUC doen't guarantee to keep IDs
if (conversation.type_ == Conversation.Type.GROUPCHAT) {
Xep.Muc.Flag? flag = stream.get_flag(Xep.Muc.Flag.IDENTITY);
if (flag == null) return;
if(!flag.has_room_feature(conversation.counterpart, Xep.Muc.Feature.STABLE_ID)) {
Xep.UniqueStableStanzaIDs.set_origin_id(new_message, message.stanza_id);
}
}
if (stream == null) {
message.marked = Entities.Message.Marked.UNSENT;
return;
}
stream.get_module(Xmpp.MessageModule.IDENTITY).send_message(stream, new_message);
} else {
MessageStanza new_message = new MessageStanza(message.stanza_id);
new_message.to = message.counterpart;
new_message.body = message.body;
if (conversation.type_ == Conversation.Type.GROUPCHAT) {
new_message.type_ = MessageStanza.TYPE_GROUPCHAT;
} else {
new_message.type_ = MessageStanza.TYPE_CHAT;
}
build_message_stanza(message, new_message, conversation);
pre_message_send(message, new_message, conversation);
if (message.marked == Entities.Message.Marked.UNSENT || message.marked == Entities.Message.Marked.WONTSEND) return;
if (delayed) {
DelayedDelivery.Module.set_message_delay(new_message, message.time);
}
// Set an origin ID if a MUC doen't guarantee to keep IDs
if (conversation.type_ == Conversation.Type.GROUPCHAT) {
Xep.Muc.Flag? flag = stream.get_flag(Xep.Muc.Flag.IDENTITY);
if (flag == null) {
message.marked = Entities.Message.Marked.UNSENT;
return;
}
if(!flag.has_room_feature(conversation.counterpart, Xep.Muc.Feature.STABLE_ID)) {
UniqueStableStanzaIDs.set_origin_id(new_message, message.stanza_id);
}
}
stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, new_message, (_, res) => {
try {
stream.get_module(MessageModule.IDENTITY).send_message.end(res);
} catch (IOStreamError e) {
message.marked = Entities.Message.Marked.UNSENT;
}
});
}
}

View file

@ -14,10 +14,9 @@ namespace Xmpp {
public signal void received_message(XmppStream stream, MessageStanza message);
public signal void received_message_unprocessed(XmppStream stream, MessageStanza message);
public void send_message(XmppStream stream, MessageStanza message) {
send_pipeline.run.begin(stream, message, (obj, res) => {
stream.write(message.stanza);
});
public async void send_message(XmppStream stream, MessageStanza message) throws IOStreamError {
yield send_pipeline.run(stream, message);
yield stream.write_async(message.stanza);
}
public async void received_message_stanza_async(XmppStream stream, StanzaNode node) {

View file

@ -131,7 +131,7 @@ public class Module : XmppStreamModule {
message.to = jid;
message.type_ = MessageStanza.TYPE_GROUPCHAT;
message.stanza.put_node((new StanzaNode.build("subject")).put_node(new StanzaNode.text(subject)));
stream.get_module(MessageModule.IDENTITY).send_message(stream, message);
stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, message);
}
public void change_nick(XmppStream stream, Jid jid, string new_nick) {
@ -151,7 +151,7 @@ public class Module : XmppStreamModule {
StanzaNode invite_node = new StanzaNode.build("x", NS_URI_USER).add_self_xmlns()
.put_node(new StanzaNode.build("invite", NS_URI_USER).put_attribute("to", jid.to_string()));
message.stanza.put_node(invite_node);
stream.get_module(MessageModule.IDENTITY).send_message(stream, message);
stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, message);
}
public void kick(XmppStream stream, Jid jid, string nick) {

View file

@ -27,7 +27,7 @@ public class Module : XmppStreamModule {
MessageProcessingHints.set_message_hint(message, MessageProcessingHints.HINT_NO_STORE);
stream.get_module(MessageModule.IDENTITY).send_message(stream, message);
stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, message);
}
public override void attach(XmppStream stream) {

View file

@ -12,7 +12,7 @@ namespace Xmpp.Xep.MessageDeliveryReceipts {
MessageStanza received_message = new MessageStanza();
received_message.to = from;
received_message.stanza.put_node(new StanzaNode.build("received", NS_URI).add_self_xmlns().put_attribute("id", message_id));
stream.get_module(MessageModule.IDENTITY).send_message(stream, received_message);
stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, received_message);
}
public static bool requests_receipt(MessageStanza message) {

View file

@ -21,7 +21,7 @@ public class Module : XmppStreamModule {
received_message.to = jid;
received_message.type_ = type_;
received_message.stanza.put_node(new StanzaNode.build(marker, NS_URI).add_self_xmlns().put_attribute("id", message_id));
stream.get_module(MessageModule.IDENTITY).send_message(stream, received_message);
stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, received_message);
}
public static bool requests_marking(MessageStanza message) {