Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
serg-vinnie
@serg-vinnie
so my question is: how to call Producer's onSubsriptionRelease in the right moment?
serg-vinnie
@serg-vinnie
scenario A: 1) you tell me where to call onSubsriptionRelease 2) I implement it 3) I do pull request
scenario B: 1) I do pull requests 2) you do all the magic
I'm really hope everything I wrote was clear :)
serg-vinnie
@serg-vinnie
or maybe it's time for revolution? what do you think about that?
serg-vinnie
@serg-vinnie
func realmChangset(cancellationToken: CancellationToken) -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
  return channel() { producer in
    producer.bufferSize = 666
    cancellationToken?.add(cancellable: producer)  // plan A

    let notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
      producer.update(changeset)
    }

    producer.onRelease = { notificationToken.invalidate }

    // producer.complete(fallible: _)
    // producer.succeded(...)
    // producer.failed(...)
  }.cancelableBy(cancellationToken)  // plan B
}
serg-vinnie
@serg-vinnie
let's summarize:
  1. it looks like AsyncNinja doesn't release onComplete closure
  2. it would be amazing to have on[Subscription]Release handler
  3. channel() { producer in ... } Convenience construction
Anton Mironov
@antonvmironov
  1. Please make a pull request with unit test that exposes this issue. I will try to it figure out.
  2. Please try using internal methods _asyncNinja_retainUntilFinalization, _asyncNinja_notifyFinalization. They could help. Their names could change if their functionality would be proven useful on larger scale
  3. That should be rather easy to introduce. Please make a pull request for that. See similar functions here: https://github.com/AsyncNinja/AsyncNinja/blob/master/Sources/Channel_makers.swift
serg-vinnie
@serg-vinnie
I've made pull request
examples of what I'm aiming for:
    // Realm Changeset
    func itemsStream() -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
        return channel(executor: Executor.main) { producer in

            let notificationToken = self.allObjects(ofType: RepoEntity.self).observe { changeset in
                producer.update(changeset)
                print("changeset update")
            }

            producer._asyncNinja_retainUntilFinalization(notificationToken)
        }
    }

    // Input stream
    func modifierFlagsStream() -> Channel<NSEvent.ModifierFlags, Void> {
        return channel(executor: Executor.main) { producer in
            let monitorFlags = NSEvent.addLocalMonitorForEvents(matching: .flagsChanged) {
                producer.update($0.modifierFlags)
                return $0
            }

            producer._asyncNinja_notifyFinalization {
                 NSEvent.removeMonitor(monitorFlags!)
            }
        }
    }
serg-vinnie
@serg-vinnie
code of factories(constructors):
public func channel<Update, Success>(executor: Executor = .primary, block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
  let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
  executor.execute(from: nil) { [weak producer] _ in
    guard let producer = producer else { return }

    do {
      try block(producer)
    } catch {
      producer.fail(error)
    }
  }
  return producer
}

public func channel<C: ExecutionContext, Update, Success>(context: C, executor: Executor? = nil,
                                                                 block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
  let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
  context.addDependent(completable: producer)

  (executor ?? context.executor).execute(from: nil) { [weak context, weak producer] (originalExecutor) in
    guard let producer = producer else { return }
    guard let _ = context else {
      producer.cancelBecauseOfDeallocatedContext(from: originalExecutor)
      return
    }

    do {
      try block(producer)
    } catch {
      producer.fail(error)
    }
  }

  return producer
}
serg-vinnie
@serg-vinnie
These two can replace all previous Convenience Constructors. There are no signature collisions with old ones, so they can coexist peacefully.
But there is a problem with infinite streams
serg-vinnie
@serg-vinnie
As you can see in first code snipped the producer must be captured in event closure.
In current implementation of AsyncNinja releasing of producer invokes the Finalization
serg-vinnie
@serg-vinnie
But my intent implies opposite: Finalization should invoke releasing of producer
serg-vinnie
@serg-vinnie
Of course Finalization can be kept as it is. And we can introduce new event: Unsubscription
  producer.retainWhileSubscribed(notificationToken)
  producer.notifySubscriptionRelease { NSEvent.removeMonitor(monitorFlags!) }
serg-vinnie
@serg-vinnie
If you perceive some danger in exposing producer into construction closure there is an option to create some ChannelProducerProtocol
It should be able to:
  1. Update and Complete
  2. Set up Cancelation
  3. Set Buffer Size
  4. Handle Unsubscription
