-
-
Notifications
You must be signed in to change notification settings - Fork 185
feat: add Parquet output format for layerstats #1546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
1Ninad
wants to merge
7
commits into
onthegomap:main
Choose a base branch
from
1Ninad:write-parquet-layerstats
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 4 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
b66072f
feat: add Parquet output format for layerstats
1Ninad 8ed5a67
Make Parquet default, remove TSV support
1Ninad 168d795
Fix test and update all TSV references
1Ninad d0e8922
Update planetiler-core/src/main/java/com/onthegomap/planetiler/config…
1Ninad 051d275
Remove unused format arg, verify parquet row values
1Ninad f86b74f
Fix compile: remove layerstatsFormat field, clean warnings
1Ninad c76eed1
Revert to addNaturalEarthSource, sqlite is not GeoPackage
1Ninad File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
12 changes: 10 additions & 2 deletions
12
planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileEncodingResult.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,14 @@ | |
| import java.util.concurrent.atomic.AtomicLong; | ||
| import me.lemire.integercompression.IntWrapper; | ||
| import org.apache.commons.lang3.tuple.Pair; | ||
| import org.apache.parquet.example.data.Group; | ||
| import org.apache.parquet.example.data.simple.SimpleGroupFactory; | ||
| import org.apache.parquet.hadoop.ParquetWriter; | ||
| import org.apache.parquet.hadoop.example.ExampleParquetWriter; | ||
| import org.apache.parquet.hadoop.metadata.CompressionCodecName; | ||
| import org.apache.parquet.schema.MessageType; | ||
| import org.apache.parquet.schema.PrimitiveType; | ||
| import org.apache.parquet.schema.Types; | ||
| import org.locationtech.jts.geom.Geometry; | ||
| import org.locationtech.jts.geom.GeometryCollection; | ||
| import org.maplibre.mlt.converter.encodings.MltTypeMap; | ||
|
|
@@ -57,14 +65,14 @@ | |
| * Utilities for extracting tile and layer size summaries from encoded vector tiles. | ||
| * <p> | ||
| * {@link #computeTileStats(VectorTileProto.Tile)} extracts statistics about each layer in a tile and | ||
| * {@link TsvSerializer} formats them as row of a TSV file to write. | ||
| * writes them to a Parquet file. | ||
| * <p> | ||
| * To generate a tsv.gz file with stats for each tile, you can add {@code --output-layerstats} option when generating an | ||
| * archive, or run the following an existing archive: | ||
| * To generate a Parquet file with stats for each tile, you can add {@code --output-layerstats} option when generating an | ||
| * archive, or run the following on an existing archive: | ||
| * | ||
| * <pre> | ||
| * {@code | ||
| * java -jar planetiler.jar stats --input=<path to pmtiles or mbtiles> --output=layerstats.tsv.gz | ||
| * java -jar planetiler.jar stats --input=<path to pmtiles or mbtiles> --output=layerstats.parquet | ||
| * } | ||
| * </pre> | ||
| */ | ||
|
|
@@ -79,7 +87,17 @@ public class TileSizeStats { | |
|
|
||
| /** Returns the default path that a layerstats file should go relative to an existing archive. */ | ||
| public static Path getDefaultLayerstatsPath(Path archive) { | ||
| return archive.resolveSibling(archive.getFileName() + ".layerstats.tsv.gz"); | ||
| return archive.resolveSibling(archive.getFileName() + ".layerstats.parquet"); | ||
| } | ||
|
|
||
| /** Returns the default path for layerstats based on the archive and output format. */ | ||
| public static Path getDefaultLayerstatsPath(Path archive, String format) { | ||
| return archive.resolveSibling(archive.getFileName() + ".layerstats.parquet"); | ||
| } | ||
|
|
||
| /** Creates a Parquet layerstats writer. */ | ||
| public static LayerStatsWriter createWriter(String format, Path output) throws IOException { | ||
|
1Ninad marked this conversation as resolved.
Outdated
|
||
| return new ParquetLayerStatsWriter(output); | ||
| } | ||
|
|
||
| public static void main(String... args) { | ||
|
|
@@ -94,12 +112,14 @@ public static void main(String... args) { | |
| var inputString = arguments.getString("input", "input file"); | ||
| var input = TileArchiveConfig.from(inputString); | ||
| var localPath = input.getLocalPath(); | ||
| String format = config.layerstatsFormat(); | ||
| var output = localPath == null ? | ||
| arguments.file("output", "output file") : | ||
| arguments.file("output", "output file", getDefaultLayerstatsPath(localPath)); | ||
| arguments.file("output", "output file", getDefaultLayerstatsPath(localPath, format)); | ||
| var counter = new AtomicLong(0); | ||
| var timer = stats.startStage("tilestats"); | ||
| record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {} | ||
| record BatchData(TileCoord coord, int archivedBytes, List<LayerStats> layerStats) {} | ||
| record Batch(List<Tile> tiles, CompletableFuture<List<BatchData>> stats) {} | ||
| WorkQueue<Batch> writerQueue = new WorkQueue<>("tilestats_write_queue", 1_000, 1, stats); | ||
| var pipeline = WorkerPipeline.start("tilestats", stats); | ||
| var readBranch = pipeline | ||
|
|
@@ -137,9 +157,9 @@ record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {} | |
| List<LayerStats> layerStats = null; | ||
|
|
||
| var updater = tileStats.threadLocalUpdater(); | ||
| var layerStatsSerializer = TileSizeStats.newThreadLocalSerializer(); | ||
| for (var batch : prev) { | ||
| List<String> lines = new ArrayList<>(batch.tiles.size()); | ||
| // collect raw stats per tile; serialization happens in write stage | ||
| List<BatchData> batchData = new ArrayList<>(batch.tiles.size()); | ||
| for (var tile : batch.tiles) { | ||
| if (!Arrays.equals(zipped, tile.bytes())) { | ||
| zipped = tile.bytes(); | ||
|
|
@@ -148,19 +168,18 @@ record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {} | |
| layerStats = computeTileStats(decoded); | ||
| } | ||
| updater.recordTile(tile.coord(), zipped.length, layerStats); | ||
| lines.addAll(layerStatsSerializer.formatOutputRows(tile.coord(), zipped.length, layerStats)); | ||
| batchData.add(new BatchData(tile.coord(), zipped.length, layerStats)); | ||
| } | ||
| batch.stats.complete(lines); | ||
| batch.stats.complete(batchData); | ||
| } | ||
| }); | ||
|
|
||
| var writeBranch = pipeline.readFromQueue(writerQueue) | ||
| .sinkTo("write", 1, prev -> { | ||
| try (var writer = newWriter(output)) { | ||
| writer.write(headerRow()); | ||
| try (var writer = createWriter(format, output)) { | ||
| for (var batch : prev) { | ||
| for (var line : batch.stats.get()) { | ||
| writer.write(line); | ||
| for (var data : batch.stats.get()) { | ||
| writer.write(data.coord, data.archivedBytes, data.layerStats); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -419,4 +438,76 @@ public int compareTo(LayerStats o) { | |
| return layer.compareTo(o.layer); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Writer abstraction for layerstats output. | ||
| */ | ||
| public interface LayerStatsWriter extends AutoCloseable { | ||
|
|
||
| /** Write layerstats for a tile. */ | ||
| void write(TileCoord tileCoord, int archivedBytes, List<LayerStats> layerStats) throws IOException; | ||
|
|
||
| @Override | ||
| void close() throws IOException; | ||
| } | ||
|
|
||
| /** | ||
| * Parquet writer for layerstats. | ||
| */ | ||
| private static class ParquetLayerStatsWriter implements LayerStatsWriter { | ||
|
|
||
| private static final MessageType SCHEMA = Types.buildMessage() | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("z") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("x") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("y") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT64).named("hilbert") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("archived_tile_bytes") | ||
| .required(PrimitiveType.PrimitiveTypeName.BINARY).as(org.apache.parquet.schema.LogicalTypeAnnotation.stringType()) | ||
| .named("layer") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_bytes") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_features") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_geometries") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_attr_bytes") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_attr_keys") | ||
| .required(PrimitiveType.PrimitiveTypeName.INT32).named("layer_attr_values") | ||
| .named("layerstats"); | ||
|
|
||
| private final ParquetWriter<Group> writer; | ||
| private final SimpleGroupFactory groupFactory; | ||
|
|
||
| ParquetLayerStatsWriter(Path output) throws IOException { | ||
| this.groupFactory = new SimpleGroupFactory(SCHEMA); | ||
| org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(output.toString()); | ||
| this.writer = ExampleParquetWriter.builder(hadoopPath) | ||
| .withType(SCHEMA) | ||
| .withCompressionCodec(CompressionCodecName.SNAPPY) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bdon any preference on encoding/row group size here? What do you usually use when converting? |
||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
| public void write(TileCoord tileCoord, int archivedBytes, List<LayerStats> layerStats) throws IOException { | ||
| long hilbert = tileCoord.hilbertEncoded(); | ||
| for (var layer : layerStats) { | ||
| Group group = groupFactory.newGroup() | ||
| .append("z", tileCoord.z()) | ||
| .append("x", tileCoord.x()) | ||
| .append("y", tileCoord.y()) | ||
| .append("hilbert", hilbert) | ||
| .append("archived_tile_bytes", archivedBytes) | ||
| .append("layer", layer.layer) | ||
| .append("layer_bytes", layer.layerBytes) | ||
| .append("layer_features", layer.layerFeatures) | ||
| .append("layer_geometries", layer.layerGeometries) | ||
| .append("layer_attr_bytes", layer.layerAttrBytes) | ||
| .append("layer_attr_keys", layer.layerAttrKeys) | ||
| .append("layer_attr_values", layer.layerAttrValues); | ||
| writer.write(group); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| writer.close(); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.