Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Christopher Metts
    @cmetts
    this generates error:
    org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
    Faiaz Sanaulla
    @fsanaulla
    Oh, sorry then leave as it is
    Christopher Metts
    @cmetts
    ok
    everything looks like it is working, what are concerns?
    Faiaz Sanaulla
    @fsanaulla
    import scala.util.{Failure, Success, Try}
    
    implicit val wr: InfluxWriter[Row] = new InfluxWriter[Row] {
      override def write(o: Row): ErrorOr[String] = {
        Try {
          val sb = StringBuilder.newBuilder
    
          val value_sb = StringBuilder.newBuilder
    
          val pw_measurement_index = o.fieldIndex("pw_measurement") // gets the column number for 'pw_measurement'
    
          val participant_id_index = o.fieldIndex("participant_id")
    
          val column_names = o.schema.names
    
          // InfluxDB line protocol
          // the way data in influx appears is:
          // MEASUREMENT,TAG=A VALUE=B
    
          sb.append(s"${o(pw_measurement_index)}")
          sb.append(",")
          sb.append(s"participant_id=${o(0)}")
          sb.append(" ")
    
          value_sb.append("\"{")
    
          for (column_name <- column_names)
            if (column_name != "json")
              if (o.getAs(column_name) != null)
                value_sb.append(s"${column_name}:${o.getAs(column_name)},")
    
          value_sb.append("}\"")
    
          sb.append("value=")
          sb.append(value_sb)
    
          println(sb.toString())
          sb.toString()
        } match {
          case Failure(exception) => Left(exception)
          case Success(value)     => Right(value)
        }
      }
    }
    Wrap you writer in Try
    To properly catch all write issues
    And also I see that: participant_id_index
    not used anywhere
    Also you can use 1 string builder instead of 2
    but it’s just a minor improvements
    everything else looks great
    Christopher Metts
    @cmetts
    ok thanks much!
    Faiaz Sanaulla
    @fsanaulla
    You can jsut replace value_sb with sb
    Christopher Metts
    @cmetts
    ok
    Faiaz Sanaulla
    @fsanaulla
    And in should works the same
    But better check it :)
    Piotr Kosecki
    @piotrkosecki

    hello! I've got a problem with creating CQs using the lib, I've got following error:

    com.github.fsanaulla.chronicler.core.model.InfluxException: error parsing query: found CREATE, expected SELECT at line 1

    I tried changing credentials to admin credentials, but it still does not work.

    Faiaz Sanaulla
    @fsanaulla
    Hello, can you share additional information? How the query looks like?
    Piotr Kosecki
    @piotrkosecki
    "CREATE CONTINUOUS QUERY cq_five_minute_queries ON ncmetering BEGIN SELECT count(countable) INTO ncmetering.a_day.five_minute_queries_measurement FROM ncmetering.autogen.decisions GROUP BY apiKey, time(5m) END"
    Piotr Kosecki
    @piotrkosecki
    also making this query manually works
    Piotr Kosecki
    @piotrkosecki
    basically I've listed queries that I've created manually(which are working) and tried make a check during the startup of my application so if there is no CQ in the database I'd create them - also performed similar steps for the retention policies, and it worked, unfortunately for CQs those steps are not working
    Faiaz Sanaulla
    @fsanaulla
    I'll check it
    Piotr Kosecki
    @piotrkosecki
    thanks!
    Piotr Kosecki
    @piotrkosecki
    @fsanaulla any findings?
    Faiaz Sanaulla
    @fsanaulla
    Sorry, didn't have time to check it. I'll check it today evening or tomorrow morning. Thank you!
    Faiaz Sanaulla
    @fsanaulla
    @piotrkosecki Can you also create github issue?
    Which module you use?
    Luca Morino
    @lukem1971_gitlab

    Hello Faiaz, I'm trying to store some json temperature values in a stream from Kafka to influxDB. I've been able to connect to the DB but I've still some problems in connecting the data coming from Kafka stream to the InfluxWriter.
    This is the Kafka stream
    ``` val measJsonDf = df.selectExpr("CAST(value AS STRING)")

        val struct = new StructType()
             .add("type", DataTypes.StringType)
             .add("value", DataTypes.IntegerType)
             .add("time", DataTypes.StringType)
    
        val measNestedDf = measJsonDf.select(from_json($"value", struct).as("meas"))
        val measFlattenedDf = measNestedDf.selectExpr("meas.type", "meas.value", "meas.time") ```

    and this is the InfluxWriter
    ```implicit val wr: InfluxWriter[Row] = new InfluxWriter[Row] {
    override def write(o: Row): ErrorOr[String] = {
    val sb = StringBuilder.newBuilder
    sb.append(s"sensor=${o(0)}")
    .append(" ")
    .append("tempVal=")
    .append(o(2))
    .append(",")
    .append("humVal=")
    .append(o(3))

              Right(sb.toString())
           }
        }```

    If I run the code I obtain Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Writing job aborted.
    What I'm doing wrong?

    Faiaz Sanaulla
    @fsanaulla
    Hey, can you share code related to chronicler?
    Luca Morino
    @lukem1971_gitlab

    Here is all the code related to chronicler

    '''import com.github.fsanaulla.chronicler.macros.annotations.{field, tag}
    import com.github.fsanaulla.chronicler.core.model.InfluxWriter
    import com.github.fsanaulla.chronicler.core.model.Point
    import com.github.fsanaulla.chronicler.core.enums.{Consistencies, Epochs, Precisions}
    import com.github.fsanaulla.chronicler.urlhttp.shared.InfluxConfig
    import com.github.fsanaulla.chronicler.core.model.InfluxCredentials
    import com.github.fsanaulla.chronicler.core.alias.ErrorOr
    import com.github.fsanaulla.chronicler.spark.ds.
    import com.github.fsanaulla.chronicler.spark.structured.streaming.

    import com.github.fsanaulla.chronicler.spark.core.{CallbackHandler, WriteConfig}

    implicit lazy val influxConf: InfluxConfig = InfluxConfig("http://127.0.0.1", 8086, Some(InfluxCredentials("luca", "luca")))

    implicit val wr: InfluxWriter[Row] = new InfluxWriter[Row] {
    override def write(o: Row): ErrorOr[String] = {
    val sb = StringBuilder.newBuilder

     sb.append(s"sensor=${o(0)}")
       .append(" ")
       .append("tempVal=")
       .append(o(2))
       .append(",")
       .append("humVal=")
       .append(o(3))
    
     Right(sb.toString())

    }
    }

    val handler: Option[CallbackHandler] = Some(
    CallbackHandler(
    code => println(s"OnSuccess code: $code"),
    err => println(s"Error on application level ${err.getMessage}"),
    err => println(s"Error on network level: cause: ${err.getCause}, msg: ${err.getMessage}")
    )
    )

    val stream = measFlattenedDf.writeStream
    .saveToInfluxDB("test_db", handler)
    .start()
    stream.awaitTermination()'''

    Faiaz Sanaulla
    @fsanaulla
    Also can you share your dependencies list?
    Luca Morino
    @lukem1971_gitlab

    name := "Test"
    version := "1.0"
    scalaVersion := "2.11.12"

    libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-sql_2.11" % "2.4.5",
    "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.4.5",
    "org.apache.spark" % "spark-streaming-kafka-0-10-assembly_2.11" % "2.4.5",
    "org.apache.kafka" % "kafka-clients" % "2.4.1",
    "com.github.fsanaulla" % "chronicler-macros_2.11" % "0.6.4",
    "com.github.fsanaulla" % "chronicler-spark-structured-streaming_2.11" % "0.4.1",
    "com.github.fsanaulla" % "chronicler-udp" % "0.6.4",
    "com.github.fsanaulla" % "chronicler-async-http" % "0.3.3",
    "com.github.fsanaulla" % "chronicler-akka-http" % "0.3.3",
    "com.github.fsanaulla" % "chronicler-url-http" % "0.3.3",
    "com.github.fsanaulla" % "chronicler-spark-ds" % "0.4.1"
    )

    Faiaz Sanaulla
    @fsanaulla
    first of all I see several useless dependencies that may create conflicts
    you should remove :
    "com.github.fsanaulla" % "chronicler-spark-ds" % “0.4.1"
    "com.github.fsanaulla" % "chronicler-url-http" % "0.3.3",
    "com.github.fsanaulla" % "chronicler-akka-http" % "0.3.3",
    "com.github.fsanaulla" % "chronicler-async-http" % "0.3.3",
    "com.github.fsanaulla" % "chronicler-udp" % "0.6.4",
    "com.github.fsanaulla" % "chronicler-macros_2.11" % "0.6.4",
    leave just:
    "com.github.fsanaulla” %% "chronicler-spark-structured-streaming" % "0.4.1"
    Luca Morino
    @lukem1971_gitlab

    I've modified the InfluxWriter to
    implicit val wr: InfluxWriter[Row] = new InfluxWriter[Row] {
    override def write(o: Row): ErrorOr[String] = {
    val sb = StringBuilder.newBuilder
    sb.append(s"type=${o(0)}")
    .append(" ")
    .append("tempValue=")
    .append(o(1))
    .append(",")
    .append(s"time=${o(2)}")

              Right(sb.toString())
           }
        }

    and now I get the following error
    'Error on application level unable to parse 'type=temperature tempValue=1000,time=2020-05-14 22:14:00': invalid number'

    Faiaz Sanaulla
    @fsanaulla
    integer type should look like 100i
    Luca Morino
    @lukem1971_gitlab

    Forgive me, but it's not clear to me how I should format the StringBuilder.
    I've a json like this

    {"type":"temperature","value":20,"time":"2020-05-14 22:14:00"}

    I parse it on Kafka stream as

              .select(
                 $"meas.type".cast("string"),
                 $"meas.value".cast("integer") as "tempVal",
                 $"meas.time".cast("string")
              )

    and I try to write on Influx as

              sb.append(s"${o(0)}")
               .append(",")
               .append("tempValue=")
               .append(o(1))
               .append(" ")
               .append(s"time=${o(2)}")

    where the first item is the measurement and the following the data...

    Faiaz Sanaulla
    @fsanaulla
    .append(o(1)) after that you should .append(“i”)
    because for now your integer look like 100, but it should have “i” modifier in the end. Like that 100i
    Luca Morino
    @lukem1971_gitlab

    I've mdified the code

          sb.append(s"${o(0)}")
           .append(",")
           .append("tempValue=")
           .append(o(1))
           .append("i")
           .append(" ")
           .append(s"time=${o(2)}")

    but now I receive

    Error on application level unable to parse 'temperature,tempValue=20i time=2020-05-14 22:14:00': invalid number

    Faiaz Sanaulla
    @fsanaulla
    which influx version?
    you use
    Luca Morino
    @lukem1971_gitlab
    influxdb-1.8.0
    Luca Morino
    @lukem1971_gitlab
    I've resolved. It's just a matter of order. If I change the code in
         sb.append(s"${o(0)},name=test value=")
           .append(o(1))
           .append("i")
    it works
    But I can I define tag and fields?