These are chat archives for nextflow-io/nextflow

26th
Jul 2016
Mokok
@Mokok
Jul 26 2016 08:44

Hi,

I wrote an humble script to test parallelism + execution with torque.

I'd like to know if there is a better way to optimise it a bit.
Plus, NextFlow is currently telling me the workflow can't be executed on torque.

Here comme the code ;)

  1 #!/user/bin/env nextflow
  2
  3 textFile = Channel.fromPath('/home/proactiveuser/nextFlow/data/dataTest1')
  4
  5
  6 process getTheFileAndSplitLines {
  7         input:
  8         val line from textFile.splitText()
  9
 10         output:
 11         val line into linesC_channel
 12         val line into linesH_channel
 13
 14         exec:
 15         println "line = $line"
 16 }
 17
 18 process extractContent {
 19         input:
 20         val line from linesC_channel
 21
 22         output:
 23         val content into content_channel
 24
 25         exec:
 26         content = line.split("\\s:\\s")[1].trim()
 27 }
 28
 29 process extractHeader {
 30         input:
 31         val line from linesH_channel
 32
 33         output:
 34         val header into header_channel
 35
 36         exec:
 37         header = line.split("\\s:\\s")[0].trim()
 38 }
 39
 40 process mergeContent {
 41         input:
 42         val content from content_channel
 43         val header from header_channel
 44
 45         output:
 46         val reformatedLine into result_channel
 47
 48         exec:
 49         reformatedLine = "$header -> $content"
 50 }
 51
 52 result_channel.subscribe{println it}

the command line:

./nextflow run ./test/testWorkflow.nf -profile pbs -c ./nextflow.config -with-dag dag.html -with-timeline

and the nextflow.config file:

  1 profiles  {
  2         loc {
  3                 process.executor = 'local'
  4         }
  5         pbs {
  6                 process.executor = 'pbs'
  7                 process.queue = 'batch'
  8         }
  9 }
