Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Luca Morino
    @lukem1971_gitlab
    influxdb-1.8.0
    Luca Morino
    @lukem1971_gitlab
    I've resolved. It's just a matter of order. If I change the code in
         sb.append(s"${o(0)},name=test value=")
           .append(o(1))
           .append("i")
    it works
    But I can I define tag and fields?
    Marco Graziano
    @marcoeg
    I am getting compiler error at the time of writing a structured stream to Influxdb using the code of @cmetts of Jan 04 2020 13:03: AssertionError: assertion failed: hostsGroup.writeStream.saveToInfluxDB("myMetrics") while compiling: command-3369522456414636 during phase: superaccessors library version: version 2.12.10 compiler version: version 2.12.10
    Faiaz Sanaulla
    @fsanaulla
    Hey, let me check. Could you provide additional information?
    Marco Graziano
    @marcoeg

    Hi Faiaz, certainly and thank you very much for your work and for getting back to me.
    First of all, I am using Databricks; runtime 7.5 (Scala 2.12, Spark 3.0.1). I have configured the cluster to install the following libraries: chronicler_spark_structured_streaming_2_12_0_4_1-2d5d8.jar, chronicler_core_2_12_0_2_9-ad449.jar, chronicler_async_http_2_12_0_3_3-7efd4.jar, chronicler_macros_2_13_0_6_5-9bf6d.jar as downloaded from Maven.
    Code:

    import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxWriter}
    import com.github.fsanaulla.chronicler.spark.structured.streaming._
    
    final case class InfluxConfig(
        host: String = s"http://54.215.156.234",
        port: Int = 8086,
        credentials: Option[InfluxCredentials] = Some(InfluxCredentials("tms_monitor", "9bT1.P\"VbH64|oSWi0A")),
        compress: Boolean = false,
        ssl: Boolean = false)
    
    implicit lazy val influxConf: InfluxConfig = InfluxConfig("http://40.121.44.97", 8086, Some(InfluxCredentials("tms_monitor", "9bT1.P\"VbH64|oSWi0A")))
    
    import org.apache.spark.sql.streaming.DataStreamWriter
    import org.apache.spark.sql.Row
    
    import org.joda.time.DateTime
    import org.joda.time.format.DateTimeFormat
    // import com.github.fsanaulla.chronicler.core.alias.ErrorOr // ErrorOr not found
    
    import com.github.fsanaulla.chronicler.macros.Influx
    implicit val wr: InfluxWriter[Row] = new InfluxWriter[Row] {
    //   override def write(o: Row): ErrorOr[String] = {
       override def write(o: Row): String = {
         val dt = DateTime.parse(o(0).toString(), DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss"))      
         val sb =  StringBuilder.newBuilder 
    
         sb.append(s"hosts-stats,")
           .append(s"host=")
           .append(o(1).toString())
           .append(s",")
           .append(s"protocol=")
           .append(o(2).toString())
           .append(s" ")
           .append(s"packetsOut=")
           .append(o(3).asInstanceOf[Int].toString())
           .append(s",")     
           .append(s"packetsIn=")
           .append(o(4).asInstanceOf[Int].toString())
           .append(s",")
           .append(s"flows=")
           .append(o(5).asInstanceOf[Int].toString())
           .append(s" ")
           .append(dt.toString())
    
           sb.toString()
    //       Right(sb.toString())
       }
    }
    
    val stream = hostsGroupDF
      .writeStream 
      .saveToInfluxDB("myMetrics")
      .start()

    hostsGroupDF is a structured streaming dataframe obtained as follows:

    var hostsGroup = finalFlowsDF
      .withWatermark("startTime", "15 seconds")
      .filter($"IPV4_SRC_ADDR" =!= "0.0.0.0")
      .groupBy($"IPV4_SRC_ADDR", $"PROTOCOL", window($"startTime", "1 minute"))
      .agg(
        sum("OUT_PKTS").alias("PacketsOut"),
        sum("IN_PKTS").alias("PacketsIn"),
        count(lit(1)).alias("Flows")
      )
      .select(
        $"window.start".alias("Timestamp"),
        $"IPV4_SRC_ADDR".alias("Address"),
        $"PROTOCOL".alias("Protocol"),
        $"PacketsOut",
        $"PacketsIn",
        $"Flows"
      )

    I am considering trying with a different Databricks runtime (older).

    Cheers.

    Marco Graziano
    @marcoeg

    @fsanaulla this is the latest error:

    command-43609577110807:1: error: Symbol 'term com.github.fsanaulla.chronicler.spark.core' is missing from the classpath.
    This symbol is required by 'value com.github.fsanaulla.chronicler.spark.structured.streaming.DataStreamWriterOps.ch'.
    Make sure that term core is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
    A full rebuild may help if 'package.class' was compiled against an incompatible version of com.github.fsanaulla.chronicler.spark.
    val stream = hostsGroupDF

    After a second attempt the error becomes:

    java.lang.AssertionError: assertion failed: 
      hostsGroupDF.writeStream.saveToInfluxDB("myMetrics").start()
         while compiling: command-43609577110807
            during phase: superaccessors
         library version: version 2.11.12
        compiler version: version 2.11.12
       reconstructed args: -target:jvm-1.8 -deprecation -classpath  [ ..... a long list of jars]

    Also, the hostsGroupDF is a structured stream from a Kafka data source.
    Cheers.

    Faiaz Sanaulla
    @fsanaulla
    Hey, sorry for late response, I'll check it tomorrow.
    Faiaz Sanaulla
    @fsanaulla
    @marcoeg could you provide your dependency list?

    You need:

    • chronicler_spark_structured_streaming
    • chronicler-spark-rdd
    • chronicler-spark-core

    to have in place

    Marco Graziano
    @marcoeg
    @fsanaulla thanks for the suggestion. In the meanwhile, a different approach using the influxdb line protocol and HTTP posts is working. However, I am concerned that with every point in a separate HTTP request, and with possibly a large number of point at once, it will soon use all the resources and slow down the pipeline. I was wondering whether your library buffers the points of individual writes and sends HTTP requests of multiple points at once. Cheers.
    Faiaz Sanaulla
    @fsanaulla
    @marcoeg try 0.4.2 with batch processing support for structured streaming
    it will be available to download from maven in a few hours
    Marco Graziano
    @marcoeg

    @fsanaulla thanks for the suggestion. Not on Maven yet but i built it from source. I am loading the libraries you suggested, but I have some unsolved dependencies:

    //import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxWriter}
    import com.github.fsanaulla.chronicler.spark.core.model.{InfluxCredentials, InfluxWriter}
    
    import com.github.fsanaulla.chronicler.spark.structured.streaming._
    
    final case class InfluxConfig(
        host: String = s"http://xx.xx.xx.xx",
        port: Int = 8086,
        credentials: Option[InfluxCredentials] = Some(InfluxCredentials("Y", "X")),
        compress: Boolean = false,
        ssl: Boolean = false)
    
    implicit lazy val influxConf: InfluxConfig = InfluxConfig("http://xx.xx.xx.xx", 8086, Some(InfluxCredentials("Y", "X")))
    command-43609577110803:2: error: object model is not a member of package com.github.fsanaulla.chronicler.spark.core
    import com.github.fsanaulla.chronicler.spark.core.model.{InfluxCredentials, InfluxWriter}
                                                      ^
    command-43609577110803:3: error: object structured is not a member of package com.github.fsanaulla.chronicler.spark
    import com.github.fsanaulla.chronicler.spark.structured.streaming._
                                                 ^
    command-43609577110803:8: error: not found: type InfluxCredentials
        credentials: Option[InfluxCredentials] = Some(InfluxCredentials("Y", "X")),
                            ^
    command-43609577110803:8: error: not found: value InfluxCredentials
        credentials: Option[InfluxCredentials] = Some(InfluxCredentials("Y", "X"))

    Also, in relation to the code in my previous message, I am getting these other undefined symbols:

    command-43609577110806:8: error: object macros is not a member of package com.github.fsanaulla.chronicler
    import com.github.fsanaulla.chronicler.macros.Influx
                                           ^
    command-43609577110806:9: error: not found: type InfluxWriter
    implicit val wr: InfluxWriter[Row] = new InfluxWriter[Row] {
                     ^
    command-43609577110806:9: error: not found: type InfluxWriter
    implicit val wr: InfluxWriter[Row] = new InfluxWriter[Row] {

    Please advise what other libraries I need to load and how to import the symbols.
    Cheers.

    Faiaz Sanaulla
    @fsanaulla
    @marcoeg could you tell what you have already in dependency list (for chronicler)?
    Marco Graziano
    @marcoeg
    @fsanaulla I am not sure I understand your question. All my symbol dependencies are in the code and the errors I have posted. I am using Databricks in which I "install" the libraries needed in my deployment environment. Cheers.
    Marco Graziano
    @marcoeg
    @fsanaulla perhaps, I can make it more clear that I need to know which of your libraries I need to add to the Databricks deployment and how to import the following symbols: InfluxCredentials, InfluxWriter and what import statements I need to use them and saveToInfluxDB. Please quickly browse my previous posts for the details. Cheers.
    Faiaz Sanaulla
    @fsanaulla
    chronicler-url-io + chronicler-macros should be added
    Marco Graziano
    @marcoeg
    @fsanaulla how about the import statements for the symbols that are undefined? For instance to address the error: object model is not a member of package com.github.fsanaulla.chronicler.spark.core and error: object structured is not a member of package com.github.fsanaulla.chronicler.spark import com.github.fsanaulla.chronicler.spark.structured.streaming.
    Faiaz Sanaulla
    @fsanaulla
    It should be covered by dependencies that I described above
    If summarize:
    chronicler-spark-structured-streaming
    chronicler-spark-rdd
    chronicler-spark-core
    chronicler-url-io
    chronicler-macros
    I was asking which chronicler related library was installed on databricks environment
    Marco Graziano
    @marcoeg
    I have added into my Databricks deployment, exactly the three libraries you recommended. I understand it should be covered and that was my expectation, but the package names are not obvious, thus the undefined symbols. Can you please specify the import statements to use for the errors I am getting? Because I am not making any progress just by adding the libraries if then I cannot import the symbols. Thanks.
    Faiaz Sanaulla
    @fsanaulla
    @marcoeg did it help you?
    Marco Graziano
    @marcoeg

    @fsanaulla not really. For example:

    import com.github.fsanaulla.chronicler.urlhttp.management.{InfluxMng, UrlManagementClient}
    import com.github.fsanaulla.chronicler.urlhttp.shared.InfluxConfig

    still give me the errors:

    error: object management is not a member of package com.github.fsanaulla.chronicler.urlhttp
    import com.github.fsanaulla.chronicler.urlhttp.management.{InfluxMng, UrlManagementClient}
    
    error: object shared is not a member of package com.github.fsanaulla.chronicler.urlhttp
    import com.github.fsanaulla.chronicler.urlhttp.shared.InfluxConfig

    Also I add to install chronicler-core as well, to resolve the symbols in the import statement:

    import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxWriter}

    I have installed in the Databricks deployment the five libraries you suggested, plus chronicler-core all downloaded from Maven:

    chronicler_spark_rdd_2_12_0_4_1.jar
    chronicler_macros_2_13_0_6_5__1_.jar
    chronicler_url_io_2_13_0_6_5.jar
    chronicler_spark_core_2_12_0_4_2.jar
    chronicler_spark_structured_streaming_2_12_0_4_2.jar
    chronicler_core_2_12_0_2_9__1_.jar

    Cheers.

    Faiaz Sanaulla
    @fsanaulla
    Hey, @marcoeg
    I see several problems in your installation:
    Marco Graziano
    @marcoeg

    @fsanaulla I have changed all packages to Scala 2.12 and these are the artifacts all from Maven I am using:

    chronicler-macros_2.12-0.6.5.jar
    chronicler-spark-structured-streaming_2.12-0.4.2 .jar
    chronicler-url-io_2.12-0.6.5.jar
    chronicler-spark-core_2.12-0.4.2.jar
    chronicler-spark-rdd_2.12-0.4.2.jar

    I still see a problem with package names and symbols when executing:

    import com.github.fsanaulla.chronicler.urlhttp.management.{InfluxMng, UrlManagementClient}
    import com.github.fsanaulla.chronicler.urlhttp.shared.InfluxConfig

    failing with errors that seem to point to issues with symbols resolution in the pacakges:

    command-2871893778827394:1: error: object management is not a member of package com.github.fsanaulla.chronicler.urlhttp
    
    command-2871893778827394:2: error: object shared is not a member of package com.github.fsanaulla.chronicler.urlhttp

    If I do not add chronicler-core_2.12-0.2.9 this importing {InfluxCredentials, InfluxWriter} with the statement below gives the same error:

    import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxWriter}

    Cheers.

    Faiaz Sanaulla
    @fsanaulla
    To solve this:
    import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxWriter}
    you should add: chronicler-core-shared_2.12-0.6.5
    to solve this:
    command-2871893778827394:2: error: object shared is not a member of package com.github.fsanaulla.chronicler.urlhttp
    you should add chronicler-url-shared_2.12-0.6.5
    Based on your code I assume that you didn’t use management api, so you can remove this lines:
    import com.github.fsanaulla.chronicler.urlhttp.management.{InfluxMng, UrlManagementClient}
    I hope it helps
    Marco Graziano
    @marcoeg

    @fsanaulla your latest suggestions addressed the errors reported. However I am not getting this other undefined symbol:

    command-43609577110807:3: error: value saveToInfluxDB is not a member of org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]

    when executing this code:

    val stream = hostsGroup
      .writeStream 
      .saveToInfluxDB("myMetrics")
      .start()

    Cheers.

    Faiaz Sanaulla
    @fsanaulla
    It’s implicit method, you should have:
    import com.github.fsanaulla.chronicler.spark.structured.streaming._ import
    Marco Graziano
    @marcoeg

    @fsanaulla with the additional import the symbol is defined but unfortunately I get another error about a parameter conf:

    command-43609577110807:3: error: could not find implicit value for parameter conf: com.github.fsanaulla.chronicler.urlhttp.shared.InfluxConfig
      .saveToInfluxDB("_monitoring")

    InfluxConfig has been initialized as follows:

    implicit lazy val influxConf: InfluxConfig =
        InfluxConfig(s"http://$host", port, Some(InfluxCredentials(user, password)))

    Cheers.

    Faiaz Sanaulla
    @fsanaulla
    Probably you implicit parameters out of implicit search, try to locate then in one worksheet/notebook
    Marco Graziano
    @marcoeg

    @fsanaulla Unlike Spark 1.6, you had to create an instance of SparkConf, using SparkContext, in Spark 2.0 that same level of functionality is offered via SparkSession, and the instance variable in the Databricks Notebook and REPL is spark. In other words, there is no need to explicitly define a conf in Databricks and the configuration is available in spark.conf. However, adding the definition for an implicit conf gave me another error:

    implicit val conf = spark.conf
    val stream = hostsGroup
      .writeStream 
      .saveToInfluxDB("_monitoring")
      .start()

    returns with the following error:

    NoClassDefFoundError: enumeratum/EnumEntry
    Caused by: ClassNotFoundException: enumeratum.EnumEntry

    the stack trace goes back to this line:

        at com.github.fsanaulla.chronicler.spark.structured.streaming.package$DataStreamWriterOps$.saveToInfluxDB$default$3$extension(package.scala:61)
    Marco Graziano
    @marcoeg

    One more clue:

    import org.apache.spark.SparkConf
    implicit val conf: SparkConf = spark.conf
    val stream = hostsGroup
      .writeStream 
      .saveToInfluxDB("_monitoring")
      .start()
    error: type mismatch;
     found   : org.apache.spark.sql.RuntimeConfig
     required: org.apache.spark.SparkConf
    implicit val conf: SparkConf = spark.conf

    Cheers.

    Faiaz Sanaulla
    @fsanaulla
    I was talking about influx config
    Marco Graziano
    @marcoeg

    @fsanaulla after installing the enumeratum library, using:

    implicit val conf = spark.conf
    val stream = hostsGroup
      .writeStream 
      .saveToInfluxDB("_monitoring")
      .start()

    I get the error:

    NoClassDefFoundError: Could not initialize class com.github.fsanaulla.chronicler.spark.core.WriteConfig$

    same with just:

    val stream = hostsGroup
      .writeStream 
      .saveToInfluxDB("_monitoring")
      .start()

    Any chance you can get a free account with Databricks, community edition? I could put together a test notebook that would run on the community edition.

    Faiaz Sanaulla
    @fsanaulla
    I’ll try to take a look on it
    Marco Graziano
    @marcoeg

    The databricks notebook to test structured streaming into infludb using the chronicler library is at: databricks notebook
    It runs in the databricks community edition with this error in the last cell:

    NoClassDefFoundError: Could not initialize class com.github.fsanaulla.chronicler.spark.core.WriteConfig$

    Using the following libraries in a cluster with runtime version 7.4 (Scala 2.12, Spark 3.01):

    
    -rw-r--r-- 1 marco marco 323905 Apr 16 08:44 chronicler-core_2.12-0.2.9.jar
    -rw-r--r-- 1 marco marco 303680 Apr 16 08:44 chronicler-core-shared_2.12-0.6.5.jar
    -rw-r--r-- 1 marco marco  58810 Apr 16 08:45 chronicler-macros_2.12-0.6.5.jar
    -rw-r--r-- 1 marco marco  11477 Apr 16 08:45 chronicler-spark-core_2.12-0.4.2.jar
    -rw-r--r-- 1 marco marco  11244 Apr 16 08:46 chronicler-spark-rdd_2.12-0.4.2.jar
    -rw-r--r-- 1 marco marco   9575 Apr 16 08:45 chronicler-spark-structured-streaming_2.12-0.4.2.jar
    -rw-r--r-- 1 marco marco  15018 Apr 16 08:45 chronicler-url-io_2.12-0.6.5.jar
    -rw-r--r-- 1 marco marco  32055 Apr 16 08:44 chronicler-url-shared_2.12-0.6.5.jar
    -rw-r--r-- 1 marco marco  59922 Apr 16 08:44 enumeratum_2.12-1.6.1.jar

    Cheers.

    Marco Graziano
    @marcoeg
    @fsanaulla would it make more convenient for you to look at the problem if I make it into a standalone app?
    Faiaz Sanaulla
    @fsanaulla
    dm to you