Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
serg-vinnie
@serg-vinnie
but it looks like memory leak in AsyncNinja so I decided to tell you about it
and yes, I'm completely sure everything was released: ViewController, ViewModel and Service
log:
[RepoListViewController]: deinit
[RepoListViewModel]: deinit
[RepoListService]: deinit
serg-vinnie
@serg-vinnie
But I can see "changeset update" in log if I change DB in Realm Studio
anyway
this is not critical
I wanted to consult with you about proper implementation
how it should be:
serg-vinnie
@serg-vinnie
return channel() { update, completion in

            let notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
                update(changeset)
            }
            return { notificationToken?.invalidate } // invalidate token on subscription release
        }
and code in Convenience constructor should be changed from
executor.execute( from: nil) { [weak producer] (originalExecutor) in
        do {
            try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
        } catch{
            producer?.complete(Fallible(failure: error), from: originalExecutor)
        }
    }
to:
executor.execute( from: nil) { [weak producer] (originalExecutor) in
        do {
            let onRelease = try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
            producer?.onSubsriptionRelease = onRelease  // <<- LOOK AT ME
        } catch{
            producer?.complete(Fallible(failure: error), from: originalExecutor)
        }
    }
serg-vinnie
@serg-vinnie
It is something like Rx's OnDispose.
so my question is: how to call Producer's onSubsriptionRelease in the right moment?
serg-vinnie
@serg-vinnie
scenario A: 1) you tell me where to call onSubsriptionRelease 2) I implement it 3) I do pull request
scenario B: 1) I do pull requests 2) you do all the magic
I'm really hope everything I wrote was clear :)
serg-vinnie
@serg-vinnie
or maybe it's time for revolution? what do you think about that?
serg-vinnie
@serg-vinnie
func realmChangset(cancellationToken: CancellationToken) -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
  return channel() { producer in
    producer.bufferSize = 666
    cancellationToken?.add(cancellable: producer)  // plan A

    let notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
      producer.update(changeset)
    }

    producer.onRelease = { notificationToken.invalidate }

    // producer.complete(fallible: _)
    // producer.succeded(...)
    // producer.failed(...)
  }.cancelableBy(cancellationToken)  // plan B
}
serg-vinnie
@serg-vinnie
let's summarize:
  1. it looks like AsyncNinja doesn't release onComplete closure
  2. it would be amazing to have on[Subscription]Release handler
  3. channel() { producer in ... } Convenience construction
Anton Mironov
@antonvmironov
  1. Please make a pull request with unit test that exposes this issue. I will try to it figure out.
  2. Please try using internal methods _asyncNinja_retainUntilFinalization, _asyncNinja_notifyFinalization. They could help. Their names could change if their functionality would be proven useful on larger scale
  3. That should be rather easy to introduce. Please make a pull request for that. See similar functions here: https://github.com/AsyncNinja/AsyncNinja/blob/master/Sources/Channel_makers.swift
serg-vinnie
@serg-vinnie
I've made pull request
examples of what I'm aiming for:
    // Realm Changeset
    func itemsStream() -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
        return channel(executor: Executor.main) { producer in

            let notificationToken = self.allObjects(ofType: RepoEntity.self).observe { changeset in
                producer.update(changeset)
                print("changeset update")
            }

            producer._asyncNinja_retainUntilFinalization(notificationToken)
        }
    }

    // Input stream
    func modifierFlagsStream() -> Channel<NSEvent.ModifierFlags, Void> {
        return channel(executor: Executor.main) { producer in
            let monitorFlags = NSEvent.addLocalMonitorForEvents(matching: .flagsChanged) {
                producer.update($0.modifierFlags)
                return $0
            }

            producer._asyncNinja_notifyFinalization {
                 NSEvent.removeMonitor(monitorFlags!)
            }
        }
    }
serg-vinnie
@serg-vinnie
code of factories(constructors):
public func channel<Update, Success>(executor: Executor = .primary, block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
  let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
  executor.execute(from: nil) { [weak producer] _ in
    guard let producer = producer else { return }

    do {
      try block(producer)
    } catch {
      producer.fail(error)
    }
  }
  return producer
}

