These are chat archives for nextflow-io/nextflow

13th
Jun 2018
Felix Kokocinski
@fkokocinski
Jun 13 2018 09:21 UTC

Good morning,
as my downstream process requires files produced by multiple upstream processes, I'm collecting and concatenating the output data:

1b = file_channel_1a.collect()
2b = file_channel_2a.collect()
file_channel_1_2 = 1b.concat( 2b )

file_channel_1_2 is then used as input for the next process.
This stops working when the upstream process is runnning as multiple parallel jobs, as I get file name collisions, in particular for directories created, e.g.:

work-dir-x/config_file.1
work-dir-x/data/file1.seq

work-dir-y/config_file.1
work-dir-y/data/file2.seq

I think I can filter out some truely duplicated files with

file_channel_1a_mod = file_channel_1a.filter{name, files -> name =~ /^[config_file]/}

But can I use something like the combine operator to join files from the two channels that have a matching directory name like the "data" dir above?

Paolo Di Tommaso
@pditommaso
Jun 13 2018 09:31 UTC
to combine you need a matching key therefore you should, for example, associate the dir name with that files with a map
Felix Kokocinski
@fkokocinski
Jun 13 2018 09:36 UTC
If you don't mind could you show how to apply map on a directory name, please?
Would I do that instead of the concat or just before that to collect all files and directories?
Paolo Di Tommaso
@pditommaso
Jun 13 2018 09:36 UTC
file_channel_1a.map { file -> [file.name , file] }
it depends do you need to process all together
or a process execution for each dir name ?
Felix Kokocinski
@fkokocinski
Jun 13 2018 09:46 UTC
It should be processed all together.
Paolo Di Tommaso
@pditommaso
Jun 13 2018 09:49 UTC
in this case you need either to filter duplicate out or guarantee a unique name when you are creating them
Felix Kokocinski
@fkokocinski
Jun 13 2018 09:56 UTC
Unfortunately I cannot change the directory name. The files *.seq above belong together, so I guess I would filter, concat and merge the "data" directories like this?!
b = file_channel_1a.collect()
c = b.filter{name, files -> name =~ /^[config_file]/}
b = file_channel_2a.collect()
file_channel_1_2 = c.concat( b )
file_channel_1_2_merged_dirs = file_channel_1_2.map { file -> [file.name , file] }
Paolo Di Tommaso
@pditommaso
Jun 13 2018 09:58 UTC
not sure the regex is working in that way first because files is a list of files not a string
suggestion, isolate the problem and try it with nextflow console
until you don't the right channel structure
Felix Kokocinski
@fkokocinski
Jun 13 2018 10:01 UTC
OK, thanks, Paolo.
I'll try and give the config files unique names e.g. based on the process ID to eliminate that issue.
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:02 UTC
maybe the simplest thing is
b = file_channel_1a.flatten().unique()
b = file_channel_2a.flatten().unique()
file_channel_1_2 = c.concat( b ).collect()
um no
b = file_channel_1a.flatten()
b = file_channel_2a.flatten()
file_channel_1_2 = c.concat( b ).unique().collect()
Felix Kokocinski
@fkokocinski
Jun 13 2018 10:05 UTC
Operator magic, I like it. :-)
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:05 UTC
:wink:
lucky
@Lucky-lai
Jun 13 2018 10:41 UTC
Hello, may I ask whether nextflow supports git platform other than BitBucket, GitHub and GitLab?
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:42 UTC
What other platform does exist? :smile:
lucky
@Lucky-lai
Jun 13 2018 10:45 UTC
private git repository?
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:47 UTC
Local repos are already supported
lucky
@Lucky-lai
Jun 13 2018 10:47 UTC
how to set
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:48 UTC
Here the guru is @emi80
Maxime Borry
@maxibor
Jun 13 2018 11:46 UTC
Hello all,
Is there a way to NOT launch a process in parallel when given multiple samples (each sample would be used by the process only after the previous one was dealt with completely).
There is a process in my pipeline that makes use of leveldb which can only be accessed by one process at the time.
I could give the db files to NF in a channel, but duplicating the database seems a bit unnecessary...
elipapa
@elipapa
Jun 13 2018 11:49 UTC
hello, what's the status of google storage support? nextflow-io/nextflow#276
Felix Kokocinski
@fkokocinski
Jun 13 2018 11:52 UTC

@maxibor I would think if you process the samples with

each sample from sample_channel

and set process.cpus = 1 in the config, they should be processed one after the other.

