144 lines
3.6 KiB
JavaScript
144 lines
3.6 KiB
JavaScript
'use strict';
|
|
|
|
import stream from 'stream';
|
|
import utils from '../utils.js';
|
|
|
|
const kInternals = Symbol('internals');
|
|
|
|
class AxiosTransformStream extends stream.Transform{
|
|
constructor(options) {
|
|
options = utils.toFlatObject(options, {
|
|
maxRate: 0,
|
|
chunkSize: 64 * 1024,
|
|
minChunkSize: 100,
|
|
timeWindow: 500,
|
|
ticksRate: 2,
|
|
samplesCount: 15
|
|
}, null, (prop, source) => {
|
|
return !utils.isUndefined(source[prop]);
|
|
});
|
|
|
|
super({
|
|
readableHighWaterMark: options.chunkSize
|
|
});
|
|
|
|
const internals = this[kInternals] = {
|
|
timeWindow: options.timeWindow,
|
|
chunkSize: options.chunkSize,
|
|
maxRate: options.maxRate,
|
|
minChunkSize: options.minChunkSize,
|
|
bytesSeen: 0,
|
|
isCaptured: false,
|
|
notifiedBytesLoaded: 0,
|
|
ts: Date.now(),
|
|
bytes: 0,
|
|
onReadCallback: null
|
|
};
|
|
|
|
this.on('newListener', event => {
|
|
if (event === 'progress') {
|
|
if (!internals.isCaptured) {
|
|
internals.isCaptured = true;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
_read(size) {
|
|
const internals = this[kInternals];
|
|
|
|
if (internals.onReadCallback) {
|
|
internals.onReadCallback();
|
|
}
|
|
|
|
return super._read(size);
|
|
}
|
|
|
|
_transform(chunk, encoding, callback) {
|
|
const internals = this[kInternals];
|
|
const maxRate = internals.maxRate;
|
|
|
|
const readableHighWaterMark = this.readableHighWaterMark;
|
|
|
|
const timeWindow = internals.timeWindow;
|
|
|
|
const divider = 1000 / timeWindow;
|
|
const bytesThreshold = (maxRate / divider);
|
|
const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
|
|
|
|
const pushChunk = (_chunk, _callback) => {
|
|
const bytes = Buffer.byteLength(_chunk);
|
|
internals.bytesSeen += bytes;
|
|
internals.bytes += bytes;
|
|
|
|
internals.isCaptured && this.emit('progress', internals.bytesSeen);
|
|
|
|
if (this.push(_chunk)) {
|
|
process.nextTick(_callback);
|
|
} else {
|
|
internals.onReadCallback = () => {
|
|
internals.onReadCallback = null;
|
|
process.nextTick(_callback);
|
|
};
|
|
}
|
|
}
|
|
|
|
const transformChunk = (_chunk, _callback) => {
|
|
const chunkSize = Buffer.byteLength(_chunk);
|
|
let chunkRemainder = null;
|
|
let maxChunkSize = readableHighWaterMark;
|
|
let bytesLeft;
|
|
let passed = 0;
|
|
|
|
if (maxRate) {
|
|
const now = Date.now();
|
|
|
|
if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
|
|
internals.ts = now;
|
|
bytesLeft = bytesThreshold - internals.bytes;
|
|
internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
|
|
passed = 0;
|
|
}
|
|
|
|
bytesLeft = bytesThreshold - internals.bytes;
|
|
}
|
|
|
|
if (maxRate) {
|
|
if (bytesLeft <= 0) {
|
|
// next time window
|
|
return setTimeout(() => {
|
|
_callback(null, _chunk);
|
|
}, timeWindow - passed);
|
|
}
|
|
|
|
if (bytesLeft < maxChunkSize) {
|
|
maxChunkSize = bytesLeft;
|
|
}
|
|
}
|
|
|
|
if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
|
|
chunkRemainder = _chunk.subarray(maxChunkSize);
|
|
_chunk = _chunk.subarray(0, maxChunkSize);
|
|
}
|
|
|
|
pushChunk(_chunk, chunkRemainder ? () => {
|
|
process.nextTick(_callback, null, chunkRemainder);
|
|
} : _callback);
|
|
};
|
|
|
|
transformChunk(chunk, function transformNextChunk(err, _chunk) {
|
|
if (err) {
|
|
return callback(err);
|
|
}
|
|
|
|
if (_chunk) {
|
|
transformChunk(_chunk, transformNextChunk);
|
|
} else {
|
|
callback(null);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
export default AxiosTransformStream;
|