import Combine import Foundation import GRDB import Martin private typealias ArchMsg = Martin.MessageArchiveManagementModule.ArchivedMessageReceived final class ClientMartinMAM { private var cancellables: Set = [] private var processor: ArchiveMessageProcessor init(_ xmppConnection: XMPPClient) { processor = ArchiveMessageProcessor(xmppConnection.context) // subscribe to archived messages xmppConnection.module(.mam).archivedMessagesPublisher .sink(receiveValue: { [weak self] archived in guard let self = self else { return } Task { await self.processor.append(archived) } }) .store(in: &cancellables) } } private actor ArchiveMessageProcessor { private var accumulator: [ArchMsg] = [] private let context: Context? init(_ ctx: Context?) { context = ctx Task { while true { try? await Task.sleep(nanoseconds: 700 * NSEC_PER_MSEC) await process() } } } func append(_ msg: ArchMsg) async { accumulator.append(msg) if accumulator.count >= Const.mamRequestPageSize { await process() } } func process() async { if accumulator.isEmpty { return } await handleMessages(accumulator) accumulator.removeAll() } private func handleMessages(_ received: [ArchMsg]) async { if received.isEmpty { return } try? await Database.shared.dbQueue.write { [weak self] db in for recv in received { let message = recv.message let date = recv.timestamp if let msgId = message.id { if try Message.fetchOne(db, key: msgId) != nil { #if DEBUG print("---") print("Skipping archived message with id \(msgId) (message exists)") print("---") #endif } else { #if DEBUG print("---") print("Archive message received: \(message)") print("Date: \(date)") print("---") #endif if var msg = Message.map(message, context: self?.context) { msg.date = date try msg.insert(db) } } } } } } }