antonvmironov on master
Initial implementation of conve… (compare)
antonvmironov on master
Fix bindings memory issue where… (compare)
antonvmironov on master
Reimplementing NSControl reacti… (compare)
antonvmironov on master
Debugging assistance added Extracting reactive testing ext… Making test fixtures shared acr… and 1 more (compare)
antonvmironov on master
Fixing swift version for cocoap… (compare)
antonvmironov on 1.3.2
Fixing swift version for cocoap… (compare)
didChange.update(value, from: Executor.main)
states.subscribeFor(key: KKHelpViewController.self, type: ViewStateInfo.self)
.filter(executor: Executor.main) { $0.state.isHidden }
.onUpdate(executor: Executor.main) { _ in scenes.show(wnd: KKMainWindowController.self) }
from originalExecutor: Executor? = nil
. It basically says "I am sure this call is from this executor". It enables yet another AsyncNinja optimization: omitting async dispatching. So if you update channel from main queue subscribers (subscribed on main executor) could be notified synchronously. It does not always happen, additional checks are in place, e.g. does sync call make sense or is there a probability of stack overflow.
channel() { update in ... }
.cancelBy(token: CancelationToken)
.bufferSize(10)
public func channel<Update, Success>(executor: Executor = .primary, cancellationToken: CancellationToken? = nil,
bufferSize: Int = AsyncNinjaConstants.defaultChannelBufferSize,
block: @escaping (_ update: @escaping (Update) -> Void, _ complete: @escaping (Fallible<Success>) -> Void) throws -> Void
) -> Channel<Update, Success> {
// TEST: ChannelMakersTests.testInfiniteChannel
let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
cancellationToken?.add(cancellable: producer)
executor.execute(
from: nil
) { [weak producer] (originalExecutor) in
do {
try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
} catch{
producer?.complete(Fallible(failure: error), from: originalExecutor)
}
}
return producer
}
func itemsStream() -> Channel<RealmCollectionChange<Results<RepoEntity>>, NotificationToken> {
var notificationToken : NotificationToken?
return channel() { update, completion in
notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
update(changeset)
print("changeset update")
}
}.onComplete { _ in _ = notificationToken } // keep reference for notificationToken
}
myViewModel.repoList.itemsStream().onUpdate(context: self) { _, changeset in
switch changeset {
case .update(_, let deletions, let insertions, let modifications):
print("deletions \(deletions), insertions \(insertions), modifications \(modifications)")
default: break
}
}
[RepoListViewController]: deinit
[RepoListViewModel]: deinit
[RepoListService]: deinit
executor.execute( from: nil) { [weak producer] (originalExecutor) in
do {
try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
} catch{
producer?.complete(Fallible(failure: error), from: originalExecutor)
}
}
executor.execute( from: nil) { [weak producer] (originalExecutor) in
do {
let onRelease = try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
producer?.onSubsriptionRelease = onRelease // <<- LOOK AT ME
} catch{
producer?.complete(Fallible(failure: error), from: originalExecutor)
}
}
func realmChangset(cancellationToken: CancellationToken) -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
return channel() { producer in
producer.bufferSize = 666
cancellationToken?.add(cancellable: producer) // plan A
let notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
producer.update(changeset)
}
producer.onRelease = { notificationToken.invalidate }
// producer.complete(fallible: _)
// producer.succeded(...)
// producer.failed(...)
}.cancelableBy(cancellationToken) // plan B
}
_asyncNinja_retainUntilFinalization
, _asyncNinja_notifyFinalization
. They could help. Their names could change if their functionality would be proven useful on larger scale // Realm Changeset
func itemsStream() -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
return channel(executor: Executor.main) { producer in
let notificationToken = self.allObjects(ofType: RepoEntity.self).observe { changeset in
producer.update(changeset)
print("changeset update")
}
producer._asyncNinja_retainUntilFinalization(notificationToken)
}
}
// Input stream
func modifierFlagsStream() -> Channel<NSEvent.ModifierFlags, Void> {
return channel(executor: Executor.main) { producer in
let monitorFlags = NSEvent.addLocalMonitorForEvents(matching: .flagsChanged) {
producer.update($0.modifierFlags)
return $0
}
producer._asyncNinja_notifyFinalization {
NSEvent.removeMonitor(monitorFlags!)
}
}
}
public func channel<Update, Success>(executor: Executor = .primary, block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
executor.execute(from: nil) { [weak producer] _ in
guard let producer = producer else { return }
do {
try block(producer)
} catch {
producer.fail(error)
}
}
return producer
}
public func channel<C: ExecutionContext, Update, Success>(context: C, executor: Executor? = nil,
block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
context.addDependent(completable: producer)
(executor ?? context.executor).execute(from: nil) { [weak context, weak producer] (originalExecutor) in
guard let producer = producer else { return }
guard let _ = context else {
producer.cancelBecauseOfDeallocatedContext(from: originalExecutor)
return
}
do {
try block(producer)
} catch {
producer.fail(error)
}
}
return producer
}