From 6400f59b1b19073f61fd429c808c9c7b415d41c3 Mon Sep 17 00:00:00 2001 From: c0re100 Date: Sun, 6 Feb 2022 11:22:20 +0800 Subject: [PATCH] Support pending updates --- client/client.go | 100 +++++++++++++++++++++-------- example/pending/PendingUpdate.go | 107 +++++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+), 28 deletions(-) create mode 100644 example/pending/PendingUpdate.go diff --git a/client/client.go b/client/client.go index ec0a53f..34fcfc5 100644 --- a/client/client.go +++ b/client/client.go @@ -9,10 +9,13 @@ import ( "time" ) +var pendingUpdateType []Type + type Client struct { jsonClient *JsonClient extraGenerator ExtraGenerator responses chan *Response + pendingResp chan *Response listenerStore *listenerStore catchersStore *sync.Map successMsgStore *sync.Map @@ -56,10 +59,18 @@ func SetFilePath(path string) { }) } +// Keep specific update type in memory when listener is not ready. +func SetPendingUpdateType(update ...Type) { + for _, v := range update { + pendingUpdateType = append(pendingUpdateType, v) + } +} + func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...Option) (*Client, error) { client := &Client{ jsonClient: NewJsonClient(), responses: make(chan *Response, 1000), + pendingResp: make(chan *Response, 1000), listenerStore: newListenerStore(), catchersStore: &sync.Map{}, successMsgStore: &sync.Map{}, @@ -74,6 +85,7 @@ func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...O tdlibInstance.addClient(client) + go client.processPendingResponse() go client.receiver() err := Authorize(client, authorizationStateHandler) @@ -84,40 +96,72 @@ func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...O return client, nil } +func (client *Client) processResponse(response *Response) { + if response.Extra != "" { + value, ok := client.catchersStore.Load(response.Extra) + if ok { + value.(chan *Response) <- response + } + } + + typ, err := UnmarshalType(response.Data) + if err != nil { + return + } + + if typ.GetType() == (&UpdateMessageSendSucceeded{}).GetType() { + value, ok := client.successMsgStore.Load(typ.(*UpdateMessageSendSucceeded).OldMessageId) + if ok { + value.(chan *Response) <- response + } + } + + if len(client.listenerStore.Listeners()) == 0 { + for _, p := range pendingUpdateType { + if typ.GetType() == p.GetType() { + client.pendingResp <- response + } + } + } + + needGc := false + for _, listener := range client.listenerStore.Listeners() { + if listener.IsActive() && listener.Updates != nil && typ.GetType() == listener.Filter.GetType() { // All updates go to Updates channel if type == filter + listener.Updates <- typ + } else if listener.IsActive() && listener.RawUpdates != nil { // All updates go to RawUpdates channel if filter is empty + listener.RawUpdates <- typ + } else if !listener.IsActive() { // GC inactive listener + needGc = true + } + } + if needGc { + client.listenerStore.gc() + } +} + func (client *Client) receiver() { for response := range client.responses { - if response.Extra != "" { - value, ok := client.catchersStore.Load(response.Extra) - if ok { - value.(chan *Response) <- response - } - } + client.processResponse(response) + } +} - typ, err := UnmarshalType(response.Data) - if err != nil { - continue - } +func (client *Client) processPendingResponse() { + // No need to process pending response if no pending list. + if len(pendingUpdateType) == 0 { + return + } - if typ.GetType() == (&UpdateMessageSendSucceeded{}).GetType() { - value, ok := client.successMsgStore.Load(typ.(*UpdateMessageSendSucceeded).OldMessageId) - if ok { - value.(chan *Response) <- response - } + // Wait for listener to be ready. + for { + if len(client.listenerStore.Listeners()) > 0 { + break } + time.Sleep(1 * time.Second) + } - needGc := false - for _, listener := range client.listenerStore.Listeners() { - if listener.IsActive() && listener.Updates != nil && typ.GetType() == listener.Filter.GetType() { // All updates go to Updates channel if type == filter - listener.Updates <- typ - } else if listener.IsActive() && listener.RawUpdates != nil { // All updates go to RawUpdates channel if filter is empty - listener.RawUpdates <- typ - } else if !listener.IsActive() { // GC inactive listener - needGc = true - } - } - if needGc { - client.listenerStore.gc() - } + // Start processing pending response + for response := range client.pendingResp { + client.processResponse(response) } } diff --git a/example/pending/PendingUpdate.go b/example/pending/PendingUpdate.go new file mode 100644 index 0000000..86d9ef8 --- /dev/null +++ b/example/pending/PendingUpdate.go @@ -0,0 +1,107 @@ +package main + +import ( + "log" + "os" + "os/signal" + "syscall" + + tdlib "github.com/c0re100/gotdlib/client" +) + +func GetTdParameters() *tdlib.TdlibParameters { + return &tdlib.TdlibParameters{ + UseTestDc: false, + DatabaseDirectory: "./tdlib-db", + FilesDirectory: "./tdlib-files", + UseFileDatabase: true, + UseChatInfoDatabase: true, + UseMessageDatabase: true, + UseSecretChats: false, + ApiId: 132712, + ApiHash: "e82c07ad653399a37baca8d1e498e472", + SystemLanguageCode: "en", + DeviceModel: "HuskyNG", + SystemVersion: "3.0", + ApplicationVersion: "3.0", + EnableStorageOptimizer: true, + IgnoreFileNames: false, + } +} + +func main() { + tdlib.SetLogLevel(0) + tdlib.SetFilePath("./errors.txt") + + // Set pending update list + tdlib.SetPendingUpdateType(&tdlib.UpdateNewMessage{}) + // Of coz, you can set more than one type, depending on your needs + //tdlib.SetPendingUpdateType(&tdlib.UpdateNewMessage{}, &tdlib.UpdateMessageEdited{}, &tdlib.UpdateDeleteMessages{}) + + botToken := "your_bot_token" + authorizer := tdlib.BotAuthorizer(botToken) + + authorizer.TdlibParameters <- GetTdParameters() + + client, err := tdlib.NewClient(authorizer) + if err != nil { + log.Fatalf("NewClient error: %s", err) + } + + // Handle SIGINT + ch := make(chan os.Signal, 2) + signal.Notify(ch, os.Interrupt, syscall.SIGINT) + signal.Notify(ch, os.Interrupt, syscall.SIGKILL) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + signal.Notify(ch, os.Interrupt, syscall.SIGQUIT) + signal.Notify(ch, os.Interrupt, syscall.SIGSEGV) + go func() { + <-ch + client.Close() + }() + + me, err := client.GetMe() + if err != nil { + log.Fatalf("GetMe error: %s", err) + } + + log.Printf("%s connected", me.Username) + + listener := client.AddEventReceiver(&tdlib.UpdateNewMessage{}, 1000) + + defer listener.Close() + for update := range listener.Updates { + updateMsg := update.(*tdlib.UpdateNewMessage) + chatId := updateMsg.Message.ChatId + msgId := updateMsg.Message.Id + + var msgText string + var msgEnt []*tdlib.TextEntity + + switch updateMsg.Message.Content.MessageContentType() { + case "messageText": + msgText = updateMsg.Message.Content.(*tdlib.MessageText).Text.Text + msgEnt = updateMsg.Message.Content.(*tdlib.MessageText).Text.Entities + + cmd := tdlib.CheckCommand(msgText, msgEnt) + switch cmd { + case "/ping": + text, _ := tdlib.ParseTextEntities(&tdlib.ParseTextEntitiesRequest{ + Text: "pong!", + ParseMode: &tdlib.TextParseModeHTML{}, + }) + m, err := client.SendMessage(&tdlib.SendMessageRequest{ + ChatId: chatId, + ReplyToMessageId: msgId, + InputMessageContent: &tdlib.InputMessageText{ + Text: text, + }, + }) + if err != nil { + continue + } + log.Printf("Message sent, ID: %d", m.Id) + } + } + } +}