Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Gary Frost
    @garyfrost
    @hannesmiller has merged 51zero/eel-sdk#375
    we'll get a build out in next cpl of days
    Hannes Miller
    @hannesmiller
    @FloWi Great work - I have merged it and will try and get an ALPHA release in the next week or so. Also if you don't mind I would like to add your code snippet above for our documentation - how to write to S3. Thanks again
    This also highlights other areas where we need to apply this fix, e.g. OrcSink and CsvSin,
    Florian Witteler
    @FloWi
    You're welcome. Of course you can add my snippet to your documentation (which is very good, btw.)
    Florian Witteler
    @FloWi
    Hi! I've encountered a bug in the jdbc-layer of eel and created an issue for that.
    51zero/eel-sdk#376
    Would you please take a look and comment on my open questions? I think I'll have some time tomorrow to take a look at it.
    Hannes Miller
    @hannesmiller
    @FloWi I would prefer this to be done in an implementation agnostic way - we have JdbcDialect trait which could be extended to support something like you found in Spark. You can use the companion object to return an instance of it.
    @FloWi it simply looks like this: object JdbcDialect { def apply(url: String): JdbcDialect = { if (url.toLowerCase.startsWith("jdbc:oracle")) new OracleJdbcDialect() else new GenericJdbcDialect() } }
    @FloWi I must say you are the first user that's using Array support for JDBC - so that's great again and thanks again :+1:
    Florian Witteler
    @FloWi
    Ah, yes. The JdbcDialect looks like the right place to put it. Sometimes using array-columns is way faster overall instead of 3rd normal form. I found out that inserting 9M rows with an average of 30 items in the array is way faster than inserting 270M rows.
    Hannes Miller
    @hannesmiller
    Interesting that still holds - that was the case back in my OCI/C++ days :-)
    Florian Witteler
    @FloWi
    Got it. Please take a look at 51zero/eel-sdk#377. Works on the in-memory-h2 and also on postgres. Please extend the testcase if you can think of corner-cases that won't work.
    Hannes Miller
    @hannesmiller
    R
    Hannes Miller
    @hannesmiller
    @FloWi It's merged - thanks once again.
    Sameroom
    @sameroom-bot
    [Gary Frost, 51zero Ltd] I'll prepare and release v1.3.0-a12 today, I think we're close to v1.3.0 final, would like to get that out this month
    Florian Witteler
    @FloWi
    Hi guys! Would you mind taking a look at 51zero/eel-sdk#382 to point me in the right direction?
    Hannes Miller
    @hannesmiller

    @FloWi Ok maybe there's a couple of things you could do....

    1) You could have Spark write out fewer part files, e.g. restrict the number of partitions using coallesce
    2) It's true that there will be N number of subscriber threads for cached thread pool, but they won't all be active as this is controlled via the parallelism param on the SinkAction, which uses a fixed thread pool for the number of partitions - I could be wrong but this is how I read it?

    So in theory say if you had 100 files - it would create 100 subscription threads but only N for parallelism should ever be active.
    Hannes Miller
    @hannesmiller
    However if this is indeed the issue we could easily change the cached thread pool at the subscription level to a fixed thread pool - a kind of belt and braces - we would need to somehow pass down the parallelism param, e.g. Source.toDataStream(parallelism) - or something like that.
    Florian Witteler
    @FloWi
    Sure, I already repartitioned the dataframe. Maybe that wasn't clear in my issue. That's how I found out that the parallelism=1 on the Sink-side didn't have an effect on the resource usage. During my quick debugging session I found out that the threadpool contained 187 threads and the queue had 13 left. I don't know if they were all running though. Do you know how to find that out? Threaddump and see if they are RUNNING? Or can the thread be in running state and internal eel-logic makes it wait?
    I like your suggestion with a fixed threadpool that can be explicitly controlled by the parallelism on the Source-side.
    Hannes Miller
    @hannesmiller
    Yes I think you are right that memory is not freed until the shutdown onDataStreamSource has finished its last subscription thread. I'm not entirely convinced that going to a fixed thread pool will completely eradicate your memory issue - it will only delay the memory consumption.
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 24 hours and 0 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Hannes Miller
    @hannesmiller
    @FloWi Its worth a try though... the ParquetPublisher should close parquet file when it reaches the end
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 56 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Hannes Miller
    @hannesmiller
    @FloWi feel free to create a fork
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 56 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Florian Witteler
    @FloWi
    @FloWi Its worth a try though... the ParquetPublisher should close parquet file when it reaches the end
    If I read the code correctly that's already the case, since the using method closes the file
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 51 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Hannes Miller
    @hannesmiller
    @FloWi Correct!
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 50 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Florian Witteler
    @FloWi

    An explanation for the high memory consumption could be that I throttled the sink with parallelism=1 and that means that many files are slowly being processed in parallel.

    I'll try the approach of limiting the threads on the DataStreamSource.
    I won't use val executor = Executors.newFixedThreadPool(n)since there wouldn't be a reasonable default value for n.
    I'll take the code from Executors.newCachedThreadPool() --> new ThreadPoolExecutor(0, n, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable])(). That way I can use Int.MaxValue as a default without immediately creating loads of threads.

    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 41 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Florian Witteler
    @FloWi
    @hannesmiller How do you build the scala 2.12 version?
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 18 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Hannes Miller
    @hannesmiller
    @FloWi Sounds like a plan
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 17 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Hannes Miller
    @hannesmiller
    @FloWi 2.11.x
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 15 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Hannes Miller
    @hannesmiller
    @FloWi btw, what sink are you using?
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 12 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Florian Witteler
    @FloWi
    JDBCSink
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 12 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Hannes Miller
    @hannesmiller
    @FloWi Ok thanks - as you said you may get better performance increasing parallelism > 1 - maybe the writes are seriously lagging behind the reads which would spike the memory due to the number rows being temporarily held on the internal queues
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 6 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Florian Witteler
    @FloWi
    For sure. But, like a real pro, I was testing on the production database and didn't want to blow it up ;-)
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 23 hours and 4 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Florian Witteler
    @FloWi
    Seems like it isn't a quick fix. A fixed thread-pool doesn't allow to add more runnables than the corepoolsize. Unfortunately I don't have more time to dig deeper. So I'll use Option 1) and repartition the dataframe :)
    Sameroom
    @sameroom-bot
    <Sameroom> Whoops! You've exceeded your daily message limit on this Sameroom account (it will reset in 21 hours and 45 minutes). If you have too many Tubes for your budget, pause or delete some Tubes on the Manage page. If you dont have a subscription, visit pricing to upgrade to unlimited messaging.
    Ayoub
    @I_am_ayoub_twitter
    Hi, I trying to create create a Datastream from an iterator using DataStream.fromIterator
    but for that I need first to create a schema for my case class which I do by using StructType.from[T].
    Unfortunately I get an error:
    Message: Can not map class scala.Option to FieldType
    Is eel incapable of handling case classes with Optional fields or should I create the field differently ?