Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
serg-vinnie
@serg-vinnie
        // Rx subscription works as expected
        states.subscribeRxFor(key: KKHelpViewController.self, type: ViewStateInfo.self)
            .filter { $0.state.isHidden }
            .observeOn(MainScheduler.instance)
            .subscribe(onNext: { _ in scenes.show(wnd: KKMainWindowController.self) })
            .disposed(by: bag)

        // AsyncNinja subscription executes too late
        states.subscribeFor(key: KKHelpViewController.self, type: ViewStateInfo.self)
            .filter { $0.state.isHidden }
            .onUpdate(executor: Executor.main) { _ in scenes.show(wnd: KKMainWindowController.self) }
And there is problem with subscription to didHide event
serg-vinnie
@serg-vinnie
I need to show MainWindow before HelpWindow get destroyed
serg-vinnie
@serg-vinnie
Is there way to force synchronous execution of Update?
serg-vinnie
@serg-vinnie
Updating from Main Executor didn't work
didChange.update(value, from: Executor.main)
        states.subscribeFor(key: KKHelpViewController.self, type: ViewStateInfo.self)
            .filter(executor: Executor.main) { $0.state.isHidden }
            .onUpdate(executor: Executor.main) { _ in scenes.show(wnd: KKMainWindowController.self) }
serg-vinnie
@serg-vinnie
the Update goes from main thread through com.apple.root.default-qosanyway
Anton Mironov
@antonvmironov
There is Executor.immediate available for immediate execution. Use that for filter and for onUpdate call to omit dispatching hoops that you do not want
serg-vinnie
@serg-vinnie
works!
serg-vinnie
@serg-vinnie
what does it mean to make update from executor? update(value, from: Executor.main)
Anton Mironov
@antonvmironov
A lot of methods have optional argument from originalExecutor: Executor? = nil. It basically says "I am sure this call is from this executor". It enables yet another AsyncNinja optimization: omitting async dispatching. So if you update channel from main queue subscribers (subscribed on main executor) could be notified synchronously. It does not always happen, additional checks are in place, e.g. does sync call make sense or is there a probability of stack overflow.
serg-vinnie
@serg-vinnie
got it
serg-vinnie
@serg-vinnie
have you ever thought to move cancelationToken and bufferSize arguments from constructor argument to operators?
  channel() { update in ... }
    .cancelBy(token: CancelationToken)
    .bufferSize(10)
serg-vinnie
@serg-vinnie
there is a problem with infinite channel
let me remind you how it was implemented
public func channel<Update, Success>(executor: Executor = .primary, cancellationToken: CancellationToken? = nil,
                                         bufferSize: Int = AsyncNinjaConstants.defaultChannelBufferSize,
                                         block: @escaping (_ update: @escaping (Update) -> Void, _ complete: @escaping (Fallible<Success>) -> Void) throws -> Void
        ) -> Channel<Update, Success> {
        // TEST: ChannelMakersTests.testInfiniteChannel

        let producer = Producer<Update, Success>(bufferSize: AsyncNinjaConstants.defaultChannelBufferSize)
        cancellationToken?.add(cancellable: producer)
        executor.execute(
            from: nil
        ) { [weak producer] (originalExecutor) in

            do {
                try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
            } catch{
                producer?.complete(Fallible(failure: error), from: originalExecutor)
            }
        }
        return producer
    }
today I've tried to implement Realm Database subscription via AsyncNinja Channel
func itemsStream() -> Channel<RealmCollectionChange<Results<RepoEntity>>, NotificationToken> {

        var notificationToken : NotificationToken?

        return channel() { update, completion in

            notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
                update(changeset)
                print("changeset update")
            }
        }.onComplete { _ in _ = notificationToken } // keep reference for notificationToken
    }
serg-vinnie
@serg-vinnie
this method located in Service (which injects into ViewModel)
and there is a subscription in ViewController
myViewModel.repoList.itemsStream().onUpdate(context: self) { _, changeset in
            switch changeset {
            case .update(_, let deletions, let insertions, let modifications):
                print("deletions \(deletions), insertions \(insertions), modifications \(modifications)")
            default: break
            }
        }
