Resend chat statuses on probe presence
This commit is contained in:
parent
a435a0a556
commit
b8fcac6ae2
63
telegram/cache/cache.go
vendored
63
telegram/cache/cache.go
vendored
|
@ -6,20 +6,30 @@ import (
|
||||||
"github.com/zelenin/go-tdlib/client"
|
"github.com/zelenin/go-tdlib/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Status stores formatted data for XMPP presence
|
||||||
|
type Status struct {
|
||||||
|
ID int64
|
||||||
|
XMPP string
|
||||||
|
Description string
|
||||||
|
}
|
||||||
|
|
||||||
// Cache allows operating the chats and users cache in
|
// Cache allows operating the chats and users cache in
|
||||||
// a thread-safe manner
|
// a thread-safe manner
|
||||||
type Cache struct {
|
type Cache struct {
|
||||||
chats map[int64]*client.Chat
|
chats map[int64]*client.Chat
|
||||||
users map[int32]*client.User
|
users map[int32]*client.User
|
||||||
chatsLock sync.Mutex
|
statuses map[int64]*Status
|
||||||
usersLock sync.Mutex
|
chatsLock sync.Mutex
|
||||||
|
usersLock sync.Mutex
|
||||||
|
statusesLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCache initializes a cache
|
// NewCache initializes a cache
|
||||||
func NewCache() *Cache {
|
func NewCache() *Cache {
|
||||||
return &Cache{
|
return &Cache{
|
||||||
chats: map[int64]*client.Chat{},
|
chats: map[int64]*client.Chat{},
|
||||||
users: map[int32]*client.User{},
|
users: map[int32]*client.User{},
|
||||||
|
statuses: map[int64]*Status{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,6 +59,26 @@ func (cache *Cache) UsersKeys() []int32 {
|
||||||
return keys
|
return keys
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StatusesRange loops through the map in a thread-safe manner
|
||||||
|
func (cache *Cache) StatusesRange() chan *Status {
|
||||||
|
cache.statusesLock.Lock()
|
||||||
|
|
||||||
|
statusChan := make(chan *Status, 1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
cache.statusesLock.Unlock()
|
||||||
|
close(statusChan)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, status := range cache.statuses {
|
||||||
|
statusChan <- status
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return statusChan
|
||||||
|
}
|
||||||
|
|
||||||
// GetChat retrieves chat by id if it's present in the cache
|
// GetChat retrieves chat by id if it's present in the cache
|
||||||
func (cache *Cache) GetChat(id int64) (*client.Chat, bool) {
|
func (cache *Cache) GetChat(id int64) (*client.Chat, bool) {
|
||||||
cache.chatsLock.Lock()
|
cache.chatsLock.Lock()
|
||||||
|
@ -67,6 +97,15 @@ func (cache *Cache) GetUser(id int32) (*client.User, bool) {
|
||||||
return user, ok
|
return user, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetStatus retrieves status by id if it's present in the cache
|
||||||
|
func (cache *Cache) GetStatus(id int64) (*Status, bool) {
|
||||||
|
cache.statusesLock.Lock()
|
||||||
|
defer cache.statusesLock.Unlock()
|
||||||
|
|
||||||
|
status, ok := cache.statuses[id]
|
||||||
|
return status, ok
|
||||||
|
}
|
||||||
|
|
||||||
// SetChat stores a chat in the cache
|
// SetChat stores a chat in the cache
|
||||||
func (cache *Cache) SetChat(id int64, chat *client.Chat) {
|
func (cache *Cache) SetChat(id int64, chat *client.Chat) {
|
||||||
cache.chatsLock.Lock()
|
cache.chatsLock.Lock()
|
||||||
|
@ -82,3 +121,15 @@ func (cache *Cache) SetUser(id int32, user *client.User) {
|
||||||
|
|
||||||
cache.users[id] = user
|
cache.users[id] = user
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetStatus stores a status in the cache
|
||||||
|
func (cache *Cache) SetStatus(id int64, show string, status string) {
|
||||||
|
cache.statusesLock.Lock()
|
||||||
|
defer cache.statusesLock.Unlock()
|
||||||
|
|
||||||
|
cache.statuses[id] = &Status{
|
||||||
|
ID: id,
|
||||||
|
XMPP: show,
|
||||||
|
Description: status,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -120,13 +120,13 @@ func (c *Client) updateHandler() {
|
||||||
func (c *Client) updateUser(update *client.UpdateUser) {
|
func (c *Client) updateUser(update *client.UpdateUser) {
|
||||||
c.cache.SetUser(update.User.Id, update.User)
|
c.cache.SetUser(update.User.Id, update.User)
|
||||||
show, status := c.userStatusToText(update.User.Status)
|
show, status := c.userStatusToText(update.User.Status)
|
||||||
go c.processStatusUpdate(int64(update.User.Id), status, show)
|
go c.ProcessStatusUpdate(int64(update.User.Id), status, show)
|
||||||
}
|
}
|
||||||
|
|
||||||
// user status changed
|
// user status changed
|
||||||
func (c *Client) updateUserStatus(update *client.UpdateUserStatus) {
|
func (c *Client) updateUserStatus(update *client.UpdateUserStatus) {
|
||||||
show, status := c.userStatusToText(update.Status)
|
show, status := c.userStatusToText(update.Status)
|
||||||
go c.processStatusUpdate(int64(update.UserId), status, show, gateway.SPImmed(false))
|
go c.ProcessStatusUpdate(int64(update.UserId), status, show, gateway.SPImmed(false))
|
||||||
}
|
}
|
||||||
|
|
||||||
// new chat discovered
|
// new chat discovered
|
||||||
|
@ -166,7 +166,7 @@ func (c *Client) updateNewChat(update *client.UpdateNewChat) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if update.Chat.Id < 0 {
|
if update.Chat.Id < 0 {
|
||||||
c.processStatusUpdate(update.Chat.Id, update.Chat.Title, "chat")
|
c.ProcessStatusUpdate(update.Chat.Id, update.Chat.Title, "chat")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"dev.narayana.im/narayana/telegabber/telegram/cache"
|
||||||
"dev.narayana.im/narayana/telegabber/xmpp/gateway"
|
"dev.narayana.im/narayana/telegabber/xmpp/gateway"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
@ -129,8 +130,8 @@ func (c *Client) userStatusToText(status client.UserStatus) (string, string) {
|
||||||
return show, textStatus
|
return show, textStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
// set contact status
|
// ProcessStatusUpdate sets contact status
|
||||||
func (c *Client) processStatusUpdate(chatID int64, status string, show string, args ...args.V) error {
|
func (c *Client) ProcessStatusUpdate(chatID int64, status string, show string, args ...args.V) error {
|
||||||
if !c.Online() {
|
if !c.Online() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -171,6 +172,8 @@ func (c *Client) processStatusUpdate(chatID int64, status string, show string, a
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.cache.SetStatus(chatID, show, status)
|
||||||
|
|
||||||
gateway.SendPresence(
|
gateway.SendPresence(
|
||||||
c.xmpp,
|
c.xmpp,
|
||||||
c.jid,
|
c.jid,
|
||||||
|
@ -512,3 +515,8 @@ func (c *Client) ProcessOutgoingMessage(chatID int64, text string, messageID int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StatusesRange proxies the following function from unexported cache
|
||||||
|
func (c *Client) StatusesRange() chan *cache.Status {
|
||||||
|
return c.cache.StatusesRange()
|
||||||
|
}
|
||||||
|
|
|
@ -176,6 +176,15 @@ func handlePresence(s xmpp.Sender, p stanza.Presence) {
|
||||||
err = session.Connect()
|
err = session.Connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(errors.Wrap(err, "TDlib connection failure"))
|
log.Error(errors.Wrap(err, "TDlib connection failure"))
|
||||||
|
} else {
|
||||||
|
for status := range session.StatusesRange() {
|
||||||
|
go session.ProcessStatusUpdate(
|
||||||
|
status.ID,
|
||||||
|
status.XMPP,
|
||||||
|
status.Description,
|
||||||
|
gateway.SPImmed(false),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue