import Collections import Combine import Foundation import GRDB @MainActor final class ConversationStore: ObservableObject { @Published private(set) var messages: Deque = [] private(set) var roster: Roster private let client: Client private let blockSize = Const.messagesPageSize private let messagesMax = Const.messagesMaxSize private var messagesCancellable: AnyCancellable? init(roster: Roster, client: Client) { self.client = client self.roster = roster } } extension ConversationStore { func loadMoreBackward() async { let earliestDate = messages.last?.date ?? Date() resubscribe(.before(earliestDate)) // let fetchedMessages = await fetchBlock(earliestDate, nil) // messages.append(contentsOf: fetchedMessages) // if messages.count > messagesMax { // messages.removeFirst(messages.count - messagesMax) // } // guard let lastMessage = messages.last else { return } // let messages = await fetchBlock(lastMessage.date, nil) // self.messages.append(contentsOf: messages) } func loadMoreForward() async { // guard let firstMessage = messages.first else { return } // let messages = await fetchBlock(nil, firstMessage.date) // self.messages.insert(contentsOf: messages, at: 0) } } private extension ConversationStore { enum FetchDirection { case before(Date) case after(Date) } func resubscribe(_ side: FetchDirection) { switch side { case .before(let date): messagesCancellable = ValueObservation.tracking(Message .filter( (Column("to") == roster.bareJid && Column("from") == roster.contactBareJid) || (Column("from") == roster.bareJid && Column("to") == roster.contactBareJid) ) .filter(Column("date") < date) .limit(blockSize) .order(Column("date").desc) .fetchAll ) .publisher(in: Database.shared.dbQueue, scheduling: .immediate) .sink { _ in } receiveValue: { [weak self] messages in self?.processFetched(messages, side) } case .after(let date): messagesCancellable = ValueObservation.tracking(Message .filter( (Column("to") == roster.bareJid && Column("from") == roster.contactBareJid) || (Column("from") == roster.bareJid && Column("to") == roster.contactBareJid) ) .filter(Column("date") > date) .limit(blockSize) .order(Column("date").desc) .fetchAll ) .publisher(in: Database.shared.dbQueue, scheduling: .immediate) .sink { _ in } receiveValue: { [weak self] messages in self?.processFetched(messages, side) } } } func processFetched(_ messages: [Message], _ side: FetchDirection) { switch side { case .before: self.messages.append(contentsOf: messages) case .after: self.messages.insert(contentsOf: messages, at: 0) } Task { await processAttachments(messages) } } func processAttachments(_ messages: [Message]) async { // load attachment here print(messages.count) } }