So subscription works as expected and it releases on window close
serg-vinnie
@serg-vinnie
but notificationToken not releases
it looks like onComplete closure references somewhere
serg-vinnie
@serg-vinnie
of course this solution is ugly (it can be easily fixed with even uglier : make notificationToken class memeber)
serg-vinnie
@serg-vinnie
but it looks like memory leak in AsyncNinja so I decided to tell you about it
and yes, I'm completely sure everything was released: ViewController, ViewModel and Service
log:
[RepoListViewController]: deinit
[RepoListViewModel]: deinit
[RepoListService]: deinit
serg-vinnie
@serg-vinnie
But I can see "changeset update" in log if I change DB in Realm Studio
anyway
this is not critical
I wanted to consult with you about proper implementation
how it should be:
serg-vinnie
@serg-vinnie
return channel() { update, completion in

            let notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
                update(changeset)
            }
            return { notificationToken?.invalidate } // invalidate token on subscription release
        }
and code in Convenience constructor should be changed from
executor.execute( from: nil) { [weak producer] (originalExecutor) in
        do {
            try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
        } catch{
            producer?.complete(Fallible(failure: error), from: originalExecutor)
        }
    }
to:
executor.execute( from: nil) { [weak producer] (originalExecutor) in
        do {
            let onRelease = try block({ producer?.update($0, from: originalExecutor) }, { producer?.complete($0, from: originalExecutor) })
            producer?.onSubsriptionRelease = onRelease  // <<- LOOK AT ME
        } catch{
            producer?.complete(Fallible(failure: error), from: originalExecutor)
        }
    }
serg-vinnie
@serg-vinnie
It is something like Rx's OnDispose.
so my question is: how to call Producer's onSubsriptionRelease in the right moment?
serg-vinnie
@serg-vinnie
scenario A: 1) you tell me where to call onSubsriptionRelease 2) I implement it 3) I do pull request
scenario B: 1) I do pull requests 2) you do all the magic
I'm really hope everything I wrote was clear :)
serg-vinnie
@serg-vinnie
or maybe it's time for revolution? what do you think about that?
serg-vinnie
@serg-vinnie
func realmChangset(cancellationToken: CancellationToken) -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
  return channel() { producer in
    producer.bufferSize = 666
    cancellationToken?.add(cancellable: producer)  // plan A

    let notificationToken = allObjects(ofType: RepoEntity.self).observe { changeset in
      producer.update(changeset)
    }

    producer.onRelease = { notificationToken.invalidate }

    // producer.complete(fallible: _)
    // producer.succeded(...)
    // producer.failed(...)
  }.cancelableBy(cancellationToken)  // plan B
}
serg-vinnie
@serg-vinnie
let's summarize:
  1. it looks like AsyncNinja doesn't release onComplete closure
  2. it would be amazing to have on[Subscription]Release handler
  3. channel() { producer in ... } Convenience construction
Anton Mironov
@antonvmironov
  1. Please make a pull request with unit test that exposes this issue. I will try to it figure out.
  2. Please try using internal methods _asyncNinja_retainUntilFinalization, _asyncNinja_notifyFinalization. They could help. Their names could change if their functionality would be proven useful on larger scale
  3. That should be rather easy to introduce. Please make a pull request for that. See similar functions here: https://github.com/AsyncNinja/AsyncNinja/blob/master/Sources/Channel_makers.swift
serg-vinnie
@serg-vinnie
I've made pull request
examples of what I'm aiming for:
    // Realm Changeset
    func itemsStream() -> Channel<RealmCollectionChange<Results<RepoEntity>>, Void> {
        return channel(executor: Executor.main) { producer in

            let notificationToken = self.allObjects(ofType: RepoEntity.self).observe { changeset in
                producer.update(changeset)
                print("changeset update")
            }

            producer._asyncNinja_retainUntilFinalization(notificationToken)
        }
    }

    // Input stream
    func modifierFlagsStream() -> Channel<NSEvent.ModifierFlags, Void> {
        return channel(executor: Executor.main) { producer in
            let monitorFlags = NSEvent.addLocalMonitorForEvents(matching: .flagsChanged) {
                producer.update($0.modifierFlags)
                return $0
            }

            producer._asyncNinja_notifyFinalization {
                 NSEvent.removeMonitor(monitorFlags!)
            }
        }
    }
serg-vinnie
@serg-vinnie
code of factories(constructors):