These are chat archives for nextflow-io/nextflow

12th
Oct 2018
Paolo Di Tommaso
@pditommaso
Oct 12 2018 07:23
@oschwengers I may have found the problem affecting your execution, I've uploaded a snapshot patching the issue 0.33.0-SNAPSHOT
Oliver Schwengers
@oschwengers
Oct 12 2018 07:46
wow, that was fast! Thanks a lot, I will check it over the weekend.
where can I get the patch? Can't find a recent commit on github/master
Paolo Di Tommaso
@pditommaso
Oct 12 2018 07:49
NXF_VER=0.33.0-SNAPSHOT nextflow run .. etc
Oliver Schwengers
@oschwengers
Oct 12 2018 07:55
I'm a bit puzzled.... does NF pull the test branch? As far as I can see, your patch is on the test branch, not on master yet. So setting the env var as stated above is enough?
Paolo Di Tommaso
@pditommaso
Oct 12 2018 07:57
the github branch is unrelated from the compiled distribution
when specifying the var NXF_VER=0.33.0-SNAPSHOT it downloads the update version the very first time
make sure it prints
NXF_VER=0.33.0-SNAPSHOT  nextflow info
  Version: 0.33.0-SNAPSHOT build 4898
  Modified: 12-10-2018 07:10 UTC (09:10 CEST)
Oliver Schwengers
@oschwengers
Oct 12 2018 07:59
ok, thanks
PhilPalmer
@PhilPalmer
Oct 12 2018 10:09

Hi I am working on this deepvariant pipeline

I have made these processes:

if( (false).equals(params.fastagz)){
    process preprocessFASTGZ {
        publishDir "$baseDir/sampleDerivatives"

        input:
        file fasta from fasta

        output:
        file("${fasta}.gz") into fasta_gz

        script:
        """
        bgzip -c ${fasta} > ${fasta}.gz
        """
    }
}

if( (false).equals(params.gzfai)){
    process preprocessGZFAI {
      publishDir "$baseDir/sampleDerivatives"

      input:
      file("${fasta}.gz") from fasta_gz

      output:
      file("${fasta}.gz.fai")

      script:
      """
      samtools faidx "${fasta}.gz"
      """
    }
}

However on execution I get this error and am not sure why?

File `/Users/phil/Documents/GitHub/TMPDIR/deepvariant/testdata/chr20.fa.gz.fai` is out of the scope of process working dir: /Users/phil/Documents/GitHub/TMPDIR/deepvariant/work/2c/f9361c76f13bdaf195a2bc5bbf4aab

I'm sure it's basic but any ideas why?

