These are chat archives for nextflow-io/nextflow

13th
Jun 2018
Felix Kokocinski
@fkokocinski
Jun 13 2018 09:21

Good morning,
as my downstream process requires files produced by multiple upstream processes, I'm collecting and concatenating the output data:

1b = file_channel_1a.collect()
2b = file_channel_2a.collect()
file_channel_1_2 = 1b.concat( 2b )

file_channel_1_2 is then used as input for the next process.
This stops working when the upstream process is runnning as multiple parallel jobs, as I get file name collisions, in particular for directories created, e.g.:

work-dir-x/config_file.1
work-dir-x/data/file1.seq

work-dir-y/config_file.1
work-dir-y/data/file2.seq

I think I can filter out some truely duplicated files with

file_channel_1a_mod = file_channel_1a.filter{name, files -> name =~ /^[config_file]/}

But can I use something like the combine operator to join files from the two channels that have a matching directory name like the "data" dir above?

Paolo Di Tommaso
@pditommaso
Jun 13 2018 09:31
to combine you need a matching key therefore you should, for example, associate the dir name with that files with a map
Felix Kokocinski
@fkokocinski
Jun 13 2018 09:36
If you don't mind could you show how to apply map on a directory name, please?
Would I do that instead of the concat or just before that to collect all files and directories?
Paolo Di Tommaso
@pditommaso
Jun 13 2018 09:36
file_channel_1a.map { file -> [file.name , file] }
it depends do you need to process all together
or a process execution for each dir name ?
Felix Kokocinski
@fkokocinski
Jun 13 2018 09:46
It should be processed all together.
Paolo Di Tommaso
@pditommaso
Jun 13 2018 09:49
in this case you need either to filter duplicate out or guarantee a unique name when you are creating them
Felix Kokocinski
@fkokocinski
Jun 13 2018 09:56
Unfortunately I cannot change the directory name. The files *.seq above belong together, so I guess I would filter, concat and merge the "data" directories like this?!
b = file_channel_1a.collect()
c = b.filter{name, files -> name =~ /^[config_file]/}
b = file_channel_2a.collect()
file_channel_1_2 = c.concat( b )
file_channel_1_2_merged_dirs = file_channel_1_2.map { file -> [file.name , file] }
Paolo Di Tommaso
@pditommaso
Jun 13 2018 09:58
not sure the regex is working in that way first because files is a list of files not a string
suggestion, isolate the problem and try it with nextflow console
until you don't the right channel structure
Felix Kokocinski
@fkokocinski
Jun 13 2018 10:01
OK, thanks, Paolo.
I'll try and give the config files unique names e.g. based on the process ID to eliminate that issue.
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:02
maybe the simplest thing is
b = file_channel_1a.flatten().unique()
b = file_channel_2a.flatten().unique()
file_channel_1_2 = c.concat( b ).collect()
um no
b = file_channel_1a.flatten()
b = file_channel_2a.flatten()
file_channel_1_2 = c.concat( b ).unique().collect()
Felix Kokocinski
@fkokocinski
Jun 13 2018 10:05
Operator magic, I like it. :-)
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:05
:wink:
lucky
@Lucky-lai
Jun 13 2018 10:41
Hello, may I ask whether nextflow supports git platform other than BitBucket, GitHub and GitLab?
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:42
What other platform does exist? :smile:
lucky
@Lucky-lai
Jun 13 2018 10:45
private git repository?
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:47
Local repos are already supported
lucky
@Lucky-lai
Jun 13 2018 10:47
how to set
Paolo Di Tommaso
@pditommaso
Jun 13 2018 10:48
Here the guru is @emi80
Maxime Borry
@maxibor
Jun 13 2018 11:46
Hello all,
Is there a way to NOT launch a process in parallel when given multiple samples (each sample would be used by the process only after the previous one was dealt with completely).
There is a process in my pipeline that makes use of leveldb which can only be accessed by one process at the time.
I could give the db files to NF in a channel, but duplicating the database seems a bit unnecessary...
elipapa
@elipapa
Jun 13 2018 11:49
hello, what's the status of google storage support? nextflow-io/nextflow#276
Felix Kokocinski
@fkokocinski
Jun 13 2018 11:52

@maxibor I would think if you process the samples with

each sample from sample_channel

and set process.cpus = 1 in the config, they should be processed one after the other.

