This commit is contained in:
fmodf 2023-09-24 02:42:32 +04:00
parent eb74ad27e3
commit 2fed518d81
24 changed files with 438 additions and 350 deletions

View file

@ -1,25 +1,25 @@
import Foundation
struct ClientState: Stateable {
var jid: JID
var srvResolverState: SRVResolverState
var socketState: SocketState
}
// MARK: Init
extension ClientState {
init(jid: JID) {
self.jid = jid
srvResolverState = .init()
socketState = .init()
}
}
// JSON
extension ClientState {
func json() -> String {
let encoder = JSONEncoder()
guard let data = try? encoder.encode(self) else { return "ClientState encoding error" }
return String(data: data, encoding: .utf8) ?? "ClientState encoding error"
}
}
//import Foundation
//
//struct ClientState: Stateable {
// var jid: JID
// var srvResolverState: SRVResolverState
// var socketState: SocketState
//}
//
//// MARK: Init
//extension ClientState {
// init(jid: JID) {
// self.jid = jid
// srvResolverState = .init()
// socketState = .init()
// }
//}
//
//// JSON
//extension ClientState {
// func json() -> String {
// let encoder = JSONEncoder()
// guard let data = try? encoder.encode(self) else { return "ClientState encoding error" }
// return String(data: data, encoding: .utf8) ?? "ClientState encoding error"
// }
//}

View file

@ -1,5 +0,0 @@
enum ClientAction: Codable {
case reconnect(_ jid: JID)
case resolverAction(_ action: SRVResolverAction)
case socketAction(_ action: SocketAction)
}

View file

@ -1,16 +0,0 @@
import Combine
final class ClientMiddleware {
static let shared = ClientMiddleware()
func middleware(state: ClientState, action: ClientAction) -> AnyPublisher<ClientAction, Never> {
switch action {
case .reconnect:
return Just(ClientAction.resolverAction(.startResolve(state.jid.domainPart)))
.eraseToAnyPublisher()
default:
return Empty().eraseToAnyPublisher()
}
}
}

View file

@ -1,16 +0,0 @@
import Foundation
extension ClientState {
static func reducer(state: inout ClientState, action: ClientAction) {
switch action {
case .reconnect(let jid):
state = .init(jid: jid)
case .resolverAction(let action):
SRVResolverState.reducer(state: &state.srvResolverState, action: action)
case .socketAction(let action):
SocketState.reducer(state: &state.socketState, action: action)
}
}
}

View file

@ -1,81 +0,0 @@
/*
In 99,99% of time YOU DON'T NEEDED TO CHANGE ANYTHING in this file!
This file declare main state object for whole XMPP client
and reducers/actions/middleware types. Core of XMPP client.
*/
import Foundation
import Combine
typealias Stateable = Codable & Equatable
typealias ClientStore = Store<ClientState, ClientAction>
typealias Reducer<State: Stateable, Action: Codable> = (inout State, Action) -> Void
typealias Middleware<State: Stateable, Action: Codable> = (State, Action) -> AnyPublisher<Action, Never>?
final class Store<State: Stateable, Action: Codable>: ObservableObject {
// Serial queue for performing any actions sequentially
private let serialQueue = DispatchQueue(label: "im.narayana.snikket.xmppclient.serial.queue", qos: .userInteractive)
private var verbose: Bool
@Published private(set) var state: State
private let reducer: Reducer<State, Action>
private let middlewares: [Middleware<State, Action>]
private var middlewareCancellables: Set<AnyCancellable> = []
// Init
init(
initialState: State,
reducer: @escaping Reducer<State, Action>,
middlewares: [Middleware<State, Action>] = [],
verbose: Bool) {
state = initialState
self.reducer = reducer
self.middlewares = middlewares
self.verbose = verbose
}
// Run reducers/middlewares
func dispatch(_ action: Action) {
serialQueue.sync { [weak self] in
guard let wSelf = self else { return }
let newState = wSelf.dispatch(wSelf.state, action)
wSelf.state = newState
}
}
private func dispatch(_ currentState: State, _ action: Action) -> State {
let startTime = CFAbsoluteTimeGetCurrent()
// Do reducing
var newState = currentState
reducer(&newState, action)
// Dispatch all middleware functions
for middleware in middlewares {
guard let middleware = middleware(newState, action) else {
break
}
middleware
.receive(on: DispatchQueue.main)
.sink(receiveValue: dispatch)
.store(in: &middlewareCancellables)
}
// Check performance
let timeElapsed = CFAbsoluteTimeGetCurrent() - startTime
if timeElapsed > 0.05 && verbose {
print(
"""
--
(Ignore this warning ONLY in case, when execution is paused by your breakpoint)
🕐Execution time: \(timeElapsed)
WARNING! Some reducers/middlewares work too long! It will lead to issues in production build!
Because of execution each action is synchronous the any stuck will reduce performance dramatically.
Probably you need check which part of reducer/middleware should be async (wrapped with Futures, as example)
--
"""
)
}
return newState
}
}

