我一直在研究Spark如何在Parquet中存储统计信息(最小/最大)以及它如何使用信息进行查询优化.我有几个问题.首先设置:Spark 2.1.0,下面设置一个1000行的Dataframe,一个long类型和一个字符串类型列.但是,它们按不同的列排序.
scala> spark.sql("select id, cast(id as string) text from range(1000)").sort("id").write.parquet("/secret/spark21-sortById") scala> spark.sql("select id, cast(id as string) text from range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
我在镶木地板工具中添加了一些代码来打印出统计数据并检查生成的镶木地板文件:
hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta /secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet file: file:/secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet creator: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]} file schema: spark_schema -------------------------------------------------------------------------------- id: REQUIRED INT64 R:0 D:0 text: REQUIRED BINARY O:UTF8 R:0 D:0 row group 1: RC:5 TS:133 OFFSET:4 -------------------------------------------------------------------------------- id: INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5 ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0] text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5 ENC:PLAIN,BIT_PACKED hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta /secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet file: file:/secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet creator: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]} file schema: spark_schema -------------------------------------------------------------------------------- id: REQUIRED INT64 R:0 D:0 text: REQUIRED BINARY O:UTF8 R:0 D:0 row group 1: RC:5 TS:140 OFFSET:4 -------------------------------------------------------------------------------- id: INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5 ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0] text: BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5 ENC:PLAIN,BIT_PACKED
所以问题是为什么Spark,特别是2.1.0,只为数字列生成最小值/最大值,而不是字符串(BINARY)字段,即使字符串字段包含在排序中?也许我错过了一个configuraiton?
第二个问题是,如何确认Spark是否正在使用最小值/最大值?
scala> sc.setLogLevel("INFO") scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where id=4").show
我有很多这样的行:
17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate: and(noteq(id, null), eq(id, 4)) 17/01/17 09:23:35 INFO FileScanRDD: Reading File path: file:///secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet, range: 0-558, partition values: [empty row] ... 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate: and(noteq(id, null), eq(id, 4)) 17/01/17 09:23:35 INFO FileScanRDD: Reading File path: file:///secret/spark21-sortById/part-00193-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet, range: 0-574, partition values: [empty row] ...
问题是看起来Spark正在扫描每个文件,即使从最小/最大,Spark也应该能够确定只有part-00000具有相关数据.或者我读错了,Spark正在跳过文件?也许Spark只能使用分区值进行数据跳过?
PARQUET-686进行了更改,以便在看起来合适时故意忽略二进制字段的统计信息.您可以通过设置parquet.strings.signed-min-max.enabled
为覆盖此行为true
.
设置该配置后,您可以使用实木复合板工具读取二进制字段中的最小/最大值.
我的另一个stackoverflow问题中的更多细节