Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Apr 12 20:52
    pomadchin edited #3294
  • Apr 12 20:46
    pomadchin edited #3294
  • Apr 12 20:44
    pomadchin synchronize #3294
  • Apr 12 20:37
    pomadchin edited #3294
  • Apr 12 20:37
    pomadchin synchronize #3294
  • Apr 12 18:31
    pomadchin edited #3294
  • Apr 12 18:28
    pomadchin edited #3294
  • Apr 12 18:22
    pomadchin edited #3294
  • Apr 12 18:14
    pomadchin edited #3294
  • Apr 12 18:13
    pomadchin edited #3294
  • Apr 12 18:03
    pomadchin synchronize #3294
  • Apr 12 16:34
    pomadchin synchronize #3294
  • Apr 12 16:29
    pomadchin synchronize #3294
  • Apr 12 15:56
    pomadchin synchronize #3294
  • Apr 12 15:21

    pomadchin on master

    Fix AWS S3LayerDeleter (#3378) … (compare)

  • Apr 12 15:21
    pomadchin closed #3378
  • Apr 12 15:21
    pomadchin edited #3378
  • Apr 12 12:47
    zacharyDez synchronize #3378
  • Apr 11 23:20
    pomadchin edited #3294
  • Apr 11 23:19
    pomadchin synchronize #3294
Grigory
@pomadchin
Hi @tosen1990 add the @JsonCodec annotation right above the case class
GT uses circe for the json encoding, check out some docs: https://circe.github.io/circe/codecs/semiauto-derivation.html#jsoncodec
Grigory
@pomadchin
@zacharyDez thanks for creating an issue! :tada:
Sithril
@Sithril
hello, so not sure if this is worthy of a bug ticket but there seems to be a hiccup in the doccumentation:
https://geotrellis.readthedocs.io/en/latest/guide/vectors.html
the path for geotrellis.jts.precision.scale=<scale factor> should actually be geotrellis.jts.simplification.scale=<scale factor>
I'm using version 3.5.2 if that helps
Grigory
@pomadchin
@Sithril yep that is definitely not up to date
tosen1990
@tosen1990

@pomadchin Hey,Grigory. I just encounter this issue that I never met before. Do you have any idea?

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    val date: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)

    implicit val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(getClass.getSimpleName + " " + date)
      .withKryoSerialization
      .getOrCreate()
      .withRasterFrames

    import spark.implicits._

With the error when compiled:

 value setLevel is not a member of org.apache.log4j.Logger
[error]  Note: implicit value spark is not applicable here because it comes after the application point and it lacks an explicit result type
Grigory
@pomadchin
hey @tosen1990 try to rename spark value to ss
I think it is conflicting with one of your imports
    implicit val ss: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(getClass.getSimpleName + " " + date)
      .withKryoSerialization
      .getOrCreate()
      .withRasterFrames

    import ss.implicits._
StrongSteve
@StrongSteve

hello ;)

i am having an issue with a union over various singleband layers
the groupByKey function is very, very slow (even says so in the documentatation that is is very heavy)
any ideas on approaches on how i can get rid of the groupByKey after the union. what i need to get is somekind of an iterable on the singleband tiles - that's why i am doing the groupByKey, but i need another approach....

//Case class extending a tile with an Int (band index) so that we can identify where a specific tile belongs to
case class TileWithBandIndex(tile: Tile, bandIndex: Int)

//Read input layers
val inputLayers = Array.ofDim[RDD[(SpatialKey, TileWithBandIndex)]](2)
inputLayers(0) = reader.read[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](LayerId("Norway", 0)).map {
          case (key, tile) => (key, TileWithBandIndex(tile, 0))
        }
inputLayers(1) = reader.read[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](LayerId("Austria", 0)).map {
          case (key, tile) => (key, TileWithBandIndex(tile, 1))
        }

//Combine (union) the input layers and perform some logic
LOG.info(s"Combine input layers and perform some")
val extractedLayer =
  sc.union(inputLayers)
    .groupByKey()
    .map{
          case (key, tilesWithBandIndex) => {
            //Create an array of tiles based on the tiles available at this key. Fill missing bands with empty tiles
            val tiles = Array.ofDim[Tile](inputLayers.size)
            var refTile: Tile = null
            tilesWithBandIndex.foreach(element => {
              tiles(element.bandIndex) = element.tile
              if (refTile == null) {refTile = element.tile}
            })
            for (bandIndex <- 0 until inputLayers.size) {
              if (tiles(bandIndex) == null) {
                tiles(bandIndex) = ArrayTile.empty(refTile.cellType, refTile.cols, refTile.rows)
              }
            }
            //Create a multiband tile from the tiles at the given key and perform the extration logic on it
            (key, myVeryOwnMapTileFunctionThatDoesSomethingWithAMultibandTileAndReturnsASinglebandTile(MultibandTile(tiles)))
          }
        }
    })
