在Spark Streaming中,可以(并且必须使用有状态操作)将StreamingContext
检查点设置为(AND)的可靠数据存储(S3,HDFS,...):
元数据
DStream
血统
如上所述这里,设置输出数据存储需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)
另一方面,可以DataStream
通过调用checkpoint(timeInterval)
它们来为每个设置谱系检查点间隔.实际上,建议将谱系检查点间隔设置为DataStream
滑动间隔的5到10倍:
dstream.checkpoint(checkpointInterval).通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的设置.
我的问题是:
当流上下文设置为执行检查点并且没有ds.checkpoint(interval)
被调用时,是否为所有数据流启用了谱系检查点,默认值checkpointInterval
等于batchInterval
?或者,相反,只有元数据检查点启用了什么?
检查Spark代码(v1.5)我发现DStream
在两种情况下启用了s'检查点:
通过显式调用他们的checkpoint
方法(不是StreamContext
):
/** * Enable periodic checkpointing of RDDs of this DStream * @param interval Time interval after which generated RDD will be checkpointed */ def checkpoint(interval: Duration): DStream[T] = { if (isInitialized) { throw new UnsupportedOperationException( "Cannot change checkpoint interval of an DStream after streaming context has started") } persist() checkpointDuration = interval this }
在DStream
初始化时,只要具体的'DStream'子类具有重写mustCheckpoint
属性(将其设置为true
):
private[streaming] def initialize(time: Time) { ... ... // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger if (mustCheckpoint && checkpointDuration == null) { checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt logInfo("Checkpoint interval automatically set to " + checkpointDuration) } ...
第一种情况很明显.对Spark Streaming代码执行简单的分析:
grep "val mustCheckpoint = true" $(find -type f -name "*.scala") > ./org/apache/spark/streaming/api/python/PythonDStream.scala: override val mustCheckpoint = true >./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala: override val mustCheckpoint = true >./org/apache/spark/streaming/dstream/StateDStream.scala: override val mustCheckpoint = true
我可以发现,通常(忽略PythonDStream
),StreamingContext
检查点仅启用StateDStream
和ReducedWindowedDStream
实例的谱系检查点.这些实例是转换的结果(分别为AND):
updateStateByKey:即通过多个窗口提供状态的流.
reduceByKeyAndWindow