// Note: This is 14 bytes. Tests make assumptions about this (even if it's just // in setting config.max_request_bytes to a specific value).
makeRecord.nonPayloadOverhead = JSON.stringify(makeRecord(0).toJSON()).length;
// Gives how many encoded bytes a request with the given payload // sizes will be (assuming the records were created by makeRecord) // requestBytesFor([20]) => 22, requestBytesFor([20, 20]) => 43 function requestBytesFor(recordPayloadByteCounts) {
let requestBytes = 1; for (let size of recordPayloadByteCounts) {
requestBytes += size + 1 + makeRecord.nonPayloadOverhead;
} return requestBytes;
}
function makePostQueue(config, lastModTime, responseGenerator) {
let stats = {
posts: [],
batches: [],
};
let poster = (data, headers, batch, commit) => {
let payloadBytes = 0;
let numRecords = 0; for (let record of JSON.parse(data)) { if (config.max_record_payload_bytes) {
less(
record.payload.length,
config.max_record_payload_bytes, "PostQueue should respect max_record_payload_bytes"
);
}
payloadBytes += record.payload.length;
++numRecords;
}
if (headers.length) {
thisPost.headers = headers;
}
// check that we respected the provided limits for the post if (config.max_post_records) {
lessOrEqual(
numRecords,
config.max_post_records, "PostQueue should respect max_post_records"
);
}
if (config.max_post_bytes) {
less(
payloadBytes,
config.max_post_bytes, "PostQueue should respect max_post_bytes"
);
}
if (config.max_request_bytes) {
less(
thisPost.nbytes,
config.max_request_bytes, "PostQueue should respect max_request_bytes"
);
}
stats.posts.push(thisPost);
// Call this now so we can check if there's a batch id in it. // Kind of cludgey, but allows us to have the correct batch id even // before the next post is made.
let nextResponse = responseGenerator.next().value;
// Record info for the batch.
let curBatch = stats.batches[stats.batches.length - 1]; // If there's no batch, it committed, or we requested a new one, // then we need to start a new one. if (!curBatch || batch == "true" || curBatch.didCommit) {
curBatch = {
posts: 0,
payloadBytes: 0,
numRecords: 0,
didCommit: false,
batch,
serverBatch: false,
}; if (nextResponse.obj && nextResponse.obj.batch) {
curBatch.batch = nextResponse.obj.batch;
curBatch.serverBatch = true;
}
stats.batches.push(curBatch);
}
// If we provided a batch id, it must be the same as the current batch if (batch && batch != "true") {
equal(curBatch.batch, batch);
}
// if this is an actual server batch (or it's a one-shot batch), check that // we respected the provided total limits if (commit && (batch == "true" || curBatch.serverBatch)) { if (config.max_total_records) {
lessOrEqual(
curBatch.numRecords,
config.max_total_records, "PostQueue should respect max_total_records"
);
}
if (config.max_total_bytes) {
less(
curBatch.payloadBytes,
config.max_total_bytes, "PostQueue should respect max_total_bytes"
);
}
}
return Promise.resolve(nextResponse);
};
let done = () => {};
let pq = new PostQueue(poster, lastModTime, config, getTestLogger(), done); return { pq, stats };
}
add_task(async function test_simple() {
let config = {
max_request_bytes: 1000,
max_record_payload_bytes: 1000,
};
let { pq, stats } = makePostQueue(config, time, responseGenerator());
await pq.enqueue(makeRecord(10));
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([10]),
payloadBytes: 10,
numRecords: 1,
commit: true, // we don't know if we have batch semantics, so committed.
headers: [["x-if-unmodified-since", time]],
batch: "true",
},
]);
deepEqual(stats.batches, [
{
posts: 1,
payloadBytes: 10,
numRecords: 1,
didCommit: true,
batch: "true",
serverBatch: false,
},
]);
});
// Test we do the right thing when we need to make multiple posts when there // are no batch semantics
add_task(async function test_max_request_bytes_no_batch() {
let config = {
max_request_bytes: 50,
max_record_payload_bytes: 50,
};
const time = 11111111; function* responseGenerator() {
yield {
success: true,
status: 200,
headers: { "x-weave-timestamp": time + 100, "x-last-modified": time + 100,
},
};
yield {
success: true,
status: 200,
headers: { "x-weave-timestamp": time + 200, "x-last-modified": time + 200,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
let payloadSize = 20 - makeRecord.nonPayloadOverhead;
await pq.enqueue(makeRecord(payloadSize)); // total size now 22 bytes - "[" + record + "]"
await pq.enqueue(makeRecord(payloadSize)); // total size now 43 bytes - "[" + record + "," + record + "]"
await pq.enqueue(makeRecord(payloadSize)); // this will exceed our byte limit, so will be in the 2nd POST.
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 43, // 43 for the first part
payloadBytes: payloadSize * 2,
numRecords: 2,
commit: false,
headers: [["x-if-unmodified-since", time]],
batch: "true",
},
{
nbytes: 22,
payloadBytes: payloadSize,
numRecords: 1,
commit: false, // we know we aren't in a batch, so never commit.
headers: [["x-if-unmodified-since", time + 100]],
batch: null,
},
]);
equal(stats.batches.filter(x => x.didCommit).length, 0);
equal(pq.lastModified, time + 200);
});
add_task(async function test_max_record_payload_bytes_no_batch() {
let config = {
max_request_bytes: 100,
max_record_payload_bytes: 50,
};
let { pq, stats } = makePostQueue(config, time, responseGenerator()); // Should trigger when the record really is too large to fit
let { enqueued } = await pq.enqueue(makeRecord(51));
ok(!enqueued); // Shouldn't trigger when the encoded record is too big
ok(
(await pq.enqueue(makeRecord(50 - makeRecord.nonPayloadOverhead))).enqueued
); // total size now 52 bytes - "[" + record + "]"
ok(
(await pq.enqueue(makeRecord(46 - makeRecord.nonPayloadOverhead))).enqueued
); // total size now 99 bytes - "[" + record0 + "," + record1 + "]"
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 99,
payloadBytes: 50 + 46 - makeRecord.nonPayloadOverhead * 2,
numRecords: 2,
commit: true, // we know we aren't in a batch, so never commit.
batch: "true",
headers: [["x-if-unmodified-since", time]],
},
]);
// Test we do the right thing when we need to make multiple posts due to // max_post_bytes when there are batch semantics in place.
add_task(async function test_max_post_bytes_batch() {
let config = {
max_post_bytes: 50,
max_post_records: 4,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 50,
max_request_bytes: 4000,
};
// Test we do the right thing when we need to make multiple posts due to // max_request_bytes when there are batch semantics in place.
add_task(async function test_max_request_bytes_batch() {
let config = {
max_post_bytes: 60,
max_post_records: 40,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 500,
max_request_bytes: 100,
};
// Test we do the right thing when the batch bytes limit is exceeded.
add_task(async function test_max_total_bytes_batch() {
let config = {
max_post_bytes: 50,
max_post_records: 20,
max_total_bytes: 70,
max_total_records: 100,
max_record_payload_bytes: 50,
max_request_bytes: 500,
};
// this will exceed our POST byte limit, so will be in the 2nd POST - but still in the first batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 60
// this will exceed our batch byte limit, so will be in a new batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 20
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 40, batch: 40 // This will exceed POST byte limit, so will be in the 4th post, part of the 2nd batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 60
await pq.flush(true);
// Test we split up the posts when we exceed the record limit when batch semantics // are in place.
add_task(async function test_max_post_records_batch() {
let config = {
max_post_bytes: 1000,
max_post_records: 2,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 1000,
max_request_bytes: 1000,
};
// Test we do the right thing when the batch record limit is exceeded.
add_task(async function test_max_records_batch() {
let config = {
max_post_bytes: 1000,
max_post_records: 3,
max_total_bytes: 10000,
max_total_records: 5,
max_record_payload_bytes: 1000,
max_request_bytes: 10000,
};
// Test we do the right thing when the limits are met but not exceeded.
add_task(async function test_packed_batch() {
let config = {
max_post_bytes: 41,
max_post_records: 4,
// Tests that check that a single record fails to enqueue for the provided config
async function test_enqueue_failure_case(failureLimit, config) { const time = 11111111; function* responseGenerator() {
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time + 100, "x-weave-timestamp": time + 100,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator()); // Check on empty postqueue
let result = await pq.enqueue(makeRecord(failureLimit + 1));
ok(!result.enqueued);
notEqual(result.error, undefined);
ok((await pq.enqueue(makeRecord(5))).enqueued);
// check on nonempty postqueue
result = await pq.enqueue(makeRecord(failureLimit + 1));
ok(!result.enqueued);
notEqual(result.error, undefined);
// make sure that we keep working, skipping the bad record entirely // (handling the error the queue reported is left up to caller)
ok((await pq.enqueue(makeRecord(5))).enqueued);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.