Multiple resources handling

calls
Bohdan Horbeshko 2 years ago
parent 462a537021
commit f4e4692a94

@ -42,12 +42,13 @@ type Client struct {
options []client.Option options []client.Option
me *client.User me *client.User
xmpp *xmpp.Component xmpp *xmpp.Component
jid string jid string
Session *persistence.Session Session *persistence.Session
content *config.TelegramContentConfig resources map[string]bool
cache *cache.Cache content *config.TelegramContentConfig
online bool cache *cache.Cache
online bool
locks clientLocks locks clientLocks
} }
@ -55,6 +56,7 @@ type Client struct {
type clientLocks struct { type clientLocks struct {
authorizationReady sync.WaitGroup authorizationReady sync.WaitGroup
chatMessageLocks map[int64]*sync.Mutex chatMessageLocks map[int64]*sync.Mutex
resourcesLock sync.Mutex
} }
// NewClient instantiates a Telegram App // NewClient instantiates a Telegram App
@ -104,6 +106,7 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component
xmpp: component, xmpp: component,
jid: jid, jid: jid,
Session: session, Session: session,
resources: make(map[string]bool),
content: &conf.Content, content: &conf.Content,
cache: cache.NewCache(), cache: cache.NewCache(),
options: options, options: options,

@ -155,7 +155,7 @@ func (c *Client) usernameOrIDToID(username string) (int64, error) {
// ProcessTransportCommand executes a command sent directly to the component // ProcessTransportCommand executes a command sent directly to the component
// and returns a response // and returns a response
func (c *Client) ProcessTransportCommand(cmdline string) string { func (c *Client) ProcessTransportCommand(cmdline string, resource string) string {
cmd, args := parseCommand(cmdline) cmd, args := parseCommand(cmdline)
switch cmd { switch cmd {
case "login", "code", "password": case "login", "code", "password":
@ -173,7 +173,7 @@ func (c *Client) ProcessTransportCommand(cmdline string) string {
if wasSessionLoginEmpty && c.authorizer == nil { if wasSessionLoginEmpty && c.authorizer == nil {
go func() { go func() {
err := c.Connect() err := c.Connect(resource)
if err != nil { if err != nil {
log.Error(errors.Wrap(err, "TDlib connection failure")) log.Error(errors.Wrap(err, "TDlib connection failure"))
} }

@ -86,11 +86,12 @@ func (stateHandler *clientAuthorizer) Close() {
} }
// Connect starts TDlib connection // Connect starts TDlib connection
func (c *Client) Connect() error { func (c *Client) Connect(resource string) error {
// avoid conflict if another authorization is pending already // avoid conflict if another authorization is pending already
c.locks.authorizationReady.Wait() c.locks.authorizationReady.Wait()
if c.Online() { if c.Online() {
c.refresh(resource)
return nil return nil
} }
@ -116,7 +117,6 @@ func (c *Client) Connect() error {
} }
c.client = tdlibClient c.client = tdlibClient
c.locks.authorizationReady.Done()
// stage 3: if a client is succesfully created, AuthorizationStateReady is already reached // stage 3: if a client is succesfully created, AuthorizationStateReady is already reached
log.Warn("Authorization successful!") log.Warn("Authorization successful!")
@ -130,27 +130,41 @@ func (c *Client) Connect() error {
go c.updateHandler() go c.updateHandler()
c.online = true c.online = true
c.locks.authorizationReady.Done()
c.addResource(resource)
_, err = c.client.GetChats(&client.GetChatsRequest{ go func() {
OffsetOrder: client.JsonInt64(math.MaxInt64), _, err = c.client.GetChats(&client.GetChatsRequest{
Limit: chatsLimit, OffsetOrder: client.JsonInt64(math.MaxInt64),
}) Limit: chatsLimit,
if err != nil { })
log.Errorf("Could not retrieve chats: %v", err) if err != nil {
} log.Errorf("Could not retrieve chats: %v", err)
}
gateway.SendPresence(c.xmpp, c.jid, gateway.SPType("subscribe")) gateway.SendPresence(c.xmpp, c.jid, gateway.SPType("subscribe"))
gateway.SendPresence(c.xmpp, c.jid, gateway.SPType("subscribed")) gateway.SendPresence(c.xmpp, c.jid, gateway.SPType("subscribed"))
gateway.SendPresence(c.xmpp, c.jid, gateway.SPStatus("Logged in as: "+c.Session.Login)) gateway.SendPresence(c.xmpp, c.jid, gateway.SPStatus("Logged in as: "+c.Session.Login))
}()
return nil return nil
} }
// Disconnect drops TDlib connection // Disconnect drops TDlib connection and
func (c *Client) Disconnect() { // returns the flag indicating if disconnecting is permitted
func (c *Client) Disconnect(resource string, quit bool) bool {
if !quit {
c.deleteResource(resource)
}
// other resources are still active
if len(c.resources) > 0 && !quit {
log.Infof("Resource %v for account %v has disconnected, %v remaining", resource, c.Session.Login, len(c.resources))
log.Debugf("Resources: %#v", c.resources)
return false
}
// already disconnected // already disconnected
if !c.Online() { if !c.Online() {
return return false
} }
log.Warn("Disconnecting from Telegram network...") log.Warn("Disconnecting from Telegram network...")
@ -168,8 +182,10 @@ func (c *Client) Disconnect() {
_, err := c.client.Close() _, err := c.client.Close()
if err != nil { if err != nil {
log.Errorf("Couldn't close the Telegram instance: %v; %#v", err, c) log.Errorf("Couldn't close the Telegram instance: %v; %#v", err, c)
c.forceClose()
} }
c.forceClose()
return true
} }
func (c *Client) interactor() { func (c *Client) interactor() {

@ -580,3 +580,37 @@ func (c *Client) ProcessOutgoingMessage(chatID int64, text string, messageID int
func (c *Client) StatusesRange() chan *cache.Status { func (c *Client) StatusesRange() chan *cache.Status {
return c.cache.StatusesRange() return c.cache.StatusesRange()
} }
func (c *Client) addResource(resource string) {
if resource == "" {
return
}
c.locks.resourcesLock.Lock()
defer c.locks.resourcesLock.Unlock()
c.resources[resource] = true
}
func (c *Client) deleteResource(resource string) {
c.locks.resourcesLock.Lock()
defer c.locks.resourcesLock.Unlock()
if _, ok := c.resources[resource]; ok {
delete(c.resources, resource)
}
}
// refresh roster
func (c *Client) refresh(resource string) {
if _, ok := c.resources[resource]; ok {
return
}
log.Warnf("Refreshing roster for resource %v", resource)
for _, chat := range c.cache.ChatsKeys() {
c.ProcessStatusUpdate(chat, "", "")
}
c.addResource(resource)
}

@ -134,7 +134,7 @@ func Close(component *xmpp.Component) {
// close all sessions // close all sessions
for _, session := range sessions { for _, session := range sessions {
session.Disconnect() session.Disconnect("", true)
} }
// save sessions // save sessions

@ -85,7 +85,7 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) {
}).Error(errors.Wrap(err, "Invalid to JID!")) }).Error(errors.Wrap(err, "Invalid to JID!"))
} else if toID == gateway.Jid.Bare() { } else if toID == gateway.Jid.Bare() {
if strings.HasPrefix(msg.Body, "/") { if strings.HasPrefix(msg.Body, "/") {
response := session.ProcessTransportCommand(msg.Body) response := session.ProcessTransportCommand(msg.Body, fromJid.Resource)
if response != "" { if response != "" {
gateway.SendMessage(msg.From, "", response, component) gateway.SendMessage(msg.From, "", response, component)
} }
@ -169,17 +169,18 @@ func handlePresence(s xmpp.Sender, p stanza.Presence) {
switch p.Type { switch p.Type {
// destroy session // destroy session
case "unsubscribed", "unsubscribe": case "unsubscribed", "unsubscribe":
session.Disconnect() if session.Disconnect(fromJid.Resource, false) {
delete(sessions, bareFromJid) delete(sessions, bareFromJid)
}
// go offline // go offline
case "unavailable", "error": case "unavailable", "error":
session.Disconnect() session.Disconnect(fromJid.Resource, false)
// go online // go online
case "probe", "", "online": case "probe", "", "online":
// due to the weird implementation of go-tdlib wrapper, it won't // due to the weird implementation of go-tdlib wrapper, it won't
// return the client instance until successful authorization // return the client instance until successful authorization
go func() { go func() {
err = session.Connect() err = session.Connect(fromJid.Resource)
if err != nil { if err != nil {
log.Error(errors.Wrap(err, "TDlib connection failure")) log.Error(errors.Wrap(err, "TDlib connection failure"))
} else { } else {

Loading…
Cancel
Save