From 17afd3f8c7a016d5103be949990efb695de865b5 Mon Sep 17 00:00:00 2001 From: Bohdan Horbeshko Date: Thu, 31 Mar 2022 21:42:12 -0400 Subject: [PATCH] Limit the file storage by an optional quota --- Makefile | 2 +- config.yml.example | 1 + config/config.go | 1 + config_schema.json | 3 ++ telegram/utils.go | 26 ++++++++++++ xmpp/component.go | 78 ++++++++++++++++++++++++++++++++++++ xmpp/component_test.go | 47 ++++++++++++++++++++++ xmpp/gateway/storage.go | 89 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 246 insertions(+), 1 deletion(-) create mode 100644 xmpp/component_test.go create mode 100644 xmpp/gateway/storage.go diff --git a/Makefile b/Makefile index 837be39..1f75f76 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ all: go build -o telegabber test: - go test -v ./config ./ ./telegram ./xmpp/gateway ./persistence ./telegram/formatter + go test -v ./config ./ ./telegram ./xmpp ./xmpp/gateway ./persistence ./telegram/formatter lint: $(GOPATH)/bin/golint ./... diff --git a/config.yml.example b/config.yml.example index dae2350..b8de1dd 100644 --- a/config.yml.example +++ b/config.yml.example @@ -5,6 +5,7 @@ :link: 'http://tlgrm.localhost/content' # webserver public address :upload: 'https:///xmppfiles.localhost' # xmpp http upload address :user: 'www-data' # owner of content files + :quota: '256MB' # maximum storage size :tdlib_verbosity: 1 :tdlib: :datadir: './sessions/' diff --git a/config/config.go b/config/config.go index e88142f..7c685fb 100644 --- a/config/config.go +++ b/config/config.go @@ -39,6 +39,7 @@ type TelegramContentConfig struct { Link string `yaml:":link"` Upload string `yaml:":upload"` User string `yaml:":user"` + Quota string `yaml:":quota"` } // TelegramTdlibConfig is for :tdlib: subtree diff --git a/config_schema.json b/config_schema.json index e77b8c3..ab25307 100644 --- a/config_schema.json +++ b/config_schema.json @@ -24,6 +24,9 @@ }, ":user": { "type": "string" + }, + ":quota": { + "type": "string" } } }, diff --git a/telegram/utils.go b/telegram/utils.go index f60f35f..b41423a 100644 --- a/telegram/utils.go +++ b/telegram/utils.go @@ -361,6 +361,9 @@ func (c *Client) formatFile(file *client.File, compact bool) string { return "" } + gateway.StorageLock.Lock() + defer gateway.StorageLock.Unlock() + var link string var src string @@ -372,6 +375,9 @@ func (c *Client) formatFile(file *client.File, compact bool) string { return "" } + size64:= uint64(file.Size) + c.prepareDiskSpace(size64) + basename := file.Remote.UniqueId + filepath.Ext(src) dest := c.content.Path + "/" + basename // destination path link = c.content.Link + "/" + basename // download link @@ -387,6 +393,8 @@ func (c *Client) formatFile(file *client.File, compact bool) string { return "" } } + gateway.CachedStorageSize += size64 + // chown if c.content.User != "" { user, err := osUser.Lookup(c.content.User) @@ -729,7 +737,12 @@ func (c *Client) messageToPrefix(message *client.Message, previewString string, } func (c *Client) ensureDownloadFile(file *client.File) *client.File { + gateway.StorageLock.Lock() + defer gateway.StorageLock.Unlock() + if file != nil { + c.prepareDiskSpace(uint64(file.Size)) + newFile, err := c.DownloadFile(file.Id, 1, true) if err == nil { return newFile @@ -952,3 +965,16 @@ func (c *Client) subscribeToID(id int64, chat *client.Chat) { args..., ) } + +func (c *Client) prepareDiskSpace(size uint64) { + if gateway.StorageQuota > 0 && c.content.Path != "" { + var loweredQuota uint64 + if gateway.StorageQuota >= size { + loweredQuota = gateway.StorageQuota - size + } + if gateway.CachedStorageSize >= loweredQuota { + log.Warn("Storage is rapidly clogged") + gateway.CleanOldFiles(c.content.Path, loweredQuota) + } + } +} diff --git a/xmpp/component.go b/xmpp/component.go index 961761b..1749100 100644 --- a/xmpp/component.go +++ b/xmpp/component.go @@ -2,6 +2,8 @@ package xmpp import ( "github.com/pkg/errors" + "regexp" + "strconv" "sync" "time" @@ -20,6 +22,19 @@ var sessions map[string]*telegram.Client var db *persistence.SessionsYamlDB var sessionLock sync.Mutex +const ( + B uint64 = 1 + KB = B << 10 + MB = KB << 10 + GB = MB << 10 + TB = GB << 10 + PB = TB << 10 + EB = PB << 10 + + maxUint64 uint64 = (1 << 64) - 1 +) +var sizeRegex = regexp.MustCompile("\\A([0-9]+) ?([KMGTPE]?B?)\\z") + // NewComponent starts a new component and wraps it in // a stream manager that you should start yourself func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.StreamManager, *xmpp.Component, error) { @@ -32,6 +47,13 @@ func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.Strea tgConf = tc + if tc.Content.Quota != "" { + gateway.StorageQuota, err = parseSize(tc.Content.Quota) + if err != nil { + log.Warnf("Error parsing the storage quota: %v; the cleaner is disabled", err) + } + } + options := xmpp.ComponentOptions{ TransportConfiguration: xmpp.TransportConfiguration{ Address: conf.Host + ":" + conf.Port, @@ -80,10 +102,22 @@ func heartbeat(component *xmpp.Component) { } sessionLock.Unlock() + quotaLowThreshold := gateway.StorageQuota / 10 * 9 + log.Info("Starting heartbeat queue") // status updater thread for { + gateway.StorageLock.Lock() + if quotaLowThreshold > 0 && tgConf.Content.Path != "" { + gateway.MeasureStorageSize(tgConf.Content.Path) + + if gateway.CachedStorageSize > quotaLowThreshold { + gateway.CleanOldFiles(tgConf.Content.Path, quotaLowThreshold) + } + } + gateway.StorageLock.Unlock() + time.Sleep(60e9) now := time.Now().Unix() @@ -201,3 +235,47 @@ func Close(component *xmpp.Component) { // close stream component.Disconnect() } + +// based on https://github.com/c2h5oh/datasize/blob/master/datasize.go +func parseSize(sSize string) (uint64, error) { + sizeParts := sizeRegex.FindStringSubmatch(sSize) + + if len(sizeParts) > 2 { + numPart, err := strconv.ParseInt(sizeParts[1], 10, 64) + if err != nil { + return 0, err + } + + var divisor uint64 + val := uint64(numPart) + + if len(sizeParts[2]) > 0 { + switch sizeParts[2][0] { + case 'B': + divisor = 1 + case 'K': + divisor = KB + case 'M': + divisor = MB + case 'G': + divisor = GB + case 'T': + divisor = TB + case 'P': + divisor = PB + case 'E': + divisor = EB + } + } + + if divisor == 0 { + return 0, &strconv.NumError{"Wrong suffix", sSize, strconv.ErrSyntax} + } + if val > maxUint64/divisor { + return 0, &strconv.NumError{"Overflow", sSize, strconv.ErrRange} + } + return val * divisor, nil + } + + return 0, &strconv.NumError{"Not enough parts", sSize, strconv.ErrSyntax} +} diff --git a/xmpp/component_test.go b/xmpp/component_test.go new file mode 100644 index 0000000..7e7e5e7 --- /dev/null +++ b/xmpp/component_test.go @@ -0,0 +1,47 @@ +package xmpp + +import ( + "testing" +) + +func TestParseSizeGarbage(t *testing.T) { + _, err := parseSize("abc") + if err == nil { + t.Error("abc should not be accepted") + } +} + +func TestParseSizeAsphalt(t *testing.T) { + size, err := parseSize("2B") + if size != 2 { + t.Errorf("Error parsing two bytes: %v %v", size, err) + } +} + +func TestParseSize9K(t *testing.T) { + size, err := parseSize("9 KB") + if size != 9216 { + t.Errorf("Error parsing 9K: %v %v", size, err) + } +} + +func TestParseSizeBits(t *testing.T) { + size, err := parseSize("9 Kb") + if err == nil { + t.Errorf("Error parsing kilobits: %v %v", size, err) + } +} + +func TestParseSizeEB(t *testing.T) { + size, err := parseSize("3EB") + if size != 3458764513820540928 { + t.Errorf("Error parsing exabytes: %v %v", size, err) + } +} + +func TestParseSizeOverflow(t *testing.T) { + size, err := parseSize("314EB") + if err == nil { + t.Errorf("Overflow is not overflowing: %v %v", size, err) + } +} diff --git a/xmpp/gateway/storage.go b/xmpp/gateway/storage.go new file mode 100644 index 0000000..a3fad79 --- /dev/null +++ b/xmpp/gateway/storage.go @@ -0,0 +1,89 @@ +package gateway + +import ( + "io/ioutil" + "os" + "sort" + "sync" + + log "github.com/sirupsen/logrus" +) + +// StorageQuota is a value from config parsed to bytes number +var StorageQuota uint64 +// CachedStorageSize estimates the storage size between full rescans +var CachedStorageSize uint64 +var StorageLock = sync.Mutex{} + +// MeasureStorageSize replaces the estimated storage size with relevant data from the filesystem +func MeasureStorageSize(path string) { + dents, err := ioutil.ReadDir(path) + if err != nil { + return + } + + var total uint64 + for _, fi := range dents { + if !fi.IsDir() { + total += uint64(fi.Size()) + } + } + + if total != CachedStorageSize { + if CachedStorageSize > 0 { + log.Warnf("Correcting cached storage size: was %v, actually %v", CachedStorageSize, total) + } + CachedStorageSize = total + } +} + +// CleanOldFiles purges the oldest files in a directory that exceed the limit +func CleanOldFiles(path string, limit uint64) { + dents, err := ioutil.ReadDir(path) + if err != nil { + return + } + + var total uint64 + for _, fi := range dents { + if !fi.IsDir() { + total += uint64(fi.Size()) + } + } + + // sort by time + sort.Slice(dents, func(i int, j int) bool { + return dents[i].ModTime().Before(dents[j].ModTime()) + }) + + // purge + if total > limit { + toPurge := total - limit + + var purgedAmount uint64 + var purgedCount uint64 + + for _, fi := range dents { + if !fi.IsDir() { + err = os.Remove(path + string(os.PathSeparator) + fi.Name()) + if err != nil { + log.Errorf("Couldn't remove %v: %v", fi.Name(), err) + continue + } + + purgedAmount += uint64(fi.Size()) + purgedCount += 1 + if purgedAmount >= toPurge { + break + } + } + } + + log.Infof("Cleaned %v bytes of %v old files", purgedAmount, purgedCount) + if CachedStorageSize > purgedAmount { + CachedStorageSize -= purgedAmount + } else { + CachedStorageSize = 0 + } + } +}