micans
@micans
Oct 12 2018 10:11
I've never seen input specified like that .. did not know it was possible
Is it possible?
PhilPalmer
@PhilPalmer
Oct 12 2018 10:14
Ah , well that is probably it, I'll try now
micans
@micans
Oct 12 2018 10:18
If I need to track a basename, identifier, or sample name, I usually transfer set val(samplename), file(somefile), so I do not have to re-parse the primary identifier in every process.
Maxime Garcia
@MaxUlysse
Oct 12 2018 10:18
@PhilPalmer Instead of (false).equals(params.fastagz) is !params.fastagz working?
I notice that both process have the same publishdir
Are you using copy or move for publishing the files in the publish dir ?
PhilPalmer
@PhilPalmer
Oct 12 2018 10:27
thanks, @micans I think that was the problem
Also thanks @MaxUlysse just tested !params.fastagz and it does work. I didn't realise that before, so I'll use that instead
PhilPalmer
@PhilPalmer
Oct 12 2018 10:32
I have not specified for the publish dir so presumably whatever the default is
Maxime Garcia
@MaxUlysse
Oct 12 2018 10:33
default is simlink I think
I believe we're using hardlink in Sarek
Oliver Schwengers
@oschwengers
Oct 12 2018 12:27
@pditommaso : so far your patch seems to work. NF runs like a charm for 1.5h, now ... I keep watching
Paolo Di Tommaso
@pditommaso
Oct 12 2018 12:29
This is a great patch if I got the problem correctly
micans
@micans
Oct 12 2018 12:35
Just curious, was there an epiphany @pditommaso ? I tried to check commits, but could not see anything 0.33.0. How did I miss it?
Paolo Di Tommaso
@pditommaso
Oct 12 2018 12:36
Testing branch
micans
@micans
Oct 12 2018 12:37
thx
Paolo Di Tommaso
@pditommaso
Oct 12 2018 12:37
Scalability issue under high number of jobs pressure
micans
@micans
Oct 12 2018 12:40
Nice. commits are beyond my expertise, but always interesting to read the code.
Paolo Di Tommaso
@pditommaso
Oct 12 2018 12:43
concurrent programming is where the real fun come .. ! :satisfied:
micans
@micans
Oct 12 2018 12:43
I've only done pthreads in C :smiley_cat:
Paolo Di Tommaso
@pditommaso
Oct 12 2018 12:45
that kind of mess when you have concurrent access, race conditions and nondeterministic dead/live-locks
micans
@micans
Oct 12 2018 12:46
Sounds like fun, no? My case was pretty simple linear algebra, has always worked without problems.
Paolo Di Tommaso
@pditommaso
Oct 12 2018 12:49
yes fun ... generally speaking :) because you need to infer the problem with pure logic, you cannot use breakpoints or other debug utilities to find these kinds of issues
micans
@micans
Oct 12 2018 12:50
Yep. My hunch, with what little I know, is it requires iron discipline, a good understanding of the call graph of the code, and of the concurrency primitives.
And the timing/dependencies of course
Paolo Di Tommaso
@pditommaso
Oct 12 2018 12:50
exactly
Oliver Schwengers
@oschwengers
Oct 12 2018 13:00
@pditommaso : It's still running. One thing I discovered (not a bug): emitted items seem to be randomly distributed to the subsequent process. Before your patch there was a not exact but more fifo like behavior.
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:01
this is a signal that parallelisation is working in a more effective manner
Oliver Schwengers
@oschwengers
Oct 12 2018 13:02
If it's possible, it would be nice if this would be the case after your patch as well as you get a sort of online hint of how far your pipeline has come while it's running
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:03
not sure to understand ?
Oliver Schwengers
@oschwengers
Oct 12 2018 13:05
NF automatically labels the tasks. Before your patch the order to submitted/completed task lables was somewhat in order; not perfect but with a sliding window. Now, the task labels suggest a completely random task order
Example:
Before: 1, 2, 5, 3,6,4, 8,7, 9, 10....
After: 9, 5, 2, 1, 8, 3, 7, ....
I hope that makes it a lil bit clearer
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:08
ah, and you should like an indication of the overall progress of the computation
micans
@micans
Oct 12 2018 13:16
You can do lo-tech things like grep -i "\<star\>" results/trace.txt | grep -ic completed where star is the name of a process and results is your outdir. That's about my level of sophistication ...
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:25
lo-tech but effective, good tip tho!
Oliver Schwengers
@oschwengers
Oct 12 2018 13:33
yeah sure... just curious if this could be somehow implemented without messing up other things or introducing too much complexity.
I haven't inspected your code heavily, but shouldn't the items being emitted into a channel not be processed fifo like? Although, parallelism of course always introduces some sort of randomness, each free executor could grab the very one next item from a list, e.g. linkedlist, fifo. Maybe a lil bit too simplified... Am I missing something here?
micans
@micans
Oct 12 2018 13:35
Processes may finish sooner depending on data load .. it's the beauty of parallelism that it is not fifo
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:35
main issue is that the dataflow model is logically like a stream, you don't know the total number of items (read tasks) expected to be processed
Oliver Schwengers
@oschwengers
Oct 12 2018 13:39
sure, after different types of processed and data load everything sooner or later becomes sort of randomly ordered... But when splitting a fasta file first emitted items should be processed long before the last items, especially on huge files. Therefore, items emitted should at least "somewhat" be in the order of the fasta file. if there are ~20.000 sequences in a file, it's rather odd when middle and last positioned sequences get processed mixed with the first ones...
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:42
this is an extract of your log file
Oct-10 15:21:29.230 %% executor local > tasks in the submission queue: 43658 -- tasks to be submitted are shown below
Oct-10 15:26:29.422 %% executor local > tasks in the submission queue: 67487 -- tasks to be submitted are shown below
Oct-10 15:31:30.130 %% executor local > tasks in the submission queue: 82962 -- tasks to be submitted are shown below
Oct-10 15:36:31.414 %% executor local > tasks in the submission queue: 98727 -- tasks to be submitted are shown below
Oct-10 15:41:33.943 %% executor local > tasks in the submission queue: 112868 -- tasks to be submitted are shown below
what happens is the file splitting is much faster that the task executions
therefore i na few minutes all tasks are submitted for execution and just wait for free resources
then the first that manage for find a free slot is server
Oliver Schwengers
@oschwengers
Oct 12 2018 13:46
Ahhh ok, I see... so tasks are equal to each other and have no order of any kind?
I thought, items get emitted to a channel (like a LinkedList, FiFo) and then a group of executors would fetch items one after another...
Ok, now its obvious
thanks for clarification!
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:47
what you are suggesting would require the implementation of a backpressure mechanism that would be actually very useful
but quite challenging to achieve in the current implementation
Oliver Schwengers
@oschwengers
Oct 12 2018 13:48
But wouldn't it be way less resource intensive if there was such a group of executors picking new items just when they are free? This way not thousands of items would struggle for resources and ever requesting free cpu/mem
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:50
for memory eager tasks yes, like in your case
but it requires a mechanism as the one I've linked
the downstream task has to prevent the execution of the upstream task, currently it's not possible
Oliver Schwengers
@oschwengers
Oct 12 2018 13:53
Given that there is enough memory to hold all items/data in mem, a channel, i.e. a FiFo list could store all items. Then you wouldn't need such a back-pressure algorithm.. although it would be nice in order to keep mem consumption small
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:54
yeah, it would enable much bigger computations
Oliver Schwengers
@oschwengers
Oct 12 2018 13:56
But as I said, not a but rather a kind of mini feature... so thank you very much for helping out and your support!
bug*
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:57
thanks for reporting and let me know if everything ends correctly, so I will merge in the master
Oliver Schwengers
@oschwengers
Oct 12 2018 13:59
So far it's running as expected for more than 3h. Way better than before! I'l let you know about the outcome.
Paolo Di Tommaso
@pditommaso
Oct 12 2018 13:59
:ok_hand:
however this discussion makes me realise that current implementation could be replaced by the new Java flow api that should implement that concept
micans
@micans
Oct 12 2018 14:30
I'm mildly frustrated not understanding what is resource intensive about the current approach and how it will be improved! That back-pressure link was a bit opaque to me. Perhaps I'll ask you over a beer @pditommaso
Paolo Di Tommaso
@pditommaso
Oct 12 2018 14:32
sounds good, in the meanwhile a look at this http://www.reactive-streams.org/
Tobias "Tobi" Schraink
@tobsecret
Oct 12 2018 15:06