Paolo Di Tommaso
@pditommaso
Jun 13 2018 11:56 UTC
no, this only used for resources allocation
Kevin Sayers
@KevinSayers
Jun 13 2018 11:56 UTC
Paolo Di Tommaso
@pditommaso
Jun 13 2018 11:56 UTC
exactly maxForks 1 :+1:
@elipapa GCP support is in the hands of google folks
Maxime Borry
@maxibor
Jun 13 2018 11:57 UTC
Great, thanks @KevinSayers and @pditommaso
Paolo Di Tommaso
@pditommaso
Jun 13 2018 11:57 UTC
as far as I know they are woking on it but I have no feedback on current status
you may want to reply to that tweet and ask for updates, that's also a way to let them know people is interested to that
@Lucky-lai for the local repository you can need too create a git bare repo
then NF can pull/run from there using git url paths
elipapa
@elipapa
Jun 13 2018 12:10 UTC
they might still be hiring... :worried: back to snakemake for now. we are stuck with gs://
tweet sent
Paolo Di Tommaso
@pditommaso
Jun 13 2018 12:11 UTC
I was told I month ago they are working on it
Alexander Peltzer
@apeltzer
Jun 13 2018 12:11 UTC
@sven1103 and I had contact with them, but no time unfortunately. They said they wanted to push much earlier in Spring
Yup
elipapa
@elipapa
Jun 13 2018 12:22 UTC
@pditommaso :+1:
Maxime Vallée
@valleem
Jun 13 2018 12:49 UTC

Hello! I saw in the docs about spread that

This operator is deprecated. See combine instead.

I am not succeeding in using combine as spread. One of my channel as a cardinality of 2, I think it matters

I have a channel with [[bamid],[bam]] and an other with [intervals]
with combine I end up with :
[[bamid],[bam],[int000],[int001],...]
whereas with spread I have what I expect :
[[bamid],[bam],[int000]]
[[bamid],[bam],[int001]]
[[bamid],[bam],[int002]]
...
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:39 UTC
hello in the documentation is not so easy to find how to set a process to run locally and not in the cluster
can someone help me?
Maxime Vallée
@valleem
Jun 13 2018 13:42 UTC
do you mean one of many processes should run locally? or the whole pipeline?
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:42 UTC
just one
Maxime Vallée
@valleem
Jun 13 2018 13:43 UTC
changing the executor in the process config does not work?
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:45 UTC
no
process {
     queue='mem_1tb,short-sl7'
     memory='20G'
     cpus='1'
     time='6h'
     scratch = false

     $run_shotgun{
    memory='10G'
     }
     $msiconvert{
         executor = 'local'
     }

}
this is gnored
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:50 UTC
ok I found there was a typo in the name of the process.
:)
Maxime Vallée
@valleem
Jun 13 2018 13:51 UTC
haha nice
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:54 UTC
well it will be nice to have a warning with
unknown process
defined
for people like me :smile:
btw other problem
how to concatenate variables in output channel
    output:
    set "${labsys}_${qcode}_${checksum}", file("${labsys}_${qcode}_${checksum}.mzML") into mzmlfiles_for_correction
