Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
serg-vinnie
@serg-vinnie
These two can replace all previous Convenience Constructors. There are no signature collisions with old ones, so they can coexist peacefully.
But there is a problem with infinite streams
serg-vinnie
@serg-vinnie
As you can see in first code snipped the producer must be captured in event closure.
In current implementation of AsyncNinja releasing of producer invokes the Finalization
serg-vinnie
@serg-vinnie
But my intent implies opposite: Finalization should invoke releasing of producer
serg-vinnie
@serg-vinnie
Of course Finalization can be kept as it is. And we can introduce new event: Unsubscription
  producer.retainWhileSubscribed(notificationToken)
  producer.notifySubscriptionRelease { NSEvent.removeMonitor(monitorFlags!) }
serg-vinnie
@serg-vinnie
If you perceive some danger in exposing producer into construction closure there is an option to create some ChannelProducerProtocol
It should be able to:
  1. Update and Complete
  2. Set up Cancelation
  3. Set Buffer Size
  4. Handle Unsubscription
serg-vinnie
@serg-vinnie
Another use case:
public extension ReactiveProperties where Object: NSView {
    func trackingAreaWith(options: NSTrackingArea.Options) -> Channel<NSEvent, Void> {
        return channel() { producer in

            let receiver = TrackingMessageReceiver(producer: producer)

            let area = NSTrackingArea.init(rect: self.object.bounds, options: options, owner: /* weak */ receiver, userInfo: nil)
            self.object.addTrackingArea(area)

            producer._asyncNinja_retainUntilFinalization(receiver)
            producer._asyncNinja_notifyFinalization {
                self.base.removeTrackingArea(area)
            }
        }
    }

    var trackingArea : Channel<NSEvent, Void> {
        return trackingAreaWith(options: [ .mouseEnteredAndExited, .activeAlways ])
    }
}

fileprivate class TrackingMessageReceiver : NSResponder {

    var producer: Producer<NSEvent, Void>

    init(producer: Producer<NSEvent, Void>) {
        self.producer = producer
        super.init()
    }

    required init?(coder: NSCoder) {
        fatalError("init(coder:) has not been implemented")
    }

    override func mouseEntered(with theEvent: NSEvent) {
        producer.update(theEvent)
    }

    override func mouseExited(with theEvent: NSEvent) {
        producer.update(theEvent)
    }
}
BTW. this code will not compile:
'object' is inaccessible due to 'internal' protection level
serg-vinnie
@serg-vinnie
if you tell me how to fix it would be awesome
Anton Mironov
@antonvmironov
I miss slack channels :( This discussion would be way easier to follow
So I've pushed channel factory co-authored with @serg-vinnie . Also I've pushed publishing of ReactiveProperties methods and initializers, so the issue you've been seeing should be gone
I do not quite understand what unsubscription means. Does it mean that subscriber is gone but the channel is still up and running?
Anton Mironov
@antonvmironov
Also I see with your point on ChannelProducerProtocol.
Update and Complete are potentially doable.
However cancellation and buffer creation are not. That would create a chain of underlying channels that would pass events to one another.
Maybe something as simple as builder could work.
serg-vinnie
@serg-vinnie

Does it mean that subscriber is gone but the channel is still up and running?

Yes

like onDispose in Rx
serg-vinnie
@serg-vinnie
        let src = Observable<NSEvent>.create { observer in
            //...                        
            return Disposables.create {
                self.base.removeTrackingArea(area)
            }
        }
serg-vinnie
@serg-vinnie
But I don't want to tell you "let's copy some feature from Rx". I prefer to describe some generic problem and offer a solution to it.
serg-vinnie
@serg-vinnie
1.png
serg-vinnie
@serg-vinnie
generic description of what I need looks like this:
    func subscribeToDataSource() -> Channel<DataItem, Void> {
        return channel() { producer in

            let streamHandler = DataStreamSubscription { nextDataItem in
                /* STRONG reference to Producer */
                producer.update(nextDataItem) 
            }

            producer.onRelease { streamHandler.invalidate() } // Invalidate streamHandler to release strong reference to the Producer
        }
    }
Do you understand why it is impossible in current implementation of AsyncNinja?
serg-vinnie
@serg-vinnie

I miss slack channels :( This discussion would be way easier to follow

Gitter is ok for me. I have sound+vibration notifications on my iPhone from Gitter. And waiting few days for response is not a problem. But if you want to switch to slack or something else - it's ok too.

Anton Mironov
@antonvmironov
Maybe I am missing something but "subscriber is gone" block is not going to solve the problem. AsyncNinja is allows you to have any amount of subscribers to your primitive. Looks like invalidation of streamHandler if any single subscriber is gone is just going to ruin a mechanism you are trying to provide. My point is that you need to invalidate streamHandler if all subscribers are gone. This is what _asyncNinja_notifyFinalization does.
Also releasing producer within finalisation block could lead to retain cycles.
Anton Mironov
@antonvmironov
However I totally see your point here. I see following options:
  • using weak reference to producer
  • using constructor described here
  • or constructor described here
  • creating a failing test case that exposes an issue
serg-vinnie
@serg-vinnie
not subscriber is gone
but last subscriber is gone
serg-vinnie
@serg-vinnie
I'm going to create some sandbox project with real world examples
this should be more productive
Anton Mironov
@antonvmironov
Sounds great!
serg-vinnie
@serg-vinnie
going to make a pull request
there is 1 bug and 1 feature
func merge<Update,Success>(channels: [Channel<Update,Success>]) -> Channel<Update,Void> {
  return channel(executor: Executor.primary) { (producer : Producer<Update,Void>) -> Void in
    for ch in channels {
      ch.bind(producer)
    }
  }
}
that merge function can merge updates from arbitrary number of channels
for example:
        let selection = merge(channels: myView.checkboxes.map { $0.actionChannel() } )
            .debounce(interval: 1)
reaction for any of 10 checkboxes
serg-vinnie
@serg-vinnie
Success values can be ignored in such case
serg-vinnie
@serg-vinnie
And there is a bug with flatMap
event.flatMap { doSomethingForTwoSeccons() }.onUpdate { * update executes immediately instead of waiting for two seconds*\ }
I added failing test for flatMap
serg-vinnie
@serg-vinnie
hi
there are two methods
func download(from: String, to: String) -> Channel<Double,URL>
func unzip(from: String, to: String, password: String) -> Channel<Double,URL>
each channel updates its progress and completes with destination url
I'm trying to combine sequencially these methods
func prepareSourceFiles() -> Channel<Double,URL> {
        return download(from: webURL, to: dstZip)
            .map { $0 / 2.0 }   // 100% is 50%                                  <= IGNORED
            .flatMapSuccess { unzip(from: $0.path, to: dstUnzip, password: "qwerty") }
            .map { 0.5 + $0 / 2.0 } // 0% is 50%
    }