@FloWi Ok maybe there's a couple of things you could do....
1) You could have Spark
write out fewer part files, e.g. restrict the number of partitions using coallesce
2) It's true that there will be N
number of subscriber threads for cached thread pool, but they won't all be active as this is controlled via the parallelism
param on the SinkAction
, which uses a fixed thread pool for the number of partitions - I could be wrong but this is how I read it?
N
for parallelism
should ever be active.
cached
thread pool at the subscription level to a fixed
thread pool - a kind of belt and braces - we would need to somehow pass down the parallelism
param, e.g. Source.toDataStream(parallelism)
- or something like that.
parallelism=1
on the Sink-side didn't have an effect on the resource usage. During my quick debugging session I found out that the threadpool contained 187 threads and the queue had 13 left. I don't know if they were all running though. Do you know how to find that out? Threaddump and see if they are RUNNING
? Or can the thread be in running state and internal eel-logic makes it wait?DataStreamSource
has finished its last subscription thread. I'm not entirely convinced that going to a fixed
thread pool will completely eradicate your memory issue - it will only delay the memory consumption.
An explanation for the high memory consumption could be that I throttled the sink with parallelism=1
and that means that many files are slowly being processed in parallel.
I'll try the approach of limiting the threads on the DataStreamSource
.
I won't use val executor = Executors.newFixedThreadPool(n)
since there wouldn't be a reasonable default value for n.
I'll take the code from Executors.newCachedThreadPool()
--> new ThreadPoolExecutor(0, n, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable])()
. That way I can use Int.MaxValue
as a default without immediately creating loads of threads.
StructType.from[T]
.Message: Can not map class scala.Option to FieldType