diff --git a/ConversationsClassic/AppData/Client/Client+MartinMAM.swift b/ConversationsClassic/AppData/Client/Client+MartinMAM.swift index 984790d..b399a18 100644 --- a/ConversationsClassic/AppData/Client/Client+MartinMAM.swift +++ b/ConversationsClassic/AppData/Client/Client+MartinMAM.swift @@ -3,54 +3,75 @@ import Foundation import GRDB import Martin +private typealias ArchMsg = Martin.MessageArchiveManagementModule.ArchivedMessageReceived + final class ClientMartinMAM { private var cancellables: Set = [] - - private weak var module: MessageArchiveManagementModule? - private var afterAvailable = true - private var beforeAvailable = true + private var processor = ArchiveMessageProcessor() init(_ xmppConnection: XMPPClient) { - module = xmppConnection.module(.mam) - subscribe() - } - - private func subscribe() { // subscribe to archived messages - module?.archivedMessagesPublisher - .delay(for: 0.7, scheduler: DispatchQueue.main) + xmppConnection.module(.mam).archivedMessagesPublisher .sink(receiveValue: { [weak self] archived in guard let self = self else { return } Task { - await self.handleMessage(archived) + await self.processor.append(archived) } }) .store(in: &cancellables) } +} - private func handleMessage(_ received: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) async { - let message = received.message - let date = received.timestamp - let msgId = received.messageId +private actor ArchiveMessageProcessor { + private var accumulator: [ArchMsg] = [] + init() { + Task { + while true { + try? await Task.sleep(nanoseconds: 700 * NSEC_PER_MSEC) + await process() + } + } + } + + func append(_ msg: ArchMsg) async { + accumulator.append(msg) + if accumulator.count >= Const.mamRequestLimit { + await process() + } + } + + func process() async { + if accumulator.isEmpty { return } + await handleMessages(accumulator) + accumulator.removeAll() + } + + private func handleMessages(_ received: [ArchMsg]) async { + if received.isEmpty { return } try? await Database.shared.dbQueue.write { db in - if try Message.fetchOne(db, key: msgId) != nil { - #if DEBUG - print("---") - print("Skipping archived message with id \(message.id ?? "???") (message exists)") - print("---") - #endif - return - } else { - #if DEBUG - print("---") - print("Archive message received: \(message)") - print("Date: \(date)") - print("---") - #endif - if var msg = Message.map(message) { - msg.date = received.timestamp - try msg.insert(db) + for recv in received { + let message = recv.message + let date = recv.timestamp + if let msgId = message.id { + if try Message.fetchOne(db, key: msgId) != nil { + #if DEBUG + print("---") + print("Skipping archived message with id \(msgId) (message exists)") + print("---") + #endif + } else { + #if DEBUG + print("---") + print("Archive message received: \(message)") + print("Date: \(date)") + print("---") + #endif + if var msg = Message.map(message) { + msg.date = date + try msg.insert(db) + } + } } } }