当前位置:  开发笔记 > 前端 > 正文

如何清理Flink流状态为非活动密钥?

如何解决《如何清理Flink流状态为非活动密钥?》经验,为你挑选了1个好方法。

我的目标是建立一个Flink流程序来保存最后的N ID,其中id是从事件中提取的.接收器是Cassandra存储器,因此可以随时获取id列表.重要的是Cassandra会在每次活动后立即更新.

这可以很容易地实现mapWithState(参见下面的代码).但是,此代码存在重要问题.国家是关键userid.有些用户可能会活动一段时间,然后再也不会.我担心的是状态存储将永远增长.

如何清除非活动密钥的状态?

case class MyEvent(userId: Int, id: String)

env
  .addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties))
  .keyBy(_.userId)
  .mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) =>
    val keepNIds = currentIds match {
      case None => Seq(in.id)
      case Some(cids) => (cids :+ in.id).takeRight(100)
    }
    ((in.userId, keepNIds), Some(keepNIds))
  }
  .addSink { in: (Int, Seq[String]) =>
    CassandraSink.appDatabase.idsTable.store(...)
  }

Fabian Huesk.. 6

不断增长的状态是一个重要而正确的观察.如果您的键空间正在移动,这肯定会发生.

Flink 1.2.0添加了ProcessFunction解决此问题的方法.A ProcessFunction类似于a FlatMapFunction但可以访问计时器服务.您可以注册onTimer()在到期时调用回调函数的计时器.回调可用于清理状态.



1> Fabian Huesk..:

不断增长的状态是一个重要而正确的观察.如果您的键空间正在移动,这肯定会发生.

Flink 1.2.0添加了ProcessFunction解决此问题的方法.A ProcessFunction类似于a FlatMapFunction但可以访问计时器服务.您可以注册onTimer()在到期时调用回调函数的计时器.回调可用于清理状态.

推荐阅读
赛亚兔备_393
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有