Paolo Di Tommaso
@pditommaso
Jun 13 2018 11:56
no, this only used for resources allocation
Kevin Sayers
@KevinSayers
Jun 13 2018 11:56
Paolo Di Tommaso
@pditommaso
Jun 13 2018 11:56
exactly maxForks 1 :+1:
@elipapa GCP support is in the hands of google folks
Maxime Borry
@maxibor
Jun 13 2018 11:57
Great, thanks @KevinSayers and @pditommaso
Paolo Di Tommaso
@pditommaso
Jun 13 2018 11:57
as far as I know they are woking on it but I have no feedback on current status
you may want to reply to that tweet and ask for updates, that's also a way to let them know people is interested to that
@Lucky-lai for the local repository you can need too create a git bare repo
then NF can pull/run from there using git url paths
elipapa
@elipapa
Jun 13 2018 12:10
they might still be hiring... :worried: back to snakemake for now. we are stuck with gs://
tweet sent
Paolo Di Tommaso
@pditommaso
Jun 13 2018 12:11
I was told I month ago they are working on it
Alexander Peltzer
@apeltzer
Jun 13 2018 12:11
@sven1103 and I had contact with them, but no time unfortunately. They said they wanted to push much earlier in Spring
Yup
elipapa
@elipapa
Jun 13 2018 12:22
@pditommaso :+1:
Maxime Vallée
@valleem
Jun 13 2018 12:49

Hello! I saw in the docs about spread that

This operator is deprecated. See combine instead.

I am not succeeding in using combine as spread. One of my channel as a cardinality of 2, I think it matters

I have a channel with [[bamid],[bam]] and an other with [intervals]
with combine I end up with :
[[bamid],[bam],[int000],[int001],...]
whereas with spread I have what I expect :
[[bamid],[bam],[int000]]
[[bamid],[bam],[int001]]
[[bamid],[bam],[int002]]
...
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:39
hello in the documentation is not so easy to find how to set a process to run locally and not in the cluster
can someone help me?
Maxime Vallée
@valleem
Jun 13 2018 13:42
do you mean one of many processes should run locally? or the whole pipeline?
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:42
just one
Maxime Vallée
@valleem
Jun 13 2018 13:43
changing the executor in the process config does not work?
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:45
no
process {
     queue='mem_1tb,short-sl7'
     memory='20G'
     cpus='1'
     time='6h'
     scratch = false

     $run_shotgun{
    memory='10G'
     }
     $msiconvert{
         executor = 'local'
     }

}
this is gnored
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:50
ok I found there was a typo in the name of the process.
:)
Maxime Vallée
@valleem
Jun 13 2018 13:51
haha nice
Luca Cozzuto
@lucacozzuto
Jun 13 2018 13:54
well it will be nice to have a warning with
unknown process
defined
for people like me :smile:
btw other problem
how to concatenate variables in output channel
    output:
    set "${labsys}_${qcode}_${checksum}", file("${labsys}_${qcode}_${checksum}.mzML") into mzmlfiles_for_correction
