From b535766aa0f4cd9ce228e9b2b0e9609561df9dfe Mon Sep 17 00:00:00 2001 From: c0re100 Date: Sat, 29 Jan 2022 09:20:54 +0800 Subject: [PATCH] Implement message type filter for listener --- client/client.go | 21 +++++++++++++++++---- client/listener.go | 15 +++++++++++---- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/client/client.go b/client/client.go index 0d51f1c..635bcc5 100644 --- a/client/client.go +++ b/client/client.go @@ -96,9 +96,11 @@ func (client *Client) receiver() { needGc := false for _, listener := range client.listenerStore.Listeners() { - if listener.IsActive() { + if listener.IsActive() && listener.Updates != nil && typ.GetType() == listener.Filter.GetType() { // All updates go to Updates channel if type == filter listener.Updates <- typ - } else { + } else if listener.IsActive() && listener.RawUpdates != nil { // All updates go to RawUpdates channel if filter is empty + listener.RawUpdates <- typ + } else if !listener.IsActive() { // GC inactive listener needGc = true } } @@ -136,8 +138,19 @@ func (client *Client) Send(req Request) (*Response, error) { func (client *Client) GetListener() *Listener { listener := &Listener{ - isActive: true, - Updates: make(chan Type, 1000), + isActive: true, + RawUpdates: make(chan Type, 1000), + } + client.listenerStore.Add(listener) + + return listener +} + +func (client *Client) AddEventReceiver(msgType Type, channelCapacity int) *Listener { + listener := &Listener{ + isActive: true, + Updates: make(chan Type, channelCapacity), + Filter: msgType, } client.listenerStore.Add(listener) diff --git a/client/listener.go b/client/listener.go index 30c122e..acd0cbd 100644 --- a/client/listener.go +++ b/client/listener.go @@ -45,9 +45,11 @@ func (store *listenerStore) gc() { } type Listener struct { - mu sync.Mutex - isActive bool - Updates chan Type + mu sync.Mutex + isActive bool + Updates chan Type + RawUpdates chan Type + Filter Type } func (listener *Listener) Close() { @@ -55,7 +57,12 @@ func (listener *Listener) Close() { defer listener.mu.Unlock() listener.isActive = false - close(listener.Updates) + if listener.Updates != nil { + close(listener.Updates) + } + if listener.RawUpdates != nil { + close(listener.RawUpdates) + } } func (listener *Listener) IsActive() bool {