@rsuchecki it's from a groupTuple call. I was a little tired yesterday, sorry for only writing an mcve now:

inputstream = Channel.from([
          ['id1', 'file1.bam'],
          ['id1', 'file2.bam'], 
          ['id2', 'file3.bam']])
process someprocess {
    memory {8.GB * bamfiles.size()}
    input:
    set sampleid, file(bamfiles) from inputstream.groupTuple()
    script:
    """
    echo $bamfiles
    """

I haven't tested this snippet yet because I am currently on a windows laptop that doesn't have nextflow installed. but it should give you an error for id2

micans
@micans
Oct 12 2018 15:36
very good link, thanks @pditommaso
Paolo Di Tommaso
@pditommaso
Oct 12 2018 15:37
:+1:
actually I couldn't resist and started to experiment with that stuff ...
Tobias "Tobi" Schraink
@tobsecret
Oct 12 2018 17:54
inputstream = Channel.from([
          ['id1', file('file1.bam')],
          ['id1', file('file2.bam')], 
          ['id2', file('file3.bam')]]).groupTuple()
/*
#.subscribe {println(it)} 
#
#prints:
#[id1, [/home/tobias/file1.bam, /home/tobias/file2.bam]]
#[id2, [/home/tobias/file3.bam]]
*/
process someprocess {
    memory {8.GB * bamfiles.size()}
    input:
    set sampleid, file(bamfiles) from inputstream
    script:
    """
    echo $bamfiles
    """
    println([bamfiles.size(), bamfiles])
}
/*
#someprocess will print:
#[0, file3.bam]
#[2, file1.bam file2.bam]
*/
That's a bit odd - why is the size of a list of length 1 == 0 but that of length 2 == 2?
Tobias "Tobi" Schraink
@tobsecret
Oct 12 2018 22:18
For now I can just use the slightly ugly workaround:
memory {8.GB * Math.max(bamfiles.size(), 1)}