View file

@ -1,4 +1,4 @@
public struct JID: CustomStringConvertible, Stateable {
public struct JID: Codable {
public let localPart: String
public let domainPart: String
public let resourcePart: String?
@ -23,6 +23,7 @@ public enum JIDError: String, Error {
case wrongJid = "Can't parse or operate with JID"
}
// swiftlint:disable large_tuple
extension JID {
static func parse(_ str: String) throws -> (String, String, String?) {
let parts = str.components(separatedBy: "@")

View file

@ -0,0 +1,29 @@
import Foundation
import Combine
public typealias ModuleName = String
public protocol ModuleIdentifiable {
var moduleName: ModuleName { get }
}
public typealias ModuleAction = ModuleIdentifiable & Codable
public typealias ModuleState = Any & ModuleIdentifiable & Codable & Equatable
public typealias GlobalState = [ModuleName: any ModuleState]
public typealias OptionalState = (any ModuleState)?
public protocol Module: ModuleIdentifiable {
func getInitState() -> OptionalState
func reduce(state: inout any ModuleState, action: any ModuleAction)
func middleware(state: GlobalState, action: any ModuleAction) -> AnyPublisher<any ModuleAction, Never>
}
// Some modules reacts on outside events, this variable related to Store dispatch (some kind of callback)
protocol ModuleContinuousActing {
var asyncDispatch: ((_ action: any ModuleAction) -> Void)? { get set }
}

View file

@ -1,6 +1,6 @@
import Foundation
struct SRVRecord: CustomStringConvertible, Stateable {
struct SRVRecord: Codable & Equatable {
let priority: Int
let weight: Int
let port: Int

View file

@ -0,0 +1,7 @@
import Foundation
// Top level actions
enum ClientAction: ModuleAction {
var moduleName: ModuleName { "client" }
case reconnect(_ jid: JID)
}

View file

@ -1,8 +1,19 @@
import Foundation
import Combine
func loggerMiddleware(verbose: Bool) -> Middleware<ClientState, ClientAction> {
{ state, action in
#if DEBUG
let verbose = true
#else
let verbose = false
#endif
struct Logger: Module {
var moduleName: ModuleName { "Logger" }
func getInitState() -> OptionalState { nil }
func reduce(state: inout any ModuleState, action: any ModuleAction) {}
func middleware(state: GlobalState, action: any ModuleAction) -> AnyPublisher<any ModuleAction, Never> {
if verbose {
let timeStr = dateFormatter.string(from: Date())
var actionStr = "\(action)"

View file

@ -2,7 +2,7 @@ import Foundation
import Combine
import dnssd
public enum SRVResolverError: Error, Stateable {
enum SRVResolverError: Error, Codable, Equatable {
case unableToComplete
case referenceError
case socketError

View file

@ -1,5 +0,0 @@
enum SRVResolverAction: Codable {
case startResolve(_ forDomain: String)
case recordsFound(_ records: [SRVRecord])
case resolvingError(_ error: SRVResolverError?)
}

View file

@ -1,25 +0,0 @@
import Combine
final class SRVResolverMiddleware {
static let shared = SRVResolverMiddleware()
func middleware(state: ClientState, action: ClientAction) -> AnyPublisher<ClientAction, Never> {
switch action {
case .resolverAction(.startResolve(let domain)):
return Future<ClientAction, Never> { promise in
Task {
do {
let records = try await SRVResolver.resolve(domain: domain)
promise(.success(.resolverAction(.recordsFound(records))))
} catch {
promise(.success(.resolverAction(.resolvingError(error as? SRVResolverError))))
}
}
}
.eraseToAnyPublisher()
default:
return Empty().eraseToAnyPublisher()
}
}
}

View file

@ -0,0 +1,70 @@
import Foundation
import Combine
enum SRVResolverAction: ModuleAction {
var moduleName: ModuleName { "Resolver" }
case startResolve(_ forDomain: String)
case recordsFound(_ records: [SRVRecord])
case resolvingError(_ error: SRVResolverError?)
}
struct SRVResolverState: ModuleState {
var moduleName: ModuleName { "Resolver" }
var foundedRecords: [SRVRecord]
}
struct SRVResolverModule: Module {
var moduleName: ModuleName { "Resolver" }
func getInitState() -> OptionalState {
SRVResolverState.init(foundedRecords: [])
}
func reduce(state: inout any ModuleState, action: any ModuleAction) {
guard let action = action as? SRVResolverAction, var newState = state as? SRVResolverState else {
return
}
switch action {
case .recordsFound(let records):
newState.foundedRecords = records
default:
break
}
state = newState
}
func middleware(state: GlobalState, action: any ModuleAction) -> AnyPublisher<any ModuleAction, Never> {
switch action {
case let act as ClientAction:
switch act {
case .reconnect(let jid):
return Just(SRVResolverAction.startResolve(jid.domainPart))
.eraseToAnyPublisher()
}
case let act as SRVResolverAction:
switch act {
case .startResolve(let domain):
return Future<any ModuleAction, Never> { promise in
Task {
do {
let records = try await SRVResolver.resolve(domain: domain)
promise(.success(SRVResolverAction.recordsFound(records)))
} catch {
promise(.success(SRVResolverAction.resolvingError(error as? SRVResolverError)))
}
}
}
.eraseToAnyPublisher()
default:
return Empty().eraseToAnyPublisher()
}
default:
return Empty().eraseToAnyPublisher()
}
}
}

View file

@ -1,11 +0,0 @@
extension SRVResolverState {
static func reducer(state: inout SRVResolverState, action: SRVResolverAction) {
switch action {
case .recordsFound(let records):
state.foundedRecords = records
default:
break
}
}
}

View file

@ -1,10 +0,0 @@
struct SRVResolverState: Stateable {
var foundedRecords: [SRVRecord]
}
// MARK: Init
extension SRVResolverState {
init() {
foundedRecords = []
}
}

View file

@ -1,4 +0,0 @@
enum SocketAction: Codable {
case connect(_ host: String, _ port: Int)
case connectionStateUpdated(_ state: SocketConnectionState)
}

View file

@ -1,96 +0,0 @@
import Foundation
import Combine
import Network
enum SocketError: Error {
case connectionInitError
}
final class SocketMiddleware {
static let shared = SocketMiddleware()
private let queue = DispatchQueue.init(label: "socket.queue.\(UUID().uuidString)")
private var connection: NWConnection?
func middleware(state: ClientState, action: ClientAction) -> AnyPublisher<ClientAction, Never> {
switch action {
case .resolverAction(.recordsFound):
return Future<ClientAction, Never> { promise in
Task {
do {
try self.startConnection(
host: state.srvResolverState.foundedRecords[0].target,
port: state.srvResolverState.foundedRecords[0].port
)
promise(.success(.socketAction(.connectionStateUpdated(.connected))))
} catch {
promise(.success(.socketAction(.connectionStateUpdated(.disconnected))))
}
}
}
.eraseToAnyPublisher()
default:
return Empty().eraseToAnyPublisher()
}
}
private func startConnection(host: String, port: Int) throws {
// options and params
let tcpOptions = NWProtocolTCP.Options()
tcpOptions.noDelay = true
tcpOptions.connectionTimeout = 5
tcpOptions.enableFastOpen = true
tcpOptions.disableAckStretching = true
let params = NWParameters(tls: nil, tcp: tcpOptions)
params.serviceClass = .responsiveData
// connection
connection = NWConnection(host: .name(host, nil), port: .init(integerLiteral: UInt16(port)), using: params)
guard let connection = connection else { throw SocketError.connectionInitError }
// subscribe to state
connection.stateUpdateHandler = { [weak self] state in
switch state {
case .cancelled, .setup:
break
case .preparing:
break
case .ready:
self?.read()
case .failed(let error):
break
case .waiting(let error):
break
default:
break
}
}
//
connection.start(queue: queue)
}
func write(data: Data) {
connection?.send(content: data, completion: .contentProcessed({err in
if let err = err {
print(err)
}
}))
}
private func read() {
connection?.receive(minimumIncompleteLength: 1, maximumLength: 4096 * 2, completion: { [weak self] data, _, complete, error in
if let data = data, complete == true {
print(String(bytes: data, encoding: .utf8) ?? "???")
self?.read()
} else if let err = error {
print(err)
}
})
}
}

View file

@ -1,11 +0,0 @@
extension SocketState {
static func reducer(state: inout SocketState, action: SocketAction) {
switch action {
case .connectionStateUpdated(let connection):
state.connection = connection
default:
break
}
}
}

View file

@ -1,21 +0,0 @@
import Foundation
enum SocketConnectionState: Stateable {
case connecting
case connected
case disconnecting
case disconnected
}
struct SocketState: Stateable {
var connection: SocketConnectionState
var lastReadData: Data?
var lastWroteData: Data?
}
// MARK: Init
extension SocketState {
init() {
connection = .disconnected
}
}

View file

@ -0,0 +1,156 @@
import Foundation
import Combine
import Network
enum SocketStatus: Codable {
case disconnected
case connected
}
enum SocketError: Error, Codable {
case connectionInitError
}
enum SocketAction: ModuleAction {
var moduleName: ModuleName {
"Socket"
}
case tryConnect(host: String, port: Int)
case connectionUpdated(_ status: SocketStatus)
case connectionError(_ err: SocketError)
}
struct SocketState: ModuleState {
var moduleName: ModuleName { "Socket" }
var socketStatus: SocketStatus = .disconnected
}
class SocketModule: Module, ModuleContinuousActing {
var moduleName: ModuleName { "Socket" }
var asyncDispatch: ((any ModuleAction) -> ())?
private let socketQueue = DispatchQueue.init(label: "socket.queue.\(UUID().uuidString)")
private var connection: NWConnection?
func getInitState() -> OptionalState {
SocketState.init()
}
func reduce(state: inout any ModuleState, action: any ModuleAction) {
guard let action = action as? SocketAction, var newState = state as? SocketState else {
return
}
switch action {
case .connectionUpdated(let status):
if newState.socketStatus != status {
newState.socketStatus = status
}
default:
break
}
state = newState
}
func middleware(state: GlobalState, action: any ModuleAction) -> AnyPublisher<any ModuleAction, Never> {
switch action {
case let act as SRVResolverAction:
switch act {
case .recordsFound:
if let founded = (state["Resolver"] as? SRVResolverState)?.foundedRecords, let record = founded.first {
return Just(SocketAction.tryConnect(host: record.target, port: record.port))
.eraseToAnyPublisher()
} else {
return Empty().eraseToAnyPublisher()
}
default:
return Empty().eraseToAnyPublisher()
}
case let act as SocketAction:
switch act {
case .tryConnect(let host, let port):
do {
try startConnection(host: host, port: port)
return Empty().eraseToAnyPublisher()
} catch {
return Just(SocketAction.connectionError(.connectionInitError))
.eraseToAnyPublisher()
}
default:
return Empty().eraseToAnyPublisher()
}
default:
return Empty().eraseToAnyPublisher()
}
}
}
private extension SocketModule {
private func startConnection(host: String, port: Int) throws {
// options and params
let tcpOptions = NWProtocolTCP.Options()
tcpOptions.noDelay = true
tcpOptions.connectionTimeout = 5
tcpOptions.enableFastOpen = true
tcpOptions.disableAckStretching = true
let params = NWParameters(tls: nil, tcp: tcpOptions)
params.serviceClass = .responsiveData
// connection
connection = NWConnection(host: .name(host, nil), port: .init(integerLiteral: UInt16(port)), using: params)
guard let connection = connection else {
throw SocketError.connectionInitError
}
// subscribe to state
connection.stateUpdateHandler = { [weak self] state in
switch state {
case .cancelled, .setup:
self?.asyncDispatch?(SocketAction.connectionUpdated(.disconnected))
case .preparing:
self?.asyncDispatch?(SocketAction.connectionUpdated(.disconnected))
case .failed(let error):
self?.asyncDispatch?(SocketAction.connectionUpdated(.disconnected))
case .waiting(let error):
self?.asyncDispatch?(SocketAction.connectionUpdated(.disconnected))
case .ready:
self?.asyncDispatch?(SocketAction.connectionUpdated(.connected))
self?.read()
default:
break
}
}
//
connection.start(queue: socketQueue)
}
func write(data: Data) {
connection?.send(content: data, completion: .contentProcessed({ err in
if let err = err {
print(err)
}
}))
}
private func read() {
connection?.receive(minimumIncompleteLength: 1, maximumLength: 4096 * 2, completion: { [weak self] data, _, complete, error in
if let data = data, complete == true {
print(String(bytes: data, encoding: .utf8) ?? "???")
self?.read()
} else if let err = error {
print(err)
}
})
}
}

131
XMPPSwift/Store.swift Normal file
View file

@ -0,0 +1,131 @@
/*
In 99,99% of time YOU DON'T NEEDED TO CHANGE ANYTHING in this file!
This file declare main state object for whole XMPP client
and reducers/actions/middleware types. Core of XMPP client.
*/
import Foundation
import Combine
enum StoreErrors: String, Error {
case moduleWithSameNameExists = "Module with same name already exist. Try to use 'replaceModule(_ module: any Module)' instead"
case moduleDoesntExists = "Trying replace module which not exist. Try 'addModule(_ module: any Module)' instead"
case onlyStructStateAllowed = "Some of modules defines ModuleState as a class. Only struct here allowed"
}
public final class Store: ObservableObject {
private let serialQueue = DispatchQueue(label: "im.narayana.snikket.xmppclient.serial.queue", qos: .userInteractive)
private var modules: [any Module] = []
private var middlewareCancellables: Set<AnyCancellable> = []
private(set) var state: GlobalState = [:]
init() {
registerDefaultModules()
}
// Run reducers/middlewares
func dispatch(_ action: any ModuleAction) {
serialQueue.sync { [weak self] in
guard let wSelf = self else {
return
}
let newState = wSelf.dispatch(wSelf.state, action)
wSelf.state = newState
}
}
private func dispatch(_ currentState: GlobalState, _ action: any ModuleAction) -> GlobalState {
let startTime = CFAbsoluteTimeGetCurrent()
var newState = currentState
// Do reducing
// Module can reduce only related state
let name = action.moduleName
if let module = modules.first(where: { $0.moduleName == name }), var moduleState = newState[name] {
// make changes
module.reduce(state: &moduleState, action: action)
// Append changes to global state
newState[moduleState.moduleName] = moduleState
}
// Dispatch all middleware functions
// Middleware of any module can handle any action
for module in modules {
module.middleware(state: newState, action: action)
.receive(on: DispatchQueue.main)
.sink(receiveValue: dispatch)
.store(in: &middlewareCancellables)
}
// Check performance
let timeElapsed = CFAbsoluteTimeGetCurrent() - startTime
if timeElapsed > 0.05 {
print(
"""
--
(Ignore this warning ONLY in case, when execution is paused by your breakpoint)
🕐Execution time: \(timeElapsed)
WARNING! Some reducers/middlewares work too long! It will lead to issues in production build!
Because of execution each action is synchronous the any stuck will reduce performance dramatically.
Probably you need check which part of reducer/middleware should be async (wrapped with Futures, as example)
--
"""
)
}
//
return newState
}
}
public extension Store {
func addModule(_ module: any Module) throws {
// check that module is unique
if modules.map({ $0.moduleName }).contains(module.moduleName) {
throw StoreErrors.moduleWithSameNameExists
}
// check that module state is struct
if let moduleState = module.getInitState() {
if Mirror(reflecting: moduleState).displayStyle == .class {
throw StoreErrors.onlyStructStateAllowed
}
state[module.moduleName] = moduleState
}
// apply async dispatch for modules which should react on some outside events
// force casting requires here
// swiftlint:disable force_cast
if var module = module as? ModuleContinuousActing {
module.asyncDispatch = self.dispatch
modules.append(module as! any Module)
} else {
modules.append(module)
}
}
func replaceModule(_ module: any Module) throws {
if !modules.map({ $0.moduleName }).contains(module.moduleName) {
throw StoreErrors.moduleDoesntExists
}
modules.removeAll(where: { $0.moduleName == module.moduleName })
state.removeValue(forKey: module.moduleName)
try addModule(module)
}
}
private extension Store {
func registerDefaultModules() {
do {
try addModule(Logger())
try addModule(SRVResolverModule())
try addModule(SocketModule())
} catch let err as StoreErrors {
fatalError(err.rawValue)
} catch {
fatalError("\(error)")
}
}
}

View file

@ -4,33 +4,17 @@ import Combine
final public class XMPPClient: ObservableObject {
public let jid: JID
public let options: XMPPClientOptions
private let store: ClientStore
private let store: Store
@Published public var state: String = ""
public init(forJid: JID, options: XMPPClientOptions = XMPPClientOptions.defaults) {
jid = forJid
self.options = options
store = XMPPClient.initStore(jid, options)
store = Store()
}
public func start() {
store.dispatch(.reconnect(jid))
}
}
private extension XMPPClient {
static func initStore(_ jid: JID, _ options: XMPPClientOptions) -> ClientStore {
ClientStore(
initialState: ClientState(jid: jid),
reducer: ClientState.reducer,
middlewares: [
loggerMiddleware(verbose: options.verbose),
ClientMiddleware.shared.middleware,
SRVResolverMiddleware.shared.middleware,
SocketMiddleware.shared.middleware
],
verbose: options.verbose
)
store.dispatch(ClientAction.reconnect(jid))
}
}