antonvmironov on master
Initial implementation of conve… (compare)
antonvmironov on master
Fix bindings memory issue where… (compare)
antonvmironov on master
Reimplementing NSControl reacti… (compare)
antonvmironov on master
Debugging assistance added Extracting reactive testing ext… Making test fixtures shared acr… and 1 more (compare)
antonvmironov on master
Fixing swift version for cocoap… (compare)
antonvmironov on 1.3.2
Fixing swift version for cocoap… (compare)
executor.execute( from: nil) { [weak producer] (originalExecutor) in
do {
let onRelease = try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
producer?.onSubsriptionRelease = onRelease // <<- LOOK AT ME
} catch{
producer?.complete(Fallible(failure: error), from: originalExecutor)
}
}
func realmChangset(cancellationToken: CancellationToken) -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
return channel() { producer in
producer.bufferSize = 666
cancellationToken?.add(cancellable: producer) // plan A
let notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
producer.update(changeset)
}
producer.onRelease = { notificationToken.invalidate }
// producer.complete(fallible: _)
// producer.succeded(...)
// producer.failed(...)
}.cancelableBy(cancellationToken) // plan B
}
_asyncNinja_retainUntilFinalization
, _asyncNinja_notifyFinalization
. They could help. Their names could change if their functionality would be proven useful on larger scale // Realm Changeset
func itemsStream() -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
return channel(executor: Executor.main) { producer in
let notificationToken = self.allObjects(ofType: RepoEntity.self).observe { changeset in
producer.update(changeset)
print("changeset update")
}
producer._asyncNinja_retainUntilFinalization(notificationToken)
}
}
// Input stream
func modifierFlagsStream() -> Channel<NSEvent.ModifierFlags, Void> {
return channel(executor: Executor.main) { producer in
let monitorFlags = NSEvent.addLocalMonitorForEvents(matching: .flagsChanged) {
producer.update($0.modifierFlags)
return $0
}
producer._asyncNinja_notifyFinalization {
NSEvent.removeMonitor(monitorFlags!)
}
}
}
public func channel<Update, Success>(executor: Executor = .primary, block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
executor.execute(from: nil) { [weak producer] _ in
guard let producer = producer else { return }
do {
try block(producer)
} catch {
producer.fail(error)
}
}
return producer
}
public func channel<C: ExecutionContext, Update, Success>(context: C, executor: Executor? = nil,
block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
context.addDependent(completable: producer)
(executor ?? context.executor).execute(from: nil) { [weak context, weak producer] (originalExecutor) in
guard let producer = producer else { return }
guard let _ = context else {
producer.cancelBecauseOfDeallocatedContext(from: originalExecutor)
return
}
do {
try block(producer)
} catch {
producer.fail(error)
}
}
return producer
}
producer.retainWhileSubscribed(notificationToken)
producer.notifySubscriptionRelease { NSEvent.removeMonitor(monitorFlags!) }
public extension ReactiveProperties where Object: NSView {
func trackingAreaWith(options: NSTrackingArea.Options) -> Channel<NSEvent, Void> {
return channel() { producer in
let receiver = TrackingMessageReceiver(producer: producer)
let area = NSTrackingArea.init(rect: self.object.bounds, options: options, owner: /* weak */ receiver, userInfo: nil)
self.object.addTrackingArea(area)
producer._asyncNinja_retainUntilFinalization(receiver)
producer._asyncNinja_notifyFinalization {
self.base.removeTrackingArea(area)
}
}
}
var trackingArea : Channel<NSEvent, Void> {
return trackingAreaWith(options: [ .mouseEnteredAndExited, .activeAlways ])
}
}
fileprivate class TrackingMessageReceiver : NSResponder {
var producer: Producer<NSEvent, Void>
init(producer: Producer<NSEvent, Void>) {
self.producer = producer
super.init()
}
required init?(coder: NSCoder) {
fatalError("init(coder:) has not been implemented")
}
override func mouseEntered(with theEvent: NSEvent) {
producer.update(theEvent)
}
override func mouseExited(with theEvent: NSEvent) {
producer.update(theEvent)
}
}
'object' is inaccessible due to 'internal' protection level
ChannelProducerProtocol
. func subscribeToDataSource() -> Channel<DataItem, Void> {
return channel() { producer in
let streamHandler = DataStreamSubscription { nextDataItem in
/* STRONG reference to Producer */
producer.update(nextDataItem)
}
producer.onRelease { streamHandler.invalidate() } // Invalidate streamHandler to release strong reference to the Producer
}
}
_asyncNinja_notifyFinalization
does.
func merge<Update,Success>(channels: [Channel<Update,Success>]) -> Channel<Update,Void> {
return channel(executor: Executor.primary) { (producer : Producer<Update,Void>) -> Void in
for ch in channels {
ch.bind(producer)
}
}
}