Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
serg-vinnie
@serg-vinnie

I miss slack channels :( This discussion would be way easier to follow

Gitter is ok for me. I have sound+vibration notifications on my iPhone from Gitter. And waiting few days for response is not a problem. But if you want to switch to slack or something else - it's ok too.

Anton Mironov
@antonvmironov
Maybe I am missing something but "subscriber is gone" block is not going to solve the problem. AsyncNinja is allows you to have any amount of subscribers to your primitive. Looks like invalidation of streamHandler if any single subscriber is gone is just going to ruin a mechanism you are trying to provide. My point is that you need to invalidate streamHandler if all subscribers are gone. This is what _asyncNinja_notifyFinalization does.
Also releasing producer within finalisation block could lead to retain cycles.
Anton Mironov
@antonvmironov
However I totally see your point here. I see following options:
  • using weak reference to producer
  • using constructor described here
  • or constructor described here
  • creating a failing test case that exposes an issue
serg-vinnie
@serg-vinnie
not subscriber is gone
but last subscriber is gone
serg-vinnie
@serg-vinnie
I'm going to create some sandbox project with real world examples
this should be more productive
Anton Mironov
@antonvmironov
Sounds great!
serg-vinnie
@serg-vinnie
going to make a pull request
there is 1 bug and 1 feature
func merge<Update,Success>(channels: [Channel<Update,Success>]) -> Channel<Update,Void> {
  return channel(executor: Executor.primary) { (producer : Producer<Update,Void>) -> Void in
    for ch in channels {
      ch.bind(producer)
    }
  }
}
that merge function can merge updates from arbitrary number of channels
for example:
        let selection = merge(channels: myView.checkboxes.map { $0.actionChannel() } )
            .debounce(interval: 1)
reaction for any of 10 checkboxes
serg-vinnie
@serg-vinnie
Success values can be ignored in such case
serg-vinnie
@serg-vinnie
And there is a bug with flatMap
event.flatMap { doSomethingForTwoSeccons() }.onUpdate { * update executes immediately instead of waiting for two seconds*\ }
I added failing test for flatMap
serg-vinnie
@serg-vinnie
hi
there are two methods
func download(from: String, to: String) -> Channel<Double,URL>
func unzip(from: String, to: String, password: String) -> Channel<Double,URL>
each channel updates its progress and completes with destination url
I'm trying to combine sequencially these methods
func prepareSourceFiles() -> Channel<Double,URL> {
        return download(from: webURL, to: dstZip)
            .map { $0 / 2.0 }   // 100% is 50%                                  <= IGNORED
            .flatMapSuccess { unzip(from: $0.path, to: dstUnzip, password: "qwerty") }
            .map { 0.5 + $0 / 2.0 } // 0% is 50%
    }
so downloading progress will be first 50% and unzipping is next 50%
but
serg-vinnie
@serg-vinnie
updates before flatMaping was dropped
in result I get only progress from unzipping while progress for downloading was skipped
serg-vinnie
@serg-vinnie
what's wrong with code and how to achieve expected behavior?
Anton Mironov
@antonvmironov
Hey. I see that unzip will not emit updates until the unzip starts.
It seems like updates and completions should be separated here.
I understand what you want to achieve. This is an interesting problem. Going to approach it and get back with results
serg-vinnie
@serg-vinnie
yes unzip starts only when download completes – it is the only possible way
serg-vinnie
@serg-vinnie
does AsyncNinaja has some easy replacement for Rx CombineLatest? http://reactivex.io/documentation/operators/combinelatest.html
serg-vinnie
@serg-vinnie
Hi
I'm developing the CombineLatest
This message was deleted
import Foundation

typealias ES = EventSource

func combineLatestUpdates<ES1:ES, ES2:ES>(_ es1: ES1, _ es2: ES2, executor: Executor = Executor.main)
  -> Channel<(ES1.Update,ES2.Update),(ES1.Success, ES2.Success)> {

  return CombineLatest2(es1, es2, executor: executor).producer
}

fileprivate class CombineLatest2<ES1:ES, ES2:ES> : ExecutionContext, ReleasePoolOwner {
  typealias ResultUpdate  = (ES1.Update,  ES2.Update)
  typealias ResultSuccess = (ES1.Success, ES2.Success)

  public let releasePool = ReleasePool()
  public var executor: Executor

  var update1 : ES1.Update?
  var update2 : ES2.Update?

  var success1 : ES1.Success?
  var success2 : ES2.Success?

  let producer = Producer<ResultUpdate,ResultSuccess>()

  init(_ es1: ES1, _ es2: ES2, executor: Executor) {
    self.executor = executor

    producer._asyncNinja_retainUntilFinalization(self)

    es1.onUpdate(context: self) { ctx, upd in     ctx.update1 = upd;      ctx.trySendUpdate() }
      .onSuccess(context: self) { ctx, success in ctx.success1 = success; ctx.tryComplete()   }
      .onFailure(context: self) { ctx, error in   ctx.producer.fail(error) }

    es2.onUpdate(context: self) { ctx, upd in     ctx.update2 = upd;      ctx.trySendUpdate() }
      .onSuccess(context: self) { ctx, success in ctx.success2 = success; ctx.tryComplete()   }
      .onFailure(context: self) { ctx, error in   ctx.producer.fail(error) }
  }

  func trySendUpdate() {
    guard let upd1 = update1 else { return }
    guard let upd2 = update2 else { return }

    producer.update((upd1,upd2))
  }

  func tryComplete() {
    guard let suc1 = success1 else { return }
    guard let suc2 = success2 else { return }

    producer.succeed((suc1,suc2), from: executor)
  }
}
this solution works fine for combining elements
but there is a memory leak
producer._asyncNinja_retainUntilFinalization(self)
serg-vinnie
@serg-vinnie
I need to hold reference to the instance of CombineLatest2 class
serg-vinnie
@serg-vinnie
but I'm not sure how to release it
feature I requested previously could help
it would be handy to release it when last subscription released
serg-vinnie
@serg-vinnie
the main idea is to create Producer which exists in context of subscriber
serg-vinnie
@serg-vinnie
ha
it was exact case when proper question contains answer inside
my solution to this problem is to extend ObjCInjectedRetainer and ReleasePoolOwner
extension Retainer {
  func combineLatestUpdates<ES1:ES, ES2:ES>(_ es1: ES1, _ es2: ES2, executor: Executor = Executor.main)
    -> Channel<(ES1.Update,ES2.Update),(ES1.Success, ES2.Success)> {

      return CombineLatest2(es1, es2, executor: executor)
        .retain(with: self)
        .producer
  }
}