Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
serg-vinnie
@serg-vinnie
let me remind you how it was implemented
public func channel<Update, Success>(executor: Executor = .primary, cancellationToken: CancellationToken? = nil,
                                         bufferSize: Int = AsyncNinjaConstants.defaultChannelBufferSize,
                                         block: @escaping (_ update: @escaping (Update) -> Void, _ complete: @escaping (Fallible<Success>) -> Void) throws -> Void
        ) -> Channel<Update, Success> {
        // TEST: ChannelMakersTests.testInfiniteChannel

        let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
        cancellationToken?.add(cancellable: producer)
        executor.execute(
            from: nil
        ) { [weak producer] (originalExecutor) in

            do {
                try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
            } catch{
                producer?.complete(Fallible(failure: error), from: originalExecutor)
            }
        }
        return producer
    }
today I've tried to implement Realm Database subscription via AsyncNinja Channel
func itemsStream() -> Channel<RealmCollectionChange<Results<RepoEntity>>, NotificationToken> {

        var notificationToken : NotificationToken?

        return channel() { update, completion in

            notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
                update(changeset)
                print("changeset update")
            }
        }.onComplete { _ in _ = notificationToken } // keep reference for notificationToken
    }
serg-vinnie
@serg-vinnie
this method located in Service (which injects into ViewModel)
and there is a subscription in ViewController
myViewModel.repoList.itemsStream().onUpdate(context: self) { _, changeset in
            switch changeset {
            case .update(_, let deletions, let insertions, let modifications):
                print("deletions \(deletions), insertions \(insertions), modifications \(modifications)")
            default: break
            }
        }
So subscription works as expected and it releases on window close
serg-vinnie
@serg-vinnie
but notificationToken not releases
it looks like onComplete closure references somewhere
serg-vinnie
@serg-vinnie
of course this solution is ugly (it can be easily fixed with even uglier : make notificationToken class memeber)
serg-vinnie
@serg-vinnie
but it looks like memory leak in AsyncNinja so I decided to tell you about it
and yes, I'm completely sure everything was released: ViewController, ViewModel and Service
log:
[RepoListViewController]: deinit
[RepoListViewModel]: deinit
[RepoListService]: deinit
serg-vinnie
@serg-vinnie
But I can see "changeset update" in log if I change DB in Realm Studio
anyway
this is not critical
I wanted to consult with you about proper implementation
how it should be:
serg-vinnie
@serg-vinnie
return channel() { update, completion in

            let notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
                update(changeset)
            }
            return { notificationToken?.invalidate } // invalidate token on subscription release
        }
and code in Convenience constructor should be changed from
executor.execute( from: nil) { [weak producer] (originalExecutor) in
        do {
            try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
        } catch{
            producer?.complete(Fallible(failure: error), from: originalExecutor)
        }
    }
to:
executor.execute( from: nil) { [weak producer] (originalExecutor) in
        do {
            let onRelease = try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
            producer?.onSubsriptionRelease = onRelease  // <<- LOOK AT ME
        } catch{
            producer?.complete(Fallible(failure: error), from: originalExecutor)
        }
    }
serg-vinnie
@serg-vinnie
It is something like Rx's OnDispose.
so my question is: how to call Producer's onSubsriptionRelease in the right moment?
serg-vinnie
@serg-vinnie
scenario A: 1) you tell me where to call onSubsriptionRelease 2) I implement it 3) I do pull request
scenario B: 1) I do pull requests 2) you do all the magic
I'm really hope everything I wrote was clear :)
serg-vinnie
@serg-vinnie
or maybe it's time for revolution? what do you think about that?
serg-vinnie
@serg-vinnie
func realmChangset(cancellationToken: CancellationToken) -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
  return channel() { producer in
    producer.bufferSize = 666
    cancellationToken?.add(cancellable: producer)  // plan A

    let notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
      producer.update(changeset)
    }

    producer.onRelease = { notificationToken.invalidate }

    // producer.complete(fallible: _)
    // producer.succeded(...)
    // producer.failed(...)
  }.cancelableBy(cancellationToken)  // plan B
}
serg-vinnie
@serg-vinnie
let's summarize:
  1. it looks like AsyncNinja doesn't release onComplete closure
  2. it would be amazing to have on[Subscription]Release handler
  3. channel() { producer in ... } Convenience construction
Anton Mironov
@antonvmironov
  1. Please make a pull request with unit test that exposes this issue. I will try to it figure out.
  2. Please try using internal methods _asyncNinja_retainUntilFinalization, _asyncNinja_notifyFinalization. They could help. Their names could change if their functionality would be proven useful on larger scale
  3. That should be rather easy to introduce. Please make a pull request for that. See similar functions here: https://github.com/AsyncNinja/AsyncNinja/blob/master/Sources/Channel_makers.swift
