From e37c428c6764eff8e7b5ea286b1c7a0ba52be11a Mon Sep 17 00:00:00 2001 From: Bohdan Horbeshko Date: Fri, 26 Jan 2024 21:02:47 -0500 Subject: [PATCH] XEP-0333 read markers for outgoing messages --- telegram/client.go | 8 ++++++-- telegram/handlers.go | 45 +++++++++++++++++++++++++++++++++++++++-- telegram/utils.go | 27 +++++++++++++++++++++---- xmpp/gateway/gateway.go | 36 ++++++++++++++++++++++++++++----- xmpp/handlers.go | 7 +++++-- 5 files changed, 108 insertions(+), 15 deletions(-) diff --git a/telegram/client.go b/telegram/client.go index 49fc1ef..38dff4c 100644 --- a/telegram/client.go +++ b/telegram/client.go @@ -34,11 +34,13 @@ type Client struct { jid string Session *persistence.Session resources map[string]bool - outbox map[string]string content *config.TelegramContentConfig cache *cache.Cache online bool + outbox map[string]string + editOutbox map[string]string + DelayedStatuses map[int64]*DelayedStatus DelayedStatusesLock sync.Mutex @@ -54,6 +56,7 @@ type clientLocks struct { chatMessageLocks map[int64]*sync.Mutex resourcesLock sync.Mutex outboxLock sync.Mutex + editOutboxLock sync.Mutex lastMsgHashesLock sync.Mutex authorizerReadLock sync.Mutex @@ -109,9 +112,10 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component jid: jid, Session: session, resources: make(map[string]bool), - outbox: make(map[string]string), content: &conf.Content, cache: cache.NewCache(), + outbox: make(map[string]string), + editOutbox: make(map[string]string), options: options, DelayedStatuses: make(map[int64]*DelayedStatus), lastMsgHashes: make(map[int64]uint64), diff --git a/telegram/handlers.go b/telegram/handlers.go index 0d1cda9..6f6d339 100644 --- a/telegram/handlers.go +++ b/telegram/handlers.go @@ -55,6 +55,33 @@ func (c *Client) cleanTempFile(path string) { } } +func (c *Client) sendMarker(chatId, messageId int64, typ gateway.MarkerType) { + if xmppId, err := gateway.IdsDB.GetByTgIds(c.Session.Login, c.jid, chatId, messageId); err == nil { + resource := c.getFromOutbox(xmppId) + + var stringType string + if typ == gateway.MarkerTypeReceived { + stringType = "received" + } else if typ == gateway.MarkerTypeDisplayed { + stringType = "displayed" + } + log.WithFields(log.Fields{ + "xmppId": xmppId, + "resource": resource, + }).Debugf("marker: %s", stringType) + + if resource != "" { + gateway.SendMessageMarker( + c.jid+"/"+resource, + strconv.FormatInt(chatId, 10), + c.xmpp, + typ, + xmppId, + ) + } + } +} + func (c *Client) updateHandler() { listener := c.client.GetListener() defer listener.Close() @@ -141,6 +168,12 @@ func (c *Client) updateHandler() { uhOh() } c.updateChatTitle(typedUpdate) + case client.TypeUpdateChatReadOutbox: + typedUpdate, ok := update.(*client.UpdateChatReadOutbox) + if !ok { + uhOh() + } + c.updateChatReadOutbox(typedUpdate) default: // log only handled types continue @@ -239,7 +272,7 @@ func (c *Client) updateMessageContent(update *client.UpdateMessageContent) { xmppId, err := gateway.IdsDB.GetByTgIds(c.Session.Login, c.jid, update.ChatId, update.MessageId) var ignoredResource string if err == nil { - ignoredResource = c.popFromOutbox(xmppId) + ignoredResource = c.popFromEditOutbox(xmppId) } else { log.Infof("Couldn't retrieve XMPP message ids for %v, an echo may happen", update.MessageId) } @@ -294,19 +327,23 @@ func (c *Client) updateAuthorizationState(update *client.UpdateAuthorizationStat } } -// clean uploaded files func (c *Client) updateMessageSendSucceeded(update *client.UpdateMessageSendSucceeded) { + // replace message ID in local database log.Debugf("replace message %v with %v", update.OldMessageId, update.Message.Id) if err := gateway.IdsDB.ReplaceTgId(c.Session.Login, c.jid, update.Message.ChatId, update.OldMessageId, update.Message.Id); err != nil { log.Errorf("failed to replace %v with %v: %v", update.OldMessageId, update.Message.Id, err.Error()) } + c.sendMarker(update.Message.ChatId, update.Message.Id, gateway.MarkerTypeReceived) + + // clean uploaded files file, _ := c.contentToFile(update.Message.Content) if file != nil && file.Local != nil { c.cleanTempFile(file.Local.Path) } } func (c *Client) updateMessageSendFailed(update *client.UpdateMessageSendFailed) { + // clean uploaded files file, _ := c.contentToFile(update.Message.Content) if file != nil && file.Local != nil { c.cleanTempFile(file.Local.Path) @@ -328,3 +365,7 @@ func (c *Client) updateChatTitle(update *client.UpdateChatTitle) { chat.Title = update.Title } } + +func (c *Client) updateChatReadOutbox(update *client.UpdateChatReadOutbox) { + c.sendMarker(update.ChatId, update.LastReadOutboxMessageId, gateway.MarkerTypeDisplayed) +} diff --git a/telegram/utils.go b/telegram/utils.go index 3f15488..6aab03a 100644 --- a/telegram/utils.go +++ b/telegram/utils.go @@ -1472,6 +1472,27 @@ func (c *Client) UpdateChatNicknames() { } } +// AddToEditOutbox temporarily store the resource from which a replace message with given ID was sent +func (c *Client) AddToEditOutbox(xmppId, resource string) { + c.locks.editOutboxLock.Lock() + defer c.locks.editOutboxLock.Unlock() + + c.editOutbox[xmppId] = resource +} + +func (c *Client) popFromEditOutbox(xmppId string) string { + c.locks.editOutboxLock.Lock() + defer c.locks.editOutboxLock.Unlock() + + resource, ok := c.editOutbox[xmppId] + if ok { + delete(c.editOutbox, xmppId) + } else { + log.Warnf("No %v xmppId in edit outbox", xmppId) + } + return resource +} + // AddToOutbox remembers the resource from which a message with given ID was sent func (c *Client) AddToOutbox(xmppId, resource string) { c.locks.outboxLock.Lock() @@ -1480,14 +1501,12 @@ func (c *Client) AddToOutbox(xmppId, resource string) { c.outbox[xmppId] = resource } -func (c *Client) popFromOutbox(xmppId string) string { +func (c *Client) getFromOutbox(xmppId string) string { c.locks.outboxLock.Lock() defer c.locks.outboxLock.Unlock() resource, ok := c.outbox[xmppId] - if ok { - delete(c.outbox, xmppId) - } else { + if !ok { log.Warnf("No %v xmppId in outbox", xmppId) } return resource diff --git a/xmpp/gateway/gateway.go b/xmpp/gateway/gateway.go index 981858d..4b2a07f 100644 --- a/xmpp/gateway/gateway.go +++ b/xmpp/gateway/gateway.go @@ -23,6 +23,17 @@ type Reply struct { End uint64 } +type MarkerType byte +const ( + MarkerTypeReceived MarkerType = iota + MarkerTypeDisplayed +) + +type marker struct { + Type MarkerType + Id string +} + const NSNick string = "http://jabber.org/protocol/nick" // Queue stores presences to send later @@ -44,25 +55,33 @@ var MessageOutgoingPermissionVersion = 0 // SendMessage creates and sends a message stanza func SendMessage(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, isCarbon bool) { - sendMessageWrapper(to, from, body, id, component, reply, "", isCarbon) + sendMessageWrapper(to, from, body, id, component, reply, nil, "", isCarbon) } // SendServiceMessage creates and sends a simple message stanza from transport func SendServiceMessage(to string, body string, component *xmpp.Component) { - sendMessageWrapper(to, "", body, "", component, nil, "", false) + sendMessageWrapper(to, "", body, "", component, nil, nil, "", false) } // SendTextMessage creates and sends a simple message stanza func SendTextMessage(to string, from string, body string, component *xmpp.Component) { - sendMessageWrapper(to, from, body, "", component, nil, "", false) + sendMessageWrapper(to, from, body, "", component, nil, nil, "", false) } // SendMessageWithOOB creates and sends a message stanza with OOB URL func SendMessageWithOOB(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) { - sendMessageWrapper(to, from, body, id, component, reply, oob, isCarbon) + sendMessageWrapper(to, from, body, id, component, reply, nil, oob, isCarbon) } -func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) { +// SendMessageMarker creates and sends a message stanza with a XEP-0333 marker +func SendMessageMarker(to string, from string, component *xmpp.Component, markerType MarkerType, markerId string) { + sendMessageWrapper(to, from, "", "", component, nil, &marker{ + Type: markerType, + Id: markerId, + }, "", false) +} + +func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, marker *marker, oob string, isCarbon bool) { toJid, err := stanza.NewJid(to) if err != nil { log.WithFields(log.Fields{ @@ -120,6 +139,13 @@ func sendMessageWrapper(to string, from string, body string, id string, componen message.Extensions = append(message.Extensions, extensions.NewReplyFallback(reply.Start, reply.End)) } } + if marker != nil { + if marker.Type == MarkerTypeReceived { + message.Extensions = append(message.Extensions, stanza.MarkReceived{ID: marker.Id}) + } else if marker.Type == MarkerTypeDisplayed { + message.Extensions = append(message.Extensions, stanza.MarkDisplayed{ID: marker.Id}) + } + } if !isCarbon && toJid.Resource != "" { message.Extensions = append(message.Extensions, stanza.HintNoCopy{}) } diff --git a/xmpp/handlers.go b/xmpp/handlers.go index 4c27b3c..088cb21 100644 --- a/xmpp/handlers.go +++ b/xmpp/handlers.go @@ -199,10 +199,12 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) { if err != nil { log.Errorf("Failed to replace id %v with %v %v", replace.Id, msg.Id, tgMessageId) } */ - session.AddToOutbox(replace.Id, resource) + session.AddToEditOutbox(replace.Id, resource) } else { err = gateway.IdsDB.Set(session.Session.Login, bare, toID, tgMessageId, msg.Id) - if err != nil { + if err == nil { + session.AddToOutbox(msg.Id, resource) + } else { log.Errorf("Failed to save ids %v/%v %v", toID, tgMessageId, msg.Id) } } @@ -458,6 +460,7 @@ func handleGetDiscoInfo(s xmpp.Sender, iq *stanza.IQ) { _, ok := toToID(iq.To) if ok { disco.AddIdentity("", "account", "registered") + disco.AddFeatures(stanza.NSMsgChatMarkers) } else { disco.AddIdentity("Telegram Gateway", "gateway", "telegram") disco.AddFeatures("jabber:iq:register")