当前位置:  开发笔记 > 编程语言 > 正文

等待最后一个stream.on('data')事件中的异步函数回调

如何解决《等待最后一个stream.on('data')事件中的异步函数回调》经验,为你挑选了1个好方法。

我正在使用fast-csv使用a来迭代CSV文件stream.对于CSV文件的每一行,我想在redis中创建一个作业,我使用kue.解析一行是同步函数.整件事看起来像这样:

var csvStream = fastCsv(_config.csvOptions)
  .on('data', function(data) {
    var stream = this;
    stream.pause();

    var payload = parseRow(data, _config);
    console.log(payload); // the payload is always printed to the console

    var job = kue.create('csv-row', {
        payload: payload
      })
      .save(function(err) {
        if (!err) console.log('Enqueued at ' + job.id);
        else console.log('Redis error ' + JSON.stringify(err));

        stream.resume();
      });
  })
  .on('end', function() {
    callback(); // this ends my parsing
  });

console.log(payload);我的CSV文件的每一行都有简单的显示,但是没有创建作业.即,回调中的输出save都没有被打印,并且作业不在我的redis中.

我假设,因为它是CSV文件的最后一行,流已经发出end,所以kue.create()在进程终止之前无法执行最后一行?

有没有办法停止流,end直到kue完成?



1> 小智..:

您可以使用异步库解决此问题.您可以将以下模式用于任何流.

var AsyncLib = require('async');

var worker = function (payload, cb) {
    //do something with payload and call callback
    return cb();
};

var concurrency = 5;
var streamQueue = AsyncLib.queue(worker, concurrency);

var stream = //some readable stream;

stream.on('data', function(data) {
    //no need to pause and resume
    var payload = '//some payload';
    streamQueue.push(payload);
})
.on('end', function() {
    //register drain event on end and callback
    streamQueue.drain = function () {
        callback();
    };
});

推荐阅读
wurtjq
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有