当前位置:  开发笔记 > 编程语言 > 正文

如何使用连接池在Spark中进行Postgresql

如何解决《如何使用连接池在Spark中进行Postgresql》经验,为你挑选了0个好方法。

我有一个spark(1.2.1 v)作业,使用scala的 postgresql.Driver将rdd的内容插入到postgres中:

rdd.foreachPartition(iter => {

        //connect to postgres database on the localhost
        val driver = "org.postgresql.Driver"
        var connection:Connection = null
        Class.forName(driver)
        connection = DriverManager.getConnection(url, username, password)
        val statement = connection.createStatement()

        iter.foreach(row => {
            val mapRequest = Utils.getInsertMap(row)
            val query = Utils.getInsertRequest(squares_table, mapRequest)

            try { statement.execute(query) } 
            catch {
                case pe: PSQLException => println("exception caught: " + pe);
            }
        })
        connection.close()
})

在上面的代码中,我为rdd的每个分区打开了到postgres的新连接,然后关闭它。我认为,以正确的方式去将是使用连接池,Postgres的,我可以从连接(如描述在这里),但它只是伪代码:

rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
}

从Spark连接池连接Postgres的正确方法是什么?

推荐阅读
依然-狠幸福
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有