GeoTrellis is a geographic data processing engine for high performance applications.
pomadchin on master
Fix AWS S3LayerDeleter (#3378) … (compare)
geotrellis.jts.precision.scale=<scale factor>
should actually be geotrellis.jts.simplification.scale=<scale factor>
@pomadchin Hey,Grigory. I just encounter this issue that I never met before. Do you have any idea?
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val date: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)
implicit val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(getClass.getSimpleName + " " + date)
.withKryoSerialization
.getOrCreate()
.withRasterFrames
import spark.implicits._
With the error when compiled:
value setLevel is not a member of org.apache.log4j.Logger
[error] Note: implicit value spark is not applicable here because it comes after the application point and it lacks an explicit result type
implicit val ss: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(getClass.getSimpleName + " " + date)
.withKryoSerialization
.getOrCreate()
.withRasterFrames
import ss.implicits._
hello ;)
i am having an issue with a union over various singleband layers
the groupByKey function is very, very slow (even says so in the documentatation that is is very heavy)
any ideas on approaches on how i can get rid of the groupByKey after the union. what i need to get is somekind of an iterable on the singleband tiles - that's why i am doing the groupByKey, but i need another approach....
//Case class extending a tile with an Int (band index) so that we can identify where a specific tile belongs to
case class TileWithBandIndex(tile: Tile, bandIndex: Int)
//Read input layers
val inputLayers = Array.ofDim[RDD[(SpatialKey, TileWithBandIndex)]](2)
inputLayers(0) = reader.read[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](LayerId("Norway", 0)).map {
case (key, tile) => (key, TileWithBandIndex(tile, 0))
}
inputLayers(1) = reader.read[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](LayerId("Austria", 0)).map {
case (key, tile) => (key, TileWithBandIndex(tile, 1))
}
//Combine (union) the input layers and perform some logic
LOG.info(s"Combine input layers and perform some")
val extractedLayer =
sc.union(inputLayers)
.groupByKey()
.map{
case (key, tilesWithBandIndex) => {
//Create an array of tiles based on the tiles available at this key. Fill missing bands with empty tiles
val tiles = Array.ofDim[Tile](inputLayers.size)
var refTile: Tile = null
tilesWithBandIndex.foreach(element => {
tiles(element.bandIndex) = element.tile
if (refTile == null) {refTile = element.tile}
})
for (bandIndex <- 0 until inputLayers.size) {
if (tiles(bandIndex) == null) {
tiles(bandIndex) = ArrayTile.empty(refTile.cellType, refTile.cols, refTile.rows)
}
}
//Create a multiband tile from the tiles at the given key and perform the extration logic on it
(key, myVeryOwnMapTileFunctionThatDoesSomethingWithAMultibandTileAndReturnsASinglebandTile(MultibandTile(tiles)))
}
}
})
there is a way in general to avoid repartitioning I think.
you already know the keys range for the query
just parallelize them in Spark (depending on the amout of keys set partitions number), map over partitions with keys and within each partition read by chips by key (i.e. via GeoTrellisRasterSource or via ValueReader);
it would be smth like (I haven’t tried the code at all, read it as a pseudo-coded prototype):
val keys: List[SpatialKey] = ???
val partitionsNumber: Int = ???
sc
.parallelize(keys, partitionsNumber)
.mapPartitions { partition =>
// this can be optimized, instead of RasterSources you can use ValueReader directly
// and read layout and other meta on a driver
// I don’t know the exact number it would give you any benefits though comparing to RasterSources
val source1 = GeoTrellisRasterSource(...).tileToLayout(layout)
val source2 = GeoTrellisRasterSource(...).tileToLayout(layout)
val empty = ArrayTile.empty(
source1.cellType.union(source2.cellType),
layout.tileCols,
layout.tileRows
)
partition.flatMap { key =>
(source1.read(key), source2.read(key)) match {
case (Some(l), Some(r)) => (key, MultibandTile(l, r)).some
case (_, Some(r)) => (key, MultibandTile(empty, r)).some
case (Some(l), _) => (key, MultibandTile(l, empty)).some
case _ => None
}
}
}
hello ;)
i am having an issue with a union over various singleband layers
the groupByKey function is very, very slow (even says so in the documentatation that is is very heavy)
any ideas on approaches on how i can get rid of the groupByKey after the union. what i need to get is somekind of an iterable on the singleband tiles - that's why i am doing the groupByKey, but i need another approach....//Case class extending a tile with an Int (band index) so that we can identify where a specific tile belongs to case class TileWithBandIndex(tile: Tile, bandIndex: Int) //Read input layers val inputLayers = Array.ofDim[RDD[(SpatialKey, TileWithBandIndex)]](2) inputLayers(0) = reader.read[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](LayerId("Norway", 0)).map { case (key, tile) => (key, TileWithBandIndex(tile, 0)) } inputLayers(1) = reader.read[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](LayerId("Austria", 0)).map { case (key, tile) => (key, TileWithBandIndex(tile, 1)) } //Combine (union) the input layers and perform some logic LOG.info(s"Combine input layers and perform some") val extractedLayer = sc.union(inputLayers) .groupByKey() .map{ case (key, tilesWithBandIndex) => { //Create an array of tiles based on the tiles available at this key. Fill missing bands with empty tiles val tiles = Array.ofDim[Tile](inputLayers.size) var refTile: Tile = null tilesWithBandIndex.foreach(element => { tiles(element.bandIndex) = element.tile if (refTile == null) {refTile = element.tile} }) for (bandIndex <- 0 until inputLayers.size) { if (tiles(bandIndex) == null) { tiles(bandIndex) = ArrayTile.empty(refTile.cellType, refTile.cols, refTile.rows) } } //Create a multiband tile from the tiles at the given key and perform the extration logic on it (key, myVeryOwnMapTileFunctionThatDoesSomethingWithAMultibandTileAndReturnsASinglebandTile(MultibandTile(tiles))) } } })
i did some further analysis over the weekend
it turned out, it is not the union nor the groupbykey() that is making the "union" slow, but the amount of data needed from S3 to create the union.
in my case we are talking about 11k tiles per layer (512x512) and 48 layers are being combined in the union. So there is hardly any CPU load nor network traffic during the union process as I guess most of the overhead comes from building up new connections. One tile size is ~300bytes.
Increasing the tile size significantly increases the performance but at the cost of very, very large RAM consumption. Of course. ;)
So I am wondering how other are dealing the the issue of serving tiles from an S3 storage.
More spark nodes and going more in parallel do make the union (download) step faster?
so blocking-thread-pool.threads seems to help transfer stuff from S3 faster
if i understood it correctly it is used inside BlockingThreadPool which is used in a variety of S3Layer readers and writers
so simple example
having an executor with 4 cores and setting geotrellis.blocking-thread-pool.threads to 20 means
reading/writing to/from S3 is done with 20 threads in parallel but mapping steps are done with 3 parallel threads
that's why i see the 16 geotrellis threads as waiting in a map step
did i get it right?
and the reason why it makes sense in your case to allocate more threads is because you have lots of unused CPUs, you have a nice bandwidth, but the internet connection is pretty limited
so you can slowly fetch more data in parallel
threadpool is allocated in a such a way that it is a single thread pool per executor, so it’s not that expensive and you don’t need to worry about the amount of threadpools
that is not a regular practice in spark, usually parallelism is achieved more traditionally
by having smaller partitions and by having more thin executors