Add keeponline option

This commit is contained in:
Bohdan Horbeshko 2022-01-05 16:04:22 -05:00
parent d48cb8b586
commit 077edae986
7 changed files with 97 additions and 22 deletions

View file

@ -36,6 +36,12 @@ type SessionsMap struct {
type Session struct { type Session struct {
Login string `yaml:":login"` Login string `yaml:":login"`
Timezone string `yaml:":timezone"` Timezone string `yaml:":timezone"`
KeepOnline bool `yaml:":keeponline"`
}
var configKeys = []string{
"timezone",
"keeponline",
} }
var sessionDB *SessionsYamlDB var sessionDB *SessionsYamlDB
@ -102,6 +108,8 @@ func (s *Session) Get(key string) (string, error) {
switch key { switch key {
case "timezone": case "timezone":
return s.Timezone, nil return s.Timezone, nil
case "keeponline":
return fromBool(s.KeepOnline), nil
} }
return "", errors.New("Unknown session property") return "", errors.New("Unknown session property")
@ -110,7 +118,7 @@ func (s *Session) Get(key string) (string, error) {
// ToMap converts the session to a map // ToMap converts the session to a map
func (s *Session) ToMap() map[string]string { func (s *Session) ToMap() map[string]string {
m := make(map[string]string) m := make(map[string]string)
for _, configKey := range []string{"timezone"} { for _, configKey := range configKeys {
value, _ := s.Get(configKey) value, _ := s.Get(configKey)
m[configKey] = value m[configKey] = value
} }
@ -124,6 +132,13 @@ func (s *Session) Set(key string, value string) (string, error) {
case "timezone": case "timezone":
s.Timezone = value s.Timezone = value
return value, nil return value, nil
case "keeponline":
b, err := toBool(value)
if err != nil {
return "", err
}
s.KeepOnline = b
return value, nil
} }
return "", errors.New("Unknown session property") return "", errors.New("Unknown session property")
@ -139,3 +154,22 @@ func (s *Session) TimezoneToLocation() *time.Location {
// default // default
return zeroLocation return zeroLocation
} }
func fromBool(b bool) string {
if b {
return "true"
} else {
return "false"
}
}
func toBool(s string) (bool, error) {
switch s {
case "true":
return true, nil
case "false":
return false, nil
}
return false, errors.New("Invalid boolean value")
}

View file

@ -53,7 +53,8 @@ var chatCommands = map[string]command{
} }
var transportConfigurationOptions = map[string]configurationOption{ var transportConfigurationOptions = map[string]configurationOption{
"timezone": configurationOption{"00:00", "adjust timezone for Telegram user statuses"}, "timezone": configurationOption{"<timezone>", "adjust timezone for Telegram user statuses (example: +02:00)"},
"keeponline": configurationOption{"<bool>", "always keep telegram session online and rely on jabber offline messages (example: true)"},
} }
type command struct { type command struct {
@ -293,6 +294,7 @@ func (c *Client) ProcessTransportCommand(cmdline string, resource string) string
if err != nil { if err != nil {
return err.Error() return err.Error()
} }
gateway.DirtySessions = true
return fmt.Sprintf("%s set to %s", args[0], value) return fmt.Sprintf("%s set to %s", args[0], value)
} else if len(args) > 0 { } else if len(args) > 0 {

View file

@ -91,7 +91,7 @@ func (c *Client) Connect(resource string) error {
c.locks.authorizationReady.Wait() c.locks.authorizationReady.Wait()
if c.Online() { if c.Online() {
c.refresh(resource) c.roster(resource)
return nil return nil
} }
@ -157,7 +157,7 @@ func (c *Client) Disconnect(resource string, quit bool) bool {
c.deleteResource(resource) c.deleteResource(resource)
} }
// other resources are still active // other resources are still active
if len(c.resources) > 0 && !quit { if (len(c.resources) > 0 || c.Session.KeepOnline) && !quit {
log.Infof("Resource %v for account %v has disconnected, %v remaining", resource, c.Session.Login, len(c.resources)) log.Infof("Resource %v for account %v has disconnected, %v remaining", resource, c.Session.Login, len(c.resources))
log.Debugf("Resources: %#v", c.resources) log.Debugf("Resources: %#v", c.resources)
return false return false

View file

@ -48,7 +48,7 @@ func (c *Client) GetContactByUsername(username string) (*client.Chat, *client.Us
// GetContactByID gets user and chat information from cache (or tries to retrieve it, if missing) // GetContactByID gets user and chat information from cache (or tries to retrieve it, if missing)
func (c *Client) GetContactByID(id int64, chat *client.Chat) (*client.Chat, *client.User, error) { func (c *Client) GetContactByID(id int64, chat *client.Chat) (*client.Chat, *client.User, error) {
if !c.Online() { if !c.Online() || id == 0 {
return nil, nil, errOffline return nil, nil, errOffline
} }
@ -600,17 +600,19 @@ func (c *Client) deleteResource(resource string) {
} }
} }
// refresh roster // resend statuses to (to another resource, for example)
func (c *Client) refresh(resource string) { func (c *Client) roster(resource string) {
if _, ok := c.resources[resource]; ok { if _, ok := c.resources[resource]; ok {
return return // we know it
} }
log.Warnf("Refreshing roster for resource %v", resource) log.Warnf("Sending roster for %v", resource)
for _, chat := range c.cache.ChatsKeys() { for _, chat := range c.cache.ChatsKeys() {
c.ProcessStatusUpdate(chat, "", "") c.ProcessStatusUpdate(chat, "", "")
} }
gateway.SendPresence(c.xmpp, c.jid, gateway.SPStatus("Logged in as: "+c.Session.Login))
c.addResource(resource) c.addResource(resource)
} }

View file

@ -2,6 +2,7 @@ package xmpp
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"sync"
"time" "time"
"dev.narayana.im/narayana/telegabber/config" "dev.narayana.im/narayana/telegabber/config"
@ -17,6 +18,7 @@ import (
var tgConf config.TelegramConfig var tgConf config.TelegramConfig
var sessions map[string]*telegram.Client var sessions map[string]*telegram.Client
var db *persistence.SessionsYamlDB var db *persistence.SessionsYamlDB
var sessionLock sync.Mutex
// NewComponent starts a new component and wraps it in // NewComponent starts a new component and wraps it in
// a stream manager that you should start yourself // a stream manager that you should start yourself
@ -69,12 +71,14 @@ func heartbeat(component *xmpp.Component) {
var err error var err error
probeType := gateway.SPType("probe") probeType := gateway.SPType("probe")
sessionLock.Lock()
for jid := range sessions { for jid := range sessions {
err = gateway.SendPresence(component, jid, probeType) err = gateway.SendPresence(component, jid, probeType)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
} }
sessionLock.Unlock()
log.Info("Starting heartbeat queue") log.Info("Starting heartbeat queue")
@ -89,6 +93,13 @@ func heartbeat(component *xmpp.Component) {
delete(gateway.Queue, key) delete(gateway.Queue, key)
} }
} }
if gateway.DirtySessions {
gateway.DirtySessions = false
// no problem if a dirty flag gets set again here,
// it would be resolved on the next iteration
SaveSessions()
}
} }
} }
@ -104,7 +115,10 @@ func loadSessions(dbPath string, component *xmpp.Component) error {
db.Transaction(func() bool { db.Transaction(func() bool {
for jid, session := range db.Data.Sessions { for jid, session := range db.Data.Sessions {
getTelegramInstance(jid, &session, component) // copy the session struct, otherwise all of them would reference
// the same temporary range variable
currentSession := session
getTelegramInstance(jid, &currentSession, component)
} }
return false return false
@ -122,22 +136,24 @@ func getTelegramInstance(jid string, savedSession *persistence.Session, componen
log.Error(errors.Wrap(err, "TDlib initialization failure")) log.Error(errors.Wrap(err, "TDlib initialization failure"))
return session, false return session, false
} }
if savedSession.KeepOnline {
if err = session.Connect(""); err != nil {
log.Error(err)
return session, false
}
}
sessionLock.Lock()
sessions[jid] = session sessions[jid] = session
sessionLock.Unlock()
} }
return session, true return session, true
} }
// Close gracefully terminates the component and saves active sessions // SaveSessions dumps current sessions to the file
func Close(component *xmpp.Component) { func SaveSessions() {
log.Error("Disconnecting...") sessionLock.Lock()
defer sessionLock.Unlock()
// close all sessions
for _, session := range sessions {
session.Disconnect("", true)
}
// save sessions
db.Transaction(func() bool { db.Transaction(func() bool {
for jid, session := range sessions { for jid, session := range sessions {
db.Data.Sessions[jid] = *session.Session db.Data.Sessions[jid] = *session.Session
@ -145,6 +161,21 @@ func Close(component *xmpp.Component) {
return true return true
}, persistence.SessionMarshaller) }, persistence.SessionMarshaller)
}
// Close gracefully terminates the component and saves active sessions
func Close(component *xmpp.Component) {
log.Error("Disconnecting...")
sessionLock.Lock()
// close all sessions
for _, session := range sessions {
session.Disconnect("", true)
}
sessionLock.Unlock()
// save sessions
SaveSessions()
// close stream // close stream
component.Disconnect() component.Disconnect()

View file

@ -18,6 +18,10 @@ var Queue = make(map[string]*stanza.Presence)
// Jid stores the component's JID object // Jid stores the component's JID object
var Jid *stanza.Jid var Jid *stanza.Jid
// DirtySessions denotes that some Telegram session configurations
// were changed and need to be re-flushed to the YamlDB
var DirtySessions = false
// SendMessage creates and sends a message stanza // SendMessage creates and sends a message stanza
func SendMessage(to string, from string, body string, component *xmpp.Component) { func SendMessage(to string, from string, body string, component *xmpp.Component) {
componentJid := Jid.Full() componentJid := Jid.Full()

View file

@ -170,7 +170,9 @@ func handlePresence(s xmpp.Sender, p stanza.Presence) {
// destroy session // destroy session
case "unsubscribed", "unsubscribe": case "unsubscribed", "unsubscribe":
if session.Disconnect(fromJid.Resource, false) { if session.Disconnect(fromJid.Resource, false) {
sessionLock.Lock()
delete(sessions, bareFromJid) delete(sessions, bareFromJid)
sessionLock.Unlock()
} }
// go offline // go offline
case "unavailable", "error": case "unavailable", "error":