NodeJS · 2017/05/30 2

使用async库实现Node.js异步流程控制

背景

在实际工作中使用到Node.js最令人头疼的就是非常丑陋的回调嵌套和由此带来的排查问题的复杂化。

这里就需要借助Node.js的异步流程控制来让异步流程实现同步化编码,主流有async库,co库,es7新支持的async,await方法。

这里我介绍下我比较用的比较多的async库,async库具备对Node版本兼容好,无性能损耗,语法简单的特点。

async库主要分为对集合和对控制流两大类方法集。对于集合可以简单理解成对数组遍历,并对每个数组元素进行或串行或并行的相同函数处理。而控制流和集合最大的不同就是对非数组类数据进行流程化也就是多个不同函数的处理。

实战

在这里我们挑选集合方法中map类的方法来作为示例,对于串行、并行和限制并行数量的各情景都会覆盖到,其他方法请根据官方文档(见参考链接)以此类推

//使用npm或者yarn先安装示例模块,我这里使用yarn
yarn add async
yarn add moment
var async = require('async')
var moment = require('moment')

/**
 * 对集合中的每一个元素,执行某个异步操作,得到结果。所有的结果将汇总到最终的callback里。
 * 提供了两种方式:
 * 1. 并行执行。同时对集合中所有元素进行操作,结果汇总到最终callback里。
 * 如果出错,则立刻返回错误以及已经执行完的任务的结果,未执行完的占个空位
 * 2. 顺序执行。对集合中的元素一个一个执行操作,结果汇总到最终callback里。
 * 如果出错,则立刻返回错误以及已经执行完的结果,未执行的被忽略。
 */

/**
 * 这里初始化一个包含key为name和delay的数组集合,其中delay用于后续例子的setTimeout来判断是否并发执行以及执行时长
 */
var arr = [{name:'chenjie0', delay:100}, {name:'chenjie1', delay:150}, {name:'chenjie2', delay:200}]

/**
 * 1.1 并行执行。同时对集合中所有元素进行操作,所有操作均正确执行,未出错。
 * 所有结果按元素顺序汇总给最终的callback。
 */
console.log('1.1>' + moment().format('x'))
async.map(arr, function(row, callback){
    setTimeout(function() {
        callback(null, row.name);
    }, row.delay);
},function(err, results){
    console.log('1.1>' + moment().format('x'))
    console.log('1.1>' + results)
})
// 结果输出 执行完耗时200ms多一点,符合预期
// 1.1>1496063693757
// 1.1>1496063693964
// 1.1>[ 'chenjie0', 'chenjie1', 'chenjie2' ]

/**
 * 1.2 并行执行。同时对集合中所有元素进行操作,如果中途出错,立刻将错误、以及已经执行完成的结果汇总给最终callback。
 * 未执行完的将会在结果数组中用占个空位。
 */
console.log('1.2>' +  moment().format('x'))
async.map(arr, function(row, callback){
    setTimeout(function() {
        if(row.name==='chenjie1') callback('occur a err')
        else callback(null, row.name);
    }, row.delay);
},function(err, results){
    console.log('1.2>' +  moment().format('x'))
    console.log('1.2>' + err)
    console.log('1.2>' + results)
})
// 结果输出 执行完耗时150ms多一点,说明在按照delay大小执行到集合第二个元素的时候因为错误立刻返回结果给callback,
// 那么在结果中第二个数组元素就被undefined 占位了,耗时较长的第三个集合元素则没有机会返回。
// 1.2>1496064404599
// 1.2>1496064404758
// 1.2>occur a err
// 1.2>[ 'chenjie0', undefined ]

/**
 * 1.3 并行执行。同时最多x个函数并行
 */
console.log('1.3>' +  moment().format('x'))
async.mapLimit(arr, 2, function(row, callback){
    setTimeout(function() {
        callback(null, row.name);
    }, row.delay);
},function(err, results){
    console.log('1.3>' +  moment().format('x'))
    console.log('1.3>' + results)
})
// 结果输出 执行完耗时314ms,略大于300ms,因为会始终保持2个在并行执行,假设执行先执行1,2后执行3,流程大致如下
// ---150---
// --100------200----
// 而时长总和就在第一轮较快执行完的时长+第二轮执行的时长。其他情况结果时长也是如此,所以符合预期
// 1.3>1496065470119
// 1.3>1496065470433
// 1.3>[ 'chenjie0', 'chenjie1', 'chenjie2' ]

/**
 * 2.1 顺序执行,顺序对集合中所有元素进行操作,所有操作均正确执行,未出错。
 * 所有结果按元素顺序汇总给最终的callback。
 */
console.log('2.1>' +  moment().format('x'))
async.mapSeries(arr, function(row, callback){
    setTimeout(function() {
        callback(null, row.name);
    }, row.delay);
},function(err, results){
    console.log('2.1>' +  moment().format('x'))
    console.log('2.1>' + results)
})
// 结果输出 执行完耗时471ms,略超过三元素的delay综合,符合预期
// 2.1>1496064797796
// 2.1>1496064798267
// 2.1>[ 'chenjie0', 'chenjie1', 'chenjie2' ]


/**
 * 2.2 顺序执行,只把错误以及执行完的传给最终callback,未执行的忽略。
 */
console.log('2.2>' +  moment().format('x'))
async.mapSeries(arr, function(row, callback){
    setTimeout(function() {
        if(row.name==='chenjie1') callback('occur a err')
        else callback(null, row.name);
    }, row.delay);
},function(err, results){
    console.log('2.2>' +   moment().format('x'))
    console.log('2.2>' +  err)
    console.log('2.2>' +  results)
})
// 结果输出 执行完耗时263ms 略高于chenjie0和chenjie1的总delay值250,说明在顺序执行到集合第二个元素的时候因为错误立刻返回
// 并忽略下面的集合元素的操作
// 2.2>1496065009844
// 2.2>1496065010107
// 2.2>occur a err
// 2.2>[ 'chenjie0', undefined ]

注意

  1. 使用map并行执行时候,迭代函数不能保证按迭代顺序完成,但是结果数组与原数组将保持相同的顺序。
  2. 使用map并行执行时如果中途遇到错误立刻将错误、以及已经执行完成的结果汇总给最终callback,但是剩余的任务仍然会继续执行只是不会返回值给最终callback了。

参考:

官网文档 https://caolan.github.io/async/docs.html

官方github issues https://github.com/caolan/async/issues/248