diff --git a/client/client.go b/client/client.go index 635bcc5..062eb52 100644 --- a/client/client.go +++ b/client/client.go @@ -1,20 +1,23 @@ package client import ( + "bytes" "context" "errors" + "strconv" "sync" "time" ) type Client struct { - jsonClient *JsonClient - extraGenerator ExtraGenerator - responses chan *Response - listenerStore *listenerStore - catchersStore *sync.Map - updatesTimeout time.Duration - catchTimeout time.Duration + jsonClient *JsonClient + extraGenerator ExtraGenerator + responses chan *Response + listenerStore *listenerStore + catchersStore *sync.Map + successMsgStore *sync.Map + updatesTimeout time.Duration + catchTimeout time.Duration } type Option func(*Client) @@ -55,10 +58,11 @@ func SetFilePath(path string) { func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...Option) (*Client, error) { client := &Client{ - jsonClient: NewJsonClient(), - responses: make(chan *Response, 1000), - listenerStore: newListenerStore(), - catchersStore: &sync.Map{}, + jsonClient: NewJsonClient(), + responses: make(chan *Response, 1000), + listenerStore: newListenerStore(), + catchersStore: &sync.Map{}, + successMsgStore: &sync.Map{}, } client.extraGenerator = UuidV4Generator() @@ -94,6 +98,13 @@ func (client *Client) receiver() { continue } + if typ.GetType() == (&UpdateMessageSendSucceeded{}).GetType() { + value, ok := client.successMsgStore.Load(typ.(*UpdateMessageSendSucceeded).OldMessageId) + if ok { + value.(chan *Response) <- response + } + } + needGc := false for _, listener := range client.listenerStore.Listeners() { if listener.IsActive() && listener.Updates != nil && typ.GetType() == listener.Filter.GetType() { // All updates go to Updates channel if type == filter @@ -129,8 +140,37 @@ func (client *Client) Send(req Request) (*Response, error) { select { case response := <-catcher: - return response, nil + if response.Type != "error" && req.Type == "sendMessage" { + m, err := UnmarshalMessage(response.Data) + if err != nil { + return nil, err + } + if m.Content.MessageContentType() == "messageText" || m.Content.MessageContentType() == "messageDice" { + successCatcher := make(chan *Response, 1) + client.successMsgStore.Store(m.Id, successCatcher) + + defer (func() { + client.successMsgStore.Delete(m.Id) + close(successCatcher) + })() + + select { + case modResponse := <-successCatcher: + m2, err2 := UnmarshalUpdateMessageSendSucceeded(modResponse.Data) + if err2 != nil { + return response, nil + } + response.Data = bytes.Replace(response.Data, []byte("{\"@type\":\"messageSendingStatePending\"}"), []byte("{\"@type\":\"updateMessageSendSucceeded\"}"), 1) + response.Data = bytes.Replace(response.Data, []byte(strconv.FormatInt(m.Id, 10)), []byte(strconv.FormatInt(m2.Message.Id, 10)), 1) + return response, nil + case <-time.After(1 * time.Second): + client.successMsgStore.Delete(m.Id) + close(successCatcher) + } + } + } + return response, nil case <-ctx.Done(): return nil, errors.New("response catching timeout") } @@ -148,9 +188,9 @@ func (client *Client) GetListener() *Listener { func (client *Client) AddEventReceiver(msgType Type, channelCapacity int) *Listener { listener := &Listener{ - isActive: true, - Updates: make(chan Type, channelCapacity), - Filter: msgType, + isActive: true, + Updates: make(chan Type, channelCapacity), + Filter: msgType, } client.listenerStore.Add(listener)