could i maybe to a join or something over n singleband layers?
Jan Van den bosch
@bossie
Good morning everyone
I'm using the RasterSources API to read in some geotiffs that don't have a NODATA defined but I know that I should interpret 0-values as NODATA. Therefore I pass a InterpretAsTargetCellType("float32ud0") to the GeoTiffRasterSource. The subsequent RasterRegions still carry this CellType but at the point where I extract the Tiles, these Tiles have the "float32raw" CellType instead (no NODATA again) and merging them gives obvious artifacts.
It seems I can work around it by doing an additional tile.interpretAs("float32ud0")
Is this a bug?
Grigory
@pomadchin
hey hey @bossie
could you show the exact sequence of steps to me?
but really it sounds like a bug
Jan Van den bosch
@bossie
I'll see if I can come up with a snippet
Grigory
@pomadchin
hey @StrongSteve, how many partitions do you have in each rdd before union, on union & after, on group by and after? what is the read/write shuffle size in the Spark UI?
from your message I think I only hear that groupBy is slow (which is expected to me as well), but it is unclear to me ‘why’ it is slow (what is the root cause of the slow procedure)
Grigory
@pomadchin

there is a way in general to avoid repartitioning I think.
you already know the keys range for the query

just parallelize them in Spark (depending on the amout of keys set partitions number), map over partitions with keys and within each partition read by chips by key (i.e. via GeoTrellisRasterSource or via ValueReader);

it would be smth like (I haven’t tried the code at all, read it as a pseudo-coded prototype):

val keys: List[SpatialKey] = ???
val partitionsNumber: Int = ???

sc
  .parallelize(keys, partitionsNumber)
  .mapPartitions { partition =>
    // this can be optimized, instead of RasterSources you can use ValueReader directly
    // and read layout and other meta on a driver 
    // I don’t know the exact number it would give you any benefits though comparing to RasterSources
    val source1 = GeoTrellisRasterSource(...).tileToLayout(layout)
    val source2 = GeoTrellisRasterSource(...).tileToLayout(layout)
    val empty = ArrayTile.empty(
      source1.cellType.union(source2.cellType), 
      layout.tileCols, 
      layout.tileRows
    )
    partition.flatMap { key =>
      (source1.read(key), source2.read(key)) match {
        case (Some(l), Some(r)) => (key, MultibandTile(l, r)).some
        case (_, Some(r))  => (key, MultibandTile(empty, r)).some
       case (Some(l), _) => (key, MultibandTile(l, empty)).some
       case _  => None
      }
    }
  }
StrongSteve
@StrongSteve

hello ;)

i am having an issue with a union over various singleband layers
the groupByKey function is very, very slow (even says so in the documentatation that is is very heavy)
any ideas on approaches on how i can get rid of the groupByKey after the union. what i need to get is somekind of an iterable on the singleband tiles - that's why i am doing the groupByKey, but i need another approach....

//Case class extending a tile with an Int (band index) so that we can identify where a specific tile belongs to
case class TileWithBandIndex(tile: Tile, bandIndex: Int)

//Read input layers
val inputLayers = Array.ofDim[RDD[(SpatialKey, TileWithBandIndex)]](2)
inputLayers(0) = reader.read[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](LayerId("Norway", 0)).map {
          case (key, tile) => (key, TileWithBandIndex(tile, 0))
        }
inputLayers(1) = reader.read[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](LayerId("Austria", 0)).map {
          case (key, tile) => (key, TileWithBandIndex(tile, 1))
        }

//Combine (union) the input layers and perform some logic
LOG.info(s"Combine input layers and perform some")
val extractedLayer =
  sc.union(inputLayers)
    .groupByKey()
    .map{
          case (key, tilesWithBandIndex) => {
            //Create an array of tiles based on the tiles available at this key. Fill missing bands with empty tiles
            val tiles = Array.ofDim[Tile](inputLayers.size)
            var refTile: Tile = null
            tilesWithBandIndex.foreach(element => {
              tiles(element.bandIndex) = element.tile
              if (refTile == null) {refTile = element.tile}
            })
            for (bandIndex <- 0 until inputLayers.size) {
              if (tiles(bandIndex) == null) {
                tiles(bandIndex) = ArrayTile.empty(refTile.cellType, refTile.cols, refTile.rows)
              }
            }
            //Create a multiband tile from the tiles at the given key and perform the extration logic on it
            (key, myVeryOwnMapTileFunctionThatDoesSomethingWithAMultibandTileAndReturnsASinglebandTile(MultibandTile(tiles)))
          }
        }
    })

