我们正在使用Spark加载文件目录的层次结构并将它们转换为Parquet.数百个管道分隔文件中有数十GB.有些人自己很大.
例如,每个第100个文件都有一行或两行,它们有一个额外的分隔符,使整个进程(或文件)中止.
我们正在加载使用:
sqlContext.read .format("com.databricks.spark.csv") .option("header", format("header")) .option("delimiter", format("delimeter")) .option("quote", format("quote")) .option("escape", format("escape")) .option("charset", "UTF-8") // Column types are unnecessary for our current use cases. //.option("inferschema", "true") .load(glob)
是否有任何扩展或事件处理机制与Spark,我们可以附加到读取行的逻辑,如果遇到格式错误的行,只是跳过行而不是失败进程?
(我们计划进行更多的预处理,但这将是最直接和最关键的解决方案.)
在你的情况下,它可能不是它的Spark解析部分失败,而是默认实际上是PERMISSIVE
这样的事实,即它将尽力而为的语法解析为格式错误的记录,然后在处理逻辑中进一步导致下游问题.
您应该只需添加选项:
.option("mode", "DROPMALFORMED")
像这样:
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load(glob)
并且它将跳过具有错误数量的分隔符或与模式不匹配的行,而不是让它们在代码中稍后导致错误.