Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Javier Luraschi
    @javierluraschi
    The name parameter is not very accurate for spark_read_table()… All the read functions share the same parameters/docs, in this case you want to interpret name as the table to read (not write to).
    Ying
    @ying1
    Ok. cool. Thank you!
    Nikunj Maheshwari
    @nik-maheshwari
    Hi all. Can I have 2 spark connections on 2 different R processes at the same time? I am in an R session, and I start 2 additional R workers. On one of these workers, I am sending out some spark jobs via future(). Scenario 1 - I also have a spark connection running in my R session. The future() always break. Scenario 2 - I don't have a spark connection in my R session, and future() runs fine.
    Nikunj Maheshwari
    @nik-maheshwari
    So short answer, I can't on Windows.
    Javier Luraschi
    @javierluraschi
    You can indeed have two connections in sparklyr, but not support for latere yet. To set up the connecetions:
    sc1 <- spark_connect(master = “local”, app_name = “app1”)
    sc2 <- spark_connect(master = “local”, app_name = “app2”)
    Nikunj Maheshwari
    @nik-maheshwari

    You can indeed have two connections in sparklyr, but not support for latere yet. To set up the connecetions:

    I tried it, but got an error box when I executed the 2nd spark_connect command saying "R code execution error". Then a bunch of error in console in red starting with "Error: org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;"

    Should I open this up as an issue?
    Javier Luraschi
    @javierluraschi
    Feel free to open an issue; however, this worked for me… so please include operating system, R version, sessionInfo(), etc:
    > sc1 <- spark_connect(master = "local", app_name = "app1")
    * Using Spark: 2.3.3
    > sc2 <- spark_connect(master = "local", app_name = "app2")
    * Using Spark: 2.3.3
    Nikunj Maheshwari
    @nik-maheshwari
    Hmmm. I am using Spark 2.3.2
    I will open up an issue today with the required info
    Also tried on Spark 2.4.0 with same error
    Nikunj Maheshwari
    @nik-maheshwari
    On a similar note, assuming I am able to start two Spark connections, can I access Spark tables loaded in sc1 from sc2?
    Javier Luraschi
    @javierluraschi
    No, you can’t. Unless you save the tables to disk, say HDFS, and then reload them in the other connection.
    Vineel
    @vineel258
    @here am trying to run RStudio as docker in an ec2 with IAM role attached to it, however when I run spark_read_csv which reads from S3, it fails due to 403
    here is the example code that am using to achieve this
    library(sparklyr)
    spark_install(version = "2.3.2", hadoop_version = "2.7")
    conf <- spark_config()
    conf$sparklyr.defaultPackages <- c("com.databricks:spark-csv_2.10:1.5.0",
                                       "com.amazonaws:aws-java-sdk-pom:1.10.34",
                                       "org.apache.hadoop:hadoop-aws:2.7.3")
    conf$`sparklyr.cores.local` <- 8
    conf$`sparklyr.shell.driver-memory` <- "32G"
    conf$spark.memory.fraction <- 0.9
    sc <- spark_connect(master = "local", 
                        version = "2.3.2",
                        config = conf)
    ctx <- sparklyr::spark_context(sc)
    jsc <- invoke_static(sc, 
                         "org.apache.spark.api.java.JavaSparkContext", 
                         "fromSparkContext", 
                         ctx)
    hconf <- jsc %>% invoke("hadoopConfiguration")
    hconf %>% invoke("set", "com.amazonaws.services.s3a.enableV4", "true")
    hconf %>% invoke("set", "fs.s3a.fast.upload", "true")
    hconf %>% invoke("set", "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hconf %>% invoke("set","fs.s3a.access.key", "accesskey") 
    hconf %>% invoke("set","fs.s3a.secret.key", "secretkey")
    sparklyr::spark_connection_is_open(sc=sc)
    curr_csv <- spark_read_csv(sc, name = "csv_ex",
                                            path = "s3a://test_dir/csv_read_test")
    Vineel
    @vineel258
    can someone point me whats missing in here
    Javier Luraschi
    @javierluraschi
    Could you share the stacktrace of this error?
    Vineel
    @vineel258
    @javierluraschi here is the stacktrace
    This message was deleted
    Error: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: XXXXXXXXX, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: EXXXXXXXXXX at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:718) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:389) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sparklyr.Invoke.invoke(invoke.scala:147) at sparklyr.StreamHandler.handleMethodCall(stream.scala:123) at sparklyr.StreamHandler.read(stream.scala:66) at sparklyr.BackendHandler.channelRead0(handler.scala:51) at sparklyr.BackendHandler.channelRead0(handler.scala:4) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.nett
    Javier Luraschi
    @javierluraschi
    I would try specyfing the credeentials as environment variablees using sys.setenv() and then reconnect, see https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
    Ying
    @ying1
    What happens when running sdf_collect? we've been seeing some out of memory error when running this thru livy :( .. and it wasn't too much data (40M of data)? Any thoughts as to what we need to check ?
    Benjamin
    @bmkor
    Anyone tried successfully on spark_connect on k8s cluster?
    Benjamin
    @bmkor
    I tried successfully creating pod spark-pi-driver on k8s but failed to connect as the log showed below:
    19/10/16 02:37:30 INFO sparklyr: Gateway (34847) is terminating backend since no client has connected after 60 seconds to 10.244.9.17/8880.
    Jordan Bentley
    @jbentleyEG
    when using the invoke methods, there seem to be some automatic conversions, such as scala maps will become R lists
    are these documented anywhere?
    I can't seem to find a list of valid conversions
    Javier Luraschi
    @javierluraschi
    This is the best documentation we have so far:
    The conversion tables are not that hard to read:
    happy725
    @happy725
    Hi everyone, I am doing a project for my university and I am facing a small issue when I try integrating SparklyR with Shiny. I googled the issue but could not find relevant solution. Any thoughts as to where I am going wrong would be most appreciated. Sample code is as follows:
    library(sparklyr)
    library(dplyr, warn.conflicts = FALSE)
    library(ggplot2)
    
    sc <- spark_connect(master = "local", spark_version = "2.4.3")
    
    if(file.exists("source")) unlink("source", TRUE)
    if(file.exists("source-out")) unlink("source-out", TRUE)
    
    stream_generate_test(iterations = 1)
    read_folder <- stream_read_csv(sc, "/Users/happy/Documents/Project/planes_final.csv") 
    
    partition <- read_folder %>%
      filter(total_arr_delay >= 5) %>%
      mutate(hour = floor(act_dep_time/100)) %>%
      sdf_random_split(training = 0.8, test = 0.2)
    
    data_training <- partition$training
    data_test <- partition$test
    
    library(shiny)
    ui <- function(){
      tableOutput("Table")
    }
    server <- function(input, output, session){
    
      ps <- reactiveSpark(partition)
    
      output$table <- renderTable({
        # Machine Learning Application
        # Naive Bayes
        fit_nb_model <- data_training %>%
          ml_naive_bayes(total_arr_delay ~ hour + day_of_week + total_dep_delay)
        pred4 <- ml_predict(fit_nb_model, data_test)
       ml_multiclass_classification_evaluator(pred4)
      })
    }
    runGadget(ui, server)
    The error is as follows:
    Warning: Error in : Unable to retrieve a spark_connection from object of class list
      56: stop
      55: spark_connection.default
      53: reactiveSpark
      52: <Anonymous> [#3]
      50: server
    Error : Unable to retrieve a spark_connection from object of class list
    Javier Luraschi
    @javierluraschi
    A few comments:
    1) I would try to get this working without Shiny first
    2) I don’t think you want to train over a stream, first is not supported, second is too slow. Instead, you want to train with a fixed dataset first, save the model. Then load the model in shiny and only use the model for predicitons over a stream
    happy725
    @happy725
    Thank you so much. I have created the model. Now, I am not sure how to load it into shiny. Any thoughts would be great help.
    partition <- data %>%
      filter(total_arr_delay >= 5) %>%
      mutate(hour = floor(act_dep_time/100)) %>%
      mutate(act_arr_time = as.double(act_arr_time),
             wheels_off_time = as.double(wheels_off_time),
             wheels_on_time = as.double(wheels_on_time),
             sch_dep_time = as.double(sch_dep_time),
             sch_arr_time = as.double(sch_arr_time),
             sch_elapsed_time = as.double(sch_elapsed_time),
             total_taxi_in_mins = as.double(total_taxi_in_mins),
             total_taxi_out_mins = as.double(total_taxi_out_mins),
             total_air_time = as.double(total_air_time),
             act_dep_time = as.double(act_dep_time),
             act_elapsed_time = as.double(act_elapsed_time),
             cancelled = as.double(cancelled),
             nas_delay = as.double(nas_delay),
             origin_airport_id = as.double(origin_airport_id),
             dest_airport_id = as.double(dest_airport_id),
             diverted = as.double(diverted),
             security_delay = as.double(security_delay),
             late_aircraft_delay = as.double(late_aircraft_delay)) %>%
      sdf_random_split(training = 0.8, test = 0.2, seed = 2222)
    
    data_training <- partition$training
    data_test <- partition$test
    
    # Linear Regression
    fit_linear_train <- data_training %>%
      ml_linear_regression(
        response = "total_arr_delay",
        features = c("hour", "day_of_week", "total_dep_delay")
      )
    #summary(fit_linear_train)
    pred <- ml_predict(fit_linear_train, data_test)
    ml_regression_evaluator(pred, label_col = "total_arr_delay")
    
    # Logistic Regression
    fit_lr_model <- data_training %>%
      ml_logistic_regression(total_arr_delay ~ hour + day_of_week + total_dep_delay)
    #summary(fit_lr_model)
    pred <- ml_predict(fit_lr_model, data_test)
    ml_binary_classification_evaluator(pred)
    
    # Gradient Boost Trees
    fit_gbt_model <- data_training %>%
      ml_gradient_boosted_trees(total_arr_delay ~ hour + day_of_week + total_dep_delay)
    summary(fit_gbt_model)
    pred <- ml_predict(fit_gbt_model, data_test)
    ml_regression_evaluator(pred, label_col = "total_arr_delay")
    
    # Naive Bayes
    fit_nb_model <- data_training %>%
      ml_naive_bayes(total_arr_delay ~ hour + day_of_week + total_dep_delay)
    #summary(fit_nb_model)
    pred <- ml_predict(fit_nb_model, data_test)
    ml_multiclass_classification_evaluator(pred)
    
    # Calculating Lift, Gain, Accuracy, Performance Metrics and Feature Importance
    ml_models <- list(
      "Logistic" = fit_lr_model,
      "Linear" = fit_linear_train,
      "Gradient Boosted Trees" = fit_gbt_model,
      "Naive Bayes" = fit_nb_model
      #"Decision Tree" = fit_ml_dt,
      #"Random Forest" = fit_ml_rf
    )
    
    score_test_data <- function(model, data = data_test)
    {
      pred <- ml_predict(model, data) 
      select(pred, late_aircraft_delay, prediction)
    }
    
    ml_score <- lapply(ml_models, score_test_data)
    # Calculate lift
    calculate_lift <- function(scored_data)
    {
      scored_data %>%
        mutate(bin = ntile(desc(prediction), 10)) %>% 
        group_by(bin) %>% 
        summarize(count = sum(late_aircraft_delay)) %>% 
        mutate(prop = count / sum(count)) %>% 
        arrange(bin) %>% 
        mutate(prop = cumsum(prop)) %>% 
        select(-count) %>% 
        collect() %>% 
        as.data.frame()
    }
    
    # Initialize results
    ml_gains <- data.frame(bin = 1:10, prop = seq(0, 1, len = 10), model = "Base")
    for(i in names(ml_score))
    {
      ml_gains <- ml_score[[i]] %>%
        calculate_lift %>%
        mutate(model = i) %>%
        rbind(ml_gains, .)
    }
    I am able to plot them in R using GGPlot but I would like to do the same in Shiny instead.
    # Plot results
    ggplot(ml_gains, aes(x = bin, y = prop, colour = model)) +
      geom_point() + geom_line() +
      ggtitle("Lift Chart for Predicting Late Aircraft Delay - Test Data Set") + 
      xlab("") + ylab("")
    # Function for calculating accuracy
    calc_accuracy <- function(data, cutpoint = 0.5)
    {
      data %>%  
        mutate(prediction = if_else(prediction > cutpoint, 1.0, 0.01)) %>%
        ml_binary_classification_evaluator("prediction", "late_aircraft_delay")
    }
    
    # Calculate AUC and accuracy
    perf_metrics <- data.frame(model = names(ml_score), 
                               AUC = 100 * sapply(ml_score, ml_binary_classification_evaluator, 
                                                   "prediction", "late_aircraft_delay"), 
                               Accuracy = 100 * sapply(ml_score, calc_accuracy), 
                               row.names = NULL, stringsAsFactors = FALSE)
    
    library(purrr)
    library(tidyr)
    # Plot results
    gather(perf_metrics, metric, value, AUC, Accuracy) %>%
      ggplot(aes(reorder(model, value), value, fill = metric)) + 
      geom_bar(stat = "identity", position = "dodge") + 
      coord_flip() +
      xlab("") +
      ylab("Percent") +
      ggtitle("Performance Metrics")
    happy725
    @happy725
    Any cheatsheat for shiny input, output commands would be greatly helpful. Thanks a million
    Javier Luraschi
    @javierluraschi
    You need to use ml_save() and ml_load(). So shiny would still use a spark connection, but it can be a master = “local” connection to reload the model.
    happy725
    @happy725
    Sorry for the delay in replying. Thank you so much for sharing the book 'The R in Spark' !!!
    This has been greatly helpful in many ways.
    I am just following step by step and learned many new things
    I just have 1 query at this stage: how will I know that the "callr" service has been initiated? When I run the code, I don't see any symbol in RStudio indicating that there is something going on in the background. So I waited for some time and then tried to query the service, but I am getting error message Error in curl::curl_fetch_memory(url, handle = handle) : Failed to connect to 127.0.0.1 port 8000: Connection refused
    happy725
    @happy725

    I am trying to generate a k-means clustering graph on the spark model in shiny. My server code is as follows:
    server <- function(input, output, session) {
    planes_final = ml_load(sc, "spark-model",
    columns = sapply(datasets::planes_final, class)) %>%
    reactiveSpark()
    selectedData <- reactive(planes_final()[, c(input$xcol, input$ycol)])
    clusters <- reactive(kmeans(selectedData(), input$clusters))
    output$plot1 <- renderPlot({
    par(mar = c(5.1, 4.1, 0, 1))
    plot(selectedData(), col = clusters()$cluster, pch = 20, cex = 3)
    points(clusters()$centers, pch = 4, cex = 4, lwd = 4)
    })
    }

    When I try to run the Shiny App, I get the following error:

    shiny::runApp("Documents/project/shiny/planes-stream.R")

    Re-using existing Spark connection to local
    Error in func(fname, ...) : app.R did not return a shiny.appobj object.