qnodes is reporting that my nodes are free and ready to go
Paolo Di Tommaso
@pditommaso
Jul 26 2016 08:49
native processes cannot execute on PBS in this way
Mokok
@Mokok
Jul 26 2016 08:49
ho, that's sad
Paolo Di Tommaso
@pditommaso
Jul 26 2016 08:49
if your cluster supports MPI interface you should be able to run it as described here
Mokok
@Mokok
Jul 26 2016 08:50
ok, i gonna check this, thk
Mokok
@Mokok
Jul 26 2016 14:14
is there a way to do conditional task branching ? (if ... then "go to task A" else "go to task B")
or should i run both task A and B and put a condition (with a special channel value for example) about if the inner script should be run or not ?
(i'd avoid the second choice as it run a "ghost" task)
Chris Fields
@cjfields
Jul 26 2016 14:16
@Mokok I have seen a few workflows use 'when', and Phil Ewels's workflows have conditionals in the process : https://github.com/SciLifeLab/NGI-MethylSeq/blob/master/main.nf#L268
e.g. in the link it is a switch from single-end read analysis to PE analysis in the script section of the process
BTW, we use 'pbs' for the executor and it works fine (we have PBS/Torque)
Chris Fields
@cjfields
Jul 26 2016 14:21
@pditommaso I'll have a go at your suggestions today, not a huge blocker. Essentially what I am trying to do is run MultiQC on all the results from a process (e.g. not just one FASTQC run but all of them). I tried the file ('fastqc/*') from fastqc_results.toList() version and had the same issue, but I can try debugging the channel in a separate workflow to see if that helps.
Paolo Di Tommaso
@pditommaso
Jul 26 2016 14:24
Hi, if fastqc_results is created by an upstream process make sure that is declared as a file output
if this still do not solve, please share your script or a snippet that reproduce your problem
Mokok
@Mokok
Jul 26 2016 14:29
@cjfields
thks for the 'when', it's what i was looking for
plus, i haven't realized yet the power of mixing scripting language (such as the 'if' statements on your example) and it opened my mind about the range of possibilities :)
Chris Fields
@cjfields
Jul 26 2016 14:29
@Mokok no problem.
@pditommaso I'll give that a try. The MultiQC step is at the end of a longer workflow so I'll test with a shorter one and report back
Paolo Di Tommaso
@pditommaso
Jul 26 2016 14:32
a template for you want to do is this
my_data_files = Channel.fromPath('/your/path/N10_GAGTGGAT_L00M_R1_001_fastqc.*')

process foo {
  input: 
  file x from my_data_files.toList()

  """
  echo $x
  """
}
or
process foo {
  input:
  each x from 'a','b','c'

  output: 
  file '*.txt' into my_data_files

  """
  touch ${x}.txt
  """
}

process bar {
  input: 
  file x from my_data_files.toList()

  """
  echo $x
  """
}
if the last collects the outputs from an upstream process
@cjfields give a try to the above examples
Chris Fields
@cjfields
Jul 26 2016 14:48
@pditommaso thx. BTW nice to finally meet in person at BOSC, apologies that we couldn't talk longer
Paolo Di Tommaso
@pditommaso
Jul 26 2016 14:50
@cjfields my plesure! anyway in these days I'm implementig something related to your request for an execution report
Feel free to open a thread in the discussion group or in Github to provide the requirements of your use case
Chris Fields
@cjfields
Jul 26 2016 14:58
pditommaso: thanks, will do
heh, switched to limechat for gitter, it apparently doesn't autofill in the @ for pinging
Jason Byars
@jbyars
Jul 26 2016 18:55
What is the proper way to phase more than 2 channels back together?
Paolo Di Tommaso
@pditommaso
Jul 26 2016 18:56
good question ..
chaining together work, though I don't remember well
Jason Byars
@jbyars
Jul 26 2016 18:57
trying ch1.phase(ch2).phase(ch3), at the moment, but it keeps hanging.
Paolo Di Tommaso
@pditommaso
Jul 26 2016 18:58
um, let me see
I've tried this
ch1 = Channel.from( 1,2,3 )
ch2 = Channel.from( 1,0,0,2,7,8,9,3 )
ch3 = Channel.from( 3,2,1,4 )
ch1 .phase(ch2) .phase(ch3) .println()
it returns
[[2, 2], 2]
[[3, 3], 3]
[[1, 1], 1]
that I guess is not exactly what u want but is not hanging
if hangs there's a problem how your channel is created
Jason Byars
@jbyars
Jul 26 2016 19:01
no, but it's a good hint. I'll simplify my script to see what is causing trouble
Paolo Di Tommaso
@pditommaso
Jul 26 2016 19:02
ok
Jason Byars
@jbyars
Jul 26 2016 19:16
ch1 = Channel.from(['sample1', 1], ['sample2', 2], ['sample3', 3])
ch2 = Channel.from(['sample1', 4], ['sample3', 6], ['sample2', 5])
ch3 = Channel.from(['sample1', 'A'], ['sample2', 'B'], ['sample3', 'C'])

ch1.phase(ch2).phase(ch3).println()
This doesn't hang, but doesn't output anything either. I'm trying to group outputs by sample name.
Paolo Di Tommaso
@pditommaso
Jul 26 2016 19:19
um, something like that
ch1 = Channel.from(['sample1', 1], ['sample2', 2], ['sample3', 3])
ch2 = Channel.from(['sample1', 4], ['sample3', 6], ['sample2', 5])
ch3 = Channel.from(['sample1', 'A'], ['sample2', 'B'], ['sample3', 'C'])

ch1
  .phase(ch2)
  .map { pair1, pair2 -> [pair1[0], pair1[1], pair2[0]] }
  .phase(ch3)
  .println()
[[sample1, 1, sample1], [sample1, A]]
[[sample2, 2, sample2], [sample2, B]]
[[sample3, 3, sample3], [sample3, C]]
I know a bit ugly, I think the operator should have a flat parameter
that would expand the nested lists into a flat one
Jason Byars
@jbyars
Jul 26 2016 19:21
ok, I can work something out with that.
phase is the only key based channel grouping operator right?
Paolo Di Tommaso
@pditommaso
Jul 26 2016 19:22
give a try to this

ch1
  .phase(ch2)
  .flatten()
  .phase(ch3)
  .println()
[sample1, [sample1, A]]
[sample3, [sample3, C]]
[sample2, [sample2, B]]
phase is the only key based channel grouping operator right?
no you can specify a mapping rule
look here
Jason Byars
@jbyars
Jul 26 2016 19:38
right, I haven't had much luck with mapping rules, when the items are complex. I'll keep looking through the examples.
Paolo Di Tommaso
@pditommaso
Jul 26 2016 19:39
well, you need to provide a closure that that the entry and must return the matching key you want
Jason Byars
@jbyars
Jul 26 2016 19:55
ch1 = Channel.from(['sample1', 1], ['sample2', 2], ['sample3', 3])
ch2 = Channel.from(['sample1', 4], ['sample3', 6], ['sample2', 5])
ch3 = Channel.from(['sample1', 'A'], ['sample2', 'B'], ['sample3', 'C'])

ch1.phase(ch2)
   .map { stat1, stat2 -> [stat1[0], stat1[1], stat2[1]] }
   .phase(ch3)
   .map { stat1, stat2 -> [stat1[0], stat1[1], stat1[2], stat2[1]] }
   .println()
This produces what I want. It looks like I have to map after every phase, but that's not too bad.
[sample1, 1, 4, A]
[sample2, 2, 5, B]
[sample3, 3, 6, C]
Paolo Di Tommaso
@pditommaso
Jul 26 2016 19:57
ok, good workaround
I will try to add a flat option
Jason Byars
@jbyars
Jul 26 2016 20:07
I'm sure you're right. There is some closure I could specify for the second phase operation and only do a single map operation.
Jason Byars
@jbyars
Jul 26 2016 20:15
ch1 = Channel.from(['sample1', 1], ['sample2', 2], ['sample3', 3])
ch2 = Channel.from(['sample1', 4], ['sample3', 6], ['sample2', 5])
ch3 = Channel.from(['sample1', 'A'], ['sample2', 'B'], ['sample3', 'C'])

ch1.concat(ch2, ch3).groupTuple().println()
I'm making this harder than I need to...
[sample1, [1, 4, A]]
[sample2, [2, 5, B]]
[sample3, [3, 6, C]]
Paolo Di Tommaso
@pditommaso
Jul 26 2016 20:15
:)
makes sense
Jason Byars
@jbyars
Jul 26 2016 20:15
yep, thanks for the clarification