diff --git a/telegram/client.go b/telegram/client.go index dde15f2..7e7cdc8 100644 --- a/telegram/client.go +++ b/telegram/client.go @@ -55,6 +55,7 @@ type Client struct { type clientLocks struct { authorizationReady sync.WaitGroup + chatMessageLocks map[int64]*sync.Mutex } // NewClient instantiates a Telegram App @@ -107,6 +108,8 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component content: &conf.Content, cache: cache.NewCache(), options: options, - locks: clientLocks{}, + locks: clientLocks{ + chatMessageLocks: make(map[int64]*sync.Mutex), + }, }, nil } diff --git a/telegram/handlers.go b/telegram/handlers.go index c79f765..847bac8 100644 --- a/telegram/handlers.go +++ b/telegram/handlers.go @@ -35,6 +35,16 @@ func int64SliceToStringSlice(ints []int64) []string { return strings } +func (c *Client) getChatMessageLock(chatID int64) *sync.Mutex { + lock, ok := c.locks.chatMessageLocks[chatID] + if !ok { + lock = &sync.Mutex{} + c.locks.chatMessageLocks[chatID] = lock + } + + return lock +} + func (c *Client) updateHandler() { listener := c.client.GetListener() defer listener.Close() @@ -121,94 +131,103 @@ func (c *Client) updateUserStatus(update *client.UpdateUserStatus) { // new chat discovered func (c *Client) updateNewChat(update *client.UpdateNewChat) { - if update.Chat != nil && update.Chat.Photo != nil && update.Chat.Photo.Small != nil { - _, err := c.client.DownloadFile(&client.DownloadFileRequest{ - FileId: update.Chat.Photo.Small.Id, - Priority: 32, - Synchronous: true, - }) + go func() { + if update.Chat != nil && update.Chat.Photo != nil && update.Chat.Photo.Small != nil { + _, err := c.client.DownloadFile(&client.DownloadFileRequest{ + FileId: update.Chat.Photo.Small.Id, + Priority: 32, + Synchronous: true, + }) - if err != nil { - log.Error("Failed to download the chat photo") + if err != nil { + log.Error("Failed to download the chat photo") + } } - } - c.cache.SetChat(update.Chat.Id, update.Chat) + c.cache.SetChat(update.Chat.Id, update.Chat) - var isChannel = false - if update.Chat.Type.ChatTypeType() == client.TypeChatTypeSupergroup { - typeSupergroup, ok := update.Chat.Type.(*client.ChatTypeSupergroup) - if !ok { - uhOh() + var isChannel = false + if update.Chat.Type.ChatTypeType() == client.TypeChatTypeSupergroup { + typeSupergroup, ok := update.Chat.Type.(*client.ChatTypeSupergroup) + if !ok { + uhOh() + } + isChannel = typeSupergroup.IsChannel } - isChannel = typeSupergroup.IsChannel - } - if !(isChannel && update.Chat.LastReadInboxMessageId == 0) { - gateway.SendPresence( - c.xmpp, - c.jid, - gateway.SPFrom(strconv.FormatInt(update.Chat.Id, 10)), - gateway.SPType("subscribe"), - gateway.SPNickname(update.Chat.Title), - ) - } + if !(isChannel && update.Chat.LastReadInboxMessageId == 0) { + gateway.SendPresence( + c.xmpp, + c.jid, + gateway.SPFrom(strconv.FormatInt(update.Chat.Id, 10)), + gateway.SPType("subscribe"), + gateway.SPNickname(update.Chat.Title), + ) + } - if update.Chat.Id < 0 { - go c.processStatusUpdate(update.Chat.Id, update.Chat.Title, "chat") - } + if update.Chat.Id < 0 { + c.processStatusUpdate(update.Chat.Id, update.Chat.Title, "chat") + } + }() } // message received func (c *Client) updateNewMessage(update *client.UpdateNewMessage) { - // ignore self outgoing messages - if update.Message.IsOutgoing && - update.Message.SendingState != nil && - update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending { - return - } + go func() { + // guarantee sequential message delivering per chat + lock := c.getChatMessageLock(update.Message.ChatId) + lock.Lock() + defer lock.Unlock() - log.WithFields(log.Fields{ - "chat_id": update.Message.ChatId, - }).Warn("New message from chat") - - text := c.messageToText(update.Message) - file, filename := c.contentToFilename(update.Message.Content) - - // download file(s) - if file != nil && !file.Local.IsDownloadingCompleted { - c.client.DownloadFile(&client.DownloadFileRequest{ - FileId: file.Id, - Priority: 32, - Synchronous: true, - }) - } - // OTR support (I do not know why would you need it, seriously) - if !strings.HasPrefix(text, "?OTR") { - var prefix strings.Builder - prefix.WriteString(c.messageToPrefix(update.Message, c.formatContent(file, filename))) - if text != "" { - // \n if it is groupchat and message is not empty - if update.Message.ChatId < 0 { - prefix.WriteString("\n") - } else if update.Message.ChatId > 0 { - prefix.WriteString(" | ") - } - - prefix.WriteString(text) + // ignore self outgoing messages + if update.Message.IsOutgoing && + update.Message.SendingState != nil && + update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending { + return } - text = prefix.String() - } + log.WithFields(log.Fields{ + "chat_id": update.Message.ChatId, + }).Warn("New message from chat") - // mark message as read - c.client.ViewMessages(&client.ViewMessagesRequest{ - ChatId: update.Message.ChatId, - MessageIds: []int64{update.Message.Id}, - ForceRead: true, - }) - // forward message to XMPP - gateway.SendMessage(c.jid, strconv.FormatInt(update.Message.ChatId, 10), text, c.xmpp) + text := c.messageToText(update.Message) + file, filename := c.contentToFilename(update.Message.Content) + + // download file(s) + if file != nil && !file.Local.IsDownloadingCompleted { + c.client.DownloadFile(&client.DownloadFileRequest{ + FileId: file.Id, + Priority: 32, + Synchronous: true, + }) + } + // OTR support (I do not know why would you need it, seriously) + if !strings.HasPrefix(text, "?OTR") { + var prefix strings.Builder + prefix.WriteString(c.messageToPrefix(update.Message, c.formatContent(file, filename))) + if text != "" { + // \n if it is groupchat and message is not empty + if update.Message.ChatId < 0 { + prefix.WriteString("\n") + } else if update.Message.ChatId > 0 { + prefix.WriteString(" | ") + } + + prefix.WriteString(text) + } + + text = prefix.String() + } + + // mark message as read + c.client.ViewMessages(&client.ViewMessagesRequest{ + ChatId: update.Message.ChatId, + MessageIds: []int64{update.Message.Id}, + ForceRead: true, + }) + // forward message to XMPP + gateway.SendMessage(c.jid, strconv.FormatInt(update.Message.ChatId, 10), text, c.xmpp) + }() } // message content updated