These are chat archives for nextflow-io/nextflow

15th
Aug 2017
Simone Baffelli
@baffelli
Aug 15 2017 06:29
@pditommaso it works!!
Brian Reichholf
@breichholf
Aug 15 2017 13:06
@idot you could just pass the NF variable baseDir to the R script, access it from within R through commandArgs, and then feed that in to source.
Brian Reichholf
@breichholf
Aug 15 2017 14:02
Our cluster has a fair-use policy that rewards and prioritises requests from users who request a certain time, and then stay (reasonably close) within it. I was thinking I could dynamically allocate a time-slot for a given task, based on the number of reads in the FASTA file. My thoughts were I could feed the files through a process, but given my experience with getting the wrong channel-'type' (for lack of a better phrasing) I thought I'd look for help before, rather than after.
Here's what I was thinking:
process countReads {
  input:
  file read from reads

  output:
  stdout readcounts

  script:
  """
  lines=$(gzip -c $read | wc -l)
  echo $lines $read
  """
}
Can I then do: readsWithCounts = readcounts.subscribe { it -> [it.tokenize(' ')[0], it.tokenize(' ')[1]] } to get a new channel readsWithCounts, that can be accessed like so?
process map {
time = { 1h * counts }

input:
set counts, file(read) from readsWithCounts

script:
"""
blah
"""
Simone Baffelli
@baffelli
Aug 15 2017 14:12
that sounds sensibile, did you try it in the console?
Evan Floden
@evanfloden
Aug 15 2017 14:12

I think readsWithCounts would contain read as a string. You can check quickly using the console

I would have the intial reads channel contain a sampleID variable that you can pass into readCounts and merge with readsif needed.

Simone Baffelli
@baffelli
Aug 15 2017 14:13
Has anyboydy had had any problem with when and -resume?
Somehow a task that should be cached gets rerun all of the time
Evan Floden
@evanfloden
Aug 15 2017 14:14
No but I am using when and resume a lot these days :computer:
Simone Baffelli
@baffelli
Aug 15 2017 14:17
I don't know whether is the combination of when and each
or that the order of files in collate is not the same
Evan Floden
@evanfloden
Aug 15 2017 14:18
Can you post the code?
Simone Baffelli
@baffelli
Aug 15 2017 14:18
But in theory nf channels should be deterministic right?
in that the order is always the same
Brian Reichholf
@breichholf
Aug 15 2017 14:18
@baffelli haven't tried it yet, how would I tell if the channel is the right type?
Simone Baffelli
@baffelli
Aug 15 2017 14:19
@breichholf what do you mean with "right type"?
@skptic here's the monster
/*
* Stack interferograms testing different amounts of stacking
*/
process stack{

    errorStrategy 'ignore'

    validExitStatus 0, 255, 127//Ignore warning of reference outside

        publishDir "${params.results}/stacked/${stack_name}/${n_stack}/${used_method}"

        input:
            set file(unw_ls:"*.diff"), file(off_ls:"*.off"), val(master_id), val(slave_id), val(bl), val(method) from to_stack
            each ref_pt from ref_pix_stack//repeat it continously becuase ref_pix_stack is only sent once
            each ref_mli from ref_mli_stack
            each n_stack from (logSpace(params.n_stack_min,params.n_stack_max, params.n_stack_samples) as Integer[])

        when:
            (unw_ls as List).size() > n_stack

        output:
            set file(off_par), file(rate_m), file(sig_rate_m), file(sig_ph), 
            val(stack_id), val(used_method), val(n_stack), val(av_time) into stacked
             set file('rate.bmp'),  file('rate_std.bmp'), file('ph_std.bmp') into rate_ras
        shell:
            println("Input list length ${(unw_ls as List).size()}, number to stack ${n_stack}")
            // if(!cond){println("Failed")}
            //we send one off par toghether with the stack for geocoding
            off_par = off_ls[0]
            used_method = method[0]
            bl_subset = (bl as List) [0..n_stack]
            unw_subset = (unw_ls as List) [0..n_stack]
            master_id_subset =  (master_id as List)[0..n_stack]
            slave_id_subset =  (slave_id as List)[0..n_stack]
            bl_day  = bl_subset.collect{ item-> secondsToDay(item as long) }
            //construct the columns of the diff_tab file
            stack_text = createStackingTable(unw_subset as List, bl_day)
             // Save the name of the stacked id
            // Give it an unique id based on first master and last slave
            stack_id = [master_id_subset[0], slave_id_subset[-1]]
            //Total time covered by stack
            av_time = computeBaseline(master_id_subset[0], master_id_subset[-1])
            //The stack name corresponds to the first master id
            stack_name = "${master_id_subset[0]}"
            // //Get the radar frequency
            mli_par = ref_mli[1]
            //Get the frequency from the parameters
            freq = (mli_par.text =~ /radar_frequency:(.+)/)[0][1].tokenize()[0] as float
            freq = params.radar_frequency
            //phase to distance conversion
            phase_to_meter = (3e8/freq) / (4 * Math.PI)
            '''
            echo '!{stack_text}' > dt
            width=$(get_value !{off_ls[0]} interferogram_width)
            stacking dt ${width} rate sig_rate sig_ph !{ref_pt[0]} !{ref_pt[1]} - - - 0
            wd=$(get_value !{off_par} interferogram_width)
            float_math rate - rate_m ${wd} 2 - - - - !{phase_to_meter}
            float_math sig_rate - sig_rate_m ${wd} 2 - - - - !{phase_to_meter}
            float_math sig_ph - sig_ph_m ${wd} 2 - - - - !{phase_to_meter}
            visdt_pwr.py rate_m !{ref_mli[0]} ${wd} -2 2 -u rate.bmp -m spectral_r
            visdt_pwr.py sig_rate_m !{ref_mli[0]} ${wd} 0 2 -u rate_std.bmp -m cool
            visdt_pwr.py sig_ph_m !{ref_mli[0]} ${wd} 0 2 -u ph_std.bmp -m cool
            '''
}
Brian Reichholf
@breichholf
Aug 15 2017 14:25
I had some issues at some point, where I didn't construct a Channel, but just assigned a file(x) to a variable. That then got 'consumed' in a process, and was not accessible for a later process.
Simone Baffelli
@baffelli
Aug 15 2017 14:27
Are you refering to the caching issue?
Evan Floden
@evanfloden
Aug 15 2017 14:39
@baffelli Wow, I see. I would compare two work directories of identical tasks for any differences. I also know the order of evaluating when was changed recently so it might be that if it is a recent thing (last week or so).
Simone Baffelli
@baffelli
Aug 15 2017 14:39
Yes, I have quite some checks to ensure that everything works properly
I am stretching nf to the max
Evan Floden
@evanfloden
Aug 15 2017 14:41

Also, if

// Give it an unique id based on first master and last slave
            stack_id = [master_id_subset[0], slave_id_subset[-1]]

is different everytime I think it will not cache.

Simone Baffelli
@baffelli
Aug 15 2017 14:43
But why should it be?
It is the same for each n_stack and for the same set of upstream inputs
Evan Floden
@evanfloden
Aug 15 2017 14:46
If it is the same on every resume then I am not sure. Do a diff on the files from the two work directories, ie the .command.sh to check.
Simone Baffelli
@baffelli
Aug 15 2017 14:53
right! I will tag it using the parameters
to easily find it
and run it twice
Brian Reichholf
@breichholf
Aug 15 2017 15:09
So, in a minimalistic example I have:
Channel
  .fromPath( params.reads )
  .ifEmpty { error "Cannot find any reads matching: ${params.reads}" }
  .set { rawReads }

process countReads {
    input:
    file reads from rawReads

    output:
    stdout readcounts

    script:
    """
    lines=`gzip -c $reads | wc -l`
    echo 100 $reads
    """
}

readsWithCounts = readcounts.subscribe { it -> [it.tokenize(' ')[0], it.tokenize(' ')[1]] }

process map {
    publishDir path: './results', mode: "copy", pattern: '*.txt'

    input:
    set counts, file(read) from readsWithCounts

    output:
    file "counts.txt" into outputF

    script:
    """
    echo $counts >> counts.txt
    echo `gzip -c $read` $read >> counts.txt
    """
}
I then start it with: nextflow run test.nf --reads '*.fq.gz' and get:
[warm up] executor > local
ERROR ~ Channel `readcounts` has been used twice as an input by process `map` and another operator

 -- Check script 'foo.nf' at line: 22 or see '.nextflow.log' file for more details
line 22 is: process map {
Simone Baffelli
@baffelli
Aug 15 2017 15:11
thats interesting
Brian Reichholf
@breichholf
Aug 15 2017 15:13
FWIW, *.fq.gz expands to two files.
But it doesn't work if I do --reads 'File1.fq.gz' either
Evan Floden
@evanfloden
Aug 15 2017 15:26

readsWithCounts is consummed. Try:

readcounts.subscribe { it -> [it.tokenize(‘ ‘)[0], it.tokenize(‘ ‘)[1]] }
    .set {newReadCounts}

and use the new channel as you input in map

Paolo Di Tommaso
@pditommaso
Aug 15 2017 15:36
you cannot do this
readsWithCounts = readcounts.subscribe { it -> [it.tokenize(' ')[0], it.tokenize(' ')[1]] }
neither
readcounts.subscribe { it -> [it.tokenize(‘ ‘)[0], it.tokenize(‘ ‘)[1]] }
    .set {newReadCounts}
subscribe consumes the channel and has not output, therefore cannot be assigned to a new channel, use map instead