123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- var async = require('./async.js');
- // API
- module.exports = {
- iterator: wrapIterator,
- callback: wrapCallback
- };
- /**
- * Wraps iterators with long signature
- *
- * @this ReadableAsyncKit#
- * @param {function} iterator - function to wrap
- * @returns {function} - wrapped function
- */
- function wrapIterator(iterator)
- {
- var stream = this;
- return function(item, key, cb)
- {
- var aborter
- , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key))
- ;
- stream.jobs[key] = wrappedCb;
- // it's either shortcut (item, cb)
- if (iterator.length == 2)
- {
- aborter = iterator(item, wrappedCb);
- }
- // or long format (item, key, cb)
- else
- {
- aborter = iterator(item, key, wrappedCb);
- }
- return aborter;
- };
- }
- /**
- * Wraps provided callback function
- * allowing to execute snitch function before
- * real callback
- *
- * @this ReadableAsyncKit#
- * @param {function} callback - function to wrap
- * @returns {function} - wrapped function
- */
- function wrapCallback(callback)
- {
- var stream = this;
- var wrapped = function(error, result)
- {
- return finisher.call(stream, error, result, callback);
- };
- return wrapped;
- }
- /**
- * Wraps provided iterator callback function
- * makes sure snitch only called once,
- * but passes secondary calls to the original callback
- *
- * @this ReadableAsyncKit#
- * @param {function} callback - callback to wrap
- * @param {number|string} key - iteration key
- * @returns {function} wrapped callback
- */
- function wrapIteratorCallback(callback, key)
- {
- var stream = this;
- return function(error, output)
- {
- // don't repeat yourself
- if (!(key in stream.jobs))
- {
- callback(error, output);
- return;
- }
- // clean up jobs
- delete stream.jobs[key];
- return streamer.call(stream, error, {key: key, value: output}, callback);
- };
- }
- /**
- * Stream wrapper for iterator callback
- *
- * @this ReadableAsyncKit#
- * @param {mixed} error - error response
- * @param {mixed} output - iterator output
- * @param {function} callback - callback that expects iterator results
- */
- function streamer(error, output, callback)
- {
- if (error && !this.error)
- {
- this.error = error;
- this.pause();
- this.emit('error', error);
- // send back value only, as expected
- callback(error, output && output.value);
- return;
- }
- // stream stuff
- this.push(output);
- // back to original track
- // send back value only, as expected
- callback(error, output && output.value);
- }
- /**
- * Stream wrapper for finishing callback
- *
- * @this ReadableAsyncKit#
- * @param {mixed} error - error response
- * @param {mixed} output - iterator output
- * @param {function} callback - callback that expects final results
- */
- function finisher(error, output, callback)
- {
- // signal end of the stream
- // only for successfully finished streams
- if (!error)
- {
- this.push(null);
- }
- // back to original track
- callback(error, output);
- }
|