/* Any copyright is dedicated to the Public Domain.
* http://creativecommons.org/publicdomain/zero/1.0/ */
const { Weave } = ChromeUtils.importESModule(
"resource://services-sync/main.sys.mjs"
);
const { WBORecord } = ChromeUtils.importESModule(
"resource://services-sync/record.sys.mjs"
);
const { Service } = ChromeUtils.importESModule(
"resource://services-sync/service.sys.mjs"
);
const { RotaryEngine } = ChromeUtils.importESModule(
"resource://testing-common/services/sync/rotaryengine.sys.mjs"
);
function makeRotaryEngine() {
return new RotaryEngine(Service);
}
async
function clean(engine) {
for (
const pref of Svc.PrefBranch.getChildList(
"")) {
Svc.PrefBranch.clearUserPref(pref);
}
Svc.PrefBranch.setStringPref(
"log.logger.engine.rotary",
"Trace");
Service.recordManager.clearCache();
await engine._tracker.clearChangedIDs();
await engine.finalize();
}
async
function cleanAndGo(engine, server) {
await clean(engine);
await promiseStopServer(server);
}
async
function promiseClean(engine, server) {
await clean(engine);
await promiseStopServer(server);
}
async
function createServerAndConfigureClient() {
let engine =
new RotaryEngine(Service);
let syncID = await engine.resetLocalSyncID();
let contents = {
meta: {
global: { engines: { rotary: { version: engine.version, syncID } } },
},
crypto: {},
rotary: {},
};
const USER =
"foo";
let server =
new SyncServer();
server.registerUser(USER,
"password");
server.createContents(USER, contents);
server.start();
await SyncTestingInfrastructure(server, USER);
Service._updateCachedURLs();
return [engine, server, USER];
}
/*
* Tests
*
* SyncEngine._sync() is divided into four rather independent steps:
*
* - _syncStartup()
* - _processIncoming()
* - _uploadOutgoing()
* - _syncFinish()
*
* In the spirit of unit testing, these are tested individually for
* different scenarios below.
*/
add_task(async
function setup() {
await generateNewKeys(Service.collectionKeys);
Svc.PrefBranch.setStringPref(
"log.logger.engine.rotary",
"Trace");
});
add_task(async
function test_syncStartup_emptyOrOutdatedGlobalsResetsSync() {
_(
"SyncEngine._syncStartup resets sync and wipes server data if there's no or an outdated global record"
);
// Some server side data that's going to be wiped
let collection =
new ServerCollection();
collection.insert(
"flying",
encryptPayload({ id:
"flying", denomination:
"LNER Class A3 4472" })
);
collection.insert(
"scotsman",
encryptPayload({ id:
"scotsman", denomination:
"Flying Scotsman" })
);
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
engine._store.items = { rekolok:
"Rekonstruktionslokomotive" };
try {
// Confirm initial environment
const changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.rekolok, undefined);
let metaGlobal = await Service.recordManager.get(engine.metaURL);
Assert.equal(metaGlobal.payload.engines, undefined);
Assert.ok(!!collection.payload(
"flying"));
Assert.ok(!!collection.payload(
"scotsman"));
await engine.setLastSync(Date.now() / 1000);
// Trying to prompt a wipe -- we no longer track CryptoMeta per engine,
// so it has nothing to check.
await engine._syncStartup();
// The meta/global WBO has been filled with data about the engine
let engineData = metaGlobal.payload.engines.rotary;
Assert.equal(engineData.version, engine.version);
Assert.equal(engineData.syncID, await engine.getSyncID());
// Sync was reset and server data was wiped
Assert.equal(await engine.getLastSync(), 0);
Assert.equal(collection.payload(
"flying"), undefined);
Assert.equal(collection.payload(
"scotsman"), undefined);
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_syncStartup_serverHasNewerVersion() {
_(
"SyncEngine._syncStartup ");
let global =
new ServerWBO(
"global", {
engines: { rotary: { version: 23456 } },
});
let server = httpd_setup({
"/1.1/foo/storage/meta/global": global.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
try {
// The server has a newer version of the data and our engine can
// handle. That should give us an exception.
let error;
try {
await engine._syncStartup();
}
catch (ex) {
error = ex;
}
Assert.equal(error.failureCode, VERSION_OUT_OF_DATE);
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_syncStartup_syncIDMismatchResetsClient() {
_(
"SyncEngine._syncStartup resets sync if syncIDs don't match");
let server = sync_httpd_setup({});
await SyncTestingInfrastructure(server);
// global record with a different syncID than our engine has
let engine = makeRotaryEngine();
let global =
new ServerWBO(
"global", {
engines: { rotary: { version: engine.version, syncID:
"foobar" } },
});
server.registerPathHandler(
"/1.1/foo/storage/meta/global", global.handler());
try {
// Confirm initial environment
Assert.equal(await engine.getSyncID(),
"");
const changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.rekolok, undefined);
await engine.setLastSync(Date.now() / 1000);
await engine._syncStartup();
// The engine has assumed the server's syncID
Assert.equal(await engine.getSyncID(),
"foobar");
// Sync was reset
Assert.equal(await engine.getLastSync(), 0);
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_processIncoming_emptyServer() {
_(
"SyncEngine._processIncoming working with an empty server backend");
let collection =
new ServerCollection();
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
try {
// Merely ensure that this code path is run without any errors
await engine._processIncoming();
Assert.equal(await engine.getLastSync(), 0);
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_processIncoming_createFromServer() {
_(
"SyncEngine._processIncoming creates new records from server data");
// Some server records that will be downloaded
let collection =
new ServerCollection();
collection.insert(
"flying",
encryptPayload({ id:
"flying", denomination:
"LNER Class A3 4472" })
);
collection.insert(
"scotsman",
encryptPayload({ id:
"scotsman", denomination:
"Flying Scotsman" })
);
// Two pathological cases involving relative URIs gone wrong.
let pathologicalPayload = encryptPayload({
id:
"../pathological",
denomination:
"Pathological Case",
});
collection.insert(
"../pathological", pathologicalPayload);
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
"/1.1/foo/storage/rotary/flying": collection.wbo(
"flying").handler(),
"/1.1/foo/storage/rotary/scotsman": collection.wbo(
"scotsman").handler(),
});
await SyncTestingInfrastructure(server);
await generateNewKeys(Service.collectionKeys);
let engine = makeRotaryEngine();
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
// Confirm initial environment
Assert.equal(await engine.getLastSync(), 0);
Assert.equal(engine.lastModified,
null);
Assert.equal(engine._store.items.flying, undefined);
Assert.equal(engine._store.items.scotsman, undefined);
Assert.equal(engine._store.items[
"../pathological"], undefined);
await engine._syncStartup();
await engine._processIncoming();
// Timestamps of last sync and last server modification are set.
Assert.ok((await engine.getLastSync()) > 0);
Assert.ok(engine.lastModified > 0);
// Local records have been created from the server data.
Assert.equal(engine._store.items.flying,
"LNER Class A3 4472");
Assert.equal(engine._store.items.scotsman,
"Flying Scotsman");
Assert.equal(engine._store.items[
"../pathological"],
"Pathological Case");
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_processIncoming_reconcile() {
_(
"SyncEngine._processIncoming updates local records");
let collection =
new ServerCollection();
// This server record is newer than the corresponding client one,
// so it'll update its data.
collection.insert(
"newrecord",
encryptPayload({ id:
"newrecord", denomination:
"New stuff..." })
);
// This server record is newer than the corresponding client one,
// so it'll update its data.
collection.insert(
"newerserver",
encryptPayload({ id:
"newerserver", denomination:
"New data!" })
);
// This server record is 2 mins older than the client counterpart
// but identical to it, so we're expecting the client record's
// changedID to be reset.
collection.insert(
"olderidentical",
encryptPayload({
id:
"olderidentical",
denomination:
"Older but identical",
})
);
collection._wbos.olderidentical.modified -= 120;
// This item simply has different data than the corresponding client
// record (which is unmodified), so it will update the client as well
collection.insert(
"updateclient",
encryptPayload({ id:
"updateclient", denomination:
"Get this!" })
);
// This is a dupe of 'original'.
collection.insert(
"duplication",
encryptPayload({ id:
"duplication", denomination:
"Original Entry" })
);
// This record is marked as deleted, so we're expecting the client
// record to be removed.
collection.insert(
"nukeme",
encryptPayload({ id:
"nukeme", denomination:
"Nuke me!", deleted:
true })
);
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
engine._store.items = {
newerserver:
"New data, but not as new as server!",
olderidentical:
"Older but identical",
updateclient:
"Got data?",
original:
"Original Entry",
long_original:
"Long Original Entry",
nukeme:
"Nuke me!",
};
// Make this record 1 min old, thus older than the one on the server
await engine._tracker.addChangedID(
"newerserver", Date.now() / 1000 - 60);
// This record has been changed 2 mins later than the one on the server
await engine._tracker.addChangedID(
"olderidentical", Date.now() / 1000);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
// Confirm initial environment
Assert.equal(engine._store.items.newrecord, undefined);
Assert.equal(
engine._store.items.newerserver,
"New data, but not as new as server!"
);
Assert.equal(engine._store.items.olderidentical,
"Older but identical");
Assert.equal(engine._store.items.updateclient,
"Got data?");
Assert.equal(engine._store.items.nukeme,
"Nuke me!");
let changes = await engine._tracker.getChangedIDs();
Assert.ok(changes.olderidentical > 0);
await engine._syncStartup();
await engine._processIncoming();
// Timestamps of last sync and last server modification are set.
Assert.ok((await engine.getLastSync()) > 0);
Assert.ok(engine.lastModified > 0);
// The new record is created.
Assert.equal(engine._store.items.newrecord,
"New stuff...");
// The 'newerserver' record is updated since the server data is newer.
Assert.equal(engine._store.items.newerserver,
"New data!");
// The data for 'olderidentical' is identical on the server, so
// it's no longer marked as changed anymore.
Assert.equal(engine._store.items.olderidentical,
"Older but identical");
changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.olderidentical, undefined);
// Updated with server data.
Assert.equal(engine._store.items.updateclient,
"Get this!");
// The incoming ID is preferred.
Assert.equal(engine._store.items.original, undefined);
Assert.equal(engine._store.items.duplication,
"Original Entry");
Assert.notEqual(engine._
delete.ids.indexOf(
"original"), -1);
// The 'nukeme' record marked as deleted is removed.
Assert.equal(engine._store.items.nukeme, undefined);
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_processIncoming_reconcile_local_deleted() {
_(
"Ensure local, duplicate ID is deleted on server.");
// When a duplicate is resolved, the local ID (which is never taken) should
// be deleted on the server.
let [engine, server, user] = await createServerAndConfigureClient();
let now = Date.now() / 1000 - 10;
await engine.setLastSync(now);
engine.lastModified = now + 1;
let record = encryptPayload({
id:
"DUPE_INCOMING",
denomination:
"incoming",
});
let wbo =
new ServerWBO(
"DUPE_INCOMING", record, now + 2);
server.insertWBO(user,
"rotary", wbo);
record = encryptPayload({ id:
"DUPE_LOCAL", denomination:
"local" });
wbo =
new ServerWBO(
"DUPE_LOCAL", record, now - 1);
server.insertWBO(user,
"rotary", wbo);
await engine._store.create({ id:
"DUPE_LOCAL", denomination:
"local" });
Assert.ok(await engine._store.itemExists(
"DUPE_LOCAL"));
Assert.equal(
"DUPE_LOCAL", await engine._findDupe({ id:
"DUPE_INCOMING" }));
await engine._sync();
do_check_attribute_count(engine._store.items, 1);
Assert.ok(
"DUPE_INCOMING" in engine._store.items);
let collection = server.getCollection(user,
"rotary");
Assert.equal(1, collection.count());
Assert.notEqual(undefined, collection.wbo(
"DUPE_INCOMING"));
await cleanAndGo(engine, server);
});
add_task(async
function test_processIncoming_reconcile_equivalent() {
_(
"Ensure proper handling of incoming records that match local.");
let [engine, server, user] = await createServerAndConfigureClient();
let now = Date.now() / 1000 - 10;
await engine.setLastSync(now);
engine.lastModified = now + 1;
let record = encryptPayload({ id:
"entry", denomination:
"denomination" });
let wbo =
new ServerWBO(
"entry", record, now + 2);
server.insertWBO(user,
"rotary", wbo);
engine._store.items = { entry:
"denomination" };
Assert.ok(await engine._store.itemExists(
"entry"));
await engine._sync();
do_check_attribute_count(engine._store.items, 1);
await cleanAndGo(engine, server);
});
add_task(
async
function test_processIncoming_reconcile_locally_deleted_dupe_new() {
_(
"Ensure locally deleted duplicate record newer than incoming is handled."
);
// This is a somewhat complicated test. It ensures that if a client receives
// a modified record for an item that is deleted locally but with a different
// ID that the incoming record is ignored. This is a corner case for record
// handling, but it needs to be supported.
let [engine, server, user] = await createServerAndConfigureClient();
let now = Date.now() / 1000 - 10;
await engine.setLastSync(now);
engine.lastModified = now + 1;
let record = encryptPayload({
id:
"DUPE_INCOMING",
denomination:
"incoming",
});
let wbo =
new ServerWBO(
"DUPE_INCOMING", record, now + 2);
server.insertWBO(user,
"rotary", wbo);
// Simulate a locally-deleted item.
engine._store.items = {};
await engine._tracker.addChangedID(
"DUPE_LOCAL", now + 3);
Assert.equal(
false, await engine._store.itemExists(
"DUPE_LOCAL"));
Assert.equal(
false, await engine._store.itemExists(
"DUPE_INCOMING"));
Assert.equal(
"DUPE_LOCAL", await engine._findDupe({ id:
"DUPE_INCOMING" }));
engine.lastModified = server.getCollection(user, engine.name).timestamp;
await engine._sync();
// After the sync, the server's payload for the original ID should be marked
// as deleted.
do_check_empty(engine._store.items);
let collection = server.getCollection(user,
"rotary");
Assert.equal(1, collection.count());
wbo = collection.wbo(
"DUPE_INCOMING");
Assert.notEqual(
null, wbo);
let payload = wbo.getCleartext();
Assert.ok(payload.deleted);
await cleanAndGo(engine, server);
}
);
add_task(
async
function test_processIncoming_reconcile_locally_deleted_dupe_old() {
_(
"Ensure locally deleted duplicate record older than incoming is restored."
);
// This is similar to the above test except it tests the condition where the
// incoming record is newer than the local deletion, therefore overriding it.
let [engine, server, user] = await createServerAndConfigureClient();
let now = Date.now() / 1000 - 10;
await engine.setLastSync(now);
engine.lastModified = now + 1;
let record = encryptPayload({
id:
"DUPE_INCOMING",
denomination:
"incoming",
});
let wbo =
new ServerWBO(
"DUPE_INCOMING", record, now + 2);
server.insertWBO(user,
"rotary", wbo);
// Simulate a locally-deleted item.
engine._store.items = {};
await engine._tracker.addChangedID(
"DUPE_LOCAL", now + 1);
Assert.equal(
false, await engine._store.itemExists(
"DUPE_LOCAL"));
Assert.equal(
false, await engine._store.itemExists(
"DUPE_INCOMING"));
Assert.equal(
"DUPE_LOCAL", await engine._findDupe({ id:
"DUPE_INCOMING" }));
await engine._sync();
// Since the remote change is newer, the incoming item should exist locally.
do_check_attribute_count(engine._store.items, 1);
Assert.ok(
"DUPE_INCOMING" in engine._store.items);
Assert.equal(
"incoming", engine._store.items.DUPE_INCOMING);
let collection = server.getCollection(user,
"rotary");
Assert.equal(1, collection.count());
wbo = collection.wbo(
"DUPE_INCOMING");
let payload = wbo.getCleartext();
Assert.equal(
"incoming", payload.denomination);
await cleanAndGo(engine, server);
}
);
add_task(async
function test_processIncoming_reconcile_changed_dupe() {
_(
"Ensure that locally changed duplicate record is handled properly.");
let [engine, server, user] = await createServerAndConfigureClient();
let now = Date.now() / 1000 - 10;
await engine.setLastSync(now);
engine.lastModified = now + 1;
// The local record is newer than the incoming one, so it should be retained.
let record = encryptPayload({
id:
"DUPE_INCOMING",
denomination:
"incoming",
});
let wbo =
new ServerWBO(
"DUPE_INCOMING", record, now + 2);
server.insertWBO(user,
"rotary", wbo);
await engine._store.create({ id:
"DUPE_LOCAL", denomination:
"local" });
await engine._tracker.addChangedID(
"DUPE_LOCAL", now + 3);
Assert.ok(await engine._store.itemExists(
"DUPE_LOCAL"));
Assert.equal(
"DUPE_LOCAL", await engine._findDupe({ id:
"DUPE_INCOMING" }));
engine.lastModified = server.getCollection(user, engine.name).timestamp;
await engine._sync();
// The ID should have been changed to incoming.
do_check_attribute_count(engine._store.items, 1);
Assert.ok(
"DUPE_INCOMING" in engine._store.items);
// On the server, the local ID should be deleted and the incoming ID should
// have its payload set to what was in the local record.
let collection = server.getCollection(user,
"rotary");
Assert.equal(1, collection.count());
wbo = collection.wbo(
"DUPE_INCOMING");
Assert.notEqual(undefined, wbo);
let payload = wbo.getCleartext();
Assert.equal(
"local", payload.denomination);
await cleanAndGo(engine, server);
});
add_task(async
function test_processIncoming_reconcile_changed_dupe_new() {
_(
"Ensure locally changed duplicate record older than incoming is ignored.");
// This test is similar to the above except the incoming record is younger
// than the local record. The incoming record should be authoritative.
let [engine, server, user] = await createServerAndConfigureClient();
let now = Date.now() / 1000 - 10;
await engine.setLastSync(now);
engine.lastModified = now + 1;
let record = encryptPayload({
id:
"DUPE_INCOMING",
denomination:
"incoming",
});
let wbo =
new ServerWBO(
"DUPE_INCOMING", record, now + 2);
server.insertWBO(user,
"rotary", wbo);
await engine._store.create({ id:
"DUPE_LOCAL", denomination:
"local" });
await engine._tracker.addChangedID(
"DUPE_LOCAL", now + 1);
Assert.ok(await engine._store.itemExists(
"DUPE_LOCAL"));
Assert.equal(
"DUPE_LOCAL", await engine._findDupe({ id:
"DUPE_INCOMING" }));
engine.lastModified = server.getCollection(user, engine.name).timestamp;
await engine._sync();
// The ID should have been changed to incoming.
do_check_attribute_count(engine._store.items, 1);
Assert.ok(
"DUPE_INCOMING" in engine._store.items);
// On the server, the local ID should be deleted and the incoming ID should
// have its payload retained.
let collection = server.getCollection(user,
"rotary");
Assert.equal(1, collection.count());
wbo = collection.wbo(
"DUPE_INCOMING");
Assert.notEqual(undefined, wbo);
let payload = wbo.getCleartext();
Assert.equal(
"incoming", payload.denomination);
await cleanAndGo(engine, server);
});
add_task(async
function test_processIncoming_resume_toFetch() {
_(
"toFetch and previousFailed items left over from previous syncs are fetched on the next sync, along with new items."
);
const LASTSYNC = Date.now() / 1000;
// Server records that will be downloaded
let collection =
new ServerCollection();
collection.insert(
"flying",
encryptPayload({ id:
"flying", denomination:
"LNER Class A3 4472" })
);
collection.insert(
"scotsman",
encryptPayload({ id:
"scotsman", denomination:
"Flying Scotsman" })
);
collection.insert(
"rekolok",
encryptPayload({ id:
"rekolok", denomination:
"Rekonstruktionslokomotive" })
);
for (let i = 0; i < 3; i++) {
let id =
"failed" + i;
let payload = encryptPayload({ id, denomination:
"Record No. " + i });
let wbo =
new ServerWBO(id, payload);
wbo.modified = LASTSYNC - 10;
collection.insertWBO(wbo);
}
collection.wbo(
"flying").modified = collection.wbo(
"scotsman").modified =
LASTSYNC - 10;
collection._wbos.rekolok.modified = LASTSYNC + 10;
// Time travel 10 seconds into the future but still download the above WBOs.
let engine = makeRotaryEngine();
await engine.setLastSync(LASTSYNC);
engine.toFetch =
new SerializableSet([
"flying",
"scotsman"]);
engine.previousFailed =
new SerializableSet([
"failed0",
"failed1",
"failed2",
]);
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
// Confirm initial environment
Assert.equal(engine._store.items.flying, undefined);
Assert.equal(engine._store.items.scotsman, undefined);
Assert.equal(engine._store.items.rekolok, undefined);
await engine._syncStartup();
await engine._processIncoming();
// Local records have been created from the server data.
Assert.equal(engine._store.items.flying,
"LNER Class A3 4472");
Assert.equal(engine._store.items.scotsman,
"Flying Scotsman");
Assert.equal(engine._store.items.rekolok,
"Rekonstruktionslokomotive");
Assert.equal(engine._store.items.failed0,
"Record No. 0");
Assert.equal(engine._store.items.failed1,
"Record No. 1");
Assert.equal(engine._store.items.failed2,
"Record No. 2");
Assert.equal(engine.previousFailed.size, 0);
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_processIncoming_notify_count() {
_(
"Ensure that failed records are reported only once.");
const NUMBER_OF_RECORDS = 15;
// Engine that fails every 5 records.
let engine = makeRotaryEngine();
engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
engine._store.applyIncomingBatch = async
function (records, countTelemetry) {
let sortedRecords = records.sort((a, b) => (a.id > b.id ? 1 : -1));
let recordsToApply = [],
recordsToFail = [];
for (let i = 0; i < sortedRecords.length; i++) {
(i % 5 === 0 ? recordsToFail : recordsToApply).push(sortedRecords[i]);
}
recordsToFail.forEach(() => {
countTelemetry.addIncomingFailedReason(
"failed message");
});
await engine._store._applyIncomingBatch(recordsToApply, countTelemetry);
return recordsToFail.map(record => record.id);
};
// Create a batch of server side records.
let collection =
new ServerCollection();
for (
var i = 0; i < NUMBER_OF_RECORDS; i++) {
let id =
"record-no-" + i.toString(10).padStart(2,
"0");
let payload = encryptPayload({ id, denomination:
"Record No. " + id });
collection.insert(id, payload);
}
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
// Confirm initial environment.
Assert.equal(await engine.getLastSync(), 0);
Assert.equal(engine.toFetch.size, 0);
Assert.equal(engine.previousFailed.size, 0);
do_check_empty(engine._store.items);
let called = 0;
let counts;
function onApplied(count) {
_(
"Called with " + JSON.stringify(counts));
counts = count;
called++;
}
Svc.Obs.add(
"weave:engine:sync:applied", onApplied);
// Do sync.
await engine._syncStartup();
await engine._processIncoming();
// Confirm failures.
do_check_attribute_count(engine._store.items, 12);
Assert.deepEqual(
Array.from(engine.previousFailed).sort(),
[
"record-no-00",
"record-no-05",
"record-no-10"].sort()
);
// There are newly failed records and they are reported.
Assert.equal(called, 1);
Assert.equal(counts.failed, 3);
Assert.equal(counts.failedReasons[0].count, 3);
Assert.equal(counts.failedReasons[0].name,
"failed message");
Assert.equal(counts.applied, 15);
Assert.equal(counts.newFailed, 3);
Assert.equal(counts.succeeded, 12);
// Sync again, 1 of the failed items are the same, the rest didn't fail.
await engine._processIncoming();
// Confirming removed failures.
do_check_attribute_count(engine._store.items, 14);
// After failing twice the record that failed again [record-no-00]
// should NOT be stored to try again
Assert.deepEqual(Array.from(engine.previousFailed), []);
Assert.equal(called, 2);
Assert.equal(counts.failed, 1);
Assert.equal(counts.failedReasons[0].count, 1);
Assert.equal(counts.failedReasons[0].name,
"failed message");
Assert.equal(counts.applied, 3);
Assert.equal(counts.newFailed, 0);
Assert.equal(counts.succeeded, 2);
Svc.Obs.remove(
"weave:engine:sync:applied", onApplied);
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_processIncoming_previousFailed() {
_(
"Ensure that failed records are retried.");
const NUMBER_OF_RECORDS = 14;
// Engine that alternates between failing and applying every 2 records.
let engine = makeRotaryEngine();
engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
engine._store.applyIncomingBatch = async
function (records, countTelemetry) {
let sortedRecords = records.sort((a, b) => (a.id > b.id ? 1 : -1));
let recordsToApply = [],
recordsToFail = [];
let chunks = Array.from(PlacesUtils.chunkArray(sortedRecords, 2));
for (let i = 0; i < chunks.length; i++) {
(i % 2 === 0 ? recordsToFail : recordsToApply).push(...chunks[i]);
}
await engine._store._applyIncomingBatch(recordsToApply, countTelemetry);
return recordsToFail.map(record => record.id);
};
// Create a batch of server side records.
let collection =
new ServerCollection();
for (
var i = 0; i < NUMBER_OF_RECORDS; i++) {
let id =
"record-no-" + i.toString(10).padStart(2,
"0");
let payload = encryptPayload({ id, denomination:
"Record No. " + i });
collection.insert(id, payload);
}
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
// Confirm initial environment.
Assert.equal(await engine.getLastSync(), 0);
Assert.equal(engine.toFetch.size, 0);
Assert.equal(engine.previousFailed.size, 0);
do_check_empty(engine._store.items);
// Initial failed items in previousFailed to be reset.
let previousFailed =
new SerializableSet([
Utils.makeGUID(),
Utils.makeGUID(),
Utils.makeGUID(),
]);
engine.previousFailed = previousFailed;
Assert.equal(engine.previousFailed, previousFailed);
// Do sync.
await engine._syncStartup();
await engine._processIncoming();
// Expected result: 4 sync batches with 2 failures each => 8 failures
do_check_attribute_count(engine._store.items, 6);
Assert.deepEqual(
Array.from(engine.previousFailed).sort(),
[
"record-no-00",
"record-no-01",
"record-no-04",
"record-no-05",
"record-no-08",
"record-no-09",
"record-no-12",
"record-no-13",
].sort()
);
// Sync again with the same failed items (records 0, 1, 8, 9).
await engine._processIncoming();
do_check_attribute_count(engine._store.items, 10);
// A second sync with the same failed items should NOT add the same items again.
// Items that did not fail a second time should no longer be in previousFailed.
Assert.deepEqual(Array.from(engine.previousFailed).sort(), []);
// Refetched items that didn't fail the second time are in engine._store.items.
Assert.equal(engine._store.items[
"record-no-04"],
"Record No. 4");
Assert.equal(engine._store.items[
"record-no-05"],
"Record No. 5");
Assert.equal(engine._store.items[
"record-no-12"],
"Record No. 12");
Assert.equal(engine._store.items[
"record-no-13"],
"Record No. 13");
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_processIncoming_failed_records() {
_(
"Ensure that failed records from _reconcile and applyIncomingBatch are refetched."
);
// Let's create three and a bit batches worth of server side records.
let APPLY_BATCH_SIZE = 50;
let collection =
new ServerCollection();
const NUMBER_OF_RECORDS = APPLY_BATCH_SIZE * 3 + 5;
for (let i = 0; i < NUMBER_OF_RECORDS; i++) {
let id =
"record-no-" + i;
let payload = encryptPayload({ id, denomination:
"Record No. " + id });
let wbo =
new ServerWBO(id, payload);
wbo.modified = Date.now() / 1000 + 60 * (i - APPLY_BATCH_SIZE * 3);
collection.insertWBO(wbo);
}
// Engine that batches but likes to throw on a couple of records,
// two in each batch: the even ones fail in reconcile, the odd ones
// in applyIncoming.
const BOGUS_RECORDS = [
"record-no-" + 42,
"record-no-" + 23,
"record-no-" + (42 + APPLY_BATCH_SIZE),
"record-no-" + (23 + APPLY_BATCH_SIZE),
"record-no-" + (42 + APPLY_BATCH_SIZE * 2),
"record-no-" + (23 + APPLY_BATCH_SIZE * 2),
"record-no-" + (2 + APPLY_BATCH_SIZE * 3),
"record-no-" + (1 + APPLY_BATCH_SIZE * 3),
];
let engine = makeRotaryEngine();
engine.__reconcile = engine._reconcile;
engine._reconcile = async
function _reconcile(record) {
if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) {
throw new Error(
"I don't like this record! Baaaaaah!");
}
return this.__reconcile.apply(
this, arguments);
};
engine._store._applyIncoming = engine._store.applyIncoming;
engine._store.applyIncoming = async
function (record) {
if (BOGUS_RECORDS.indexOf(record.id) % 2 == 1) {
throw new Error(
"I don't like this record! Baaaaaah!");
}
return this._applyIncoming.apply(
this, arguments);
};
// Keep track of requests made of a collection.
let count = 0;
let uris = [];
function recording_handler(recordedCollection) {
let h = recordedCollection.handler();
return function (req, res) {
++count;
uris.push(req.path +
"?" + req.queryString);
return h(req, res);
};
}
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": recording_handler(collection),
});
await SyncTestingInfrastructure(server);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
// Confirm initial environment
Assert.equal(await engine.getLastSync(), 0);
Assert.equal(engine.toFetch.size, 0);
Assert.equal(engine.previousFailed.size, 0);
do_check_empty(engine._store.items);
let observerSubject;
let observerData;
Svc.Obs.add(
"weave:engine:sync:applied",
function onApplied(subject, data) {
Svc.Obs.remove(
"weave:engine:sync:applied", onApplied);
observerSubject = subject;
observerData = data;
});
await engine._syncStartup();
await engine._processIncoming();
// Ensure that all records but the bogus 4 have been applied.
do_check_attribute_count(
engine._store.items,
NUMBER_OF_RECORDS - BOGUS_RECORDS.length
);
// Ensure that the bogus records will be fetched again on the next sync.
Assert.equal(engine.previousFailed.size, BOGUS_RECORDS.length);
Assert.deepEqual(
Array.from(engine.previousFailed).sort(),
BOGUS_RECORDS.sort()
);
// Ensure the observer was notified
Assert.equal(observerData, engine.name);
Assert.equal(observerSubject.failed, BOGUS_RECORDS.length);
Assert.equal(observerSubject.newFailed, BOGUS_RECORDS.length);
// Testing batching of failed item fetches.
// Try to sync again. Ensure that we split the request into chunks to avoid
// URI length limitations.
async
function batchDownload(batchSize) {
count = 0;
uris = [];
engine.guidFetchBatchSize = batchSize;
await engine._processIncoming();
_(
"Tried again. Requests: " + count +
"; URIs: " + JSON.stringify(uris));
return count;
}
// There are 8 bad records, so this needs 3 fetches.
_(
"Test batching with ID batch size 3, normal mobile batch size.");
Assert.equal(await batchDownload(3), 3);
// Since there the previous batch failed again, there should be
// no more records to fetch
_(
"Test that the second time a record failed to sync, gets ignored");
Assert.equal(await batchDownload(BOGUS_RECORDS.length), 0);
}
finally {
await cleanAndGo(engine, server);
}
});
add_task(async
function test_processIncoming_decrypt_failed() {
_(
"Ensure that records failing to decrypt are either replaced or refetched.");
// Some good and some bogus records. One doesn't contain valid JSON,
// the other will throw during decrypt.
let collection =
new ServerCollection();
collection._wbos.flying =
new ServerWBO(
"flying",
encryptPayload({ id:
"flying", denomination:
"LNER Class A3 4472" })
);
collection._wbos.nojson =
new ServerWBO(
"nojson",
"This is invalid JSON");
collection._wbos.nojson2 =
new ServerWBO(
"nojson2",
"This is invalid JSON");
collection._wbos.scotsman =
new ServerWBO(
"scotsman",
encryptPayload({ id:
"scotsman", denomination:
"Flying Scotsman" })
);
collection._wbos.nodecrypt =
new ServerWBO(
"nodecrypt",
"Decrypt this!");
collection._wbos.nodecrypt2 =
new ServerWBO(
"nodecrypt2",
"Decrypt this!");
// Patch the fake crypto service to throw on the record above.
Weave.Crypto._decrypt = Weave.Crypto.decrypt;
Weave.Crypto.decrypt =
function (ciphertext) {
if (ciphertext ==
"Decrypt this!") {
throw new Error(
"Derp! Cipher finalized failed. Im ur crypto destroyin ur recordz."
);
}
return this._decrypt.apply(
this, arguments);
};
// Some broken records also exist locally.
let engine = makeRotaryEngine();
engine.enabled =
true;
engine._store.items = { nojson:
"Valid JSON", nodecrypt:
"Valid ciphertext" };
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
// Confirm initial state
Assert.equal(engine.toFetch.size, 0);
Assert.equal(engine.previousFailed.size, 0);
let observerSubject;
let observerData;
Svc.Obs.add(
"weave:engine:sync:applied",
function onApplied(subject, data) {
Svc.Obs.remove(
"weave:engine:sync:applied", onApplied);
observerSubject = subject;
observerData = data;
});
await engine.setLastSync(collection.wbo(
"nojson").modified - 1);
let ping = await sync_engine_and_validate_telem(engine,
true);
Assert.equal(ping.engines[0].incoming.applied, 2);
Assert.equal(ping.engines[0].incoming.failed, 4);
console.log(
"incoming telem: ", ping.engines[0].incoming);
Assert.equal(
ping.engines[0].incoming.failedReasons[0].name,
"No ciphertext: nothing to decrypt?"
);
// There should be 4 of the same error
Assert.equal(ping.engines[0].incoming.failedReasons[0].count, 4);
Assert.equal(engine.previousFailed.size, 4);
Assert.ok(engine.previousFailed.has(
"nojson"));
Assert.ok(engine.previousFailed.has(
"nojson2"));
Assert.ok(engine.previousFailed.has(
"nodecrypt"));
Assert.ok(engine.previousFailed.has(
"nodecrypt2"));
// Ensure the observer was notified
Assert.equal(observerData, engine.name);
Assert.equal(observerSubject.applied, 2);
Assert.equal(observerSubject.failed, 4);
Assert.equal(observerSubject.failedReasons[0].count, 4);
}
finally {
await promiseClean(engine, server);
}
});
add_task(async
function test_uploadOutgoing_toEmptyServer() {
_(
"SyncEngine._uploadOutgoing uploads new records to server");
let collection =
new ServerCollection();
collection._wbos.flying =
new ServerWBO(
"flying");
collection._wbos.scotsman =
new ServerWBO(
"scotsman");
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
"/1.1/foo/storage/rotary/flying": collection.wbo(
"flying").handler(),
"/1.1/foo/storage/rotary/scotsman": collection.wbo(
"scotsman").handler(),
});
await SyncTestingInfrastructure(server);
await generateNewKeys(Service.collectionKeys);
let engine = makeRotaryEngine();
engine._store.items = {
flying:
"LNER Class A3 4472",
scotsman:
"Flying Scotsman",
};
// Mark one of these records as changed
await engine._tracker.addChangedID(
"scotsman", 0);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
await engine.setLastSync(123);
// needs to be non-zero so that tracker is queried
// Confirm initial environment
Assert.equal(collection.payload(
"flying"), undefined);
Assert.equal(collection.payload(
"scotsman"), undefined);
await engine._syncStartup();
await engine._uploadOutgoing();
// Ensure the marked record ('scotsman') has been uploaded and is
// no longer marked.
Assert.equal(collection.payload(
"flying"), undefined);
Assert.ok(!!collection.payload(
"scotsman"));
Assert.equal(collection.cleartext(
"scotsman").id,
"scotsman");
const changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.scotsman, undefined);
// The 'flying' record wasn't marked so it wasn't uploaded
Assert.equal(collection.payload(
"flying"), undefined);
}
finally {
await cleanAndGo(engine, server);
}
});
async
function test_uploadOutgoing_max_record_payload_bytes(
allowSkippedRecord
) {
_(
"SyncEngine._uploadOutgoing throws when payload is bigger than max_record_payload_bytes"
);
let collection =
new ServerCollection();
collection._wbos.flying =
new ServerWBO(
"flying");
collection._wbos.scotsman =
new ServerWBO(
"scotsman");
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
"/1.1/foo/storage/rotary/flying": collection.wbo(
"flying").handler(),
"/1.1/foo/storage/rotary/scotsman": collection.wbo(
"scotsman").handler(),
});
await SyncTestingInfrastructure(server);
await generateNewKeys(Service.collectionKeys);
let engine = makeRotaryEngine();
engine.allowSkippedRecord = allowSkippedRecord;
engine._store.items = { flying:
"a".repeat(1024 * 1024), scotsman:
"abcd" };
await engine._tracker.addChangedID(
"flying", 1000);
await engine._tracker.addChangedID(
"scotsman", 1000);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
await engine.setLastSync(1);
// needs to be non-zero so that tracker is queried
// Confirm initial environment
Assert.equal(collection.payload(
"flying"), undefined);
Assert.equal(collection.payload(
"scotsman"), undefined);
await engine._syncStartup();
await engine._uploadOutgoing();
if (!allowSkippedRecord) {
do_throw(
"should not get here");
}
await engine.trackRemainingChanges();
// Check we uploaded the other record to the server
Assert.ok(collection.payload(
"scotsman"));
// And that we won't try to upload the huge record next time.
const changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.flying, undefined);
}
catch (e) {
if (allowSkippedRecord) {
do_throw(
"should not get here");
}
await engine.trackRemainingChanges();
// Check that we will try to upload the huge record next time
const changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.flying, 1000);
}
finally {
// Check we didn't upload the oversized record to the server
Assert.equal(collection.payload(
"flying"), undefined);
await cleanAndGo(engine, server);
}
}
add_task(
async
function test_uploadOutgoing_max_record_payload_bytes_disallowSkippedRecords
() {
return test_uploadOutgoing_max_record_payload_bytes(false);
}
);
add_task(
async function test_uploadOutgoing_max_record_payload_bytes_allowSkippedRecords() {
return test_uploadOutgoing_max_record_payload_bytes(true);
}
);
add_task(async function test_uploadOutgoing_failed() {
_(
"SyncEngine._uploadOutgoing doesn't clear the tracker of objects that failed to upload."
);
let collection = new ServerCollection();
// We only define the "flying" WBO on the server, not the "scotsman"
// and "peppercorn" ones.
collection._wbos.flying = new ServerWBO("flying");
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
engine._store.items = {
flying: "LNER Class A3 4472",
scotsman: "Flying Scotsman",
peppercorn: "Peppercorn Class",
};
// Mark these records as changed
const FLYING_CHANGED = 12345;
const SCOTSMAN_CHANGED = 23456;
const PEPPERCORN_CHANGED = 34567;
await engine._tracker.addChangedID("flying", FLYING_CHANGED);
await engine._tracker.addChangedID("scotsman", SCOTSMAN_CHANGED);
await engine._tracker.addChangedID("peppercorn", PEPPERCORN_CHANGED);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
// Confirm initial environment
Assert.equal(collection.payload("flying"), undefined);
let changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.flying, FLYING_CHANGED);
Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
Assert.equal(changes.peppercorn, PEPPERCORN_CHANGED);
engine.enabled = true;
await sync_engine_and_validate_telem(engine, true);
// Ensure the 'flying' record has been uploaded and is no longer marked.
Assert.ok(!!collection.payload("flying"));
changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.flying, undefined);
// The 'scotsman' and 'peppercorn' records couldn't be uploaded so
// they weren't cleared from the tracker.
Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
Assert.equal(changes.peppercorn, PEPPERCORN_CHANGED);
} finally {
await promiseClean(engine, server);
}
});
async function createRecordFailTelemetry(allowSkippedRecord) {
Services.prefs.setStringPref("services.sync.username", "foo");
let collection = new ServerCollection();
collection._wbos.flying = new ServerWBO("flying");
collection._wbos.scotsman = new ServerWBO("scotsman");
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
engine.allowSkippedRecord = allowSkippedRecord;
let oldCreateRecord = engine._store.createRecord;
engine._store.createRecord = async (id, col) => {
if (id != "flying") {
throw new Error("oops");
}
return oldCreateRecord.call(engine._store, id, col);
};
engine._store.items = {
flying: "LNER Class A3 4472",
scotsman: "Flying Scotsman",
};
// Mark these records as changed
const FLYING_CHANGED = 12345;
const SCOTSMAN_CHANGED = 23456;
await engine._tracker.addChangedID("flying", FLYING_CHANGED);
await engine._tracker.addChangedID("scotsman", SCOTSMAN_CHANGED);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
let ping;
try {
await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
// Confirm initial environment
Assert.equal(collection.payload("flying"), undefined);
let changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.flying, FLYING_CHANGED);
Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
engine.enabled = true;
ping = await sync_engine_and_validate_telem(engine, true, onErrorPing => {
ping = onErrorPing;
});
if (!allowSkippedRecord) {
do_throw("should not get here");
}
// Ensure the 'flying' record has been uploaded and is no longer marked.
Assert.ok(!!collection.payload("flying"));
changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.flying, undefined);
} catch (err) {
if (allowSkippedRecord) {
do_throw("should not get here");
}
// Ensure the 'flying' record has not been uploaded and is still marked
Assert.ok(!collection.payload("flying"));
const changes = await engine._tracker.getChangedIDs();
Assert.ok(changes.flying);
} finally {
// We reported in telemetry that we failed a record
Assert.equal(ping.engines[0].outgoing[0].failed, 1);
Assert.equal(ping.engines[0].outgoing[0].failedReasons[0].name, "oops");
// In any case, the 'scotsman' record couldn't be created so it wasn't
// uploaded nor it was not cleared from the tracker.
Assert.ok(!collection.payload("scotsman"));
const changes = await engine._tracker.getChangedIDs();
Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
engine._store.createRecord = oldCreateRecord;
await promiseClean(engine, server);
}
}
add_task(
async function test_uploadOutgoing_createRecord_throws_reported_telemetry() {
_(
"SyncEngine._uploadOutgoing reports a failed record to telemetry if createRecord throws"
);
await createRecordFailTelemetry(true);
}
);
add_task(
async function test_uploadOutgoing_createRecord_throws_dontAllowSkipRecord() {
_(
"SyncEngine._uploadOutgoing will throw if createRecord throws and allowSkipRecord is set to false"
);
await createRecordFailTelemetry(false);
}
);
add_task(async function test_uploadOutgoing_largeRecords() {
_(
"SyncEngine._uploadOutgoing throws on records larger than the max record payload size"
);
let collection = new ServerCollection();
let engine = makeRotaryEngine();
engine.allowSkippedRecord = false;
engine._store.items["large-item"] = "Y".repeat(
Service.getMaxRecordPayloadSize() * 2
);
await engine._tracker.addChangedID("large-item", 0);
collection.insert("large-item");
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
try {
await engine._syncStartup();
let error = null;
try {
await engine._uploadOutgoing();
} catch (e) {
error = e;
}
ok(!!error);
} finally {
await cleanAndGo(engine, server);
}
});
add_task(async function test_syncFinish_deleteByIds() {
_(
"SyncEngine._syncFinish deletes server records slated for deletion (list of record IDs)."
);
let collection = new ServerCollection();
collection._wbos.flying = new ServerWBO(
"flying",
encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
);
collection._wbos.scotsman = new ServerWBO(
"scotsman",
encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
);
collection._wbos.rekolok = new ServerWBO(
"rekolok",
encryptPayload({ id: "rekolok", denomination: "Rekonstruktionslokomotive" })
);
let server = httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
try {
engine._delete = { ids: ["flying", "rekolok"] };
await engine._syncFinish();
// The 'flying' and 'rekolok' records were deleted while the
// 'scotsman' one wasn't.
Assert.equal(collection.payload("flying"), undefined);
Assert.ok(!!collection.payload("scotsman"));
Assert.equal(collection.payload("rekolok"), undefined);
// The deletion todo list has been reset.
Assert.equal(engine._delete.ids, undefined);
} finally {
await cleanAndGo(engine, server);
}
});
add_task(async function test_syncFinish_deleteLotsInBatches() {
_(
"SyncEngine._syncFinish deletes server records in batches of 100 (list of record IDs)."
);
let collection = new ServerCollection();
// Let's count how many times the client does a DELETE request to the server
var noOfUploads = 0;
collection.delete = (function (orig) {
return function () {
noOfUploads++;
return orig.apply(this, arguments);
};
})(collection.delete);
// Create a bunch of records on the server
let now = Date.now();
for (var i = 0; i < 234; i++) {
let id = "record-no-" + i;
let payload = encryptPayload({ id, denomination: "Record No. " + i });
let wbo = new ServerWBO(id, payload);
wbo.modified = now / 1000 - 60 * (i + 110);
collection.insertWBO(wbo);
}
let server = httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
try {
// Confirm initial environment
Assert.equal(noOfUploads, 0);
// Declare what we want to have deleted: all records no. 100 and
// up and all records that are less than 200 mins old (which are
// records 0 thru 90).
engine._delete = { ids: [], newer: now / 1000 - 60 * 200.5 };
for (i = 100; i < 234; i++) {
engine._delete.ids.push("record-no-" + i);
}
await engine._syncFinish();
// Ensure that the appropriate server data has been wiped while
// preserving records 90 thru 200.
for (i = 0; i < 234; i++) {
let id = "record-no-" + i;
if (i <= 90 || i >= 100) {
Assert.equal(collection.payload(id), undefined);
} else {
Assert.ok(!!collection.payload(id));
}
}
// The deletion was done in batches
Assert.equal(noOfUploads, 2 + 1);
// The deletion todo list has been reset.
Assert.equal(engine._delete.ids, undefined);
} finally {
await cleanAndGo(engine, server);
}
});
add_task(async function test_sync_partialUpload() {
_("SyncEngine.sync() keeps changedIDs that couldn't be uploaded.");
let collection = new ServerCollection();
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
let oldServerConfiguration = Service.serverConfiguration;
Service.serverConfiguration = {
max_post_records: 100,
};
await SyncTestingInfrastructure(server);
await generateNewKeys(Service.collectionKeys);
let engine = makeRotaryEngine();
// Let the third upload fail completely
var noOfUploads = 0;
collection.post = (function (orig) {
return function () {
if (noOfUploads == 2) {
throw new Error("FAIL!");
}
noOfUploads++;
return orig.apply(this, arguments);
};
})(collection.post);
// Create a bunch of records (and server side handlers)
for (let i = 0; i < 234; i++) {
let id = "record-no-" + i;
engine._store.items[id] = "Record No. " + i;
await engine._tracker.addChangedID(id, i);
// Let two items in the first upload batch fail.
if (i != 23 && i != 42) {
collection.insert(id);
}
}
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
try {
await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
engine.enabled = true;
let error;
try {
await sync_engine_and_validate_telem(engine, true);
} catch (ex) {
error = ex;
}
ok(!!error);
const changes = await engine._tracker.getChangedIDs();
for (let i = 0; i < 234; i++) {
let id = "record-no-" + i;
// Ensure failed records are back in the tracker:
// * records no. 23 and 42 were rejected by the server,
// * records after the third batch and higher couldn't be uploaded because
// we failed hard on the 3rd upload.
if (i == 23 || i == 42 || i >= 200) {
Assert.equal(changes[id], i);
} else {
Assert.equal(false, id in changes);
}
}
} finally {
Service.serverConfiguration = oldServerConfiguration;
await promiseClean(engine, server);
}
});
add_task(async function test_canDecrypt_noCryptoKeys() {
_(
"SyncEngine.canDecrypt returns false if the engine fails to decrypt items on the server, e.g. due to a missing crypto key collection."
);
// Wipe collection keys so we can test the desired scenario.
Service.collectionKeys.clear();
let collection = new ServerCollection();
collection._wbos.flying = new ServerWBO(
"flying",
encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
);
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
try {
Assert.equal(false, await engine.canDecrypt());
} finally {
await cleanAndGo(engine, server);
}
});
add_task(async function test_canDecrypt_true() {
_(
"SyncEngine.canDecrypt returns true if the engine can decrypt the items on the server."
);
await generateNewKeys(Service.collectionKeys);
let collection = new ServerCollection();
collection._wbos.flying = new ServerWBO(
"flying",
encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
);
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let engine = makeRotaryEngine();
try {
Assert.ok(await engine.canDecrypt());
} finally {
await cleanAndGo(engine, server);
}
});
add_task(async function test_syncapplied_observer() {
const NUMBER_OF_RECORDS = 10;
let engine = makeRotaryEngine();
// Create a batch of server side records.
let collection = new ServerCollection();
for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
let id = "record-no-" + i;
let payload = encryptPayload({ id, denomination: "Record No. " + id });
collection.insert(id, payload);
}
let server = httpd_setup({
"/1.1/foo/storage/rotary": collection.handler(),
});
await SyncTestingInfrastructure(server);
let syncID = await engine.resetLocalSyncID();
let meta_global = Service.recordManager.set(
engine.metaURL,
new WBORecord(engine.metaURL)
);
meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
let numApplyCalls = 0;
let engine_name;
let count;
function onApplied(subject, data) {
numApplyCalls++;
engine_name = data;
count = subject;
}
Svc.Obs.add("weave:engine:sync:applied", onApplied);
try {
Service.scheduler.hasIncomingItems = false;
// Do sync.
await engine._syncStartup();
await engine._processIncoming();
do_check_attribute_count(engine._store.items, 10);
Assert.equal(numApplyCalls, 1);
Assert.equal(engine_name, "rotary");
Assert.equal(count.applied, 10);
Assert.ok(Service.scheduler.hasIncomingItems);
} finally {
await cleanAndGo(engine, server);
Service.scheduler.hasIncomingItems = false;
Svc.Obs.remove("weave:engine:sync:applied", onApplied);
}
});