Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    David Morin
    @morind
    Hi @Galsor ! Sorry, no unique answer for now. It could be issued by different things: memory issue with OOM (GC), coalesce onto 1 partition and shuffle that takes too long (timeout), .. Is it related to the amount of data? Is it possible for you to provide us an example to reproduce this error with Pandas UDFs? Thanks
    Antiez
    @Galsor
    My output is around 7k rows for 21 columns.
    I can show() the output of the agg method that use the pandas_udf so I think the problem is circumscribe to write.csv()
    I increased the spark.network.timeout at 300000 as it was mentioned as a potential cause of error with HeartBeatReceiver
    I am running tests and will see if it is of any help
    David Morin
    @morind
    What is your configuration you use for your job in terms of memory, cores for driver and executor and number of executors you use ? What is the list of parameters defined in your code (timeout, ...) ?
    Antiez
    @Galsor
        spark = SparkSession\
            .builder\
            .appName("OutliersThresholdApp") \
            .config("maxResultSize", "2g")\
            .config("spark.hadoop.fs.s3a.access.key", cfg.OVH_ACCESS_KEY_ID)\
            .config("spark.hadoop.fs.s3a.secret.key", cfg.OVH_SECRET_KEY)\
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.path.style.access", "true")\
            .config("spark.hadoop.fs.s3a.endpoint", "s3.gra.cloud.ovh.net")\
            .config("spark.hadoop.fs.s3a.connection.maximum", 100)\
            .config("spark.hadoop.fs.s3a.connection.establish.timeout", 20000)\
            .config("spark.hadoop.fs.s3a.connection.timeout", 500000)\
            .config("spark.network.timeout", 300000)\
            .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
            .config("spark.driver.extraJavaOptions","-Dio.netty.tryReflectionSetAccessible=true")\
            .config("spark.executor.extraJavaOptions","-Dio.netty.tryReflectionSetAccessible=true")\
            .config("spark.executor.memory", "15g")\
            .config("spark.driver.memory", "15g")\
            .config("spark.cores.max", "4")\
            .getOrCreate()
    Digging into the logs of Spark UI I found this :
    ExecutorLostFailure (executor 1 exited unrelated to the running tasks) Reason: Executor killed by driver.
    This message was deleted
    Antiez
    @Galsor
    I tried to upgrade the size of the job via the console and I got this error
    image.png
    David Morin
    @morind
    Normally, the minimal overhead memory is 384 MB. Do not hesitate to increase because sometimes memory issues are related to this space. Some libs use off-heap memory.
    Thanks for the configuration. FYI spark.executor.memory and spark.driver.memory are runtime parameters and they will be overrided by values provided during submit (In the Manager or with OVHcloud Spark Submit). Same for spark.cores.max.
    Antiez
    @Galsor
    Ok good to know. Thanks.
    Regarding the overhead memory are you refering to spark.executor.memoryOverhead?
    David Morin
    @morind
    yes exactly. In fact overhead memory parameters provided in the manager or OVHcloud Dataprocessing CLI are translated into spark.executor.memoryOverhead or spark.driver.memoryOverhead.
    Concerning timeout do not hesitate to increase spark.executor.heartbeatInterval with spark.executor.heartbeatInterval < spark.network.timeout especially in case of GC
    David Morin
    @morind
    For example spark.executor.heartbeatInterval = 200000 and spark.network.timeout >= 300000 "spark.executor.heartbeatInterval should be significantly less than spark.network.timeout"
    Antiez
    @Galsor
    Unfortunatly it didn't fix the problem... Driver keeps killing the executor...
    Antiez
    @Galsor
    Ok my mistake. Increesing memory overhead at 2Go enabled the script to write the file.
    However and even if the file seems properly generated the previous error kept occurring and ended with :
      File "/opt/spark/workdir/outliers_threshold_job.py", line 54, in <module>
        .csv(filepath, header="true")
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1027, in csv
      File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
      File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o357.csv.
    : org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
        at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:953)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
    David Morin
    @morind
    Ok @Galsor good news. At least one file is generated now. So you have one part file that contains some data ?
    When you said "the previous error kept occurring" is it this one "ExecutorLostFailure (executor 1 exited unrelated to the running tasks) Reason: Executor killed by driver." or the one concerning the HeartbeatReceiver ?
    What are the lines of code concerning the read and the write of CSV files with the filepath (at least the scheme: s3a, ...) ?
    Do you have any other errors/exceptions in your logs ?
    Antiez
    @Galsor
    • Yes all data is written in a single part-000 file.
    • Both errors are one. ExecutorLostFailure is mentioned in SparkUI logs while HeartbeatReceiver warning is raised in OVH logs.
    sdf = spark.read.format('csv')\
            .options(nanValue="\\N")\
            .schema(cfg.SPARK_SCHEMA)\
            .load('s3a://'+cfg.DATA_BUCKET_NAME+'/chunks/')
    
    sdf_thresholds = sdf.select(*[cfg.DIM_ANALIZED+cfg.NUMERICAL_METRICS])\
            .groupby(cfg.DIM_ANALIZED)\
            .agg( [pandas_udf(col) for col in cfg.NUMERICAL_METRICS])
    
    filepath = "s3a://"+cfg.BUCKET_NAME+"/"+filename
    sdf_thresholds.coalesce(numPartitions=1).write.csv(filepath, header="true")
    David Morin
    @morind
    Ok. So the Job is still killed because of timeout is triggered. When you set "spark.executor.heartbeatInterval" the parameter is ignored ? Do you still have the same timeout value in log "..heartbeats: 155668 ms exceeds timeout 120000 ms" ?
    Just for test, could you try this:
    .set("spark.executor.heartbeatInterval", "300s") \
    .set("spark.network.timeout", "600s")
    Antiez
    @Galsor
    Error is solved
    Thanks a lot for your help.
    David Morin
    @morind
    Good news ! just in case of someone else has the same error, how did you solve it ?
    Antiez
    @Galsor
    Mainly with these configs:
        import config as cfg
        from custom_module import compute_thresholds
    
        spark = SparkSession\
            .builder\
            .appName("OutliersThresholdApp") \
            .config("maxResultSize", "2g")\
            .config("spark.hadoop.fs.s3a.access.key", cfg.OVH_ACCESS_KEY_ID)\
            .config("spark.hadoop.fs.s3a.secret.key", cfg.OVH_SECRET_KEY)\
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.path.style.access", "true")\
            .config("spark.hadoop.fs.s3a.endpoint", "s3.gra.cloud.ovh.net")\
            .config("spark.hadoop.fs.s3a.connection.maximum", 100)\
            .config("spark.hadoop.fs.s3a.connection.establish.timeout", 20000)\
            .config("spark.hadoop.fs.s3a.connection.timeout", 500000)\
            .config("spark.network.timeout", 300000)\
            .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
            .config("spark.driver.extraJavaOptions","-Dio.netty.tryReflectionSetAccessible=true")\
            .config("spark.executor.extraJavaOptions","-Dio.netty.tryReflectionSetAccessible=true")\
            .config("spark.executor.memoryOverhead", "2g")\
            .getOrCreate()
    
        sdf = spark.read.format('csv').schema(cfg.SPARK_SCHEMA).load(cfg.INPUT_PATH)
    
        # compute_thresholds is pandas_udf
        sdf_thresholds = sdf.agg(*[compute_thresholds(col) for col in cfg.NUMERICAL_METRICS])
    
        sdf_thresholds.coalesce(numPartitions=1).write.csv(cfg.OUTPUT_PATH, header="true")
        spark.stop()
    btw I have tried to log information with logging module from my pandas_udf function (imported from another python file) and nothing appeared in the logs reported in Data Processing console.
    Any Idea how to configure loggingin order to report this logs as well?
    David Morin
    @morind
    Thanks @Galsor for your feedback. Concerning the logs, at the moment, only logs from Spark Driver are pushed to Data Processing console and uploaded to Swift at the end of the job. Logs on the Executor side are not collected yet but... this feature is in development. We're currently working on it. So it's coming soon.. I will keep you informed here on gitter as soon as it is available.
    Antiez
    @Galsor
    Great
    From my research it seems logging inside pandas_udf is slightly more complex than generic logging.
    I can't manage to plot these logs even locally
    Antiez
    @Galsor
    May I suggest that the kind of use case I delt with should deserve documentation on Data Processing home page? Especially regarding resources and time out management for file writing. It has been quite complex to navigate through solutions...
    David Morin
    @morind
    Ok, even with a a custom log4j.properties it does not work? Concerning the documentation, we've planned to add some docs especially documentation to facilitate onboarding with OVHcloud Data Processing. And new code samples on github with custom configurations like the ones you had to deal with. Thanks @Galsor for your feedbacks.
    EricB
    @Pooouf

    HI,
    I'm trying to use the API to fire up a Python job
    https://api.ovh.com/console/#/cloud/project/%7BserviceName%7D/dataProcessing/jobs#POST

    I have a hard time figuring out what values are required or possible (i.e. region is expected to be "GRA", not "ovh-eu", right ?)

    Does anyone know where I can find an example ?

    David Morin
    @morind
    Hello ! Yes region is GRA for Data Processing from OVH API. Here after a sample with list of parameters to run a Spark example in Python with Data Processing from OVH API.
    { "containerName": "test", "engine": "spark", "engineParameters": [ { "name": "main_application_code", "value": "test.py" }, { "name": "driver_cores", "value": "1" }, { "name": "driver_memory", "value": "2048" }, { "name": "executor_cores", "value": "1" }, { "name": "executor_num", "value": "1" }, { "name": "executor_memory", "value": "2048" }, { "name": "job_type", "value": "python" } ], "engineVersion": "3.0.1", "name": "testspark", "region": "GRA" }
    EricB
    @Pooouf
    Merci !
    EricB
    @Pooouf

    @morind I filled the form https://api.ovh.com/console/#/cloud/project/%7BserviceName%7D/dataProcessing/jobs#POST and could run a job.
    Now, I'm trying to use the Python code that the form generates. But it does not work.

    The generated code states :
    result = client.post('/cloud/project/999999999999999/dataProcessing/jobs', ='{"containerName":"spark-files-01","engine":"spark","engineParameters":[{"name":"driver_cores","value":"1"},{"name":"driver_memory","value":"2048"},{"name":"executor_cores","value":"1"},{"name":"executor_memory","value":"2048"},{"name":"executor_num","value":"1"},{"name":"job_type","value":"python"},{"name":"main_application_code","value":"wordcount.py"}],"engineVersion":"3.0.1","region":"GRA"}', // Request Body (type: cloud.project.dataProcessing.Job) )

    I guess the = is either missing the argument name, or should not be there.
    And // Request Body (type: cloud.project.dataProcessing.Job) )should also not be there
    When I remove them, I get this error :
    ovh.exceptions.BadParametersError: Body parameter region is mandatory
    EricB
    @Pooouf
    How should I rewrite this request?
    EricB
    @Pooouf

    What that error actually says is that 'region' is expected as a named parameter.

    So instead of
    result = client.post('/cloud/project/999999999999999/dataProcessing/jobs', ='{"containerName":"spark-files-01","engine":"spark","engineParameters":[{"name":"driver_cores","value":"1"},{"name":"driver_memory","value":"2048"},{"name":"executor_cores","value":"1"},{"name":"executor_memory","value":"2048"},{"name":"executor_num","value":"1"},{"name":"job_type","value":"python"},{"name":"main_application_code","value":"wordcount.py"}],"engineVersion":"3.0.1","region":"GRA"}', // Request Body (type: cloud.project.dataProcessing.Job) )

    the post() call should look more like this:
    result = client.post('/cloud/project/999999999999999/dataProcessing/jobs', '{"containerName":"spark-files-01","engine":"spark","engineParameters":[{"name":"driver_cores","value":"1"},{"name":"driver_memory","value":"2048"},{"name":"executor_cores","value":"1"},{"name":"executor_memory","value":"2048"},{"name":"executor_num","value":"1"},{"name":"job_type","value":"python"},{"name":"main_application_code","value":"wordcount.py"}],"engineVersion":"3.0.1"}', region="GRA")

    This gives a better result: ovh.exceptions.BadParametersError: Body parameter engineVersion is mandatory
    So let's continue:

    result = client.post('/cloud/project/999999999999999/dataProcessing/jobs', '{"containerName":"spark-files-01","engine":"spark","engineParameters":[{"name":"driver_cores","value":"1"},{"name":"driver_memory","value":"2048"},{"name":"executor_cores","value":"1"},{"name":"executor_memory","value":"2048"},{"name":"executor_num","value":"1"},{"name":"job_type","value":"python"},{"name":"main_application_code","value":"wordcount.py"}], engineVersion="3.0.1", region="GRA")

    Now I get this error: ovh.exceptions.BadParametersError: Body parameter engineParameters isn't formated correctly

    EricB
    @Pooouf
    Is there a doc that I may have missed and that explains the parameters and their format ?
    impolitepanda
    @impolitepanda
    Hello @Pooouf ! Unfortunately there is no documentation on using the service through API or via python. Yet. They are scheduled and will be produced ASAP. In the meantime, we will try and see why your code is not working. But the body that @morind gave you is the one expected by the API. ( just to be sure, i created a container with the same name in one of our project and ran it without a hitch with a curl using the following body :
    {
    "containerName": "spark-files-01",
    "engine": "spark",
    "engineVersion": "3.0.1",
    "region": "GRA",
    "engineParameters": [{
    "name": "main_application_code",
    "value": "wordcount.py"
    }, {
    "name": "driver_cores",
    "value": "1"
    }, {
    "name": "driver_memory",
    "value": "2048"
    }, {
    "name": "executor_cores",
    "value": "1"
    }, {
    "name": "executor_memory",
    "value": "2048"
    }, {
    "name": "executor_num",
    "value": "1"
    }, {
    "name": "job_type",
    "value": "python"
    }]
    }
    Which means the problem is probably coming from the way the python call is made, somehow. I'll work on a reproducer in python to see if I can find what's going on. Can you just tell me if you're using a specific library to make your REST calls (to be more accurate in my testing) ?
    EricB
    @Pooouf

    Hi @impolitepanda

    I'm using the template of code that is generated on the API documentation page once you fill the form and run the request successfully, under the tab Python : https://eu.api.ovh.com/console/#/cloud/project/%7BserviceName%7D/dataProcessing/jobs#POST

    The entire code is :

    # -*- encoding: utf-8 -*-
    '''
    First, install the latest release of Python wrapper: $ pip install ovh
    '''
    import json
    import ovh
    
    # Instanciate an OVH Client.
    # You can generate new credentials with full access to your account on
    # the token creation page
    client = ovh.Client(
        endpoint='ovh-eu',               # Endpoint of API OVH Europe (List of available endpoints)
        application_key='xxxxxxxxxx',    # Application Key
        application_secret='xxxxxxxxxx', # Application Secret
        consumer_key='xxxxxxxxxx',       # Consumer Key
    )
    
    result = client.post('/cloud/project/999999999999999/dataProcessing/jobs', 
        ='{"containerName":"spark-files-01","engine":"Spark","engineParameters":[{"name":"driver-cores","value":"1"},{"name":"driver-memory","value":"4G"},{"name":"executor-cores","value":"1"},{"name":"executor-memory","value":"4G"},{"name":"num-executors","value":"1"}],"engineVersion":"3.0.1","name":"wordcount.py","region":"ovh-eu"}', // Request Body (type: cloud.project.dataProcessing.Job)
    )
    
    # Pretty print
    print json.dumps(result, indent=4)

    Note that there are syntaxic issues in that generated code snippet:

    • the =character
    • the // Request Body (type: cloud.project.dataProcessing.Job)
    • missing parenthesis for the printfunction call
    EricB
    @Pooouf

    Sorry, wrong copy/paste.

    The actual snippet is :

    # -*- encoding: utf-8 -*-
    '''
    First, install the latest release of Python wrapper: $ pip install ovh
    '''
    import json
    import ovh
    
    # Instanciate an OVH Client.
    # You can generate new credentials with full access to your account on
    # the token creation page
    client = ovh.Client(
        endpoint='ovh-eu',               # Endpoint of API OVH Europe (List of available endpoints)
        application_key='xxxxxxxxxx',    # Application Key
        application_secret='xxxxxxxxxx', # Application Secret
        consumer_key='xxxxxxxxxx',       # Consumer Key
    )
    
    result = client.post('/cloud/project/999999999999999/dataProcessing/jobs', 
        ='{"containerName":"spark-files-01","engine":"spark","engineParameters":[{"name":"driver_cores","value":"1"},{"name":"driver_memory","value":"2048"},{"name":"executor_cores","value":"1"},{"name":"executor_memory","value":"2048"},{"name":"executor_num","value":"1"},{"name":"job_type","value":"python"},{"name":"main_application_code","value":"wordcount.py"}],"engineVersion":"3.0.1","region":"GRA"}', // Request Body (type: cloud.project.dataProcessing.Job)
    )
    
    # Pretty print
    print json.dumps(result, indent=4)
    EricB
    @Pooouf

    BTW, Merci @impolitepanda

    It works when I call post() with names parameters:

    result = client.post('/cloud/project/999999999999999/dataProcessing/jobs',
        containerName="spark-files-01",
        engine="spark",
        region="GRA",
        engineVersion="3.0.1",
        engineParameters= [{
            "name": "main_application_code",
            "value": "wordcount.py"
            }, {
            "name": "driver_cores",
            "value": "1"
            }, {
            "name": "driver_memory",
            "value": "2048"
            }, {
            "name": "executor_cores",
            "value": "1"
            }, {
            "name": "executor_memory",
            "value": "2048"
            }, {
            "name": "executor_num",
            "value": "1"
            }, {
            "name": "job_type",
            "value": "python"
            }]
        )

    Thanks a lot!

    impolitepanda
    @impolitepanda
    @Pooouf No problem, even though I didn't do much :) Glad to hear it's working well. Don't hesitate to give us your feedback as it will help us improve the service :)