These are chat archives for nextflow-io/nextflow

12th
Jul 2017
Robert Syme
@robsyme
Jul 12 2017 01:41

Hi all. I've got a channel

output:
set condition, file('fwd.fastq.gz'), file('rev.fastq.gz') into reads

I'd like to use the splitFastq operator to divide up both the fwd and rev read files.
One option is two channels:

output:
set condition, 'fwd.fastq.gz' into readsFwd
set condition, 'rev.fastq.gz' into readsRev

and then

readsFwd.splitFastq( by: 5000000 )
.mix( readsRev.splitFastq( by: 5000000 ) )
.phase()
.set( splitReads )

Is this a sensible approach? I need to ensure that the order of the fastq chunks is maintained so that the left and right pairs don't become mixed up.

Robert Syme
@robsyme
Jul 12 2017 01:52
Sorry, that last block should be:
readsFwd.splitFastq( by: 5000000, file: true )
.phase( readsRev.splitFastq( by: 5000000, file: true ) )
.set( splitReads )
Paolo Di Tommaso
@pditommaso
Jul 12 2017 08:25
channels are orders, thus you don't need phase (which should neither work because there's no a unique for each chunk)
you can use merge
readsFwd.splitFastq( by: 5000000, file: true ).set { chunksFwd }
readsRev.splitFastq( by: 5000000, file: true ).set { chunksRev }
chunksFwd.merge(chunksRev) { fwd, rev -> tuple(fwd[0], fwd[1], rev[1]) }
provided the first entry is the condition and the second the file chunk
Shellfishgene
@Shellfishgene
Jul 12 2017 13:12
In the RNA toy example, sometimes you use the variables with {} and sometimes without in the command. For example ${task.cpus}. What's the difference?
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:36
the different is that ${ .. } allows you to evaluate an expression instead of a single value
thus ${x} or $x are perfectly identical
also you may need curly brackets when you are interpolating a variable in a string where there's a character that can interfere in the variable evaluation eg "mv transcripts.gtf transcript_${pair_id}.gtf"
in this case if you wrote $pair_id.gtf it would try to evaluate the variable gtf in the object pair_id
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:39
Hi all, everybody in mu lab has switched to Snakemake. As I'm a rebel I'm going to try to switch from Make to nextflow :-) My aim is to run a basic read mapping/calling. First step is to download a reference with wget and taht's already difficult to me :-)
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:40
ahahah
I love this !
Evan Floden
@evanfloden
Jul 12 2017 13:41
Difficult in NF or Snakemake? :laughing:
Shellfishgene
@Shellfishgene
Jul 12 2017 13:42
@pditommaso Ok, thanks
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:42
Trying to download chr22 and chrM from UCSC, gunzip and concatenate
params.in = "ref.fa"

