var expect = require('chai' ).expect;
var util = require('./util' );
var stream = require('../lib/protocol/stream' );
var Stream = stream.Stream;
function createStream() {
var stream = new Stream(util.log, null );
stream.upstream._window = Infinity;
return stream;
}
// Execute a list of commands and assertions
var recorded_events = ['state' , 'error' , 'window_update' , 'headers' , 'promise' ];
function execute_sequence(stream, sequence, done) {
if (!done) {
done = sequence;
sequence = stream;
stream = createStream();
}
var outgoing_frames = [];
var emit = stream.emit, events = [];
stream.emit = function (name) {
if (recorded_events.indexOf(name) !== -1) {
events.push({ name: name, data: Array.prototype.slice.call(arguments, 1) });
}
return emit.apply(this , arguments);
};
var commands = [], checks = [];
sequence.forEach(function (step) {
if ('method' in step || 'incoming' in step || 'outgoing' in step || 'wait' in step || 'set_state' in step) {
commands.push(step);
}
if ('outgoing' in step || 'event' in step || 'active' in step) {
checks.push(step);
}
});
var activeCount = 0;
function count_change(change) {
activeCount += change;
}
function execute(callback) {
var command = commands.shift();
if (command) {
if ('method' in command) {
var value = stream[command.method.name].apply(stream, command.method.arguments);
if (command.method.ret) {
command.method.ret(value);
}
execute(callback);
} else if ('incoming' in command) {
command.incoming.count_change = count_change;
stream.upstream.write(command.incoming);
execute(callback);
} else if ('outgoing' in command) {
outgoing_frames.push(stream.upstream.read());
execute(callback);
} else if ('set_state' in command) {
stream.state = command.set_state;
execute(callback);
} else if ('wait' in command) {
setTimeout(execute.bind(null , callback), command.wait);
} else {
throw new Error('Invalid command' , command);
}
} else {
setTimeout(callback, 5);
}
}
function check() {
checks.forEach(function (check) {
if ('outgoing' in check) {
var frame = outgoing_frames.shift();
for (var key in check.outgoing) {
expect(frame).to.have.property(key).that.deep.equals(check.outgoing[key]);
}
count_change(frame.count_change);
} else if ('event' in check) {
var event = events.shift();
expect(event.name).to.be.equal(check.event.name);
check.event.data.forEach(function (data, index) {
expect(event.data[index]).to.deep.equal(data);
});
} else if ('active' in check) {
expect(activeCount).to.be.equal(check.active);
} else {
throw new Error('Invalid check' , check);
}
});
done();
}
setImmediate(execute.bind(null , check));
}
var example_frames = [
{ type: 'PRIORITY' , flags: {}, priority: 1 },
{ type: 'WINDOW_UPDATE' , flags: {}, settings: {} },
{ type: 'RST_STREAM' , flags: {}, error: 'CANCEL' },
{ type: 'HEADERS' , flags: {}, headers: {}, priority: undefined },
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {}, promised_stream: new Stream(util.log, null ) }
];
var invalid_incoming_frames = {
IDLE: [
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'WINDOW_UPDATE' , flags: {}, settings: {} },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {} },
{ type: 'RST_STREAM' , flags: {}, error: 'CANCEL' }
],
RESERVED_LOCAL: [
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'HEADERS' , flags: {}, headers: {}, priority: undefined },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {} },
{ type: 'WINDOW_UPDATE' , flags: {}, settings: {} }
],
RESERVED_REMOTE: [
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {} },
{ type: 'WINDOW_UPDATE' , flags: {}, settings: {} }
],
OPEN: [
],
HALF_CLOSED_LOCAL: [
],
HALF_CLOSED_REMOTE: [
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'HEADERS' , flags: {}, headers: {}, priority: undefined },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {} }
]
};
var invalid_outgoing_frames = {
IDLE: [
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'WINDOW_UPDATE' , flags: {}, settings: {} },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {} }
],
RESERVED_LOCAL: [
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {} },
{ type: 'WINDOW_UPDATE' , flags: {}, settings: {} }
],
RESERVED_REMOTE: [
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'HEADERS' , flags: {}, headers: {}, priority: undefined },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {} },
{ type: 'WINDOW_UPDATE' , flags: {}, settings: {} }
],
OPEN: [
],
HALF_CLOSED_LOCAL: [
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'HEADERS' , flags: {}, headers: {}, priority: undefined },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {} }
],
HALF_CLOSED_REMOTE: [
],
CLOSED: [
{ type: 'WINDOW_UPDATE' , flags: {}, settings: {} },
{ type: 'HEADERS' , flags: {}, headers: {}, priority: undefined },
{ type: 'DATA' , flags: {}, data: Buffer.alloc(5) },
{ type: 'PUSH_PROMISE' , flags: {}, headers: {}, promised_stream: new Stream(util.log, null ) }
]
};
describe('stream.js' , function () {
describe('Stream class' , function () {
describe('._transition(sending, frame) method' , function () {
it('should emit error, and answer RST_STREAM for invalid incoming frames' , function () {
Object.keys(invalid_incoming_frames).forEach(function (state) {
invalid_incoming_frames[state].forEach(function (invalid_frame) {
var stream = createStream();
var connectionErrorHappened = false ;
stream.state = state;
stream.once('connectionError' , function () { connectionErrorHappened = true ; });
stream._transition(false , invalid_frame);
expect(connectionErrorHappened);
});
});
// CLOSED state as a result of incoming END_STREAM (or RST_STREAM)
var stream = createStream();
stream.headers({});
stream.end();
stream.upstream.write({ type: 'HEADERS' , headers:{}, flags: { END_STREAM: true }, count_change: util.noop });
example_frames.slice(2).forEach(function (invalid_frame) {
invalid_frame.count_change = util.noop;
expect(stream._transition.bind(stream, false , invalid_frame)).to.throw ('Uncaught, unspecified "error" event.' );
});
// CLOSED state as a result of outgoing END_STREAM
stream = createStream();
stream.upstream.write({ type: 'HEADERS' , headers:{}, flags: { END_STREAM: true }, count_change: util.noop });
stream.headers({});
stream.end();
example_frames.slice(3).forEach(function (invalid_frame) {
invalid_frame.count_change = util.noop;
expect(stream._transition.bind(stream, false , invalid_frame)).to.throw ('Uncaught, unspecified "error" event.' );
});
});
it('should throw exception for invalid outgoing frames' , function () {
Object.keys(invalid_outgoing_frames).forEach(function (state) {
invalid_outgoing_frames[state].forEach(function (invalid_frame) {
var stream = createStream();
stream.state = state;
expect(stream._transition.bind(stream, true , invalid_frame)).to.throw (Error);
});
});
});
it('should close the stream when there\' s an incoming or outgoing RST_STREAM', function() {
[
'RESERVED_LOCAL' ,
'RESERVED_REMOTE' ,
'OPEN' ,
'HALF_CLOSED_LOCAL' ,
'HALF_CLOSED_REMOTE'
].forEach(function (state) {
[true , false ].forEach(function (sending) {
var stream = createStream();
stream.state = state;
stream._transition(sending, { type: 'RST_STREAM' , flags: {} });
expect(stream.state).to.be.equal('CLOSED' );
});
});
});
it('should ignore any incoming frame after sending reset' , function () {
var stream = createStream();
stream.reset();
example_frames.forEach(stream._transition.bind(stream, false ));
});
it('should ignore certain incoming frames after closing the stream with END_STREAM' , function () {
var stream = createStream();
stream.upstream.write({ type: 'HEADERS' , flags: { END_STREAM: true }, headers:{} });
stream.headers({});
stream.end();
example_frames.slice(0,3).forEach(function (frame) {
frame.count_change = util.noop;
stream._transition(false , frame);
});
});
});
});
describe('test scenario' , function () {
describe('sending request' , function () {
it('should trigger the appropriate state transitions and outgoing frames' , function (done) {
execute_sequence([
{ method : { name: 'headers' , arguments: [{ ':path' : '/' }] } },
{ outgoing: { type: 'HEADERS' , flags: { }, headers: { ':path' : '/' } } },
{ event : { name: 'state' , data: ['OPEN' ] } },
{ wait : 5 },
{ method : { name: 'end' , arguments: [] } },
{ event : { name: 'state' , data: ['HALF_CLOSED_LOCAL' ] } },
{ outgoing: { type: 'DATA' , flags: { END_STREAM: true }, data: Buffer.alloc(0) } },
{ wait : 10 },
{ incoming: { type: 'HEADERS' , flags: { }, headers: { ':status' : 200 } } },
{ incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: Buffer.alloc(5) } },
{ event : { name: 'headers' , data: [{ ':status' : 200 }] } },
{ event : { name: 'state' , data: ['CLOSED' ] } },
{ active : 0 }
], done);
});
});
describe('answering request' , function () {
it('should trigger the appropriate state transitions and outgoing frames' , function (done) {
var payload = Buffer.alloc(5);
execute_sequence([
{ incoming: { type: 'HEADERS' , flags: { }, headers: { ':path' : '/' } } },
{ event : { name: 'state' , data: ['OPEN' ] } },
{ event : { name: 'headers' , data: [{ ':path' : '/' }] } },
{ wait : 5 },
{ incoming: { type: 'DATA' , flags: { }, data: Buffer.alloc(5) } },
{ incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: Buffer.alloc(10) } },
{ event : { name: 'state' , data: ['HALF_CLOSED_REMOTE' ] } },
{ wait : 5 },
{ method : { name: 'headers' , arguments: [{ ':status' : 200 }] } },
{ outgoing: { type: 'HEADERS' , flags: { }, headers: { ':status' : 200 } } },
{ wait : 5 },
{ method : { name: 'end' , arguments: [payload] } },
{ outgoing: { type: 'DATA' , flags: { END_STREAM: true }, data: payload } },
{ event : { name: 'state' , data: ['CLOSED' ] } },
{ active : 0 }
], done);
});
});
describe('sending push stream' , function () {
it('should trigger the appropriate state transitions and outgoing frames' , function (done) {
var payload = Buffer.alloc(5);
var pushStream;
execute_sequence([
// receiving request
{ incoming: { type: 'HEADERS' , flags: { END_STREAM: true }, headers: { ':path' : '/' } } },
{ event : { name: 'state' , data: ['OPEN' ] } },
{ event : { name: 'state' , data: ['HALF_CLOSED_REMOTE' ] } },
{ event : { name: 'headers' , data: [{ ':path' : '/' }] } },
// sending response headers
{ wait : 5 },
{ method : { name: 'headers' , arguments: [{ ':status' : '200' }] } },
{ outgoing: { type: 'HEADERS' , flags: { }, headers: { ':status' : '200' } } },
// sending push promise
{ method : { name: 'promise' , arguments: [{ ':path' : '/' }], ret: function (str) { pushStream = str; } } },
{ outgoing: { type: 'PUSH_PROMISE' , flags: { }, headers: { ':path' : '/' } } },
// sending response data
{ method : { name: 'end' , arguments: [payload] } },
{ outgoing: { type: 'DATA' , flags: { END_STREAM: true }, data: payload } },
{ event : { name: 'state' , data: ['CLOSED' ] } },
{ active : 0 }
], function () {
// initial state of the promised stream
expect(pushStream.state).to.equal('RESERVED_LOCAL' );
execute_sequence(pushStream, [
// push headers
{ wait : 5 },
{ method : { name: 'headers' , arguments: [{ ':status' : '200' }] } },
{ outgoing: { type: 'HEADERS' , flags: { }, headers: { ':status' : '200' } } },
{ event : { name: 'state' , data: ['HALF_CLOSED_REMOTE' ] } },
// push data
{ method : { name: 'end' , arguments: [payload] } },
{ outgoing: { type: 'DATA' , flags: { END_STREAM: true }, data: payload } },
{ event : { name: 'state' , data: ['CLOSED' ] } },
{ active : 1 }
], done);
});
});
});
describe('receiving push stream' , function () {
it('should trigger the appropriate state transitions and outgoing frames' , function (done) {
var payload = Buffer.alloc(5);
var original_stream = createStream();
var promised_stream = createStream();
done = util.callNTimes(2, done);
execute_sequence(original_stream, [
// sending request headers
{ method : { name: 'headers' , arguments: [{ ':path' : '/' }] } },
{ method : { name: 'end' , arguments: [] } },
{ outgoing: { type: 'HEADERS' , flags: { END_STREAM: true }, headers: { ':path' : '/' } } },
{ event : { name: 'state' , data: ['OPEN' ] } },
{ event : { name: 'state' , data: ['HALF_CLOSED_LOCAL' ] } },
// receiving response headers
{ wait : 10 },
{ incoming: { type: 'HEADERS' , flags: { }, headers: { ':status' : 200 } } },
{ event : { name: 'headers' , data: [{ ':status' : 200 }] } },
// receiving push promise
{ incoming: { type: 'PUSH_PROMISE' , flags: { }, headers: { ':path' : '/2.html' }, promised_stream: promised_stream } },
{ event : { name: 'promise' , data: [promised_stream, { ':path' : '/2.html' }] } },
// receiving response data
{ incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: payload } },
{ event : { name: 'state' , data: ['CLOSED' ] } },
{ active : 0 }
], done);
execute_sequence(promised_stream, [
// initial state of the promised stream
{ event : { name: 'state' , data: ['RESERVED_REMOTE' ] } },
// push headers
{ wait : 10 },
{ incoming: { type: 'HEADERS' , flags: { END_STREAM: false }, headers: { ':status' : 200 } } },
{ event : { name: 'state' , data: ['HALF_CLOSED_LOCAL' ] } },
{ event : { name: 'headers' , data: [{ ':status' : 200 }] } },
// push data
{ incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: payload } },
{ event : { name: 'state' , data: ['CLOSED' ] } },
{ active : 0 }
], done);
});
});
});
describe('bunyan formatter' , function () {
describe('`s`' , function () {
var format = stream.serializers.s;
it('should assign a unique ID to each frame' , function () {
var stream1 = createStream();
var stream2 = createStream();
expect(format(stream1)).to.be.equal(format(stream1));
expect(format(stream2)).to.be.equal(format(stream2));
expect(format(stream1)).to.not.be.equal(format(stream2));
});
});
});
});
quality 96%
¤ Dauer der Verarbeitung: 0.8 Sekunden
¤
*© Formatika GbR, Deutschland