import Combine import Foundation import GRDB import Martin @MainActor final class MessagesStore: ObservableObject { @Published private(set) var messages: [Message] = [] @Published var replyText = "" private(set) var roster: Roster private let client: Client private var messagesCancellable: AnyCancellable? private let archiver = ArchiveMessageFetcher() init(roster: Roster, client: Client) { self.client = client self.roster = roster subscribe() } } // MARK: - Send message extension MessagesStore { func sendMessage(_ message: String) { Task { var msg = Message.blank msg.from = roster.bareJid msg.to = roster.contactBareJid msg.body = message // store as pending on db, and send do { try await msg.save() try await client.sendMessage(msg) try await msg.setStatus(.sent) } catch { try? await msg.setStatus(.error) } } } func sendContact(_ jidStr: String) { sendMessage("contact:\(jidStr)") } func sendLocation(_ lat: Double, _ lon: Double) { sendMessage("geo:\(lat),\(lon)") } } // MARK: - Subscriptions private extension MessagesStore { func subscribe() { messagesCancellable = ValueObservation.tracking(Message .filter( (Column("to") == roster.bareJid && Column("from") == roster.contactBareJid) || (Column("from") == roster.bareJid && Column("to") == roster.contactBareJid) ) .order(Column("date").desc) .fetchAll ) .publisher(in: Database.shared.dbQueue, scheduling: .immediate) .receive(on: DispatchQueue.main) .sink { _ in } receiveValue: { [weak self] messages in guard let self else { return } self.messages = messages Task { await self.archiver.initialFetch(messages, self.roster, self.client) } } } } // MARK: - Fetch archived messages extension MessagesStore { func fetchForward() { Task { await archiver.fetchForward(roster, client) } } func fetchBackward() { Task { await archiver.fetchBackward(roster, client) } } } private actor ArchiveMessageFetcher { private var initFetchStarted = false private var forwardRsm: RSM.Query? private var backwardRsm: RSM.Query? func initialFetch(_ messages: [Message], _ roster: Roster, _ client: Client) async { if initFetchStarted { return } initFetchStarted = true do { if let firstExistId = messages.first?.id { let result = try await client.fetchArchiveMessages(for: roster, query: .init(before: firstExistId, max: Const.mamRequestLimit)) result.complete ? forwardRsm = nil : (forwardRsm = .init(after: result.rsm?.last, max: Const.mamRequestLimit)) result.complete ? backwardRsm = nil : (backwardRsm = .init(before: result.rsm?.first, max: Const.mamRequestLimit)) } else { let result = try await client.fetchArchiveMessages(for: roster, query: .init(lastItems: Const.mamRequestLimit)) result.complete ? backwardRsm = nil : (backwardRsm = .init(before: result.rsm?.first, max: Const.mamRequestLimit)) } } catch { logIt(.error, "Error requesting archived messages: \(error)") initFetchStarted = false } } func fetchForward(_ roster: Roster, _ client: Client) { guard let rsm = forwardRsm else { return } Task { let result = try await client.fetchArchiveMessages(for: roster, query: rsm) result.complete ? (forwardRsm = nil) : (forwardRsm = .init(after: result.rsm?.last, max: Const.mamRequestLimit)) } } func fetchBackward(_ roster: Roster, _ client: Client) { guard let rsm = backwardRsm else { return } Task { let result = try await client.fetchArchiveMessages(for: roster, query: rsm) result.complete ? (backwardRsm = nil) : (backwardRsm = .init(before: result.rsm?.first, max: Const.mamRequestLimit)) } } // func fetchBackward(_ roster: Roster, _ client: Client) { // guard let rsm = backwardRsm else { return } // Task { // let result = try await client.fetchArchiveMessages(for: roster, query: rsm) // result.complete ? (backwardRsm = nil) : (backwardRsm = .init(before: result.rsm?.first, max: Const.mamRequestLimit)) // } // } } // MARK: - Archived messages // extension MessagesStore { // func requestEarliestArchivedMessages() { // guard let beforeId = messages.first?.id else { return } // Task { // await archiveMessageFetcher.fetchAfterMessages(roster, client, afterId: beforeId) // // await archiveMessageFetcher.fetchBeforeMessages(roster, client, beforeId: beforeId) // } // } // // func requestLatestArchivedMessages() { // guard let afterId = messages.last?.id else { return } // Task { // await archiveMessageFetcher.fetchBeforeMessages(roster, client, beforeId: afterId) // // await archiveMessageFetcher.fetchAfterMessages(roster, client, afterId: afterId) // } // } // // private func requestLastArchivedMessages() { // Task { // await archiveMessageFetcher.fetchLastMessages(roster, client) // } // } // } // private actor ArchiveMessageFetcher { // private var afterAvailable = true // private var beforeAvailable = true // private var isFetching = false // private var fetchingIsPossinle = true // // func fetchLastMessages(_ roster: Roster, _ client: Client) async { // if !fetchingIsPossinle { return } // while isFetching { // await Task.yield() // } // isFetching = true // // let query: RSM.Query = .init(lastItems: Const.mamRequestLimit) // do { // _ = try await client.fetchArchiveMessages(for: roster, query: query) // } catch AppError.featureNotSupported { // fetchingIsPossinle = false // } catch { // logIt(.error, "Error requesting archived messages: \(error)") // } // // isFetching = false // } // // func fetchBeforeMessages(_ roster: Roster, _ client: Client, beforeId: String) async { // if !fetchingIsPossinle || !beforeAvailable { return } // while isFetching { // await Task.yield() // } // isFetching = true // // let query: RSM.Query = .init(before: beforeId, max: Const.mamRequestLimit) // do { // let result = try await client.fetchArchiveMessages(for: roster, query: query) // if result.complete { // beforeAvailable = false // } // } catch AppError.featureNotSupported { // fetchingIsPossinle = false // } catch { // logIt(.error, "Error requesting archived messages: \(error)") // } // // isFetching = false // } // // func fetchAfterMessages(_ roster: Roster, _ client: Client, afterId: String) async { // if !fetchingIsPossinle || !afterAvailable { return } // while isFetching { // await Task.yield() // } // isFetching = true // // let query: RSM.Query = .init(after: afterId, max: Const.mamRequestLimit) // do { // let result = try await client.fetchArchiveMessages(for: roster, query: query) // if result.complete { // afterAvailable = false // } // } catch AppError.featureNotSupported { // fetchingIsPossinle = false // } catch { // logIt(.error, "Error requesting archived messages: \(error)") // } // // isFetching = false // } // }