Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -835,8 +835,9 @@ public void run() {
}

Path layerStatsPath = arguments.file("layer_stats", "layer stats output path",
// default to <output file>.layerstats.tsv.gz
TileSizeStats.getDefaultLayerstatsPath(Optional.ofNullable(output.getLocalPath()).orElse(Path.of("output"))));
// default to <output file>.layerstats.tsv.gz or .layerstats.parquet based on format
TileSizeStats.getDefaultLayerstatsPath(Optional.ofNullable(output.getLocalPath()).orElse(Path.of("output")),
config.layerstatsFormat()));

if (config.tileWriteThreads() < 1) {
throw new IllegalArgumentException("require tile_write_threads >= 1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ public static void writeOutput(FeatureGroup features, WriteableTileArchive outpu

if (config.outputLayerStats()) {
layerStatsBranch = pipeline.readFromQueue(layerStatsQueue)
.sinkTo("stats", 1, tileStatsWriter(layerStatsPath));
// instance method: needs config to resolve layerstats format
.sinkTo("stats", 1, writer.tileStatsWriter(layerStatsPath));
}

var loggers = ProgressLoggers.create()
Expand Down Expand Up @@ -210,14 +211,17 @@ public static void writeOutput(FeatureGroup features, WriteableTileArchive outpu
timer.stop();
}

private static WorkerPipeline.SinkStep<TileBatch> tileStatsWriter(Path layerStatsPath) {
private WorkerPipeline.SinkStep<TileBatch> tileStatsWriter(Path layerStatsPath) {
return prev -> {
try (var statsWriter = TileSizeStats.newWriter(layerStatsPath)) {
statsWriter.write(TileSizeStats.headerRow());
try (var statsWriter = TileSizeStats.createWriter(config.layerstatsFormat(), layerStatsPath)) {
for (var batch : prev) {
for (var encodedTile : batch.out().get()) {
for (var line : encodedTile.layerStats()) {
statsWriter.write(line);
if (encodedTile.layerStatsData() != null) {
statsWriter.write(
encodedTile.layerStatsData().coord(),
encodedTile.layerStatsData().archivedBytes(),
encodedTile.layerStatsData().layerStats()
);
}
}
}
Expand Down Expand Up @@ -280,7 +284,6 @@ private void tileEncoderSink(Iterable<TileBatch> prev) throws IOException {
boolean lastIsFill = false;
List<TileSizeStats.LayerStats> lastLayerStats = null;
boolean skipFilled = config.skipFilledTiles();
var layerStatsSerializer = TileSizeStats.newThreadLocalSerializer();
boolean includeIds = !config.excludeIds();

var tileStatsUpdater = tileStats.threadLocalUpdater();
Expand Down Expand Up @@ -373,16 +376,17 @@ private void tileEncoderSink(Iterable<TileBatch> prev) throws IOException {
}
if (!(skipFilled && lastIsFill) && bytes != null) {
tileStatsUpdater.recordTile(tileFeatures.tileCoord(), bytes.length, layerStats);
List<String> layerStatsRows = config.outputLayerStats() ?
layerStatsSerializer.formatOutputRows(tileFeatures.tileCoord(), bytes.length, layerStats) :
List.of();
// store raw stats; writer decides format (tsv or parquet)
LayerStatsData layerStatsData = config.outputLayerStats() ?
new LayerStatsData(tileFeatures.tileCoord(), bytes.length, layerStats) :
null;
result.add(
new TileEncodingResult(
tileFeatures.tileCoord(),
bytes,
encoded.length,
tileDataHash == null ? OptionalLong.empty() : OptionalLong.of(tileDataHash),
layerStatsRows
layerStatsData
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
package com.onthegomap.planetiler.archive;

import com.onthegomap.planetiler.geo.TileCoord;
import com.onthegomap.planetiler.util.TileSizeStats;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;

/** Layer statistics data for a tile. */
record LayerStatsData(
TileCoord coord,
int archivedBytes,
List<TileSizeStats.LayerStats> layerStats
) {}

public record TileEncodingResult(
TileCoord coord,
byte[] tileData,
int rawTileSize,
/* will always be empty in non-compact mode and might also be empty in compact mode */
OptionalLong tileDataHash,
List<String> layerStats
LayerStatsData layerStatsData
) {
public TileEncodingResult(
TileCoord coord,
byte[] tileData,
OptionalLong tileDataHash
) {
this(coord, tileData, tileData.length, tileDataHash, List.of());
this(coord, tileData, tileData.length, tileDataHash, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public record PlanetilerConfig(
boolean mltReorderFeature,
boolean mltSharedDictionaries,
boolean outputLayerStats,
String layerstatsFormat,
String debugUrlPattern,
Path tmpDir,
Path tileWeights,
Expand Down Expand Up @@ -237,6 +238,8 @@ public static PlanetilerConfig from(Arguments arguments) {
"Allow re-ordering output features within each layer when tile format=MLT to reduce tile sizes", false),
arguments.getBoolean("mlt_shared_dict", "Share dictionaries between string fields when tile format=MLT", false),
arguments.getBoolean("output_layerstats", "output a tsv.gz file for each tile/layer size", false),
arguments.getString("layerstats_format|layerstats-format",
"layerstats output format (tsv or parquet)", "tsv"),
arguments.getString("debug_url", "debug url to use for displaying tiles with {z} {lat} {lon} placeholders",
"https://onthegomap.github.io/planetiler-demo/#{z}/{lat}/{lon}"),
tmpDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
import java.util.concurrent.atomic.AtomicLong;
import me.lemire.integercompression.IntWrapper;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryCollection;
import org.maplibre.mlt.converter.encodings.MltTypeMap;
Expand Down Expand Up @@ -82,6 +90,21 @@ public static Path getDefaultLayerstatsPath(Path archive) {
return archive.resolveSibling(archive.getFileName() + ".layerstats.tsv.gz");
}

/** Returns the default path for layerstats based on the archive and output format. */
public static Path getDefaultLayerstatsPath(Path archive, String format) {
Comment thread
1Ninad marked this conversation as resolved.
Outdated
String extension = "parquet".equalsIgnoreCase(format) ? ".layerstats.parquet" : ".layerstats.tsv.gz";
return archive.resolveSibling(archive.getFileName() + extension);
}

/** Creates a layerstats writer based on the output format. */
public static LayerStatsWriter createWriter(String format, Path output) throws IOException {
Comment thread
1Ninad marked this conversation as resolved.
Outdated
if ("parquet".equalsIgnoreCase(format)) {
return new ParquetLayerStatsWriter(output);
} else {
return new TsvLayerStatsWriter(output);
}
}

public static void main(String... args) {
var arguments = Arguments.fromArgsOrConfigFile(args);
var config = PlanetilerConfig.from(arguments);
Expand All @@ -94,12 +117,14 @@ public static void main(String... args) {
var inputString = arguments.getString("input", "input file");
var input = TileArchiveConfig.from(inputString);
var localPath = input.getLocalPath();
String format = config.layerstatsFormat();
var output = localPath == null ?
arguments.file("output", "output file") :
arguments.file("output", "output file", getDefaultLayerstatsPath(localPath));
arguments.file("output", "output file", getDefaultLayerstatsPath(localPath, format));
var counter = new AtomicLong(0);
var timer = stats.startStage("tilestats");
record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {}
record BatchData(TileCoord coord, int archivedBytes, List<LayerStats> layerStats) {}
record Batch(List<Tile> tiles, CompletableFuture<List<BatchData>> stats) {}
WorkQueue<Batch> writerQueue = new WorkQueue<>("tilestats_write_queue", 1_000, 1, stats);
var pipeline = WorkerPipeline.start("tilestats", stats);
var readBranch = pipeline
Expand Down Expand Up @@ -137,9 +162,9 @@ record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {}
List<LayerStats> layerStats = null;

var updater = tileStats.threadLocalUpdater();
var layerStatsSerializer = TileSizeStats.newThreadLocalSerializer();
for (var batch : prev) {
List<String> lines = new ArrayList<>(batch.tiles.size());
// collect raw stats per tile; serialization happens in write stage
List<BatchData> batchData = new ArrayList<>(batch.tiles.size());
for (var tile : batch.tiles) {
if (!Arrays.equals(zipped, tile.bytes())) {
zipped = tile.bytes();
Expand All @@ -148,19 +173,18 @@ record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {}
layerStats = computeTileStats(decoded);
}
updater.recordTile(tile.coord(), zipped.length, layerStats);
lines.addAll(layerStatsSerializer.formatOutputRows(tile.coord(), zipped.length, layerStats));
batchData.add(new BatchData(tile.coord(), zipped.length, layerStats));
}
batch.stats.complete(lines);
batch.stats.complete(batchData);
}
});

var writeBranch = pipeline.readFromQueue(writerQueue)
.sinkTo("write", 1, prev -> {
try (var writer = newWriter(output)) {
writer.write(headerRow());
try (var writer = createWriter(format, output)) {
for (var batch : prev) {
for (var line : batch.stats.get()) {
writer.write(line);
for (var data : batch.stats.get()) {
writer.write(data.coord, data.archivedBytes, data.layerStats);
}
}
}
Expand Down Expand Up @@ -419,4 +443,103 @@ public int compareTo(LayerStats o) {
return layer.compareTo(o.layer);
}
}

/**
* Writer abstraction for layerstats output.
*/
public interface LayerStatsWriter extends AutoCloseable {

/** Write layerstats for a tile. */
void write(TileCoord tileCoord, int archivedBytes, List<LayerStats> layerStats) throws IOException;

@Override
void close() throws IOException;
}

/**
* TSV writer for layerstats.
*/
private static class TsvLayerStatsWriter implements LayerStatsWriter {

private final Writer writer;
private final TsvSerializer serializer;

TsvLayerStatsWriter(Path output) throws IOException {
this.writer = newWriter(output);
this.serializer = newThreadLocalSerializer();
writer.write(headerRow());
}

@Override
public void write(TileCoord tileCoord, int archivedBytes, List<LayerStats> layerStats) throws IOException {
for (var line : serializer.formatOutputRows(tileCoord, archivedBytes, layerStats)) {
writer.write(line);
}
}

@Override
public void close() throws IOException {
writer.close();
}
}

/**
* Parquet writer for layerstats.
*/
private static class ParquetLayerStatsWriter implements LayerStatsWriter {

private static final MessageType SCHEMA = Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("z")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("x")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("y")
.required(PrimitiveType.PrimitiveTypeName.INT64).named("hilbert")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("archived_tile_bytes")
.required(PrimitiveType.PrimitiveTypeName.BINARY).as(org.apache.parquet.schema.LogicalTypeAnnotation.stringType())
.named("layer")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_bytes")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_features")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_geometries")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_attr_bytes")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_attr_keys")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_attr_values")
.named("layerstats");

private final ParquetWriter<Group> writer;
private final SimpleGroupFactory groupFactory;

ParquetLayerStatsWriter(Path output) throws IOException {
this.groupFactory = new SimpleGroupFactory(SCHEMA);
org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(output.toString());
this.writer = ExampleParquetWriter.builder(hadoopPath)
.withType(SCHEMA)
.withCompressionCodec(CompressionCodecName.SNAPPY)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bdon any preference on encoding/row group size here? What do you usually use when converting?

.build();
}

@Override
public void write(TileCoord tileCoord, int archivedBytes, List<LayerStats> layerStats) throws IOException {
long hilbert = tileCoord.hilbertEncoded();
for (var layer : layerStats) {
Group group = groupFactory.newGroup()
.append("z", tileCoord.z())
.append("x", tileCoord.x())
.append("y", tileCoord.y())
.append("hilbert", hilbert)
.append("archived_tile_bytes", archivedBytes)
.append("layer", layer.layer)
.append("layer_bytes", layer.layerBytes)
.append("layer_features", layer.layerFeatures)
.append("layer_geometries", layer.layerGeometries)
.append("layer_attr_bytes", layer.layerAttrBytes)
.append("layer_attr_keys", layer.layerAttrKeys)
.append("layer_attr_values", layer.layerAttrValues);
writer.write(group);
}
}

@Override
public void close() throws IOException {
writer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@

import static com.onthegomap.planetiler.TestUtils.newPoint;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.onthegomap.planetiler.VectorTile;
import com.onthegomap.planetiler.geo.TileCoord;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.maplibre.mlt.converter.ConversionConfig;
import org.maplibre.mlt.converter.FeatureTableOptimizations;
import org.maplibre.mlt.converter.MltConverter;
Expand Down Expand Up @@ -221,4 +225,39 @@ void computeStats2FeaturesNested() throws IOException {
.trim(),
(TileSizeStats.headerRow() + String.join("", formatted)).trim());
}

@Test
void testParquetOutput(@TempDir Path tempDir) throws IOException {
Path parquetFile = tempDir.resolve("layerstats.parquet");
var stats = List.of(
new TileSizeStats.LayerStats("layer1", 100, 5, 5, 20, 2, 3),
new TileSizeStats.LayerStats("layer2", 150, 10, 10, 30, 3, 4)
);

// Write Parquet file
try (var writer = TileSizeStats.createWriter("parquet", parquetFile)) {
writer.write(TileCoord.ofXYZ(1, 2, 3), 999, stats);
}

// Verify file exists and has content
assertTrue(Files.exists(parquetFile));
assertTrue(Files.size(parquetFile) > 0);
Comment thread
1Ninad marked this conversation as resolved.
Outdated
}

@Test
void testTsvOutput(@TempDir Path tempDir) throws IOException {
Path tsvFile = tempDir.resolve("layerstats.tsv.gz");
var stats = List.of(
new TileSizeStats.LayerStats("layer1", 100, 5, 5, 20, 2, 3)
);

// Write TSV file
try (var writer = TileSizeStats.createWriter("tsv", tsvFile)) {
writer.write(TileCoord.ofXYZ(1, 2, 3), 999, stats);
}

// Verify file exists and has content
assertTrue(Files.exists(tsvFile));
assertTrue(Files.size(tsvFile) > 0);
}
}
Loading