import Combine import Foundation import GRDB // swiftlint:disable:next type_body_length final class DatabaseMiddleware { static let shared = DatabaseMiddleware() private let database = Database.shared private var cancellables: Set = [] private var conversationCancellables: Set = [] private init() { // Database changes ValueObservation .tracking(Roster.fetchAll) .publisher(in: database._db, scheduling: .immediate) .sink { _ in // Handle completion } receiveValue: { rosters in DispatchQueue.main.async { store.dispatch(.databaseAction(.storedRostersLoaded(rosters: rosters))) } } .store(in: &cancellables) ValueObservation .tracking(Chat.fetchAll) .publisher(in: database._db, scheduling: .immediate) .sink { _ in // Handle completion } receiveValue: { chats in DispatchQueue.main.async { store.dispatch(.databaseAction(.storedChatsLoaded(chats: chats))) } } .store(in: &cancellables) } // swiftlint:disable:next function_body_length cyclomatic_complexity func middleware(state _: AppState, action: AppAction) -> AnyPublisher { switch action { // MARK: Accounts case .startAction(.loadStoredAccounts): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.loadingStoredAccountsFailed))) return } do { try database._db.read { db in let accounts = try Account.fetchAll(db) promise(.success(.databaseAction(.storedAccountsLoaded(accounts: accounts)))) } } catch { promise(.success(.databaseAction(.loadingStoredAccountsFailed))) } } } .eraseToAnyPublisher() case .accountsAction(.makeAccountPermanent(let account)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.updateAccountFailed))) return } do { try database._db.write { db in // make permanent and store to database var acc = account acc.isTemp = false try acc.insert(db) // Re-Fetch all accounts let accounts = try Account.fetchAll(db) // Use the accounts promise(.success(.databaseAction(.storedAccountsLoaded(accounts: accounts)))) } } catch { promise(.success(.databaseAction(.updateAccountFailed))) } } } .eraseToAnyPublisher() // MARK: Rosters case .rostersAction(.markRosterAsLocallyDeleted(let ownerJID, let contactJID)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.rostersAction(.rosterDeletingFailed(reason: L10n.Global.Error.genericDbError)))) return } do { _ = try database._db.write { db in try Roster .filter(Column("bareJid") == ownerJID) .filter(Column("contactBareJid") == contactJID) .updateAll(db, Column("locallyDeleted").set(to: true)) } promise(.success(.empty("Roster marked as locally deleted"))) } catch { promise(.success(.rostersAction(.rosterDeletingFailed(reason: L10n.Global.Error.genericDbError)))) } } } .eraseToAnyPublisher() case .rostersAction(.unmarkRosterAsLocallyDeleted(let ownerJID, let contactJID)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.rostersAction(.rosterDeletingFailed(reason: L10n.Global.Error.genericDbError)))) return } do { _ = try database._db.write { db in try Roster .filter(Column("bareJid") == ownerJID) .filter(Column("contactBareJid") == contactJID) .updateAll(db, Column("locallyDeleted").set(to: false)) } promise(.success(.empty("Roster unmarked as locally deleted"))) } catch { promise(.success(.rostersAction(.rosterDeletingFailed(reason: L10n.Global.Error.genericDbError)))) } } } .eraseToAnyPublisher() // MARK: Chats case .chatsAction(.createNewChat(let accountJid, let participantJid)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.chatsAction(.chatCreationFailed(reason: L10n.Global.Error.genericDbError)))) return } do { try database._db.write { db in let chat = Chat( id: UUID().uuidString, account: accountJid, participant: participantJid, type: .chat ) try chat.insert(db) promise(.success(.chatsAction(.chatCreated(chat: chat)))) } } catch { promise(.success(.chatsAction(.chatCreationFailed(reason: L10n.Global.Error.genericDbError)))) } } } .eraseToAnyPublisher() // MARK: Conversation and messages case .conversationAction(.makeConversationActive(let chat, _)): subscribeToMessages(chat: chat) return Empty().eraseToAnyPublisher() case .xmppAction(.xmppMessageReceived(let message)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.storeMessageFailed(reason: L10n.Global.Error.genericDbError)))) return } guard message.contentType != .typing, message.body != nil else { promise(.success(.empty("Typing message received or message body is nil"))) return } do { try database._db.write { db in try message.insert(db) } promise(.success(.empty("Message stored in db"))) } catch { promise(.success(.databaseAction(.storeMessageFailed(reason: error.localizedDescription)))) } } } .eraseToAnyPublisher() case .conversationAction(.sendMessage(let from, let to, let body)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.storeMessageFailed(reason: L10n.Global.Error.genericDbError)))) return } do { let message = Message( id: UUID().uuidString, type: .chat, contentType: .text, from: from, to: to, body: body, subject: nil, thread: nil, oobUrl: nil, date: Date(), pending: true, sentError: false ) try database._db.write { db in try message.insert(db) } promise(.success(.xmppAction(.xmppMessageSent(message)))) } catch { promise(.success(.databaseAction(.storeMessageFailed(reason: error.localizedDescription)))) } } } .eraseToAnyPublisher() case .xmppAction(.xmppMessageSendSuccess(let msgId)): // mark message as pending false and sentError false return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.storeMessageFailed(reason: L10n.Global.Error.genericDbError)))) return } do { _ = try database._db.write { db in try Message .filter(Column("id") == msgId) .updateAll(db, Column("pending").set(to: false), Column("sentError").set(to: false)) } promise(.success(.empty("Message marked in db as sent"))) } catch { promise(.success(.databaseAction(.storeMessageFailed(reason: error.localizedDescription))) ) } } } .eraseToAnyPublisher() case .xmppAction(.xmppMessageSendFailed(let msgId)): // mark message as pending false and sentError true return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.storeMessageFailed(reason: L10n.Global.Error.genericDbError)))) return } do { _ = try database._db.write { db in try Message .filter(Column("id") == msgId) .updateAll(db, Column("pending").set(to: false), Column("sentError").set(to: true)) } promise(.success(.empty("Message marked in db as failed to send"))) } catch { promise(.success(.databaseAction(.storeMessageFailed(reason: error.localizedDescription)))) } } } .eraseToAnyPublisher() // MARK: Attachments case .fileAction(.downloadAttachmentFile(let id, _)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.updateAttachmentFailed(id: id, reason: L10n.Global.Error.genericDbError))) ) return } do { _ = try database._db.write { db in try Message .filter(Column("id") == id) .updateAll(db, Column("attachmentDownloadFailed").set(to: false)) } promise(.success(.empty("Message marked in db as starting downloading attachment"))) } catch { promise(.success(.databaseAction(.updateAttachmentFailed(id: id, reason: error.localizedDescription))) ) } } } .eraseToAnyPublisher() case .fileAction(.downloadingAttachmentFileFailed(let id, _)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.updateAttachmentFailed(id: id, reason: L10n.Global.Error.genericDbError))) ) return } do { _ = try database._db.write { db in try Message .filter(Column("id") == id) .updateAll(db, Column("attachmentDownloadFailed").set(to: true)) } promise(.success(.empty("Message marked in db as failed to download attachment"))) } catch { promise(.success(.databaseAction(.updateAttachmentFailed(id: id, reason: error.localizedDescription))) ) } } } .eraseToAnyPublisher() case .fileAction(.attachmentFileDownloaded(let id, let localName)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.updateAttachmentFailed(id: id, reason: L10n.Global.Error.genericDbError))) ) return } do { _ = try database._db.write { db in try Message .filter(Column("id") == id) .updateAll(db, Column("attachmentLocalName").set(to: localName), Column("attachmentDownloadFailed").set(to: false)) } promise(.success(.empty("Message marked in db as downloaded attachment"))) } catch { promise(.success(.databaseAction(.updateAttachmentFailed(id: id, reason: error.localizedDescription))) ) } } } .eraseToAnyPublisher() case .fileAction(.attachmentThumbnailCreated(let id, let thumbnailName)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.updateAttachmentFailed(id: id, reason: L10n.Global.Error.genericDbError))) ) return } do { _ = try database._db.write { db in try Message .filter(Column("id") == id) .updateAll(db, Column("attachmentThumbnailName").set(to: thumbnailName)) } promise(.success(.empty("Message marked in db as thumbnail created"))) } catch { promise(.success(.databaseAction(.updateAttachmentFailed(id: id, reason: error.localizedDescription))) ) } } } .eraseToAnyPublisher() // MARK: Sharing case .conversationAction(.sendMediaMessages(let from, let to, let messageIds, let localFilesNames)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.storeMessageFailed(reason: L10n.Global.Error.genericDbError))) ) return } do { for (index, id) in messageIds.enumerated() { let message = Message( id: id, type: .chat, contentType: .attachment, from: from, to: to, body: nil, subject: nil, thread: nil, oobUrl: nil, date: Date(), pending: true, sentError: false, attachmentType: localFilesNames[index].attachmentType, attachmentLocalName: localFilesNames[index] ) try database._db.write { db in try message.insert(db) } } promise(.success(.empty("Messages with sharings stored in db"))) } catch { promise(.success(.databaseAction(.storeMessageFailed(reason: error.localizedDescription))) ) } } } .eraseToAnyPublisher() case .sharingAction(.retrySharing(let id)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.storeMessageFailed(reason: L10n.Global.Error.genericDbError))) ) return } do { _ = try database._db.write { db in try Message .filter(Column("id") == id) .updateAll(db, Column("pending").set(to: true), Column("sentError").set(to: false)) } promise(.success(.empty("Message with shares marked in db as pending to send"))) } catch { promise(.success(.databaseAction(.storeMessageFailed(reason: error.localizedDescription))) ) } } } .eraseToAnyPublisher() case .xmppAction(.xmppSharingUploadSuccess(let messageId, let remotePath)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.updateAttachmentFailed(id: messageId, reason: L10n.Global.Error.genericDbError))) ) return } do { _ = try database._db.write { db in try Message .filter(Column("id") == messageId) .updateAll(db, Column("attachmentRemotePath").set(to: remotePath), Column("pending").set(to: false), Column("sentError").set(to: false)) } promise(.success(.empty("Shared file uploaded and message marked in db as sent"))) } catch { promise(.success(.databaseAction(.updateAttachmentFailed(id: messageId, reason: error.localizedDescription))) ) } } } .eraseToAnyPublisher() case .xmppAction(.xmppSharingUploadFailed(let messageId, _)): return Future { promise in Task(priority: .background) { [weak self] in guard let database = self?.database else { promise(.success(.databaseAction(.updateAttachmentFailed(id: messageId, reason: L10n.Global.Error.genericDbError))) ) return } do { _ = try database._db.write { db in try Message .filter(Column("id") == messageId) .updateAll(db, Column("pending").set(to: false), Column("sentError").set(to: true)) } promise(.success(.empty("Shared file upload failed and message marked in db as failed to send"))) } catch { promise(.success(.databaseAction(.updateAttachmentFailed(id: messageId, reason: error.localizedDescription))) ) } } } .eraseToAnyPublisher() default: return Empty().eraseToAnyPublisher() } } } private extension DatabaseMiddleware { func subscribeToMessages(chat: Chat) { conversationCancellables = [] ValueObservation .tracking( Message .filter( (Column("to") == chat.account && Column("from") == chat.participant) || (Column("from") == chat.account && Column("to") == chat.participant) ) .order(Column("date").desc) .fetchAll ) .publisher(in: database._db, scheduling: .immediate) .sink { _ in } receiveValue: { messages in // messages DispatchQueue.main.async { store.dispatch(.conversationAction(.messagesUpdated(messages: messages))) } } .store(in: &conversationCancellables) } }