Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions layerstats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ This page describes how to generate and analyze layer stats data to find ways to

### Generating Layer Stats

Run planetiler with `--output-layerstats` to generate an extra `<output>.layerstats.tsv.gz` file with a row for each
Run planetiler with `--output-layerstats` to generate an extra `<output>.layerstats.parquet` file with a row for each
layer in each tile that can be used to analyze tile sizes. You can also get stats for an existing archive by running:

```bash
java -jar planetiler.jar stats --input=<path to mbtiles or pmtiles file> --output=layerstats.tsv.gz
java -jar planetiler.jar stats --input=<path to mbtiles or pmtiles file> --output=layerstats.parquet
```

The output is a gzipped tsv with a row per layer on each tile and the following columns:
The output is a Parquet file with a row per layer on each tile and the following columns:

| column | description |
|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
Expand All @@ -34,7 +34,7 @@ The output is a gzipped tsv with a row per layer on each tile and the following
Load a layer stats file in [duckdb](https://duckdb.org/):

```sql
CREATE TABLE layerstats AS SELECT * FROM 'output.pmtiles.layerstats.tsv.gz';
CREATE TABLE layerstats AS SELECT * FROM 'output.pmtiles.layerstats.parquet';
```

Then get the biggest layers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ public void run() {
}

Path layerStatsPath = arguments.file("layer_stats", "layer stats output path",
// default to <output file>.layerstats.tsv.gz
// default to <output file>.layerstats.parquet
TileSizeStats.getDefaultLayerstatsPath(Optional.ofNullable(output.getLocalPath()).orElse(Path.of("output"))));

if (config.tileWriteThreads() < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ public static void writeOutput(FeatureGroup features, WriteableTileArchive outpu

if (config.outputLayerStats()) {
layerStatsBranch = pipeline.readFromQueue(layerStatsQueue)
.sinkTo("stats", 1, tileStatsWriter(layerStatsPath));
// instance method: needs config to resolve layerstats format
.sinkTo("stats", 1, writer.tileStatsWriter(layerStatsPath));
}

var loggers = ProgressLoggers.create()
Expand Down Expand Up @@ -210,14 +211,17 @@ public static void writeOutput(FeatureGroup features, WriteableTileArchive outpu
timer.stop();
}

private static WorkerPipeline.SinkStep<TileBatch> tileStatsWriter(Path layerStatsPath) {
private WorkerPipeline.SinkStep<TileBatch> tileStatsWriter(Path layerStatsPath) {
return prev -> {
try (var statsWriter = TileSizeStats.newWriter(layerStatsPath)) {
statsWriter.write(TileSizeStats.headerRow());
try (var statsWriter = TileSizeStats.createWriter(layerStatsPath)) {
for (var batch : prev) {
for (var encodedTile : batch.out().get()) {
for (var line : encodedTile.layerStats()) {
statsWriter.write(line);
if (encodedTile.layerStatsData() != null) {
statsWriter.write(
encodedTile.layerStatsData().coord(),
encodedTile.layerStatsData().archivedBytes(),
encodedTile.layerStatsData().layerStats()
);
}
}
}
Expand Down Expand Up @@ -280,7 +284,6 @@ private void tileEncoderSink(Iterable<TileBatch> prev) throws IOException {
boolean lastIsFill = false;
List<TileSizeStats.LayerStats> lastLayerStats = null;
boolean skipFilled = config.skipFilledTiles();
var layerStatsSerializer = TileSizeStats.newThreadLocalSerializer();
boolean includeIds = !config.excludeIds();

var tileStatsUpdater = tileStats.threadLocalUpdater();
Expand Down Expand Up @@ -373,16 +376,17 @@ private void tileEncoderSink(Iterable<TileBatch> prev) throws IOException {
}
if (!(skipFilled && lastIsFill) && bytes != null) {
tileStatsUpdater.recordTile(tileFeatures.tileCoord(), bytes.length, layerStats);
List<String> layerStatsRows = config.outputLayerStats() ?
layerStatsSerializer.formatOutputRows(tileFeatures.tileCoord(), bytes.length, layerStats) :
List.of();
// store raw stats; writer decides format (tsv or parquet)
LayerStatsData layerStatsData = config.outputLayerStats() ?
new LayerStatsData(tileFeatures.tileCoord(), bytes.length, layerStats) :
null;
result.add(
new TileEncodingResult(
tileFeatures.tileCoord(),
bytes,
encoded.length,
tileDataHash == null ? OptionalLong.empty() : OptionalLong.of(tileDataHash),
layerStatsRows
layerStatsData
)
);
}
Expand Down Expand Up @@ -528,9 +532,5 @@ private record TileBatch(
public int size() {
return in.size();
}

public boolean isEmpty() {
return in.isEmpty();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
package com.onthegomap.planetiler.archive;

import com.onthegomap.planetiler.geo.TileCoord;
import com.onthegomap.planetiler.util.TileSizeStats;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;

/** Layer statistics data for a tile. */
record LayerStatsData(
TileCoord coord,
int archivedBytes,
List<TileSizeStats.LayerStats> layerStats
) {}

public record TileEncodingResult(
TileCoord coord,
byte[] tileData,
int rawTileSize,
/* will always be empty in non-compact mode and might also be empty in compact mode */
OptionalLong tileDataHash,
List<String> layerStats
LayerStatsData layerStatsData
) {
public TileEncodingResult(
TileCoord coord,
byte[] tileData,
OptionalLong tileDataHash
) {
this(coord, tileData, tileData.length, tileDataHash, List.of());
this(coord, tileData, tileData.length, tileDataHash, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public static PlanetilerConfig from(Arguments arguments) {
arguments.getBoolean("mlt_reorder_features",
"Allow re-ordering output features within each layer when tile format=MLT to reduce tile sizes", false),
arguments.getBoolean("mlt_shared_dict", "Share dictionaries between string fields when tile format=MLT", false),
arguments.getBoolean("output_layerstats", "output a tsv.gz file for each tile/layer size", false),
arguments.getBoolean("output_layerstats", "output a parquet file for each tile/layer size", false),
arguments.getString("debug_url", "debug url to use for displaying tiles with {z} {lat} {lon} placeholders",
"https://onthegomap.github.io/planetiler-demo/#{z}/{lat}/{lon}"),
tmpDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
import java.util.concurrent.atomic.AtomicLong;
import me.lemire.integercompression.IntWrapper;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryCollection;
import org.maplibre.mlt.converter.encodings.MltTypeMap;
Expand All @@ -57,14 +65,14 @@
* Utilities for extracting tile and layer size summaries from encoded vector tiles.
* <p>
* {@link #computeTileStats(VectorTileProto.Tile)} extracts statistics about each layer in a tile and
* {@link TsvSerializer} formats them as row of a TSV file to write.
* writes them to a Parquet file.
* <p>
* To generate a tsv.gz file with stats for each tile, you can add {@code --output-layerstats} option when generating an
* archive, or run the following an existing archive:
* To generate a Parquet file with stats for each tile, you can add {@code --output-layerstats} option when generating an
* archive, or run the following on an existing archive:
*
* <pre>
* {@code
* java -jar planetiler.jar stats --input=<path to pmtiles or mbtiles> --output=layerstats.tsv.gz
* java -jar planetiler.jar stats --input=<path to pmtiles or mbtiles> --output=layerstats.parquet
* }
* </pre>
*/
Expand All @@ -79,7 +87,12 @@ public class TileSizeStats {

/** Returns the default path that a layerstats file should go relative to an existing archive. */
public static Path getDefaultLayerstatsPath(Path archive) {
return archive.resolveSibling(archive.getFileName() + ".layerstats.tsv.gz");
return archive.resolveSibling(archive.getFileName() + ".layerstats.parquet");
}

/** Creates a Parquet layerstats writer. */
public static LayerStatsWriter createWriter(Path output) throws IOException {
return new ParquetLayerStatsWriter(output);
}

public static void main(String... args) {
Expand All @@ -99,7 +112,8 @@ public static void main(String... args) {
arguments.file("output", "output file", getDefaultLayerstatsPath(localPath));
var counter = new AtomicLong(0);
var timer = stats.startStage("tilestats");
record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {}
record BatchData(TileCoord coord, int archivedBytes, List<LayerStats> layerStats) {}
record Batch(List<Tile> tiles, CompletableFuture<List<BatchData>> stats) {}
WorkQueue<Batch> writerQueue = new WorkQueue<>("tilestats_write_queue", 1_000, 1, stats);
var pipeline = WorkerPipeline.start("tilestats", stats);
var readBranch = pipeline
Expand Down Expand Up @@ -137,9 +151,9 @@ record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {}
List<LayerStats> layerStats = null;

var updater = tileStats.threadLocalUpdater();
var layerStatsSerializer = TileSizeStats.newThreadLocalSerializer();
for (var batch : prev) {
List<String> lines = new ArrayList<>(batch.tiles.size());
// collect raw stats per tile; serialization happens in write stage
List<BatchData> batchData = new ArrayList<>(batch.tiles.size());
for (var tile : batch.tiles) {
if (!Arrays.equals(zipped, tile.bytes())) {
zipped = tile.bytes();
Expand All @@ -148,19 +162,18 @@ record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {}
layerStats = computeTileStats(decoded);
}
updater.recordTile(tile.coord(), zipped.length, layerStats);
lines.addAll(layerStatsSerializer.formatOutputRows(tile.coord(), zipped.length, layerStats));
batchData.add(new BatchData(tile.coord(), zipped.length, layerStats));
}
batch.stats.complete(lines);
batch.stats.complete(batchData);
}
});

var writeBranch = pipeline.readFromQueue(writerQueue)
.sinkTo("write", 1, prev -> {
try (var writer = newWriter(output)) {
writer.write(headerRow());
try (var writer = createWriter(output)) {
for (var batch : prev) {
for (var line : batch.stats.get()) {
writer.write(line);
for (var data : batch.stats.get()) {
writer.write(data.coord, data.archivedBytes, data.layerStats);
}
}
}
Expand Down Expand Up @@ -419,4 +432,76 @@ public int compareTo(LayerStats o) {
return layer.compareTo(o.layer);
}
}

/**
* Writer abstraction for layerstats output.
*/
public interface LayerStatsWriter extends AutoCloseable {

/** Write layerstats for a tile. */
void write(TileCoord tileCoord, int archivedBytes, List<LayerStats> layerStats) throws IOException;

@Override
void close() throws IOException;
}

/**
* Parquet writer for layerstats.
*/
private static class ParquetLayerStatsWriter implements LayerStatsWriter {

private static final MessageType SCHEMA = Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("z")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("x")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("y")
.required(PrimitiveType.PrimitiveTypeName.INT64).named("hilbert")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("archived_tile_bytes")
.required(PrimitiveType.PrimitiveTypeName.BINARY).as(org.apache.parquet.schema.LogicalTypeAnnotation.stringType())
.named("layer")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_bytes")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_features")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_geometries")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_attr_bytes")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_attr_keys")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_attr_values")
.named("layerstats");

private final ParquetWriter<Group> writer;
private final SimpleGroupFactory groupFactory;

ParquetLayerStatsWriter(Path output) throws IOException {
this.groupFactory = new SimpleGroupFactory(SCHEMA);
org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(output.toString());
this.writer = ExampleParquetWriter.builder(hadoopPath)
.withType(SCHEMA)
.withCompressionCodec(CompressionCodecName.SNAPPY)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bdon any preference on encoding/row group size here? What do you usually use when converting?

.build();
}

@Override
public void write(TileCoord tileCoord, int archivedBytes, List<LayerStats> layerStats) throws IOException {
long hilbert = tileCoord.hilbertEncoded();
for (var layer : layerStats) {
Group group = groupFactory.newGroup()
.append("z", tileCoord.z())
.append("x", tileCoord.x())
.append("y", tileCoord.y())
.append("hilbert", hilbert)
.append("archived_tile_bytes", archivedBytes)
.append("layer", layer.layer)
.append("layer_bytes", layer.layerBytes)
.append("layer_features", layer.layerFeatures)
.append("layer_geometries", layer.layerGeometries)
.append("layer_attr_bytes", layer.layerAttrBytes)
.append("layer_attr_keys", layer.layerAttrKeys)
.append("layer_attr_values", layer.layerAttrValues);
writer.write(group);
}
}

@Override
public void close() throws IOException {
writer.close();
}
}
}
Loading
Loading