this complains about
Caused by:
  Missing output file(s) `b2a401cd-ee09-4a2d-8799-765a237beffb_QC01_7c743164378f61aa48e70f1f3991c8f4` expected by process `msconvert (b2a401cd-ee09-4a2d-8799-765a237beffb_QC01_7c743164378f61aa48e70f1f3991c8f4)
removing set "${labsys}_${qcode}_${checksum}" it works
Maxime Garcia
@MaxUlysse
Jun 13 2018 14:00 UTC
output:
    set ("${labsys}_${qcode}_${checksum}", file("${labsys}_${qcode}_${checksum}.mzML") into mzmlfiles_for_correction
You forgot a (
if it doesn't work try:
output:
    set (val("${labsys}_${qcode}_${checksum}"), file("${labsys}_${qcode}_${checksum}.mzML")) into mzmlfiles_for_correction
Luca Cozzuto
@lucacozzuto
Jun 13 2018 14:02 UTC
the latter it works
Maxime Garcia
@MaxUlysse
Jun 13 2018 14:02 UTC
Nevermind my first comment, it's now missing a )
Perfect then
Luca Cozzuto
@lucacozzuto
Jun 13 2018 14:02 UTC
it is a bit confusing for me to know when a ( is needed or not
sometimes it triggers errors sometime not
Maxime Garcia
@MaxUlysse
Jun 13 2018 14:03 UTC
Typically I try to put as many as possible, and then I remove them
Luca Cozzuto
@lucacozzuto
Jun 13 2018 14:03 UTC
:clap:
thanks!
Maxime Garcia
@MaxUlysse
Jun 13 2018 14:03 UTC
You're welcome
Luca Cozzuto
@lucacozzuto
Jun 13 2018 15:08 UTC
Hi again, is there a way to send a mail with WatchPath when a new file is read from the pipeline?
Paolo Di Tommaso
@pditommaso
Jun 13 2018 15:13 UTC
Channel.watchPath('/foo').tap { notification_ch }.continueAsBeforeEtc()  
notification_ch.subscribe { sendMail(from:'paolo' to:'ammameta', subject: 'LOL') }
Felix Kokocinski
@fkokocinski
Jun 13 2018 15:33 UTC

Sorry to come back to this Paolo, but when the process I mentioned is executed multiple times the directory where output files are written to still has a file name collision for me! (This is without trying to combine the output from other channels.) I tried various combinations of

file_channel_1.flatten().map { file -> [file.name , file] }.unique()
file_channel_1.flatten().unique()

Is there another way, please?

Paolo Di Tommaso
@pditommaso
Jun 13 2018 15:33 UTC
there's not just me here, this is a community ;)
BTW I guess the problem is that each file has an unique full path, therefore the unique op is not working as expected
try
b = file_channel_1a.flatten()
b = file_channel_2a.flatten()
file_channel_1_2 = c.concat( b ).unique{ it.name }.collect()
doing that the uniqueness is evaluated on the file name not the complete path
Felix Kokocinski
@fkokocinski
Jun 13 2018 15:45 UTC

sorry, I know...
For this case (merging files from the same channel "file_channel_1" but multiple jobs), would it be:

file_channel_2 = file_channel_1.flatten()
file_channel_3 = file_channel_2.unique{ it.name }.collect()

It seems files are going missing now, maybe an issue with absolute path names?

Paolo Di Tommaso
@pditommaso
Jun 13 2018 15:50 UTC
no, this should work
Mike Smoot
@mes5k
Jun 13 2018 15:56 UTC
@fkokocinski I'd recommend copious use of .view() and possibly a test pipeline (i.e. with a small subset of data) to establish the state of your channels.
Felix Kokocinski
@fkokocinski
Jun 13 2018 16:03 UTC
OK, I'll sprinkle in some .view(), thanks.
Could it be that I'm doing this channel merging operation before all processes are finished?
I'm currently performing it in between the two processes, the next process however has .last() a input. I might need to merge there?!
Mike Smoot
@mes5k
Jun 13 2018 16:09 UTC
Both unique and collect require the channel to be "full", which is to say the channel has received it's last input. This means all upstream processes need to have completed before unique or collect will run.
Felix Kokocinski
@fkokocinski
Jun 13 2018 16:16 UTC
Nice, feels like we getting closer!
Would you move the merging into a process or add a last() somewhere else to achieve this?
  • or do you mean unique will wait for completion anyway?
Luca Cozzuto
@lucacozzuto
Jun 13 2018 16:21 UTC
Ammeta@gmail or Yahoo? Scurnacchiat ;)
Mike Smoot
@mes5k
Jun 13 2018 16:30 UTC
unique will wait for completion
Felix Kokocinski
@fkokocinski
Jun 13 2018 17:12 UTC
OK, thanks. Wouldn't I need to do the unique and collect before the flatten than?
Argh, so many questions...
Mike Smoot
@mes5k
Jun 13 2018 17:19 UTC
No, flatten applies to individual elements in a channel whereas unique and collect operate on the channel as a whole, so flatten can run before.
That being said, because flatten flattens recursively, I think it could probably run after and you'd get the same result.
Mike Smoot
@mes5k
Jun 13 2018 17:29 UTC

I'd also highly recommend writing a few test programs to experiment with how channel operators work:

Channel.from("a", "b", "c", "c").set{ a }
Channel.from(1,2,3,3,2).set{b}
a.concat(b).view("in a and b: ${it}").unique().collect().view("after ${id}")

Then nextflow run my_exp.nf and see what the view operators print. Just make dummy data structures that match what your data look like and then start looking at the output. I've got directories full of little nextflow programs I've written to experiment and understand how my data fit with nextflow.

Felix Kokocinski
@fkokocinski
Jun 13 2018 17:32 UTC
Yes, good advice, thanks Mike!