我有一个spark(1.2.1 v)作业,使用scala的 postgresql.Driver将rdd的内容插入到postgres中:
rdd.foreachPartition(iter => { //connect to postgres database on the localhost val driver = "org.postgresql.Driver" var connection:Connection = null Class.forName(driver) connection = DriverManager.getConnection(url, username, password) val statement = connection.createStatement() iter.foreach(row => { val mapRequest = Utils.getInsertMap(row) val query = Utils.getInsertRequest(squares_table, mapRequest) try { statement.execute(query) } catch { case pe: PSQLException => println("exception caught: " + pe); } }) connection.close() })
在上面的代码中,我为rdd的每个分区打开了到postgres的新连接,然后关闭它。我认为,以正确的方式去将是使用连接池,Postgres的,我可以从连接(如描述在这里),但它只是伪代码:
rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }
从Spark连接池连接Postgres的正确方法是什么?