我为我的Spark作业启用了Kryo序列化,启用了设置以要求注册,并确保我的所有类型都已注册.
val conf = new SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrationRequired", "true") conf.registerKryoClasses(classes) conf.registerAvroSchemas(avroSchemas: _*)
作业的Wallclock-time性能恶化了大约20%,并且洗牌的字节数增加了近400%.
鉴于Spark文档建议Kryo应该更好,这对我来说似乎真的很令人惊讶.
Kryo比Java序列化更快,更紧凑(通常高达10倍)
我手动调用serialize
Spark的实例上的方法org.apache.spark.serializer.KryoSerializer
和org.apache.spark.serializer.JavaSerializer
我的数据示例.结果与Spark文档中的建议一致:Kryo生成了98个字节; Java产生了993个字节.这确实是10倍的改进.
一个可能混淆的因素是被序列化和混洗的对象实现了Avro GenericRecord
接口.我尝试注册Avro架构SparkConf
,但没有显示出任何改进.
我尝试制作新的类来改组简单的Scala数据case class
,不包括任何Avro机器.它没有改善shuffle性能或交换的字节数.
Spark代码最终沸腾到以下:
case class A( f1: Long, f2: Option[Long], f3: Int, f4: Int, f5: Option[String], f6: Option[Int], f7: Option[String], f8: Option[Int], f9: Option[Int], f10: Option[Int], f11: Option[Int], f12: String, f13: Option[Double], f14: Option[Int], f15: Option[Double], f16: Option[Double], f17: List[String], f18: String) extends org.apache.avro.specific.SpecificRecordBase { def get(f: Int) : AnyRef = ??? def put(f: Int, value: Any) : Unit = ??? def getSchema(): org.apache.avro.Schema = A.SCHEMA$ } object A extends AnyRef with Serializable { val SCHEMA$: org.apache.avro.Schema = ??? } case class B( f1: Long f2: Long f3: String f4: String) extends org.apache.avro.specific.SpecificRecordBase { def get(field$ : Int) : AnyRef = ??? def getSchema() : org.apache.avro.Schema = B.SCHEMA$ def put(field$ : Int, value : Any) : Unit = ??? } object B extends AnyRef with Serializable { val SCHEMA$ : org.apache.avro.Schema = ??? } def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = { val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b) joined.map { case (_, asAndBs) => asAndBs } }
你知道可能会发生什么,或者我怎样才能获得Kryo应该提供的更好的性能?