package telegram import ( "fmt" "os" "path/filepath" "strconv" "strings" "sync" "dev.narayana.im/narayana/telegabber/telegram/formatter" "dev.narayana.im/narayana/telegabber/xmpp/gateway" log "github.com/sirupsen/logrus" "github.com/zelenin/go-tdlib/client" ) func uhOh() { log.Fatal("Update type mismatch") } func int64SliceToStringSlice(ints []int64) []string { strings := make([]string, len(ints)) wg := sync.WaitGroup{} for i, xi := range ints { wg.Add(1) go func(i int, xi int64) { strings[i] = strconv.FormatInt(xi, 10) wg.Done() }(i, xi) } wg.Wait() return strings } func (c *Client) getChatMessageLock(chatID int64) *sync.Mutex { lock, ok := c.locks.chatMessageLocks[chatID] if !ok { lock = &sync.Mutex{} c.locks.chatMessageLocks[chatID] = lock } return lock } func (c *Client) cleanTempFile(path string) { os.Remove(path) dir := filepath.Dir(path) dirName := filepath.Base(dir) if strings.HasPrefix(dirName, "telegabber-") { os.Remove(dir) } } func (c *Client) sendMarker(chatId, messageId int64, typ gateway.MarkerType) { if xmppId, err := gateway.IdsDB.GetByTgIds(c.Session.Login, c.jid, chatId, messageId); err == nil { resource := c.getFromOutbox(xmppId) var stringType string if typ == gateway.MarkerTypeReceived { stringType = "received" } else if typ == gateway.MarkerTypeDisplayed { stringType = "displayed" } log.WithFields(log.Fields{ "xmppId": xmppId, "resource": resource, }).Debugf("marker: %s", stringType) if resource != "" { gateway.SendMessageMarker( c.jid+"/"+resource, strconv.FormatInt(chatId, 10), c.xmpp, typ, xmppId, ) } } } func (c *Client) updateHandler() { listener := c.client.GetListener() defer listener.Close() for update := range listener.Updates { if update.GetClass() == client.ClassUpdate { switch update.GetType() { case client.TypeUpdateUser: typedUpdate, ok := update.(*client.UpdateUser) if !ok { uhOh() } c.updateUser(typedUpdate) log.Debugf("%#v", typedUpdate.User) case client.TypeUpdateUserStatus: typedUpdate, ok := update.(*client.UpdateUserStatus) if !ok { uhOh() } c.updateUserStatus(typedUpdate) log.Debugf("%#v", typedUpdate.Status) case client.TypeUpdateNewChat: typedUpdate, ok := update.(*client.UpdateNewChat) if !ok { uhOh() } c.updateNewChat(typedUpdate) log.Debugf("%#v", typedUpdate.Chat) case client.TypeUpdateChatPosition: typedUpdate, ok := update.(*client.UpdateChatPosition) if !ok { uhOh() } c.updateChatPosition(typedUpdate) log.Debugf("%#v", typedUpdate) case client.TypeUpdateChatLastMessage: typedUpdate, ok := update.(*client.UpdateChatLastMessage) if !ok { uhOh() } c.updateChatLastMessage(typedUpdate) log.Debugf("%#v", typedUpdate) case client.TypeUpdateNewMessage: typedUpdate, ok := update.(*client.UpdateNewMessage) if !ok { uhOh() } c.updateNewMessage(typedUpdate) log.Debugf("%#v", typedUpdate.Message) case client.TypeUpdateMessageContent: typedUpdate, ok := update.(*client.UpdateMessageContent) if !ok { uhOh() } c.updateMessageContent(typedUpdate) log.Debugf("%#v", typedUpdate.NewContent) case client.TypeUpdateDeleteMessages: typedUpdate, ok := update.(*client.UpdateDeleteMessages) if !ok { uhOh() } c.updateDeleteMessages(typedUpdate) case client.TypeUpdateAuthorizationState: typedUpdate, ok := update.(*client.UpdateAuthorizationState) if !ok { uhOh() } c.updateAuthorizationState(typedUpdate) case client.TypeUpdateMessageSendSucceeded: typedUpdate, ok := update.(*client.UpdateMessageSendSucceeded) if !ok { uhOh() } c.updateMessageSendSucceeded(typedUpdate) case client.TypeUpdateMessageSendFailed: typedUpdate, ok := update.(*client.UpdateMessageSendFailed) if !ok { uhOh() } c.updateMessageSendFailed(typedUpdate) case client.TypeUpdateChatTitle: typedUpdate, ok := update.(*client.UpdateChatTitle) if !ok { uhOh() } c.updateChatTitle(typedUpdate) case client.TypeUpdateChatReadOutbox: typedUpdate, ok := update.(*client.UpdateChatReadOutbox) if !ok { uhOh() } c.updateChatReadOutbox(typedUpdate) default: // log only handled types continue } log.Debugf("%#v", update) } } } // new user discovered func (c *Client) updateUser(update *client.UpdateUser) { c.cache.SetUser(update.User.Id, update.User) show, status, presenceType := c.userStatusToText(update.User.Status, update.User.Id) go c.ProcessStatusUpdate(update.User.Id, status, show, gateway.SPType(presenceType)) } // user status changed func (c *Client) updateUserStatus(update *client.UpdateUserStatus) { show, status, presenceType := c.userStatusToText(update.Status, update.UserId) go c.ProcessStatusUpdate(update.UserId, status, show, gateway.SPImmed(false), gateway.SPType(presenceType)) } // new chat discovered func (c *Client) updateNewChat(update *client.UpdateNewChat) { go func() { if update.Chat != nil && update.Chat.Photo != nil && update.Chat.Photo.Small != nil { _, err := c.DownloadFile(update.Chat.Photo.Small.Id, 10, true) if err != nil { log.Error("Failed to download the chat photo") } } c.cache.SetChat(update.Chat.Id, update.Chat) if update.Chat.Positions != nil && len(update.Chat.Positions) > 0 { c.subscribeToID(update.Chat.Id, update.Chat) } if update.Chat.Id < 0 { c.ProcessStatusUpdate(update.Chat.Id, update.Chat.Title, "chat") } }() } // chat position is updated func (c *Client) updateChatPosition(update *client.UpdateChatPosition) { if update.Position != nil && update.Position.Order != 0 { go c.subscribeToID(update.ChatId, nil) } } // chat last message is updated func (c *Client) updateChatLastMessage(update *client.UpdateChatLastMessage) { if update.Positions != nil && len(update.Positions) > 0 { go c.subscribeToID(update.ChatId, nil) } } // message received func (c *Client) updateNewMessage(update *client.UpdateNewMessage) { chatId := update.Message.ChatId // guarantee sequential message delivering per chat lock := c.getChatMessageLock(chatId) go func() { lock.Lock() defer lock.Unlock() // ignore self outgoing messages if update.Message.IsOutgoing && update.Message.SendingState != nil && update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending { return } log.WithFields(log.Fields{ "chat_id": chatId, }).Warn("New message from chat") c.ProcessIncomingMessage(chatId, update.Message) c.updateLastMessageHash(update.Message.ChatId, update.Message.Id, update.Message.Content) }() } // message content updated func (c *Client) updateMessageContent(update *client.UpdateMessageContent) { markupFunction := c.getFormatter() defer c.updateLastMessageHash(update.ChatId, update.MessageId, update.NewContent) c.SendMessageLock.Lock() c.SendMessageLock.Unlock() xmppId, err := gateway.IdsDB.GetByTgIds(c.Session.Login, c.jid, update.ChatId, update.MessageId) var ignoredResource string if err == nil { ignoredResource = c.popFromEditOutbox(xmppId) } else { log.Infof("Couldn't retrieve XMPP message ids for %v, an echo may happen", update.MessageId) } log.Infof("ignoredResource: %v", ignoredResource) jids := c.getCarbonFullJids(true, ignoredResource) if len(jids) == 0 { log.Info("The only resource is ignored, aborting") return } if update.NewContent.MessageContentType() == client.TypeMessageText && c.hasLastMessageHashChanged(update.ChatId, update.MessageId, update.NewContent) { textContent := update.NewContent.(*client.MessageText) var editChar string if c.Session.AsciiArrows { editChar = "e " } else { editChar = "✎ " } text := editChar + fmt.Sprintf("%v | %s", update.MessageId, formatter.Format( textContent.Text.Text, textContent.Text.Entities, markupFunction, )) for _, jid := range jids { gateway.SendMessage(jid, strconv.FormatInt(update.ChatId, 10), text, "e"+strconv.FormatInt(update.MessageId, 10), c.xmpp, nil, false) } } } // message(s) deleted func (c *Client) updateDeleteMessages(update *client.UpdateDeleteMessages) { if update.IsPermanent { var deleteChar string if c.Session.AsciiArrows { deleteChar = "X " } else { deleteChar = "✗ " } text := deleteChar + strings.Join(int64SliceToStringSlice(update.MessageIds), ",") gateway.SendTextMessage(c.jid, strconv.FormatInt(update.ChatId, 10), text, c.xmpp) } } func (c *Client) updateAuthorizationState(update *client.UpdateAuthorizationState) { switch update.AuthorizationState.AuthorizationStateType() { case client.TypeAuthorizationStateClosing: log.Warn("Closing the updates listener") case client.TypeAuthorizationStateClosed: log.Warn("Closed the updates listener") c.forceClose() } } func (c *Client) updateMessageSendSucceeded(update *client.UpdateMessageSendSucceeded) { // replace message ID in local database log.Debugf("replace message %v with %v", update.OldMessageId, update.Message.Id) if err := gateway.IdsDB.ReplaceTgId(c.Session.Login, c.jid, update.Message.ChatId, update.OldMessageId, update.Message.Id); err != nil { log.Errorf("failed to replace %v with %v: %v", update.OldMessageId, update.Message.Id, err.Error()) } c.sendMarker(update.Message.ChatId, update.Message.Id, gateway.MarkerTypeReceived) // clean uploaded files file, _ := c.contentToFile(update.Message.Content) if file != nil && file.Local != nil { c.cleanTempFile(file.Local.Path) } } func (c *Client) updateMessageSendFailed(update *client.UpdateMessageSendFailed) { // clean uploaded files file, _ := c.contentToFile(update.Message.Content) if file != nil && file.Local != nil { c.cleanTempFile(file.Local.Path) } } // chat title changed func (c *Client) updateChatTitle(update *client.UpdateChatTitle) { gateway.SetNickname(c.jid, strconv.FormatInt(update.ChatId, 10), update.Title, c.xmpp) // set also the status (for group chats only) chat, user, _ := c.GetContactByID(update.ChatId, nil) if user == nil { c.ProcessStatusUpdate(update.ChatId, update.Title, "chat", gateway.SPImmed(true)) } // update chat title in the cache if chat != nil { chat.Title = update.Title } } func (c *Client) updateChatReadOutbox(update *client.UpdateChatReadOutbox) { c.sendMarker(update.ChatId, update.LastReadOutboxMessageId, gateway.MarkerTypeDisplayed) }