These are chat archives for nextflow-io/nextflow

28th
Dec 2017
Fredrik Boulund
@boulund
Dec 28 2017 11:58
Hi! Hope you had good holidays so far. Anyone in here working today? I have a question about the order of how things are processed in nextflow channels

I'll simplify to the smallest example possible: I have three processes: assemble, predict_orfs, map_to_orfs. The assembly step takes reads and assembles them, then sends the assembled contigs to the orf prediction step, which in turn predicts orfs and sends the nucleotide sequences of the predicted genes to the final mapping process. The final mapping process also takes the reads from a channel and maps the reads back to the predicted orfs (from contigs assembled from the same reads). However, something is wrong, because the reads aren't matched up with the correct predicted orfs (i.e. orfs predicted from the contigs assembled from the same reads).

The two channels with reads are defined like this:
Channel.fromFilePairs(params.input_reads).into {input_reads_assemble; input_reads_map}

I kind of expected things to happen i lockstep, so that the first predicted orfs in the predicted_orfs_output channel are "related" (for lack of a better word) with the first read pair in the input_reads_mapchannel. Do you understand how I mean?

Evan Floden
@evanfloden
Dec 28 2017 12:29
Hi @boulund, I often find it best for each channel element to contain a sample id and then the data. That way you can split/combine channels on the sample id using your operators.
Fredrik Boulund
@boulund
Dec 28 2017 12:29
They do, I send tuples of pair_id, file() in each channel
Evan Floden
@evanfloden
Dec 28 2017 12:30
So it is simple 1:1 for each sample: assemble -> predict_orfs -> map_to_orfs?
Fredrik Boulund
@boulund
Dec 28 2017 12:30
However, the map_to_orfs process gets input from two separate channels: the output from predict_orfs and input_reads_map (i.e. the input reads)
Evan Floden
@evanfloden
Dec 28 2017 12:30
I see
Fredrik Boulund
@boulund
Dec 28 2017 12:31
The pair_ids from the output channel from predict_orfs and the input_reads_map doesn't match
Evan Floden
@evanfloden
Dec 28 2017 12:31
Best bet is to create a new channel from the output channel of predict_orfs and from the input_reads_map.
Fredrik Boulund
@boulund
Dec 28 2017 12:32
Hmm.... Not sure how to do that
Evan Floden
@evanfloden
Dec 28 2017 12:32
If you have the code I can walk you through it.
Fredrik Boulund
@boulund
Dec 28 2017 12:33
Here's a simplified example:
Channel.fromFilePairs(params.input_reads).into {input_reads_assemble; input_reads_map}

process assemble {
    input: 
    set pair_id, file(reads) from input_reads_assemble
    output:
    set pair_id, file("${pair_id}.contigs.fasta") into input_mgm
    ... (remaining code not relevant)
}
process predict_orfs{
    input:
    set file_id, file(contigs) from input_mgm
    output:
    set file_id, file("${pair_id}.predicted_nucleotides.fasta") into orf_nucleotides
    ... (remaining code not relevant)
}

process map_to_orfs{
    input:
    set pair_id, file(reads) from input_reads_map
    set file_id, file(orfs) from orf_nucleotides

    output:
    file "${pair_id}.mapped.sam"
    ... (remaining code not relevant)
}
Evan Floden
@evanfloden
Dec 28 2017 12:33
Great, give a sec and I will mock up something with the nextflow console
Fredrik Boulund
@boulund
Dec 28 2017 12:33
wonderful! Thank you
I think I understand what you mean; that I should merge orf_nucleotides with input_reads_map into a new channel combining them via their pair_ids, and then unpack that new channel in map_to_orfs instead of reading two separate channels like I do now?
Evan Floden
@evanfloden
Dec 28 2017 12:36
exactly. Take a look at combine using “by”, in this case something like:
orf_nucleotides
  .combine(input_reads_map, by: 0)
  .set {new_channel}
