@FloWi Ok maybe there's a couple of things you could do....
1) You could have
Spark write out fewer part files, e.g. restrict the number of partitions using
2) It's true that there will be
N number of subscriber threads for cached thread pool, but they won't all be active as this is controlled via the
parallelism param on the
SinkAction, which uses a fixed thread pool for the number of partitions - I could be wrong but this is how I read it?
parallelismshould ever be active.
cachedthread pool at the subscription level to a
fixedthread pool - a kind of belt and braces - we would need to somehow pass down the
Source.toDataStream(parallelism)- or something like that.
parallelism=1on the Sink-side didn't have an effect on the resource usage. During my quick debugging session I found out that the threadpool contained 187 threads and the queue had 13 left. I don't know if they were all running though. Do you know how to find that out? Threaddump and see if they are
RUNNING? Or can the thread be in running state and internal eel-logic makes it wait?
DataStreamSourcehas finished its last subscription thread. I'm not entirely convinced that going to a
fixedthread pool will completely eradicate your memory issue - it will only delay the memory consumption.
An explanation for the high memory consumption could be that I throttled the sink with
parallelism=1 and that means that many files are slowly being processed in parallel.
I'll try the approach of limiting the threads on the
I won't use
val executor = Executors.newFixedThreadPool(n)since there wouldn't be a reasonable default value for n.
I'll take the code from
new ThreadPoolExecutor(0, n, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable])(). That way I can use
Int.MaxValue as a default without immediately creating loads of threads.
Message: Can not map class scala.Option to FieldType