Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
PhillHenry
@PhillHenry
And even that probably won't compile as there is no Decoder for Any. OK, scratch that, call .toString on everything and store it as a String. But, like I said, that way lies madness.
Git-Rocker
@Git-Rocker
Hi,
Git-Rocker
@Git-Rocker
I want to write a Custom MongoStreamingSource for my application. For that I am planning to extend to MicroBatchReader. So I am not able to understand the methods of MicroBatchReader interface, I went through the official documents but was able to get what is supposed to be done in each of the methods as an outline and also went through some samples on how they are implemented but was not able to understand how the offsets works.
If someone can explain the same it would be very helpful/ or guide me to the resources where I can find the info on the same
fuyichengccc
@fuyichengccc

I ran a spark job which failed for the reason

The executor with id 2 exited with exit code 137.
The API gave the following brief reason: N/A
The API gave the following message: N/A
The API gave the following container statuses:
     container name: spark-kubernetes-executor
     container image: nydsjhub.com/spark/spark:v1.0
     container state: terminated
     container started at: 2019-08-20T06:45:15Z
     container finished at: 2019-08-20T09:34:34Z
     exit code: 137
     termination reason: OOMKilled

it implied the oom exception happened on executor 2, but then i check the log of executor 2, it showed that this executor has lots of free memory and i can't find any oom exception related to code. Later, i increase executor-memory from 8G, then the job can only finish successfully when executor-memory is increased to 16G. so, i don't want to do this because i think 8G is sufficient for this job according to my experience. Finally, I set spark.executor.memoryOverhead to 2G manually, and this job can also finish successfully. I searched a lot but still can't understand the effect of this conf spark.executor.memoryOverhead and i have never set this during my previous job. Is any one can help me understand it?

