在深入研究问题的具体细节之前,有关Apache Flink中函数可序列化的一些背景知识:
Apache Flink使用Java Serialization(java.io.Serializable)将函数对象(此处MapFunction
)发送给并行执行它们的worker.因此,函数需要是可序列化的:函数可能不包含任何非可序列化的字段,即非原始(int,long,double,...)和不实现的类型java.io.Serializable
.
使用非可序列化构造的典型方法是懒惰地初始化它们.
在Flink函数中使用非可序列化类型的一种方法是懒惰地初始化它们.保存这些类型的字段仍然null
是在序列化要发送的函数时,并且仅在工作人员对函数进行反序列化后设置.
例如,在Scala中,您可以简单地使用惰性字段lazy val x = new NonSerializableType()
.该NonSerializableType
类型实际上只在首次访问变量时创建,该变量x
通常在worker上.因此,类型可以是不可序列化的,因为x
当函数序列化为运送给工作者时,该类型为null.
在Java中,open()
如果将其设置为Rich Function,则可以在函数方法上初始化非可序列化字段.丰富的功能(如RichMapFunction
)是基本功能的扩展版本(这里MapFunction
),让您可以访问生命周期方法,如open()
和close()
.
我不太熟悉依赖注入,但是dagger似乎也提供了类似于懒惰依赖的东西,这可能有助于解决方法,就像Scala中的惰性变量一样:
new MapFunction() { @Inject Lazy dep; public Long map(Long value) { return dep.get().doSomething(value); } }