process downloadRef {
        """
        wget -O - "http://hgdownload.cse.ucsc.edu/goldenPath/hg19/chromosomes/chr22.fa.gz" | gunzip -c  > ${params.in}
        wget -O - "http://hgdownload.cse.ucsc.edu/goldenPath/hg19/chromosomes/chrM.fa.gz" | gunzip -c  >>  ${params.in}
        """
    }
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:43
first problem the gitter formatting :D
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:43
(how do i format this stuff ?) :-)
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:43
triple ` then newline :)
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:44
OK anyway: this is wrong because it's creating a ref.fa under work each time I invoke it.
and I suppose I must specify an output file in the process downloadRef.
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:45
it depends if you want to have them named always ref.fa or not
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:46
yes
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:46
this is wrong because it's creating a ref.fa under work each time I invoke it.
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:47
@pditommaso suppose I always want a ref.fa under a folder REF/ref.fa
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:47
why is wrong ?
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:47
@pditommaso with make it would be mkdir -p $(dir $@) && wget -O - ....
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:48
you don't have to care about the target directory just work in the current dir eg
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:48
I imagine it's wrong if my process fails later, it will download again the ref from scratch (?)
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:48
process downloadRef {
       output:
       file 'ref.fa' into ref_ch
        """
        wget -O - "http://hgdownload.cse.ucsc.edu/goldenPath/hg19/chromosomes/chr22.fa.gz" | gunzip -c  > ref.fa
        wget -O - "http://hgdownload.cse.ucsc.edu/goldenPath/hg19/chromosomes/chrM.fa.gz" | gunzip -c  >>  ref.fa
        """
    }
I imagine it's wrong if my process fails later, it will download again the ref from scratch (?)
Shellfishgene
@Shellfishgene
Jul 12 2017 13:48
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:49
yes, if failed the download, no if fails a downstream step
(provided you have re-executed it with -resume option)
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:50
very useful, I'll try this and index with bwa and samtools-faidx
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:50
does it make sense ?
have you gave a look to this simple pipeline ?
you may also give a try to this quick tutorial
Pierre Lindenbaum
@lindenb
Jul 12 2017 13:52
thanks for the pointers
Paolo Di Tommaso
@pditommaso
Jul 12 2017 13:54
you are welcome, I really want to support your rebellion because it's exactly the spirit of the project ;)
Pierre Lindenbaum
@lindenb
Jul 12 2017 14:05
when using '-resume' and there is more than one 'ref.fa', technically how does nextflow knows what is the latest/best ref.fa (metadata in the output files ?)
asking because there was two version of ref.fa in my work. I've indexed with bwa and then with faidx : two distinct version were processed:
work/62/7f211366105c4d31a6e37275ddbf8b
work/62/7f211366105c4d31a6e37275ddbf8b/.command.log
work/62/7f211366105c4d31a6e37275ddbf8b/.command.err
work/62/7f211366105c4d31a6e37275ddbf8b/ref.fa.bwt
work/62/7f211366105c4d31a6e37275ddbf8b/.command.begin
work/62/7f211366105c4d31a6e37275ddbf8b/.exitcode
work/62/7f211366105c4d31a6e37275ddbf8b/.command.out
work/62/7f211366105c4d31a6e37275ddbf8b/.command.sh
work/62/7f211366105c4d31a6e37275ddbf8b/ref.fa.sa
work/62/7f211366105c4d31a6e37275ddbf8b/ref.fa.ann
work/62/7f211366105c4d31a6e37275ddbf8b/ref.fa.amb
work/62/7f211366105c4d31a6e37275ddbf8b/ref.fa
work/62/7f211366105c4d31a6e37275ddbf8b/ref.fa.pac
work/62/7f211366105c4d31a6e37275ddbf8b/.command.run
work/06
work/06/77196cab1590131d19aecdff428930
work/06/77196cab1590131d19aecdff428930/.command.log
work/06/77196cab1590131d19aecdff428930/.command.err
work/06/77196cab1590131d19aecdff428930/.command.begin
work/06/77196cab1590131d19aecdff428930/.exitcode
work/06/77196cab1590131d19aecdff428930/.command.out
work/06/77196cab1590131d19aecdff428930/.command.sh
work/06/77196cab1590131d19aecdff428930/ref.fa.fai
work/06/77196cab1590131d19aecdff428930/ref.fa
work/06/77196cab1590131d19aecdff428930/.command.run
Evan Floden
@evanfloden
Jul 12 2017 14:12
Here you have two work dirs and there is one ref.fa per work directory. Between processes, files are symlinked.
Paolo Di Tommaso
@pditommaso
Jul 12 2017 14:15
files are tracked by generating a unique hash key for each task given the inputs and the command script
Pierre Lindenbaum
@lindenb
Jul 12 2017 14:15
@skptic ok , I see
lrwxrwxrwx 1 lindenb lindenb 72 juil. 12 16:04 work/06/77196cab1590131d19aecdff428930/ref.fa -> /home/lindenb/tmp/NEXTFLOW/work/db/1d4452d239ad543cdd88e871d90623/ref.fa
lrwxrwxrwx 1 lindenb lindenb 72 juil. 12 15:58 work/62/7f211366105c4d31a6e37275ddbf8b/ref.fa -> /home/lindenb/tmp/NEXTFLOW/work/db/1d4452d239ad543cdd88e871d90623/ref.fa
Paolo Di Tommaso
@pditommaso
Jul 12 2017 14:16
hence for a pair (inputs, command) only a set of outputs can exists
for this NF processes are supposed to be idempotent, they only modify the task working directory
Pierre Lindenbaum
@lindenb
Jul 12 2017 14:21
:+1:

We're working with a bunch of JSON-based files to describe our sample+fastqs like:

"samples":[
    {"name":"SAMPLE1","Center":"Paris","fastqs":[["path/to//13D1082_1.fastq.gz","path/to//13D1082_2.fastq.gz"]]},
    {"name":"SAMPLE2","Center":"Lyon","fastqs":[["path/to//15D0590_1.fastq.gz","path/to//15D0590_2.fastq.gz"],["path/to//15D0591_1.fastq.gz","path/to//15D0591_2.fastq.gz"]]},
    {"name":"NAME3","Center":"Paris","fastqs":[["path/to//15D0685_1.fastq.gz","path/to//15D0685_2.fastq.gz"]]}
    ]

do I need to convert them, or is there a smooth way to import+use them ?

Paolo Di Tommaso
@pditommaso
Jul 12 2017 14:24
if you are like it should be possible to map them as input parameters
launch your script with nextflow run <your-script> -params-file <your.json>
then your should be able to access those values as params.samples
for custom handling you can use Groovy JsonSlurper
(don't forget that in a NF script you can use any Groovy/Java code)
Pierre Lindenbaum
@lindenb
Jul 12 2017 14:47

' not sure if it's the most elegant syntax but the following seems to work (just touching the file with the sample's name) :

#
#  ./nextflow  run -resume -params-file  src/test.json src/test.nf 
#
process testJson {
    input:
        each  sample from params.samples
    output:
        file "${sample.get("name")}.txt" into sample_txt

    """
        touch '${sample.get("name")}.txt'
    """

    }

got

$ find . -name "*.txt"
./work/9c/3ef23010832ab248d6b1099b810c76/NAME3.txt
./work/bd/86cb05b6d754b50288b528330e13cc/SAMPLE2.txt
./work/f6/a0243dc8ddeb0b281f1933382d40dc/SAMPLE1.txt
enough for today, thanks again :clap:
Paolo Di Tommaso
@pditommaso
Jul 12 2017 14:57
first note, it can simplify as shown below
process testJson {
    input:
        each  sample from params.samples
    output:
        file "${sample.name}.txt" into sample_txt

    """
        touch '${sample.name}.txt'
    """

    }
ie. you can access any map value by simply using the key name
Paolo Di Tommaso
@pditommaso
Jul 12 2017 15:10
second note, I think what are trying to do is the following:
params.samples
    .channel()
    .flatMap { entry -> entry.fastqs.collect { files -> tuple(entry.name, files.collect { file(it)} ) } }
    .set { samples_ch }

 process testJson {
    input:
        set name, file(read_pair) from samples_ch 
    output:
        file "${name}.txt" into sample_txt

    """
        touch '${name}.txt'
    """
    }
now you are in the NF world :smile:
Pierre Lindenbaum
@lindenb
Jul 12 2017 16:21
:+1: but the downside of this is it gives the feeling that I have to learn, not one, but two new languages :-/
you said java statements will work, does java8 stream can be used in place of 'channel()' or is NF-specific
furthermore, I'll need the parents of a node (e.g: for a pair of fastq , I'll need the ../center). But I'm away from my test files now, I'll explore this tomorrow.
Paolo Di Tommaso
@pditommaso
Jul 12 2017 17:32
well, there's no such magic language that fits perfectly any use case
NF implements a DSL that simplifies writing many common bioinformatic use cases
Félix C. Morency
@fmorency
Jul 12 2017 17:37
not just bioinformatic :P
Paolo Di Tommaso
@pditommaso
Jul 12 2017 17:38
the advantage compared to a pure declarative workflow language is that it allows to use as well a general purpose programming lang to handle corner cases that always exist is real work applications
for example here there's a variant calling pipeline almost all code is just a composition of processes except this 10 lines
in your case you need that code to adapt your json structure to the channel model of Nextflow, I dare you to handle in a simpler way with Make or any other workflow framework :)
Paolo Di Tommaso
@pditommaso
Jul 12 2017 17:46
regarding Java streams they reminds NF channels but they are not the same thing. They main difference is that NF channels are inherently asynchronous and are designed to work in the dataflow paradigm used by NF
@fmorency exactly !
Mike Smoot
@mes5k
Jul 12 2017 18:26
Hi @pditommaso I'm having difficulty imagining a solution to a problem and I'm wondering if you can see an elegant way around this in nextflow. I start with a large multi fasta file and split it into 10 (for example) smaller multi fasta files. The intent is to process each of these 10 multi fasta files (which I'm calling "groups") independently. The next step is to further split each group into single fasta files, so I end up with a channel of tuples: [groupId, file(fasta)] which I then run various tools on resulting in another tuple: [groupId, file(result)]. The problem is that for each group I need to aggregate all result files and then process the list of results for that group all at once. Right now I do this aggregation with groupTuple which works, but forces me to wait for ALL 10 groups to complete before proceeding. In an ideal world, once a group finishes, I'd like that group to be able to proceed with the rest of the pipeline and not be forced to wait for all 10 groups to complete. So, groupTuple works, but blocks until all intermediate processing is done. One possible alternative is that I could simply do the initial split creating the 10 groups outside of nextflow and run each group as a standalone pipeline, but I feel like nextflow should be able to handle this - I just can't figure out how.
Félix C. Morency
@fmorency
Jul 12 2017 18:29
@mes5k does it work if you specify a size: to groupTuple?
Paolo Di Tommaso
@pditommaso
Jul 12 2017 18:29
I was suggesting that
Mike Smoot
@mes5k
Jul 12 2017 18:33
size: might help, although I'm not sure I can guarantee that all groups will have the same size.
Félix C. Morency
@fmorency
Jul 12 2017 18:33
how do you know when to emit then?
Mike Smoot
@mes5k
Jul 12 2017 18:34
Exactly, which is why I've been stuck on this problem!
Félix C. Morency
@fmorency
Jul 12 2017 18:35
Maybe you could specify a size: and use remainder: true
Mike Smoot
@mes5k
Jul 12 2017 18:36
Yup. I'll experiment with that and see what I can come up with. Thanks for pointing me in the right direction!
Félix C. Morency
@fmorency
Jul 12 2017 18:36
It should emit groups of equal size except for the last one that can be smaller
Np
Mike Smoot
@mes5k
Jul 12 2017 19:59
I have a question about how nextflow handles jobs that are allocated to different queues when using slurm. If I have 5000 jobs ready for queue1 and 100 jobs ready for queue2, does nextflow make any effort to ensure that jobs get submitted to both queues? Basically I want to ensure that both queue1 and queue2 are full (i.e. worker nodes busy) even if there are a bunch more jobs available for queue1.
Félix C. Morency
@fmorency
Jul 12 2017 20:05
Well if you specify the queue using queue, it should submit to the right SLURM partition. The rest of the scheduling is done by SLURM
In your example, queue1 and queue2 are both SLURM partitions?
Mike Smoot
@mes5k
Jul 12 2017 20:09
Right, queue1 and queue2 are different slurm partitions. Nextflow submits jobs to the proper partition, but since I have 5000 jobs ready for queue1 nothing is getting submitted to queue2, which means the machines in that partition are sitting idle, even though I know there is work to be done there.
Félix C. Morency
@fmorency
Jul 12 2017 20:10
Nothing is being submitted to queue2 because of what? NF limits on job submission?
Mike Smoot
@mes5k
Jul 12 2017 20:11
Yup, I'm thinking that I could expand executor.queueSize, but even then I'm not sure if there's any guarantee that jobs would get distributed to both partitions, it would just make it more likely.
It's a global parameter on the total jobs being submitted by NF
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:13
if there's a data available to trigger a queue2 task, NF will submit it
Félix C. Morency
@fmorency
Jul 12 2017 20:14
But if there's already queueSize jobs submitted to queue1 it won't be submitted until one job finishes
Mike Smoot
@mes5k
Jul 12 2017 20:14
Is that limited by queueSize?
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:14
yes, by default is 100
Félix C. Morency
@fmorency
Jul 12 2017 20:16
Would it be possible/feasible to have a per-queue limit instead of a global one? (thinking in real-time)
Mike Smoot
@mes5k
Jul 12 2017 20:18
Or make queueSize apply to each individual queue, not the combination.
Félix C. Morency
@fmorency
Jul 12 2017 20:18
You could keep the default queueSize to 100 but if a user configures it's pipeline to get queue1Size: 50 and queue2Size: 50 it would change how NF submit things to the executor
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:18
won't help so much, you can use just a big number as queueSize, then the SLURM queue will manage properly
the default is 100 to avoid newbie users submit too many jobs in a shared cluster
Félix C. Morency
@fmorency
Jul 12 2017 20:20
It depends of said big number and the number of SLURM partition you have. Another problem with said big number is for multiuser cluster usage. I like "smaller" numbers because user1 and user2 queues overlap sooner
Mike Smoot
@mes5k
Jul 12 2017 20:20
Sure, I'll increase queueSize for now. I'm just afraid that even setting queueSize to 10000 could be problematic given some of the large datasets on my horizon.
@pditommaso is something like what @fmorency suggests a reasonable feature request?
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:22
well, in principle it's the batch scheduler to allocate properly the job requests
Mike Smoot
@mes5k
Jul 12 2017 20:22
The problem is that the batch schedule isn't getting the requests.
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:23
what's your queueSize at the moment ?
Mike Smoot
@mes5k
Jul 12 2017 20:24
the default
I can completely understand that this would be a headache to implement.
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:25
between the many :)
Félix C. Morency
@fmorency
Jul 12 2017 20:26
Would it be hard to make queueSize apply to each queue and not the combination like @mes5k suggested?
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:27
the main problem is that it's a concept only available on certain executors
Félix C. Morency
@fmorency
Jul 12 2017 20:27
Right.
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:28
actually NF has a jobs queue for each executor, and allows to use many executors in the same execution
Mike Smoot
@mes5k
Jul 12 2017 20:28
how do you run with multiple executors?
Félix C. Morency
@fmorency
Jul 12 2017 20:29
You can set those per process iirc
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:29
you can define executor at process level as any other directive
Mike Smoot
@mes5k
Jul 12 2017 20:30
interesting. Does anyone do that? I'm trying to think of how this would be useful.
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:30
anyhow at this moment, this won't solve the queueSize problem
in some cases you may want to run some jobs locally and other in the cluster
Mike Smoot
@mes5k
Jul 12 2017 20:32
That makes sense
Mike Smoot
@mes5k
Jul 12 2017 20:44
Could we create a new (or extend) TaskPollingMonitor that is just used by AbstractGridExecutor?
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:46
the relevant code is this
it creates a TaskPollingMonitor instance for each executor type
Mike Smoot
@mes5k
Jul 12 2017 20:50
Right. For the executors that support queues we could override Executor.createTaskMonitor() returning a "queue aware" TaskPollingMonitor
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:51
anyhow I would like to avoid to add unnecessary complexity at least until it's not clear what's the actual problem
I would suggest to verify if increasing the queueSizesolve your problem
Mike Smoot
@mes5k
Jul 12 2017 20:54
Sure, I'll wait before submitting a feature request until I experiment with increasing queueSize.
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:55
:+1:
Mike Smoot
@mes5k
Jul 12 2017 20:55
The good news is that my cluster does shut itself down quite efficiently when there's no work available! :)
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:56
nice, I want to read a post about that!
Mike Smoot
@mes5k
Jul 12 2017 20:57
Any thoughts on an upper limit for queueSize? Don't go above 5000? 10000?
Paolo Di Tommaso
@pditommaso
Jul 12 2017 20:58
on NF side is fine, with the latest change it should be able to handle billions of tasks :)
Mike Smoot
@mes5k
Jul 12 2017 21:00
Ok, then I won't be afraid to go big!
modulo the memory on my head node