PhillHenry
@PhillHenry
@fuyichengccc Sounds like a Kubernetes issues rather than Spark (caveat: I don't know much about Kubernetes).
Are you sure that what's running on Kubernetes is not exceeding the limits that Kubernetes sets?
Adi Polak
@AdiPolak_twitter
@fuyichengccc can you find the logs of the K8s node itself? it might limit the memory there. you can check the yaml file to see what resources this app requested was to begin with
@fuyichengccc another way to think about it is - how much memory does the storage memory,the execution memory and the stack itself memory. you can change it viaspark.memory.fraction . I hope it helps. I had a similer issues and changing the fraction helped me solve it
fuyichengccc
@fuyichengccc

hey @PhillHenry @AdiPolak_twitter thanks for your advice, I check the description of the executor pod, it shows that the limit memory is the same as therequest memory, i find some documents say that the OOMKIlled happend when the request memory exceed limit memory. I don't know why the request memory would exceed limit memory on spark executor , and i tried my best to find ways to set limit memory more size than request memory but it seems no way.

Limits:
      memory:  36044Mi
Requests:
      cpu:     10
      memory:  36044Mi
Environment:
      SPARK_EXECUTOR_CORES:   10
      SPARK_EXECUTOR_MEMORY:  32G

beside, I find the request memory equals the sum of spark_executor_memory and memoryOverhead

Ryan Murphy
@oatmealraisin_gitlab
Is there a way to zip two columns of the same length?
Ryan Murphy
@oatmealraisin_gitlab
Or rather, is there a way to create a dataframe when my columns are in different files?
Holden Karau
@holdenk
If the partioning is the same convert them into RDDs zip them and convert them back.
Ryan Murphy
@oatmealraisin_gitlab
@holdenk not sure about the partitioning. Is that dynamic?
If it matters, they're both input CSV files, as opposed to the results of maps
So ideally I would do this immediately after loading them
Git-Rocker
@Git-Rocker
How to proceed in catch block if the exception is MongoTimeoutException ?
PhillHenry
@PhillHenry

@oatmealraisin_gitlab Take a look at this SO answer (by the esteemed Sean Owen) https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method/29281548

The upshot is that the RDDs must be sorted if the zip is to be meaningful. And even then, there are gotchas (partial vs. total ordering).

Piyush
@piyushpatel2005_gitlab
Is there a way to dynamically change parquet schema? I have a table which is loaded using spark and saved as parquet. My source is xml file not properly formatted and they can add new fields any time. Is there a way, I can read parquet and create dynamic schema which will add new column when they're added? I doubt it is possible but if someone has any idea on this.
PhillHenry
@PhillHenry
How about oldParquetDF.withColumn("newColumnName", ...) and then write it to a new file?
@piyushpatel2005_gitlab
Piyush
@piyushpatel2005_gitlab
Yes that works, but what if they add the column in the middle of the file? and real problem is that the files are very much unstructured. I m hoping to make this automated. Whenever the file is changed write the new column.
@PhillHenry Your suggestion works but doesn't it need code revision?
PhillHenry
@PhillHenry
@piyushpatel2005_gitlab Spark will read the whole file in and infer what the schema is. That's why it can take some time when you first create a DF from a file without explicitly telling Spark what the schema is. So, if they add a new XML element half way through the file, no worries. Spark doesn't decide what the schema is until it has finished reading it.
Piyush
@piyushpatel2005_gitlab
@PhillHenry sorry but in my case it is even worse.
<?xml version='1.0' encoding='utf-8'?>
<Data File="20190524185705_20190524195703_002659.log">
  <FileInfo FileLength="62369" LastModified="2019-05-24T23:57:04Z" StartTime="43609.956307870336" Format="Huawei Data Result" AnalyzerVersion="5.5.393.457" ProcessingNodeVersion="8.4.0.192 x64 (2019-02-14 17:24:57 #389666)" OriginalPath="T3_06_NE_20190524185705_20190524195703_002659.log" NumStreams="1" FileID="127698257" HardCodedCount="2">
    <StreamInfo StreamFlagsCode="1080863910837354496" StreamFlags="Common | LTERRC | LTES1AP | LTECalltrace" DeviceIdxInFile="0" DeviceTypeString="Actix CT Adapter LTE Result" StreamID="77217797"/>
  </FileInfo>
  <Rowset StreamName="StreamName" QueryName="datatype" StartTimeUTC="1558738625" HardCoded="-1">
    <Fields>
      <Field Name="col1"/>
      <Field Name="col2"/>
      <Field Name="col3"/>
      <Field Name="col4"/>
    </Fields>
    <Row>
      <v>13165</v>
      <v>09.9979623611</v>
      <v>43609.9911</v>
      <v>43609</v>
    </Row>
</Rowset>
</Data>
This is the raw data so I cannot directly infer schema as it will only give me Fields as array of Field, but my column names are as attributes and values as corresponding v element.
matanster
@matanster
Is there any configuration knob that determines whether I'll be getting full stack traces v.s. just the exception line, as it relates to Spark? I am running a project in IntelliJ in Spark local mode
Piyush
@piyushpatel2005_gitlab
@matanster may be you can wrap the code in try-catch and in catch, you print e.getMessage().
matanster
@matanster
Nothing built into spark configuration that controls supression?
PhillHenry
@PhillHenry
@piyushpatel2005_gitlab That's insane. Looks like you're going to have to write custom parsers.
@matanster I don't quite understand. Do you want more explicit or less explicit error reporting? You might be able to suppress some errors by a judicious change to your log4j.xml
fuyichengccc
@fuyichengccc

I have an RDD[Array[Int]], now i want to add a map method which will add the adjacent element to each element(if adjacent element doesn't exist, then fill 0). for example i have
val rdd1: RDD[Int] = sc.parallelize(Array[Int](1,2,3,4,5)), and i want to trans this rdd to

rdd2 = ( (0,1,2), 
(1,2,3),
(2,3,4),
(3,4,5),
(4,5,0))

is there any way to realize this?

PhillHenry
@PhillHenry
@fuyichengccc Not used it myself but you might want to look at RDDFunctions.sliding
fuyichengccc
@fuyichengccc
hey @PhillHenry thanks, i am looking this.
Gosha Emelyanov
@goshaQ
Can someone explain what is the best way to extract multiple dataframes from single rdd of JSON documents? Should I specify several schemes and then convert rdd to dataframe several times, or convert whole rdd using inferred schema and then using selects create dataframes that I need? Thanks
Piyush
@piyushpatel2005_gitlab
@goshaQ how many schemas you have to define.
@goshaQ please note that inferring schema may not result in exactly what you want if the data is unstructured or not reliable.
Creating schema is always the best option.
Smit Shah
@sshah90
Hello, I need to write some text on the Top of CSV File after writing df to CSV from Spark. I am pretty new to Scala and not able to find proper way to achieve that. Any suggestions ?
Benjamin White
@benmwhite
@sshah90 Why not just have a separate readme text file for that in the directory?
zhangpujing
@zhangpujing
spark先分区在排序使用哪个算子比较好
Spark first partition and then sort which operator is better
Sort each partition after partitioning
PhillHenry
@PhillHenry
@zhangpujing Interesting question and I'd say: always try it with realistic data as the shape of your data may change performance.
My guess is that the partitioning is not necessary. My question is: why do you think partitioning is necessary in the first place?
If you do experiment, could you please post your results back to the forum so we can all learn?
Gosha Emelyanov
@goshaQ
@piyushpatel2005_gitlab Actually, a lot. The thing is that there are different formats of JSON documents and I need to unify them and map to two dataframes (btw, is that possible in schema define expected name for each column?). The inferred schema is not accurate, that's true, some fields are not processed properly, i.e. arrays, but it allows to avoid writing huge schemes for different data and also for some JSON documents all the fields are not known in advance, so it allows to explore the whole collection and see what structure it has. But is there any benefit in terms of resource consumption using manually defined schema and making several dataframes from single rdd, instead of using select operator from one huge dataframe?
Btw, how it's better to store all this different schemes? Create package and for each type of document define an object with schema definition? Just store almost a hundred lines of text it in code looks strange to me
zhangpujing
@zhangpujing
@PhillHenry input:
A online time time1
A online time time2
A LogoutTime time3
A online time time4
A LogoutTime time5
B LogoutTime time1
B online time time2
@PhillHenry output:

A online time1 LogoutTime3

A online time4 LogoutTime5

B online time2 LogoutTime null

@PhillHenry One is the raw data format and one is the required result