serg-vinnie
@serg-vinnie
Another use case:
public extension ReactiveProperties where Object: NSView {
    func trackingAreaWith(options: NSTrackingArea.Options) -> Channel<NSEvent, Void> {
        return channel() { producer in

            let receiver = TrackingMessageReceiver(producer: producer)

            let area = NSTrackingArea.init(rect: self.object.bounds, options: options, owner: /* weak */ receiver, userInfo: nil)
            self.object.addTrackingArea(area)

            producer._asyncNinja_retainUntilFinalization(receiver)
            producer._asyncNinja_notifyFinalization {
                self.base.removeTrackingArea(area)
            }
        }
    }

    var trackingArea : Channel<NSEvent, Void> {
        return trackingAreaWith(options: [ .mouseEnteredAndExited, .activeAlways ])
    }
}

fileprivate class TrackingMessageReceiver : NSResponder {

    var producer: Producer<NSEvent, Void>

    init(producer: Producer<NSEvent, Void>) {
        self.producer = producer
        super.init()
    }

    required init?(coder: NSCoder) {
        fatalError("init(coder:) has not been implemented")
    }

    override func mouseEntered(with theEvent: NSEvent) {
        producer.update(theEvent)
    }

    override func mouseExited(with theEvent: NSEvent) {
        producer.update(theEvent)
    }
}
BTW. this code will not compile:
'object' is inaccessible due to 'internal' protection level
serg-vinnie
@serg-vinnie
if you tell me how to fix it would be awesome
Anton Mironov
@antonvmironov
I miss slack channels :( This discussion would be way easier to follow
So I've pushed channel factory co-authored with @serg-vinnie . Also I've pushed publishing of ReactiveProperties methods and initializers, so the issue you've been seeing should be gone
I do not quite understand what unsubscription means. Does it mean that subscriber is gone but the channel is still up and running?
Anton Mironov
@antonvmironov
Also I see with your point on ChannelProducerProtocol.
Update and Complete are potentially doable.
However cancellation and buffer creation are not. That would create a chain of underlying channels that would pass events to one another.
Maybe something as simple as builder could work.
serg-vinnie
@serg-vinnie

Does it mean that subscriber is gone but the channel is still up and running?

Yes

like onDispose in Rx
serg-vinnie
@serg-vinnie
        let src = Observable<NSEvent>.create { observer in
            //...                        
            return Disposables.create {
                self.base.removeTrackingArea(area)
            }
        }
serg-vinnie
@serg-vinnie
But I don't want to tell you "let's copy some feature from Rx". I prefer to describe some generic problem and offer a solution to it.
serg-vinnie
@serg-vinnie
1.png
serg-vinnie
@serg-vinnie
generic description of what I need looks like this:
    func subscribeToDataSource() -> Channel<DataItem, Void> {
        return channel() { producer in

            let streamHandler = DataStreamSubscription { nextDataItem in
                /* STRONG reference to Producer */
                producer.update(nextDataItem) 
            }

            producer.onRelease { streamHandler.invalidate() } // Invalidate streamHandler to release strong reference to the Producer
        }
    }
Do you understand why it is impossible in current implementation of AsyncNinja?
serg-vinnie
@serg-vinnie

I miss slack channels :( This discussion would be way easier to follow

Gitter is ok for me. I have sound+vibration notifications on my iPhone from Gitter. And waiting few days for response is not a problem. But if you want to switch to slack or something else - it's ok too.

Anton Mironov
@antonvmironov
Maybe I am missing something but "subscriber is gone" block is not going to solve the problem. AsyncNinja is allows you to have any amount of subscribers to your primitive. Looks like invalidation of streamHandler if any single subscriber is gone is just going to ruin a mechanism you are trying to provide. My point is that you need to invalidate streamHandler if all subscribers are gone. This is what _asyncNinja_notifyFinalization does.
Also releasing producer within finalisation block could lead to retain cycles.
Anton Mironov
@antonvmironov
However I totally see your point here. I see following options:
  • using weak reference to producer
  • using constructor described here
  • or constructor described here
  • creating a failing test case that exposes an issue
serg-vinnie
@serg-vinnie
not subscriber is gone
but last subscriber is gone
serg-vinnie
@serg-vinnie
I'm going to create some sandbox project with real world examples
this should be more productive
Anton Mironov
@antonvmironov
Sounds great!
serg-vinnie
@serg-vinnie
going to make a pull request
there is 1 bug and 1 feature
func merge<Update,Success>(channels: [Channel<Update,Success>]) -> Channel<Update,Void> {
  return channel(executor: Executor.primary) { (producer : Producer<Update,Void>) -> Void in
    for ch in channels {
      ch.bind(producer)
    }
  }
}
that merge function can merge updates from arbitrary number of channels
for example:
        let selection = merge(channels: myView.checkboxes.map { $0.actionChannel() } )
            .debounce(interval: 1)