this complains about
Caused by:
  Missing output file(s) `b2a401cd-ee09-4a2d-8799-765a237beffb_QC01_7c743164378f61aa48e70f1f3991c8f4` expected by process `msconvert (b2a401cd-ee09-4a2d-8799-765a237beffb_QC01_7c743164378f61aa48e70f1f3991c8f4)
removing set "${labsys}_${qcode}_${checksum}" it works
Maxime Garcia
@MaxUlysse
Jun 13 2018 14:00
output:
    set ("${labsys}_${qcode}_${checksum}", file("${labsys}_${qcode}_${checksum}.mzML") into mzmlfiles_for_correction
You forgot a (
if it doesn't work try:
output:
    set (val("${labsys}_${qcode}_${checksum}"), file("${labsys}_${qcode}_${checksum}.mzML")) into mzmlfiles_for_correction
Luca Cozzuto
@lucacozzuto
Jun 13 2018 14:02
the latter it works
Maxime Garcia
@MaxUlysse
Jun 13 2018 14:02
Nevermind my first comment, it's now missing a )
Perfect then
Luca Cozzuto
@lucacozzuto
Jun 13 2018 14:02
it is a bit confusing for me to know when a ( is needed or not
sometimes it triggers errors sometime not
Maxime Garcia
@MaxUlysse
Jun 13 2018 14:03
Typically I try to put as many as possible, and then I remove them
Luca Cozzuto
@lucacozzuto
Jun 13 2018 14:03
:clap:
thanks!
Maxime Garcia
@MaxUlysse
Jun 13 2018 14:03
You're welcome
Luca Cozzuto
@lucacozzuto
Jun 13 2018 15:08
Hi again, is there a way to send a mail with WatchPath when a new file is read from the pipeline?
Paolo Di Tommaso
@pditommaso
Jun 13 2018 15:13
Channel.watchPath('/foo').tap { notification_ch }.continueAsBeforeEtc()  
notification_ch.subscribe { sendMail(from:'paolo' to:'ammameta', subject: 'LOL') }
Felix Kokocinski
@fkokocinski
Jun 13 2018 15:33

Sorry to come back to this Paolo, but when the process I mentioned is executed multiple times the directory where output files are written to still has a file name collision for me! (This is without trying to combine the output from other channels.) I tried various combinations of

file_channel_1.flatten().map { file -> [file.name , file] }.unique()
file_channel_1.flatten().unique()

Is there another way, please?

Paolo Di Tommaso
@pditommaso
Jun 13 2018 15:33
there's not just me here, this is a community ;)
BTW I guess the problem is that each file has an unique full path, therefore the unique op is not working as expected
try
b = file_channel_1a.flatten()
b = file_channel_2a.flatten()
file_channel_1_2 = c.concat( b ).unique{ it.name }.collect()
doing that the uniqueness is evaluated on the file name not the complete path
Felix Kokocinski
@fkokocinski
Jun 13 2018 15:45

sorry, I know...
For this case (merging files from the same channel "file_channel_1" but multiple jobs), would it be:

file_channel_2 = file_channel_1.flatten()
file_channel_3 = file_channel_2.unique{ it.name }.collect()

It seems files are going missing now, maybe an issue with absolute path names?

Paolo Di Tommaso
@pditommaso
Jun 13 2018 15:50
no, this should work
Mike Smoot
@mes5k
Jun 13 2018 15:56
@fkokocinski I'd recommend copious use of .view() and possibly a test pipeline (i.e. with a small subset of data) to establish the state of your channels.
Felix Kokocinski
@fkokocinski
Jun 13 2018 16:03
OK, I'll sprinkle in some .view(), thanks.
Could it be that I'm doing this channel merging operation before all processes are finished?
I'm currently performing it in between the two processes, the next process however has .last() a input. I might need to merge there?!
Mike Smoot
@mes5k
Jun 13 2018 16:09
Both unique and collect require the channel to be "full", which is to say the channel has received it's last input. This means all upstream processes need to have completed before unique or collect will run.
Felix Kokocinski
@fkokocinski
Jun 13 2018 16:16
Nice, feels like we getting closer!
Would you move the merging into a process or add a last() somewhere else to achieve this?
  • or do you mean unique will wait for completion anyway?
Luca Cozzuto
@lucacozzuto
Jun 13 2018 16:21
Ammeta@gmail or Yahoo? Scurnacchiat ;)
Mike Smoot
@mes5k
Jun 13 2018 16:30
unique will wait for completion
Felix Kokocinski
@fkokocinski
Jun 13 2018 17:12
OK, thanks. Wouldn't I need to do the unique and collect before the flatten than?
Argh, so many questions...
Mike Smoot
@mes5k
Jun 13 2018 17:19
No, flatten applies to individual elements in a channel whereas unique and collect operate on the channel as a whole, so flatten can run before.
That being said, because flatten flattens recursively, I think it could probably run after and you'd get the same result.
Mike Smoot
@mes5k
Jun 13 2018 17:29

I'd also highly recommend writing a few test programs to experiment with how channel operators work:

Channel.from("a", "b", "c", "c").set{ a }
Channel.from(1,2,3,3,2).set{b}
a.concat(b).view("in a and b: ${it}").unique().collect().view("after ${id}")

Then nextflow run my_exp.nf and see what the view operators print. Just make dummy data structures that match what your data look like and then start looking at the output. I've got directories full of little nextflow programs I've written to experiment and understand how my data fit with nextflow.

Felix Kokocinski
@fkokocinski
Jun 13 2018 17:32
Yes, good advice, thanks Mike!