我正在从和类型的gz
压缩json
文件中创建图形。edge
vertices
我已将文件放在此处的保管箱文件夹中
我加载并映射这些json
记录以创建所需的vertices
和edge
类型,graphx
如下所示:
val vertices_raw = sqlContext.read.json("path/vertices.json.gz") val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index"))) val verticesRDD: RDD[(VertexId, Long)] = vertices val edges_raw = sqlContext.read.json("path/edges.json.gz") val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length")))) val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)
然后,我使用dijkstra
发现的这种实现来计算两个顶点之间的最短路径:
def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = { var g2 = g.mapVertices( (vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]()) ) for (i <- 1L to g.vertices.count - 1) { val currentVertexId: VertexId = g2.vertices.filter(!_._2._1) .fold((0L, (false, Double.MaxValue, List[VertexId]())))( (a, b) => if (a._2._2 < b._2._2) a else b) ._1 val newDistances: VertexRDD[(Double, List[VertexId])] = g2.aggregateMessages[(Double, List[VertexId])]( ctx => if (ctx.srcId == currentVertexId) { ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId)) }, (a, b) => if (a._1 < b._1) a else b ) g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => { val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]())) ( vd._1 || vid == currentVertexId, math.min(vd._2, newSumVal._1), if (vd._2 < newSumVal._1) vd._3 else newSumVal._2 ) }) } g.outerJoinVertices(g2.vertices)((vid, vd, dist) => (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]())) .productIterator.toList.tail )) }
我采用两个随机顶点ID:
val v1 = 4000000028222916L val v2 = 4000000031019012L
并计算它们之间的路径:
val results = dijkstra(my_graph, v1).vertices.map(_._2).collect
我无法在笔记本电脑上本地计算此数据,而不会出现stackoverflow错误。我可以看到它正在使用4个可用内核中的3个。我可以加载该图,并igraph
使用完全相同的图上的Python库每秒计算最短的10条路径。这是计算路径的低效手段吗?在规模上,将在多个节点上计算路径(无堆栈溢出错误),但每次路径计算仍为30/40秒。