Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
P. Oscar Boykin
@johnynek
Although I’d still like a unit test
Timur
@ttim
I'm writing test, will share it a bit later
P. Oscar Boykin
@johnynek
:+1:
Good night!
Timur
@ttim
Good night!
Timur
@ttim
I did repro for infinite loop (with optimization) or exception (without): ttim/scalding@627ccce . cc/ @dieu
Timur
@ttim

@johnynek We added a guard into Fill function to fail on (at least some) mutations https://github.com/twitter/scalding/pull/1894/files . Wdyt?

As follow up items we think about next:

  1. fix algebird's PriorityQueue monoid
  2. find out why full disable of this optimization fail a job. I appreciate your thoughts on that, especially on how critical this optimizations to keep scalding properly working on cascading?
Personally I do think about (1) is to create ImmutablePriorityQueue abstraction which just holds sorted array under the hood. I'm pretty sure it's going to be even more performant in most our use cases , wdyt?
P. Oscar Boykin
@johnynek
replied on the ticket...
using an immutable sorted heap would be faster...
P. Oscar Boykin
@johnynek
yeah, I mean, that is for an internal case where someone understands and takes responsibility
Timur
@ttim
@johnynek immutable topk looks exactly like what I proposed above which is very cool =) I guess the only thing to make it properly efficient is to replace List with Array
P. Oscar Boykin
@johnynek
e.g. internal to sortedTake
in this case, many optimizations that cascading or scalding can do will break users since we assume values are immutable once in the TypedPipe.
Timur
@ttim
I believe this TopKMonoid should have very similar perf characteristics to mutable PriorityQueue if we'll migrate it to Array
P. Oscar Boykin
@johnynek
(e.g. we use hashCode, etc...)
P. Oscar Boykin
@johnynek
BTW: here is an immutable Heap which should given even better perf: https://github.com/typelevel/cats-collections/blob/master/core/src/main/scala/cats/collections/Heap.scala
Anton Panasenko
@dieu
@johnynek we did release of scalding yesterday
P. Oscar Boykin
@johnynek
what is the version?
oh, you mean, internally?
if so, congratulations!
Anton Panasenko
@dieu
Yes
Give us week or two to see if we have any problems
Then we can make official release
P. Oscar Boykin
@johnynek
niceeee! great work.
Anton Panasenko
@dieu
Thanks
P. Oscar Boykin
@johnynek
@dieu @ttim I saw this: https://cloud.google.com/twitter/ would be interesting to think about a scalding backend for beam, perhaps...
Timur
@ttim
@johnynek Reminds me scio =), but it would be interesting to try. Sources still break everything =(
Ian O'Connell
@ianoc
Sources are a pain. But with some multiple trait inheritance the generated dalv2 stuff might be able to make it seamless ? Since there is likely not that many totally custom sources at tw.
P. Oscar Boykin
@johnynek
the approach I took with memory/spark platform is building a source mapping table.
I think that can work for the basic types automatically (parquet, sequencefile, tsv/csv/textline... should cover most cases).
Timur
@ttim
@johnynek we found an issue with new scalding - it fails planning if two hashJoins one after another is presented: ttim/scalding@9532d85 . If you have an idea on what's an issue from top of your head it would be good. I'm currently debugging scalding to find out what's wrong
P. Oscar Boykin
@johnynek
Bummer. I’ll look at the stack output from Travis (if you have a PR)
We used to put more forces in place. We may have misported that code
Timur
@ttim
Yeah, checking this right now
Timur
@ttim
That's a stacktrace:
could not build flow from assembly: [[_pipe_1-0*IterableSour...][com.twitter.scalding.typed.cascading_backend.CascadingBackend$.com$twitter$scalding$typed$cascading_backend$CascadingBackend$$planHashJoin(CascadingBackend.scala:662)] found duplicate field names in joined tuple stream: ['key', 'value', 'key1', 'value1']['key1', 'value1']]
cascading.flow.planner.PlannerException: could not build flow from assembly: [[_pipe_1-0*IterableSour...][com.twitter.scalding.typed.cascading_backend.CascadingBackend$.com$twitter$scalding$typed$cascading_backend$CascadingBackend$$planHashJoin(CascadingBackend.scala:662)] found duplicate field names in joined tuple stream: ['key', 'value', 'key1', 'value1']['key1', 'value1']]
    at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:578)
    at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:108)
    at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:40)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
    at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:95)
    at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:210)
    at com.twitter.scalding.typed.cascading_backend.AsyncFlowDefRunner$$anon$2.go$1(AsyncFlowDefRunner.scala:172)
    at com.twitter.scalding.typed.cascading_backend.AsyncFlowDefRunner$$anon$2.run(AsyncFlowDefRunner.scala:201)
    at java.lang.Thread.run(Thread.java:745)
Caused by: cascading.pipe.OperatorException: [_pipe_1-0*IterableSour...][com.twitter.scalding.typed.cascading_backend.CascadingBackend$.com$twitter$scalding$typed$cascading_backend$CascadingBackend$$planHashJoin(CascadingBackend.scala:662)] found duplicate field names in joined tuple stream: ['key', 'value', 'key1', 'value1']['key1', 'value1']
    at cascading.pipe.Splice.resolveDeclared(Splice.java:1299)
    at cascading.pipe.Splice.outgoingScopeFor(Splice.java:992)
    at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:628)
    at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:610)
    at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:95)
    ... 7 more
Caused by: cascading.tuple.TupleException: field name already exists: key1
    at cascading.tuple.Fields.copyRetain(Fields.java:1397)
    at cascading.tuple.Fields.appendInternal(Fields.java:1266)
    at cascading.tuple.Fields.append(Fields.java:1215)
    at cascading.pipe.Splice.resolveDeclared(Splice.java:1290)
    ... 11 more
P. Oscar Boykin
@johnynek
huh, the issue is with fields with duplicate key names...
P. Oscar Boykin
@johnynek
@ttim I think I see the issue...
we use key1 to do the hashjoin, but we don't then do an identity transformation to remove the old key1...
so it is hanging out until there is another barrier (like groupby or join or something)... I think we need to explicitly put an identity transformation after the hashJoin
Timur
@ttim
@johnynek It did work! I'm going to create PR then
thanks!
Timur
@ttim
@johnynek here is PR twitter/scalding#1897
P. Oscar Boykin
@johnynek
nice!
P. Oscar Boykin
@johnynek
always nice to have a quick fix...