Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Marco Balduini
    @mbalduini
    Codice per montare il bucket s3 come directory locale
    val AccessKey = "AKIAJ2V7YHDNKYWK7LBA"
    val SecretKey = "2zTAvniwV8wRGXECwa2DVnVq/u5OtX+9au5UY5hj"
    val EncodedSecretKey = SecretKey.replace("/", "%2F")
    val AwsBucketName = "hackaton-cefriel-20181114"
    val MountName = "hackaton-cefriel-20181114"
    
    dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@$AwsBucketName", s"/mnt/$MountName")
    Test per capire se รจ montato
    display(dbutils.fs.ls(s"/mnt/$MountName"))
    Marco Balduini
    @mbalduini

    Schema People Counter

    • idMin: Indice 5 min (tutte le righe nello stesso range di 5 min hanno lo stesso idMin)
    • dow: giorno della settimana (1-7)
    • h:ora del giorno
    • woy: settimana dell'anno
    • moy: mese dell'anno
    • festivita: 0 (non festivo) o 1 (festivo)
    • fw2: forward sensore con id = 2
    • bw2: backward sensore con id = 2
    • ...
    Marco Balduini
    @mbalduini
    import org.apache.spark.sql.functions._

    Lista file su S3

    Weather

    • mnt/hackaton-cefriel-20181114/weather.csv

    Telco Data

    • mnt/hackaton-cefriel-20181114/telcoData/agerange_hour_city.csv
    • mnt/hackaton-cefriel-20181114/telcoData/country_hour_city.csv
    • mnt/hackaton-cefriel-20181114/telcoData/customertype_hour_city.csv
    • mnt/hackaton-cefriel-20181114/telcoData/gender_hour_city.csv

    Parking Data

    • mnt/hackaton-cefriel-20181114/parking/log_Autosilo Auguadri.txt
    • mnt/hackaton-cefriel-20181114/parking/log_Autosilo Valduce.txt
    • mnt/hackaton-cefriel-20181114/parking/log_Autosilo Valmulini.txt
    • mnt/hackaton-cefriel-20181114/parking/log_Parcheggio Castelnuovo.txt
    • mnt/hackaton-cefriel-20181114/parking/log_Parcheggio Centro Lago.txt
    • mnt/hackaton-cefriel-20181114/parking/log_Parcheggio Como San Giovanni.txt
    • mnt/hackaton-cefriel-20181114/parking/log_Parcheggio Sirtori.txt

    People Counter

    • mnt/hackaton-cefriel-20181114/peopleCounter.parquet/
    Marco Balduini
    @mbalduini
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    //Read file as text file from s3
    val rawdata = sc.textFile(s"/mnt/$MountName/parking/log_Parcheggio Centro Lago.txt")
    
    //split the information usinf \space as token
    val rddData = rawdata.map(l => l.replaceAll("\\(.*?\\)","").split(" ")).map(l => (l(1).substring(0, l(1).indexOf(":")), l(2).toInt, l(6)+" "+l(7)+" "+l(8)+" "+l(9)+" "+l(10)+" "+l(11)))
    
    //create the data frame from the RDD
    val df = spark.createDataFrame(rddData)
    
    //select and cast the needed columns
    val parking = df.select($"_1".as("id"), $"_2".as("cnt"),unix_timestamp($"_3","EEE MMM dd yyyy HH:mm:ss 'GMT'X").cast("timestamp").as("timestamp"))
    
    display(parking)
    Marco Balduini
    @mbalduini
    val filtered_parking = parking.filter(col("timestamp") > "2016-06-01T00:00:00.000+0000")
    Marco Balduini
    @mbalduini
    %sh
    exec <&- 2> /dev/null
    echo "=Look for big s3 files:"
    du --human-readable --max-depth=2 --apparent-size --exclude='/dbfs/mnt' \
    --exclude='/dbfs/databricks-*' /dbfs
    echo
    echo "=Look for big local files:"
    du --human-readable --max-depth=1 --exclude='/dbfs' /
    Marco Balduini
    @mbalduini
    val pcWithYear = <PeopleCounterDataFrame>.withColumn("year", year(to_timestamp((col("idMin") *300) + 1420070400)))
    Marco Balduini
    @mbalduini
    df.join(df1, df.col("...") === df1.col("..."))
    Marco Balduini
    @mbalduini
    Categorizzazione delle icon meteo
    import org.apache.spark.sql.functions._
    
    //define a function to turn icon into an integer
    def matchWeatherType(icon:String): Integer = {
      if ((icon.substring(0,2).toInt < 4) || (icon.substring(0,2).toInt > 13)) return 1
      else return 0
    }
    
    //Register UDF
    val matchWeatherTypeUDF = sqlContext.udf.register("matchWeatherType",(s: String) =>  matchWeatherType(s))
    
    val weatherRaw = spark.read.option("header", "true").option("inferSchema","true").csv(s"/mnt/$MountName/weather.csv")
    //Add column with integer value of icon to the dataframe
    val weather = weatherRaw.withColumn("hour",hour($"date")).withColumn("dow",date_format($"date","u").cast(org.apache.spark.sql.types.IntegerType)).withColumn("dom",dayofmonth($"date")).withColumn("month",date_format($"date","M")).withColumn("year",date_format($"date","Y")).withColumn("goodWeather",matchWeatherTypeUDF($"icon"))
    
    //Filter only for 2016 values
    val weather2016 = weather.filter($"date" >= lit("2016-11-26")).filter($"date" <= lit("2017-01-08")).withColumn("doe",(datediff($"date",lit("2016-11-26"))))
    //display(weather2016.groupBy($"doe").agg(sum($"goodWeather").as("weather")).orderBy($"doe"))
    Emanuele Della Valle
    @emanueledellavalle
    cast a integer
    import org.apache.spark.sql.types.IntegerType
    df.select( df("year").cast(IntegerType).as("year"), ... )
    Marco Balduini
    @mbalduini
    <df>.withColumn("idHour",((unix_timestamp(col("timestamp"))-1420070400)/3600).cast("int"))
    Emanuele Della Valle
    @emanueledellavalle
    leggere una table (chiamata someTable) da SparkR
     `someTable <- sql("SELECT * FROM someTable")     `
    Marco Balduini
    @mbalduini
    from_unixtime(unix_timestamp(date) + hour x 60 x 60)
    Emanuele Della Valle
    @emanueledellavalle
    come salvare una tabella temporanea in SparkR
    registerTempTable(dataframe, "nomeTabella")
    Marco Balduini
    @mbalduini

    Come Aggiungere le ore a una data

    SQL

    from_unixtime(unix_timestamp(date) + hour x 60 x 60)

    DF

    <df>.withColumn("completeDate", to_timestamp(unix_timestamp(col("date"))+ (col("hour") * 60 * 60)))
    Marco Balduini
    @mbalduini
    telco.join(weather, telco.col("date") === weather.col("date"))