i did some further analysis over the weekend
it turned out, it is not the union nor the groupbykey() that is making the "union" slow, but the amount of data needed from S3 to create the union.
in my case we are talking about 11k tiles per layer (512x512) and 48 layers are being combined in the union. So there is hardly any CPU load nor network traffic during the union process as I guess most of the overhead comes from building up new connections. One tile size is ~300bytes.

Increasing the tile size significantly increases the performance but at the cost of very, very large RAM consumption. Of course. ;)

So I am wondering how other are dealing the the issue of serving tiles from an S3 storage.
More spark nodes and going more in parallel do make the union (download) step faster?

Grigory
@pomadchin
@StrongSteve sounds about right; if that’s an S3LayerReader use the numPartitions argument to increase the partitions amount
StrongSteve
@StrongSteve
@pomadchin what would the benefit of increasing the partitions be? from what i understand the num of partitions is calculated by spark based on the cores and executors available - therefore of course I can make smaller tasks by increasing num of partitions but i do not directly see the benefit of it....
Grigory
@pomadchin
@StrongSteve partitions is that smallest sparky thing that can be parallelized. Right above you are talking about increasing parallelism; more partitions you have, smaller tasks you would need, more of your spark cores you would be able to load with IO (if it is really stuck on IO)
another option would be to increase parallelism within partitions, for IO heavy applications it makes sense
Try increasing the geotrellis.blocking-thread-pool.threads conf setting to increase it
StrongSteve
@StrongSteve
where could i increase it? not 100% sure what you mean. do you mean to extend the number of working threads i the executor pool?
Grigory
@pomadchin

what do you mean by the working threads in the executor pool?

geotrellis.blocking-thread-pool.threads is the GT configuration setting, you can set it in your applciation.conf file

StrongSteve
@StrongSteve
i saw there is a setting in BlockingThreadPool where a number of threads is read from a config and set in the exectutor pool
Grigory
@pomadchin
That is the parallelism per executor, which is used in a pool that handles reading keys from S3, it is set here
S3LayerReader fetches keys from S3 using the dedicated blocking thread pool (with the size configured thgouht the configuration option)
so it would make sense to increase its size if you experience IO problems
StrongSteve
@StrongSteve
will give it a try, thx!
Grigory
@pomadchin
:+1:
StrongSteve
@StrongSteve

so blocking-thread-pool.threads seems to help transfer stuff from S3 faster
if i understood it correctly it is used inside BlockingThreadPool which is used in a variety of S3Layer readers and writers

so simple example
having an executor with 4 cores and setting geotrellis.blocking-thread-pool.threads to 20 means
reading/writing to/from S3 is done with 20 threads in parallel but mapping steps are done with 3 parallel threads
that's why i see the 16 geotrellis threads as waiting in a map step

did i get it right?

Grigory
@pomadchin
yea that is right

and the reason why it makes sense in your case to allocate more threads is because you have lots of unused CPUs, you have a nice bandwidth, but the internet connection is pretty limited

so you can slowly fetch more data in parallel

StrongSteve
@StrongSteve
and to clarify the last grey spot ;) - having more threads during read means to have more RAM available, right?
how is this handled in spark - just curious. because i still see the task running with - lets stick to the example above, with 4 running task, but making a threaddump of the executor i see 20 geotrellis-io threads running
Grigory
@pomadchin
@StrongSteve not really, imagine you have a partition of length 100 that contains keys
it doesnt really matter much in terms of ram would you load all keys into 256x256 tiles at once or sequentially
each task can process 1 partition at a time
within a single task we can make a parallel partition unfolding (that’s what we use the FixedThreadPool for)
StrongSteve
@StrongSteve
and a partition consists of n tiles and having more threads to work on this task makes it faster (in my case of the download)
Grigory
@pomadchin
yep

threadpool is allocated in a such a way that it is a single thread pool per executor, so it’s not that expensive and you don’t need to worry about the amount of threadpools

that is not a regular practice in spark, usually parallelism is achieved more traditionally
by having smaller partitions and by having more thin executors

but empirically approach with parallelizing within a single partition works nice for IO ops + allows us to handle exceptions and retries much better
StrongSteve
@StrongSteve
of course but the traditional approach would mean have more executors with more cores to achive the same effect
Grigory
@pomadchin
yep
StrongSteve
@StrongSteve
and most of the cores would be idle (in my case) as well as there is nothing CPU intensive going on but network based IO
(at least in that step of the processing)
thx for the clarifications - going in the right direction ;)
Grigory
@pomadchin
:+1: you’re welcome