public func channel<C: ExecutionContext, Update, Success>(context: C, executor: Executor? = nil,
                                                                 block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
  let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
  context.addDependent(completable: producer)

  (executor ?? context.executor).execute(from: nil) { [weak context, weak producer] (originalExecutor) in
    guard let producer = producer else { return }
    guard let _ = context else {
      producer.cancelBecauseOfDeallocatedContext(from: originalExecutor)
      return
    }

    do {
      try block(producer)
    } catch {
      producer.fail(error)
    }
  }

  return producer
}
serg-vinnie
@serg-vinnie
These two can replace all previous Convenience Constructors. There are no signature collisions with old ones, so they can coexist peacefully.
But there is a problem with infinite streams
serg-vinnie
@serg-vinnie
As you can see in first code snipped the producer must be captured in event closure.
In current implementation of AsyncNinja releasing of producer invokes the Finalization
serg-vinnie
@serg-vinnie
But my intent implies opposite: Finalization should invoke releasing of producer
serg-vinnie
@serg-vinnie
Of course Finalization can be kept as it is. And we can introduce new event: Unsubscription
  producer.retainWhileSubscribed(notificationToken)
  producer.notifySubscriptionRelease { NSEvent.removeMonitor(monitorFlags!) }
serg-vinnie
@serg-vinnie
If you perceive some danger in exposing producer into construction closure there is an option to create some ChannelProducerProtocol
It should be able to:
  1. Update and Complete
  2. Set up Cancelation
  3. Set Buffer Size
  4. Handle Unsubscription
serg-vinnie
@serg-vinnie
Another use case:
public extension ReactiveProperties where Object: NSView {
    func trackingAreaWith(options: NSTrackingArea.Options) -> Channel<NSEvent, Void> {
        return channel() { producer in

            let receiver = TrackingMessageReceiver(producer: producer)

            let area = NSTrackingArea.init(rect: self.object.bounds, options: options, owner: /* weak */ receiver, userInfo: nil)
            self.object.addTrackingArea(area)

            producer._asyncNinja_retainUntilFinalization(receiver)
            producer._asyncNinja_notifyFinalization {
                self.base.removeTrackingArea(area)
            }
        }
    }

    var trackingArea : Channel<NSEvent, Void> {
        return trackingAreaWith(options: [ .mouseEnteredAndExited, .activeAlways ])
    }
}

fileprivate class TrackingMessageReceiver : NSResponder {

    var producer: Producer<NSEvent, Void>

    init(producer: Producer<NSEvent, Void>) {
        self.producer = producer
        super.init()
    }

    required init?(coder: NSCoder) {
        fatalError("init(coder:) has not been implemented")
    }

    override func mouseEntered(with theEvent: NSEvent) {
        producer.update(theEvent)
    }

    override func mouseExited(with theEvent: NSEvent) {
        producer.update(theEvent)
    }
}
BTW. this code will not compile:
'object' is inaccessible due to 'internal' protection level
serg-vinnie
@serg-vinnie
if you tell me how to fix it would be awesome
Anton Mironov
@antonvmironov
I miss slack channels :( This discussion would be way easier to follow
So I've pushed channel factory co-authored with @serg-vinnie . Also I've pushed publishing of ReactiveProperties methods and initializers, so the issue you've been seeing should be gone
I do not quite understand what unsubscription means. Does it mean that subscriber is gone but the channel is still up and running?
Anton Mironov
@antonvmironov
Also I see with your point on ChannelProducerProtocol.
Update and Complete are potentially doable.
However cancellation and buffer creation are not. That would create a chain of underlying channels that would pass events to one another.
Maybe something as simple as builder could work.
serg-vinnie
@serg-vinnie

Does it mean that subscriber is gone but the channel is still up and running?

Yes

like onDispose in Rx
serg-vinnie
@serg-vinnie
        let src = Observable<NSEvent>.create { observer in
            //...                        
            return Disposables.create {
                self.base.removeTrackingArea(area)
            }
        }
serg-vinnie
@serg-vinnie
But I don't want to tell you "let's copy some feature from Rx". I prefer to describe some generic problem and offer a solution to it.
serg-vinnie
@serg-vinnie
1.png
serg-vinnie
@serg-vinnie
generic description of what I need looks like this:
    func subscribeToDataSource() -> Channel<DataItem, Void> {
        return channel() { producer in

            let streamHandler = DataStreamSubscription { nextDataItem in
                /* STRONG reference to Producer */
                producer.update(nextDataItem) 
            }

            producer.onRelease { streamHandler.invalidate() } // Invalidate streamHandler to release strong reference to the Producer
        }
    }
Do you understand why it is impossible in current implementation of AsyncNinja?