Fredrik Boulund
@boulund
Dec 28 2017 12:37
Won't such a solution hold up the entire pipeline until all files have been assembled and orf_predicted? Or is it like a Python generator that will yield pairs as soon as they are found?
Evan Floden
@evanfloden
Dec 28 2017 12:38
Nope, as soon as the input arrives it will execute for each pair.
Fredrik Boulund
@boulund
Dec 28 2017 12:39
Sweet, then that sounds like a good solution.
Am I correct in guessing that I could put that code after the process definition for predict_orfs?
Evan Floden
@evanfloden
Dec 28 2017 12:40
Yeah, that is best for readability so you can follow what is happening.
Fredrik Boulund
@boulund
Dec 28 2017 12:40
And then I guess I'll get a tuple with three elements when I unpack that in my process (it looks like that in the documentation: https://www.nextflow.io/docs/latest/operator.html?highlight=combine#combine)
Excellent! I'll try it right away, thanks a lot for the help
Evan Floden
@evanfloden
Dec 28 2017 12:42
exactly, so you can use as input something like
set sample_id, file(orf_file), file(read_files) from new_channel
Fredrik Boulund
@boulund
Dec 28 2017 12:43
ok, so it automatically groups the remaining parts of a tuple in the last file() ?
Evan Floden
@evanfloden
Dec 28 2017 12:44
My pleasure, anytime!
Fredrik Boulund
@boulund
Dec 28 2017 12:44
Thanks so much
Evan Floden
@evanfloden
Dec 28 2017 12:44
so it automatically groups the remaining parts of a tuple
Fredrik Boulund
@boulund
Dec 28 2017 12:44
I was afraid I was going to have to resort to some ugly hack solution pushing the read files along all intermediary processes
Evan Floden
@evanfloden
Dec 28 2017 12:46
Yeah, it takes a little bit of a mind shift to think in terms of channels and operators, but once I got it I found it amazingly powerful.
Fredrik Boulund
@boulund
Dec 28 2017 12:46
This would work equally well with the join operator as well then?
Evan Floden
@evanfloden
Dec 28 2017 12:47
Yeah, I think it is the same as combine when using by: 0
Fredrik Boulund
@boulund
Dec 28 2017 12:48
From the documentation it looks like it is.
Fredrik Boulund
@boulund
Dec 28 2017 12:53
... But I just tried it and it doesn't seem to work.
ERROR ~ No signature of method: groovyx.gpars.dataflow.DataflowQueue.join() is applicable for argument types:
Evan Floden
@evanfloden
Dec 28 2017 12:56
What the rest of the error. Looks like you are trying to perform join on something other than a channel.
Fredrik Boulund
@boulund
Dec 28 2017 13:13
Sorry, got distracted by a package delivery.
ERROR ~ No signature of method: groovyx.gpars.dataflow.DataflowQueue.join() is applicable for argument types: (groovyx.gpars.dataflow.Datafl
owQueue) values: [DataflowQueue(queue=[DataflowVariable(value=[saliva_10000reads_1rep, nextflow.util.ArrayBag@6f70f32f]), DataflowVariable(v
alue=[saliva_10000reads_2rep, nextflow.util.ArrayBag@548e76f1]), DataflowVariable(value=[saliva_10000reads_3rep, nextflow.util.ArrayBag@5aab
bb29]), DataflowVariable(value=[biopsy_10000reads_1rep, nextflow.util.ArrayBag@72c927f1]), DataflowVariable(value=[biopsy_10000reads_2rep, n
extflow.util.ArrayBag@1ac85b0c]), DataflowVariable(value=[biopsy_10000reads_3rep, nextflow.util.ArrayBag@3dd69f5a]), DataflowVariable(value=
[faeces_10000reads_1rep, nextflow.util.ArrayBag@3aa3193a]), DataflowVariable(value=[faeces_10000reads_2rep, nextflow.util.ArrayBag@1ee4730])
, DataflowVariable(value=[faeces_10000reads_3rep, nextflow.util.ArrayBag@59a67c3a]), DataflowVariable(value=[vag_10000reads_1rep, nextflow.u
til.ArrayBag@5003041b]), DataflowVariable(value=[vag_10000reads_2rep, nextflow.util.ArrayBag@724bade8]), DataflowVariable(value=[vag_10000re
ads_3rep, nextflow.util.ArrayBag@16fb356]), DataflowVariable(value=groovyx.gpars.dataflow.operator.PoisonPill@6bc248ed)])]                  
Possible solutions: min(), min(java.util.Comparator), min(groovy.lang.Closure), wait(), poll(), count()
Evan Floden
@evanfloden
Dec 28 2017 13:15
and the code?
Fredrik Boulund
@boulund
Dec 28 2017 13:16
oh, of course!
orf_nucleotides                     
    .join(input_reads_map)
    .set {input_orfs_reads}
The rest of the code is the same as previously.
Paolo Di Tommaso
@pditommaso
Dec 28 2017 13:17
what version of NF are u using ?
Fredrik Boulund
@boulund
Dec 28 2017 13:17
I'm just going to go with the combine solution as that worked just fine. I'm just a bit curious why it wouldn't work
It's probably quite old: Version: 0.25.7 build 4531
Paolo Di Tommaso
@pditommaso
Dec 28 2017 13:18
that's the problem, join was introduced with version 0.26.x
Fredrik Boulund
@boulund
Dec 28 2017 13:18
Ah! :) Nice!
Evan Floden
@evanfloden
Dec 28 2017 13:18
Haha!
Glad we didn’t spend too much time trying to debug that one!
Fredrik Boulund
@boulund
Dec 28 2017 13:19
Yep! So I guess it's time to update nextflow on my dev computer ;)
It would be nice to add such information to the docs @pditommaso! Let me know if I can help with that somehow
Paolo Di Tommaso
@pditommaso
Dec 28 2017 13:21
true, we need a massive reorganisation of the docs
Fredrik Boulund
@boulund
Dec 28 2017 13:22
Sounds like you have big plans for the docs already?
Paolo Di Tommaso
@pditommaso
Dec 28 2017 13:23
plans a lot, time little :)
Fredrik Boulund
@boulund
Dec 28 2017 13:26
:D
Anyway, thanks @skptic and @pditommaso for your help today! I'm sure I'll be back for more eventually
(some other day)
Paolo Di Tommaso
@pditommaso
Dec 28 2017 13:27
you are welcome