package telegram import ( "crypto/sha256" "fmt" "os" "path/filepath" "strconv" "strings" "sync" "dev.narayana.im/narayana/telegabber/telegram/formatter" "dev.narayana.im/narayana/telegabber/xmpp/gateway" log "github.com/sirupsen/logrus" "github.com/zelenin/go-tdlib/client" ) func uhOh() { log.Fatal("Update type mismatch") } func int64SliceToStringSlice(ints []int64) []string { strings := make([]string, len(ints)) wg := sync.WaitGroup{} for i, xi := range ints { wg.Add(1) go func(i int, xi int64) { strings[i] = strconv.FormatInt(xi, 10) wg.Done() }(i, xi) } wg.Wait() return strings } func (c *Client) getChatMessageLock(chatID int64) *sync.Mutex { lock, ok := c.locks.chatMessageLocks[chatID] if !ok { lock = &sync.Mutex{} c.locks.chatMessageLocks[chatID] = lock } return lock } func (c *Client) updateHandler() { listener := c.client.GetListener() defer listener.Close() for update := range listener.Updates { if update.GetClass() == client.ClassUpdate { switch update.GetType() { case client.TypeUpdateUser: typedUpdate, ok := update.(*client.UpdateUser) if !ok { uhOh() } c.updateUser(typedUpdate) log.Debugf("%#v", typedUpdate.User) case client.TypeUpdateUserStatus: typedUpdate, ok := update.(*client.UpdateUserStatus) if !ok { uhOh() } c.updateUserStatus(typedUpdate) log.Debugf("%#v", typedUpdate.Status) case client.TypeUpdateNewChat: typedUpdate, ok := update.(*client.UpdateNewChat) if !ok { uhOh() } c.updateNewChat(typedUpdate) log.Debugf("%#v", typedUpdate.Chat) case client.TypeUpdateNewMessage: typedUpdate, ok := update.(*client.UpdateNewMessage) if !ok { uhOh() } c.updateNewMessage(typedUpdate) log.Debugf("%#v", typedUpdate.Message) case client.TypeUpdateMessageContent: typedUpdate, ok := update.(*client.UpdateMessageContent) if !ok { uhOh() } c.updateMessageContent(typedUpdate) log.Debugf("%#v", typedUpdate.NewContent) case client.TypeUpdateDeleteMessages: typedUpdate, ok := update.(*client.UpdateDeleteMessages) if !ok { uhOh() } c.updateDeleteMessages(typedUpdate) case client.TypeUpdateFile: typedUpdate, ok := update.(*client.UpdateFile) if !ok { uhOh() } c.updateFile(typedUpdate) log.Debugf("%#v", typedUpdate.File) case client.TypeUpdateAuthorizationState: typedUpdate, ok := update.(*client.UpdateAuthorizationState) if !ok { uhOh() } c.updateAuthorizationState(typedUpdate) default: // log only handled types continue } log.Debugf("%#v", update) } } } // new user discovered func (c *Client) updateUser(update *client.UpdateUser) { c.cache.SetUser(update.User.Id, update.User) show, status := c.userStatusToText(update.User.Status) go c.ProcessStatusUpdate(update.User.Id, status, show) } // user status changed func (c *Client) updateUserStatus(update *client.UpdateUserStatus) { show, status := c.userStatusToText(update.Status) go c.ProcessStatusUpdate(update.UserId, status, show, gateway.SPImmed(false)) } // new chat discovered func (c *Client) updateNewChat(update *client.UpdateNewChat) { go func() { if update.Chat != nil && update.Chat.Photo != nil && update.Chat.Photo.Small != nil { _, err := c.client.DownloadFile(&client.DownloadFileRequest{ FileId: update.Chat.Photo.Small.Id, Priority: 32, Synchronous: false, }) if err != nil { log.Error("Failed to download the chat photo") } } c.cache.SetChat(update.Chat.Id, update.Chat) var isChannel = false if update.Chat.Type.ChatTypeType() == client.TypeChatTypeSupergroup { typeSupergroup, ok := update.Chat.Type.(*client.ChatTypeSupergroup) if !ok { uhOh() } isChannel = typeSupergroup.IsChannel } if !(isChannel && update.Chat.LastReadInboxMessageId == 0) { gateway.SendPresence( c.xmpp, c.jid, gateway.SPFrom(strconv.FormatInt(update.Chat.Id, 10)), gateway.SPType("subscribe"), gateway.SPNickname(update.Chat.Title), ) } if update.Chat.Id < 0 { c.ProcessStatusUpdate(update.Chat.Id, update.Chat.Title, "chat") } }() } // message received func (c *Client) updateNewMessage(update *client.UpdateNewMessage) { go func() { // guarantee sequential message delivering per chat lock := c.getChatMessageLock(update.Message.ChatId) lock.Lock() defer lock.Unlock() // ignore self outgoing messages if update.Message.IsOutgoing && update.Message.SendingState != nil && update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending { return } log.WithFields(log.Fields{ "chat_id": update.Message.ChatId, }).Warn("New message from chat") text := c.messageToText(update.Message) file, filename := c.contentToFilename(update.Message.Content) // download file(s) if file != nil && !file.Local.IsDownloadingCompleted { c.client.DownloadFile(&client.DownloadFileRequest{ FileId: file.Id, Priority: 10, Synchronous: false, }) } // OTR support (I do not know why would you need it, seriously) if !strings.HasPrefix(text, "?OTR") { var prefix strings.Builder prefix.WriteString(c.messageToPrefix(update.Message, c.formatContent(file, filename))) if text != "" { // \n if it is groupchat and message is not empty if update.Message.ChatId < 0 { prefix.WriteString("\n") } else if update.Message.ChatId > 0 { prefix.WriteString(" | ") } prefix.WriteString(text) } text = prefix.String() } // mark message as read c.client.ViewMessages(&client.ViewMessagesRequest{ ChatId: update.Message.ChatId, MessageIds: []int64{update.Message.Id}, ForceRead: true, }) // forward message to XMPP gateway.SendMessage(c.jid, strconv.FormatInt(update.Message.ChatId, 10), text, c.xmpp) }() } // message content updated func (c *Client) updateMessageContent(update *client.UpdateMessageContent) { markupFunction := formatter.EntityToXEP0393 if update.NewContent.MessageContentType() == client.TypeMessageText { textContent := update.NewContent.(*client.MessageText) text := fmt.Sprintf("✎ %v | %s", update.MessageId, formatter.Format( textContent.Text.Text, formatter.SortEntities(textContent.Text.Entities), markupFunction, )) gateway.SendMessage(c.jid, strconv.FormatInt(update.ChatId, 10), text, c.xmpp) } } // message(s) deleted func (c *Client) updateDeleteMessages(update *client.UpdateDeleteMessages) { if update.IsPermanent { text := "✗ " + strings.Join(int64SliceToStringSlice(update.MessageIds), ",") gateway.SendMessage(c.jid, strconv.FormatInt(update.ChatId, 10), text, c.xmpp) } } // file downloaded func (c *Client) updateFile(update *client.UpdateFile) { // not really if !update.File.Local.IsDownloadingCompleted { return } err := os.Symlink( update.File.Local.Path, fmt.Sprintf( "%s/%s%s", c.content.Path, fmt.Sprintf("%x", sha256.Sum256([]byte(update.File.Remote.Id))), filepath.Ext(update.File.Local.Path), ), ) if err != nil { linkErr := err.(*os.LinkError) if linkErr.Err.Error() == "file exists" { log.Warn(err.Error()) } else { log.Errorf("Error creating symlink: %v", err) } } } func (c *Client) updateAuthorizationState(update *client.UpdateAuthorizationState) { switch update.AuthorizationState.AuthorizationStateType() { case client.TypeAuthorizationStateClosing: log.Warn("Closing the updates listener") case client.TypeAuthorizationStateClosed: log.Warn("Closed the updates listener") c.forceClose() } }