我正在使用fast-csv使用a来迭代CSV文件stream
.对于CSV文件的每一行,我想在redis中创建一个作业,我使用kue.解析一行是同步函数.整件事看起来像这样:
var csvStream = fastCsv(_config.csvOptions) .on('data', function(data) { var stream = this; stream.pause(); var payload = parseRow(data, _config); console.log(payload); // the payload is always printed to the console var job = kue.create('csv-row', { payload: payload }) .save(function(err) { if (!err) console.log('Enqueued at ' + job.id); else console.log('Redis error ' + JSON.stringify(err)); stream.resume(); }); }) .on('end', function() { callback(); // this ends my parsing });
console.log(payload);
我的CSV文件的每一行都有简单的显示,但是没有创建作业.即,回调中的输出save
都没有被打印,并且作业不在我的redis中.
我假设,因为它是CSV文件的最后一行,流已经发出end
,所以kue.create()
在进程终止之前无法执行最后一行?
有没有办法停止流,end
直到kue完成?
您可以使用异步库解决此问题.您可以将以下模式用于任何流.
var AsyncLib = require('async'); var worker = function (payload, cb) { //do something with payload and call callback return cb(); }; var concurrency = 5; var streamQueue = AsyncLib.queue(worker, concurrency); var stream = //some readable stream; stream.on('data', function(data) { //no need to pause and resume var payload = '//some payload'; streamQueue.push(payload); }) .on('end', function() { //register drain event on end and callback streamQueue.drain = function () { callback(); }; });