我是后台流程的新手,所以如果我做错了假设,请随时指出。
我正在尝试编写一个脚本,用于将导入数据从大型CSV文件导入Neo4j db(将其视为数据流,无休止地)。csv文件仅包含两列-user_a_id和user_b_id,它们映射有向关系。需要考虑的几件事:
数据可能重复
同一用户可以映射到其他多个用户,并且不能保证该用户何时会再次显示。
我当前的解决方案:我正在使用sidekiq,并且有一个工作人员批量读取文件,并分派工作人员在数据库中创建边。
我遇到的问题:
由于我正在接收数据流,因此无法对文件进行预排序并为一个用户分配建立关系的作业。
由于作业是异步执行的,因此如果两个工作人员正在处理同一节点的关系,我将获得Neo4j的写锁。
假设我绕过写锁,如果两个工作人员正在处理重复的记录,那么我将构建重复的边。
可能的解决方案:建立一个同步队列,并且只有一个工作线程来执行写操作(似乎sidekiq或resque都没有该选项)。这可能非常慢,因为只有一个线程在工作。
或者,我可以编写自己的实现,该实现创建一个工作程序以根据user_id(每个队列一个唯一的ID)构建多个作业队列,并使用Redis进行存储。然后,为每个队列分配一个工作线程以写入数据库。设置最大队列数,这样我就不会用完内存,并在队列耗尽所有作业后将其删除(如果以后会看到相同的user_id,请重新构建它)。-虽然听起来并不简单,所以我更喜欢在使用之前使用现有的库。
我的问题是-是否可以使用现有的宝石?处理此问题的良好做法是什么?
您有很多选择;)
如果您的数据确实在文件中而不是在流中,那么我绝对建议您检查一下Neo4j随附的neo4j-import命令。它允许您以每秒一百万行的速度导入CSV数据。两个警告:您可能需要稍微修改文件格式,并且需要生成一个新的数据库(它不会将新数据导入到现有数据库中)
我也会熟悉该LOAD CSV
命令。这将采用任何格式的CSV,并允许您编写一些Cypher命令来转换和导入数据。它的速度不如neo4j-import
,但非常快,它可以从磁盘或URL流式传输CSV文件。
由于您使用的是Ruby,因此我也建议您查看neo4apis。这是我编写的一个gem,它使批量导入数据更加容易,这样您就不会对文件中的每个条目都发出单个请求。它允许您使用导入程序在DSL中定义类。这些导入器可以使用任何种类的Ruby对象,并且鉴于该Ruby对象,它们将定义应使用add_node
和add_relationship
方法导入的内容。在幕后,这将生成Cypher查询,这些查询将被批量缓冲和执行,这样您就不会有太多往返Neo4j的路程。
在考虑异步处理之前,我将首先研究所有这些内容。但是,如果确实确实有永无止境的数据输入。该MERGE
子句应帮助您解决任何竞争状况或锁定。它允许您创建对象和关系(如果尚不存在)。它基本上是一个find_or_create
,但是在数据库级别。如果你使用LOAD CSV
,你可能会想合并的欢迎,并neo4apis
使用MERGE
在幕后。
希望有帮助!