使用Slick,您可以执行以下操作以从表中生成结果流:
val q = for (e <- events) yield e.name val p: DatabasePublisher[String] = db.stream(q.result) p.foreach { s => println(s"Event: $s") }
这将打印events
表中的所有事件并在最后一行之后终止.
假设您可以通过某种方式通知您何时在events
表中输入新行,是否可以编写一个在插入事件时连续输出事件的流?一种tail -f
用于DB表的.
我认为Slick本身不支持这个,但我认为应该可以使用Akka流媒体来提供帮助.因此,如果您可以从Slick Source获取某些东西,直到它为空,那么等待一个事件来指示表中的更多数据,然后流式传输新数据.可能通过使用一个ActorPublisher
绑定这个逻辑?
只是想知道某人是否有这方面的经验或任何建议?
您是对的ActorPublisher
:)这是一个使用PostgreSQL,异步DB驱动程序和LISTEN / NOTIFY机制的简单示例:
演员:
class PostgresListener extends ActorPublisher[String] { override def receive = { case _ ? val configuration = URLParser.parse(s"jdbc://postgresql://$host:$port/$db?user=$user&password=$password") val connection = new PostgreSQLConnection(configuration) Await.result(connection.connect, 5.seconds) connection.sendQuery(s"LISTEN $channel") connection.registerNotifyListener { message ? onNext(message.payload) } } }
服务:
def stream: Source[ServerSentEvent, Unit] = { val dataPublisherRef = Props[PostgresListener] val dataPublisher = ActorPublisher[String](dataPublisherRef) dataPublisherRef ! "go" Source(dataPublisher) .map(ServerSentEvent(_)) .via(WithHeartbeats(10.second)) }
build.sbt
在libraryDependencies
:
"com.github.mauricio" %% "postgresql-async" % "0.2.18"
Postgres触发器应该调用 select pg_notify('foo', 'payload')
据我所知,Slick不支持LISTEN
。