serg-vinnie
@serg-vinnie
I've made pull request
examples of what I'm aiming for:
    // Realm Changeset
    func itemsStream() -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
        return channel(executor: Executor.main) { producer in

            let notificationToken = self.allObjects(ofType: RepoEntity.self).observe { changeset in
                producer.update(changeset)
                print("changeset update")
            }

            producer._asyncNinja_retainUntilFinalization(notificationToken)
        }
    }

    // Input stream
    func modifierFlagsStream() -> Channel<NSEvent.ModifierFlags, Void> {
        return channel(executor: Executor.main) { producer in
            let monitorFlags = NSEvent.addLocalMonitorForEvents(matching: .flagsChanged) {
                producer.update($0.modifierFlags)
                return $0
            }

            producer._asyncNinja_notifyFinalization {
                 NSEvent.removeMonitor(monitorFlags!)
            }
        }
    }
serg-vinnie
@serg-vinnie
code of factories(constructors):
public func channel<Update, Success>(executor: Executor = .primary, block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
  let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
  executor.execute(from: nil) { [weak producer] _ in
    guard let producer = producer else { return }

    do {
      try block(producer)
    } catch {
      producer.fail(error)
    }
  }
  return producer
}

public func channel<C: ExecutionContext, Update, Success>(context: C, executor: Executor? = nil,
                                                                 block: @escaping (_ strongProducer: Producer<Update, Success>) throws -> Void) -> Channel<Update, Success> {
  let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
  context.addDependent(completable: producer)

  (executor ?? context.executor).execute(from: nil) { [weak context, weak producer] (originalExecutor) in
    guard let producer = producer else { return }
    guard let _ = context else {
      producer.cancelBecauseOfDeallocatedContext(from: originalExecutor)
      return
    }

    do {
      try block(producer)
    } catch {
      producer.fail(error)
    }
  }

  return producer
}
serg-vinnie
@serg-vinnie
These two can replace all previous Convenience Constructors. There are no signature collisions with old ones, so they can coexist peacefully.
But there is a problem with infinite streams
serg-vinnie
@serg-vinnie
As you can see in first code snipped the producer must be captured in event closure.
In current implementation of AsyncNinja releasing of producer invokes the Finalization
serg-vinnie
@serg-vinnie
But my intent implies opposite: Finalization should invoke releasing of producer
serg-vinnie
@serg-vinnie
Of course Finalization can be kept as it is. And we can introduce new event: Unsubscription
  producer.retainWhileSubscribed(notificationToken)
  producer.notifySubscriptionRelease { NSEvent.removeMonitor(monitorFlags!) }
serg-vinnie
@serg-vinnie
If you perceive some danger in exposing producer into construction closure there is an option to create some ChannelProducerProtocol
It should be able to:
  1. Update and Complete
  2. Set up Cancelation
  3. Set Buffer Size
  4. Handle Unsubscription
serg-vinnie
@serg-vinnie
Another use case:
public extension ReactiveProperties where Object: NSView {
    func trackingAreaWith(options: NSTrackingArea.Options) -> Channel<NSEvent, Void> {
        return channel() { producer in

            let receiver = TrackingMessageReceiver(producer: producer)

            let area = NSTrackingArea.init(rect: self.object.bounds, options: options, owner: /* weak */ receiver, userInfo: nil)
            self.object.addTrackingArea(area)

            producer._asyncNinja_retainUntilFinalization(receiver)
            producer._asyncNinja_notifyFinalization {
                self.base.removeTrackingArea(area)
            }
        }
    }

    var trackingArea : Channel<NSEvent, Void> {
        return trackingAreaWith(options: [ .mouseEnteredAndExited, .activeAlways ])
    }
}

fileprivate class TrackingMessageReceiver : NSResponder {

    var producer: Producer<NSEvent, Void>

    init(producer: Producer<NSEvent, Void>) {
        self.producer = producer
        super.init()
    }

    required init?(coder: NSCoder) {
        fatalError("init(coder:) has not been implemented")
    }

    override func mouseEntered(with theEvent: NSEvent) {
        producer.update(theEvent)
    }

    override func mouseExited(with theEvent: NSEvent) {
        producer.update(theEvent)
    }
}
BTW. this code will not compile:
'object' is inaccessible due to 'internal' protection level
serg-vinnie
@serg-vinnie
if you tell me how to fix it would be awesome