Implement message type filter for listener
This commit is contained in:
parent
469cae3b44
commit
b535766aa0
|
@ -96,9 +96,11 @@ func (client *Client) receiver() {
|
||||||
|
|
||||||
needGc := false
|
needGc := false
|
||||||
for _, listener := range client.listenerStore.Listeners() {
|
for _, listener := range client.listenerStore.Listeners() {
|
||||||
if listener.IsActive() {
|
if listener.IsActive() && listener.Updates != nil && typ.GetType() == listener.Filter.GetType() { // All updates go to Updates channel if type == filter
|
||||||
listener.Updates <- typ
|
listener.Updates <- typ
|
||||||
} else {
|
} else if listener.IsActive() && listener.RawUpdates != nil { // All updates go to RawUpdates channel if filter is empty
|
||||||
|
listener.RawUpdates <- typ
|
||||||
|
} else if !listener.IsActive() { // GC inactive listener
|
||||||
needGc = true
|
needGc = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -136,8 +138,19 @@ func (client *Client) Send(req Request) (*Response, error) {
|
||||||
|
|
||||||
func (client *Client) GetListener() *Listener {
|
func (client *Client) GetListener() *Listener {
|
||||||
listener := &Listener{
|
listener := &Listener{
|
||||||
isActive: true,
|
isActive: true,
|
||||||
Updates: make(chan Type, 1000),
|
RawUpdates: make(chan Type, 1000),
|
||||||
|
}
|
||||||
|
client.listenerStore.Add(listener)
|
||||||
|
|
||||||
|
return listener
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *Client) AddEventReceiver(msgType Type, channelCapacity int) *Listener {
|
||||||
|
listener := &Listener{
|
||||||
|
isActive: true,
|
||||||
|
Updates: make(chan Type, channelCapacity),
|
||||||
|
Filter: msgType,
|
||||||
}
|
}
|
||||||
client.listenerStore.Add(listener)
|
client.listenerStore.Add(listener)
|
||||||
|
|
||||||
|
|
|
@ -45,9 +45,11 @@ func (store *listenerStore) gc() {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Listener struct {
|
type Listener struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
isActive bool
|
isActive bool
|
||||||
Updates chan Type
|
Updates chan Type
|
||||||
|
RawUpdates chan Type
|
||||||
|
Filter Type
|
||||||
}
|
}
|
||||||
|
|
||||||
func (listener *Listener) Close() {
|
func (listener *Listener) Close() {
|
||||||
|
@ -55,7 +57,12 @@ func (listener *Listener) Close() {
|
||||||
defer listener.mu.Unlock()
|
defer listener.mu.Unlock()
|
||||||
|
|
||||||
listener.isActive = false
|
listener.isActive = false
|
||||||
close(listener.Updates)
|
if listener.Updates != nil {
|
||||||
|
close(listener.Updates)
|
||||||
|
}
|
||||||
|
if listener.RawUpdates != nil {
|
||||||
|
close(listener.RawUpdates)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (listener *Listener) IsActive() bool {
|
func (listener *Listener) IsActive() bool {
|
||||||
|
|
Loading…
Reference in a new issue