Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
Anton Mironov
@antonvmironov
Sounds great!
serg-vinnie
@serg-vinnie
going to make a pull request
there is 1 bug and 1 feature
func merge<Update,Success>(channels: [Channel<Update,Success>]) -> Channel<Update,Void> {
  return channel(executor: Executor.primary) { (producer : Producer<Update,Void>) -> Void in
    for ch in channels {
      ch.bind(producer)
    }
  }
}
that merge function can merge updates from arbitrary number of channels
for example:
        let selection = merge(channels: myView.checkboxes.map { $0.actionChannel() } )
            .debounce(interval: 1)
reaction for any of 10 checkboxes
serg-vinnie
@serg-vinnie
Success values can be ignored in such case
serg-vinnie
@serg-vinnie
And there is a bug with flatMap
event.flatMap { doSomethingForTwoSeccons() }.onUpdate { * update executes immediately instead of waiting for two seconds*\ }
I added failing test for flatMap
serg-vinnie
@serg-vinnie
hi
there are two methods
func download(from: String, to: String) -> Channel<Double,URL>
func unzip(from: String, to: String, password: String) -> Channel<Double,URL>
each channel updates its progress and completes with destination url
I'm trying to combine sequencially these methods
func prepareSourceFiles() -> Channel<Double,URL> {
        return download(from: webURL, to: dstZip)
            .map { $0 / 2.0 }   // 100% is 50%                                  <= IGNORED
            .flatMapSuccess { unzip(from: $0.path, to: dstUnzip, password: "qwerty") }
            .map { 0.5 + $0 / 2.0 } // 0% is 50%
    }
so downloading progress will be first 50% and unzipping is next 50%
but
serg-vinnie
@serg-vinnie
updates before flatMaping was dropped
in result I get only progress from unzipping while progress for downloading was skipped
serg-vinnie
@serg-vinnie
what's wrong with code and how to achieve expected behavior?
Anton Mironov
@antonvmironov
Hey. I see that unzip will not emit updates until the unzip starts.
It seems like updates and completions should be separated here.
I understand what you want to achieve. This is an interesting problem. Going to approach it and get back with results
serg-vinnie
@serg-vinnie
yes unzip starts only when download completes – it is the only possible way
serg-vinnie
@serg-vinnie
does AsyncNinaja has some easy replacement for Rx CombineLatest? http://reactivex.io/documentation/operators/combinelatest.html
serg-vinnie
@serg-vinnie
Hi
I'm developing the CombineLatest
This message was deleted
import Foundation

typealias ES = EventSource

func combineLatestUpdates<ES1:ES, ES2:ES>(_ es1: ES1, _ es2: ES2, executor: Executor = Executor.main)
  -> Channel<(ES1.Update,ES2.Update),(ES1.Success, ES2.Success)> {

  return CombineLatest2(es1, es2, executor: executor).producer
}

fileprivate class CombineLatest2<ES1:ES, ES2:ES> : ExecutionContext, ReleasePoolOwner {
  typealias ResultUpdate  = (ES1.Update,  ES2.Update)
  typealias ResultSuccess = (ES1.Success, ES2.Success)

  public let releasePool = ReleasePool()
  public var executor: Executor

  var update1 : ES1.Update?
  var update2 : ES2.Update?

  var success1 : ES1.Success?
  var success2 : ES2.Success?

  let producer = Producer<ResultUpdate,ResultSuccess>()

  init(_ es1: ES1, _ es2: ES2, executor: Executor) {
    self.executor = executor

    producer._asyncNinja_retainUntilFinalization(self)

    es1.onUpdate(context: self) { ctx, upd in     ctx.update1 = upd;      ctx.trySendUpdate() }
      .onSuccess(context: self) { ctx, success in ctx.success1 = success; ctx.tryComplete()   }
      .onFailure(context: self) { ctx, error in   ctx.producer.fail(error) }

    es2.onUpdate(context: self) { ctx, upd in     ctx.update2 = upd;      ctx.trySendUpdate() }
      .onSuccess(context: self) { ctx, success in ctx.success2 = success; ctx.tryComplete()   }
      .onFailure(context: self) { ctx, error in   ctx.producer.fail(error) }
  }

  func trySendUpdate() {
    guard let upd1 = update1 else { return }
    guard let upd2 = update2 else { return }

    producer.update((upd1,upd2))
  }

  func tryComplete() {
    guard let suc1 = success1 else { return }
    guard let suc2 = success2 else { return }

    producer.succeed((suc1,suc2), from: executor)
  }
}
this solution works fine for combining elements
but there is a memory leak
producer._asyncNinja_retainUntilFinalization(self)
serg-vinnie
@serg-vinnie
I need to hold reference to the instance of CombineLatest2 class
serg-vinnie
@serg-vinnie
but I'm not sure how to release it
feature I requested previously could help
it would be handy to release it when last subscription released
serg-vinnie
@serg-vinnie
the main idea is to create Producer which exists in context of subscriber
serg-vinnie
@serg-vinnie
ha
it was exact case when proper question contains answer inside
my solution to this problem is to extend ObjCInjectedRetainer and ReleasePoolOwner
extension Retainer {
  func combineLatestUpdates<ES1:ES, ES2:ES>(_ es1: ES1, _ es2: ES2, executor: Executor = Executor.main)
    -> Channel<(ES1.Update,ES2.Update),(ES1.Success, ES2.Success)> {

      return CombineLatest2(es1, es2, executor: executor)
        .retain(with: self)
        .producer
  }
}