diff --git a/client/client.go b/client/client.go index 0d51f1c..635bcc5 100644 --- a/client/client.go +++ b/client/client.go @@ -96,9 +96,11 @@ func (client *Client) receiver() { needGc := false for _, listener := range client.listenerStore.Listeners() { - if listener.IsActive() { + if listener.IsActive() && listener.Updates != nil && typ.GetType() == listener.Filter.GetType() { // All updates go to Updates channel if type == filter listener.Updates <- typ - } else { + } else if listener.IsActive() && listener.RawUpdates != nil { // All updates go to RawUpdates channel if filter is empty + listener.RawUpdates <- typ + } else if !listener.IsActive() { // GC inactive listener needGc = true } } @@ -136,8 +138,19 @@ func (client *Client) Send(req Request) (*Response, error) { func (client *Client) GetListener() *Listener { listener := &Listener{ - isActive: true, - Updates: make(chan Type, 1000), + isActive: true, + RawUpdates: make(chan Type, 1000), + } + client.listenerStore.Add(listener) + + return listener +} + +func (client *Client) AddEventReceiver(msgType Type, channelCapacity int) *Listener { + listener := &Listener{ + isActive: true, + Updates: make(chan Type, channelCapacity), + Filter: msgType, } client.listenerStore.Add(listener) diff --git a/client/listener.go b/client/listener.go index 30c122e..acd0cbd 100644 --- a/client/listener.go +++ b/client/listener.go @@ -45,9 +45,11 @@ func (store *listenerStore) gc() { } type Listener struct { - mu sync.Mutex - isActive bool - Updates chan Type + mu sync.Mutex + isActive bool + Updates chan Type + RawUpdates chan Type + Filter Type } func (listener *Listener) Close() { @@ -55,7 +57,12 @@ func (listener *Listener) Close() { defer listener.mu.Unlock() listener.isActive = false - close(listener.Updates) + if listener.Updates != nil { + close(listener.Updates) + } + if listener.RawUpdates != nil { + close(listener.RawUpdates) + } } func (listener *Listener) IsActive() bool {