Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Javier Luraschi
    sc1 <- spark_connect(master = “local”, app_name = “app1”)
    sc2 <- spark_connect(master = “local”, app_name = “app2”)
    Nikunj Maheshwari

    You can indeed have two connections in sparklyr, but not support for latere yet. To set up the connecetions:

    I tried it, but got an error box when I executed the 2nd spark_connect command saying "R code execution error". Then a bunch of error in console in red starting with "Error: org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;"

    Should I open this up as an issue?
    Javier Luraschi
    Feel free to open an issue; however, this worked for me… so please include operating system, R version, sessionInfo(), etc:
    > sc1 <- spark_connect(master = "local", app_name = "app1")
    * Using Spark: 2.3.3
    > sc2 <- spark_connect(master = "local", app_name = "app2")
    * Using Spark: 2.3.3
    Nikunj Maheshwari
    Hmmm. I am using Spark 2.3.2
    I will open up an issue today with the required info
    Also tried on Spark 2.4.0 with same error
    Nikunj Maheshwari
    On a similar note, assuming I am able to start two Spark connections, can I access Spark tables loaded in sc1 from sc2?
    Javier Luraschi
    No, you can’t. Unless you save the tables to disk, say HDFS, and then reload them in the other connection.
    @here am trying to run RStudio as docker in an ec2 with IAM role attached to it, however when I run spark_read_csv which reads from S3, it fails due to 403
    here is the example code that am using to achieve this
    spark_install(version = "2.3.2", hadoop_version = "2.7")
    conf <- spark_config()
    conf$sparklyr.defaultPackages <- c("com.databricks:spark-csv_2.10:1.5.0",
    conf$`sparklyr.cores.local` <- 8
    conf$`sparklyr.shell.driver-memory` <- "32G"
    conf$spark.memory.fraction <- 0.9
    sc <- spark_connect(master = "local", 
                        version = "2.3.2",
                        config = conf)
    ctx <- sparklyr::spark_context(sc)
    jsc <- invoke_static(sc, 
    hconf <- jsc %>% invoke("hadoopConfiguration")
    hconf %>% invoke("set", "com.amazonaws.services.s3a.enableV4", "true")
    hconf %>% invoke("set", "fs.s3a.fast.upload", "true")
    hconf %>% invoke("set", "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hconf %>% invoke("set","fs.s3a.access.key", "accesskey") 
    hconf %>% invoke("set","fs.s3a.secret.key", "secretkey")
    curr_csv <- spark_read_csv(sc, name = "csv_ex",
                                            path = "s3a://test_dir/csv_read_test")
    can someone point me whats missing in here
    Javier Luraschi
    Could you share the stacktrace of this error?
    @javierluraschi here is the stacktrace
    This message was deleted
    Error: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: XXXXXXXXX, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: EXXXXXXXXXX at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:718) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:389) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sparklyr.Invoke.invoke(invoke.scala:147) at sparklyr.StreamHandler.handleMethodCall(stream.scala:123) at sparklyr.StreamHandler.read(stream.scala:66) at sparklyr.BackendHandler.channelRead0(handler.scala:51) at sparklyr.BackendHandler.channelRead0(handler.scala:4) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.nett
    Javier Luraschi
    I would try specyfing the credeentials as environment variablees using sys.setenv() and then reconnect, see https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
    What happens when running sdf_collect? we've been seeing some out of memory error when running this thru livy :( .. and it wasn't too much data (40M of data)? Any thoughts as to what we need to check ?
    Anyone tried successfully on spark_connect on k8s cluster?
    I tried successfully creating pod spark-pi-driver on k8s but failed to connect as the log showed below:
    19/10/16 02:37:30 INFO sparklyr: Gateway (34847) is terminating backend since no client has connected after 60 seconds to
    Jordan Bentley
    when using the invoke methods, there seem to be some automatic conversions, such as scala maps will become R lists
    are these documented anywhere?
    I can't seem to find a list of valid conversions
    Javier Luraschi
    This is the best documentation we have so far:
    The conversion tables are not that hard to read:
    Hi everyone, I am doing a project for my university and I am facing a small issue when I try integrating SparklyR with Shiny. I googled the issue but could not find relevant solution. Any thoughts as to where I am going wrong would be most appreciated. Sample code is as follows:
    library(dplyr, warn.conflicts = FALSE)
    sc <- spark_connect(master = "local", spark_version = "2.4.3")
    if(file.exists("source")) unlink("source", TRUE)
    if(file.exists("source-out")) unlink("source-out", TRUE)
    stream_generate_test(iterations = 1)
    read_folder <- stream_read_csv(sc, "/Users/happy/Documents/Project/planes_final.csv") 
    partition <- read_folder %>%
      filter(total_arr_delay >= 5) %>%
      mutate(hour = floor(act_dep_time/100)) %>%
      sdf_random_split(training = 0.8, test = 0.2)
    data_training <- partition$training
    data_test <- partition$test
    ui <- function(){
    server <- function(input, output, session){
      ps <- reactiveSpark(partition)
      output$table <- renderTable({
        # Machine Learning Application
        # Naive Bayes
        fit_nb_model <- data_training %>%
          ml_naive_bayes(total_arr_delay ~ hour + day_of_week + total_dep_delay)
        pred4 <- ml_predict(fit_nb_model, data_test)
    runGadget(ui, server)
    The error is as follows:
    Warning: Error in : Unable to retrieve a spark_connection from object of class list
      56: stop
      55: spark_connection.default
      53: reactiveSpark
      52: <Anonymous> [#3]
      50: server
    Error : Unable to retrieve a spark_connection from object of class list
    Javier Luraschi
    A few comments:
    1) I would try to get this working without Shiny first
    2) I don’t think you want to train over a stream, first is not supported, second is too slow. Instead, you want to train with a fixed dataset first, save the model. Then load the model in shiny and only use the model for predicitons over a stream
    Thank you so much. I have created the model. Now, I am not sure how to load it into shiny. Any thoughts would be great help.
    partition <- data %>%
      filter(total_arr_delay >= 5) %>%
      mutate(hour = floor(act_dep_time/100)) %>%
      mutate(act_arr_time = as.double(act_arr_time),
             wheels_off_time = as.double(wheels_off_time),
             wheels_on_time = as.double(wheels_on_time),
             sch_dep_time = as.double(sch_dep_time),
             sch_arr_time = as.double(sch_arr_time),
             sch_elapsed_time = as.double(sch_elapsed_time),
             total_taxi_in_mins = as.double(total_taxi_in_mins),
             total_taxi_out_mins = as.double(total_taxi_out_mins),
             total_air_time = as.double(total_air_time),
             act_dep_time = as.double(act_dep_time),
             act_elapsed_time = as.double(act_elapsed_time),
             cancelled = as.double(cancelled),
             nas_delay = as.double(nas_delay),
             origin_airport_id = as.double(origin_airport_id),
             dest_airport_id = as.double(dest_airport_id),
             diverted = as.double(diverted),
             security_delay = as.double(security_delay),
             late_aircraft_delay = as.double(late_aircraft_delay)) %>%
      sdf_random_split(training = 0.8, test = 0.2, seed = 2222)
    data_training <- partition$training
    data_test <- partition$test
    # Linear Regression
    fit_linear_train <- data_training %>%
        response = "total_arr_delay",
        features = c("hour", "day_of_week", "total_dep_delay")
    pred <- ml_predict(fit_linear_train, data_test)
    ml_regression_evaluator(pred, label_col = "total_arr_delay")
    # Logistic Regression
    fit_lr_model <- data_training %>%
      ml_logistic_regression(total_arr_delay ~ hour + day_of_week + total_dep_delay)
    pred <- ml_predict(fit_lr_model, data_test)
    # Gradient Boost Trees
    fit_gbt_model <- data_training %>%
      ml_gradient_boosted_trees(total_arr_delay ~ hour + day_of_week + total_dep_delay)
    pred <- ml_predict(fit_gbt_model, data_test)
    ml_regression_evaluator(pred, label_col = "total_arr_delay")
    # Naive Bayes
    fit_nb_model <- data_training %>%
      ml_naive_bayes(total_arr_delay ~ hour + day_of_week + total_dep_delay)
    pred <- ml_predict(fit_nb_model, data_test)
    # Calculating Lift, Gain, Accuracy, Performance Metrics and Feature Importance
    ml_models <- list(
      "Logistic" = fit_lr_model,
      "Linear" = fit_linear_train,
      "Gradient Boosted Trees" = fit_gbt_model,
      "Naive Bayes" = fit_nb_model
      #"Decision Tree" = fit_ml_dt,
      #"Random Forest" = fit_ml_rf
    score_test_data <- function(model, data = data_test)
      pred <- ml_predict(model, data) 
      select(pred, late_aircraft_delay, prediction)
    ml_score <- lapply(ml_models, score_test_data)
    # Calculate lift
    calculate_lift <- function(scored_data)
      scored_data %>%
        mutate(bin = ntile(desc(prediction), 10)) %>% 
        group_by(bin) %>% 
        summarize(count = sum(late_aircraft_delay)) %>% 
        mutate(prop = count / sum(count)) %>% 
        arrange(bin) %>% 
        mutate(prop = cumsum(prop)) %>% 
        select(-count) %>% 
        collect() %>% 
    # Initialize results
    ml_gains <- data.frame(bin = 1:10, prop = seq(0, 1, len = 10), model = "Base")
    for(i in names(ml_score))
      ml_gains <- ml_score[[i]] %>%
        calculate_lift %>%
        mutate(model = i) %>%
        rbind(ml_gains, .)
    I am able to plot them in R using GGPlot but I would like to do the same in Shiny instead.
    # Plot results
    ggplot(ml_gains, aes(x = bin, y = prop, colour = model)) +
      geom_point() + geom_line() +
      ggtitle("Lift Chart for Predicting Late Aircraft Delay - Test Data Set") + 
      xlab("") + ylab("")
    # Function for calculating accuracy
    calc_accuracy <- function(data, cutpoint = 0.5)
      data %>%  
        mutate(prediction = if_else(prediction > cutpoint, 1.0, 0.01)) %>%
        ml_binary_classification_evaluator("prediction", "late_aircraft_delay")
    # Calculate AUC and accuracy
    perf_metrics <- data.frame(model = names(ml_score), 
                               AUC = 100 * sapply(ml_score, ml_binary_classification_evaluator, 
                                                   "prediction", "late_aircraft_delay"), 
                               Accuracy = 100 * sapply(ml_score, calc_accuracy), 
                               row.names = NULL, stringsAsFactors = FALSE)
    # Plot results
    gather(perf_metrics, metric, value, AUC, Accuracy) %>%
      ggplot(aes(reorder(model, value), value, fill = metric)) + 
      geom_bar(stat = "identity", position = "dodge") + 
      coord_flip() +
      xlab("") +
      ylab("Percent") +
      ggtitle("Performance Metrics")
    Any cheatsheat for shiny input, output commands would be greatly helpful. Thanks a million
    Javier Luraschi
    You need to use ml_save() and ml_load(). So shiny would still use a spark connection, but it can be a master = “local” connection to reload the model.
    Sorry for the delay in replying. Thank you so much for sharing the book 'The R in Spark' !!!
    This has been greatly helpful in many ways.
    I am just following step by step and learned many new things
    I just have 1 query at this stage: how will I know that the "callr" service has been initiated? When I run the code, I don't see any symbol in RStudio indicating that there is something going on in the background. So I waited for some time and then tried to query the service, but I am getting error message Error in curl::curl_fetch_memory(url, handle = handle) : Failed to connect to port 8000: Connection refused

    I am trying to generate a k-means clustering graph on the spark model in shiny. My server code is as follows:
    server <- function(input, output, session) {
    planes_final = ml_load(sc, "spark-model",
    columns = sapply(datasets::planes_final, class)) %>%
    selectedData <- reactive(planes_final()[, c(input$xcol, input$ycol)])
    clusters <- reactive(kmeans(selectedData(), input$clusters))
    output$plot1 <- renderPlot({
    par(mar = c(5.1, 4.1, 0, 1))
    plot(selectedData(), col = clusters()$cluster, pch = 20, cex = 3)
    points(clusters()$centers, pch = 4, cex = 4, lwd = 4)

    When I try to run the Shiny App, I get the following error:


    Re-using existing Spark connection to local
    Error in func(fname, ...) : app.R did not return a shiny.appobj object.
    Francisco Palomares
    Hi, R room ?
    Hello, I am having troubles when I call sparkxgb library, I could not create the spark session.
    says :
    "Failed while connecting to sparklyr to port 8880 for session id : Gateway in localhost:8880 did not respond