Bug 609398 - Get rid of partial sync [r=mconnor]

This commit is contained in:
Philipp von Weitershausen 2010-11-09 13:51:19 -08:00
parent f32561863b
commit 08efe3cb23
4 changed files with 47 additions and 135 deletions

View File

@ -62,6 +62,10 @@ MULTI_DESKTOP_SYNC: 60 * 60 * 1000, // 1 hour
MULTI_MOBILE_SYNC: 5 * 60 * 1000, // 5 minutes MULTI_MOBILE_SYNC: 5 * 60 * 1000, // 5 minutes
PARTIAL_DATA_SYNC: 60 * 1000, // 1 minute PARTIAL_DATA_SYNC: 60 * 1000, // 1 minute
// 50 is hardcoded here because of URL length restrictions.
// (GUIDs can be up to 64 chars long)
MOBILE_BATCH_SIZE: 50,
// score thresholds for early syncs // score thresholds for early syncs
SINGLE_USER_THRESHOLD: 1000, SINGLE_USER_THRESHOLD: 1000,
MULTI_DESKTOP_THRESHOLD: 500, MULTI_DESKTOP_THRESHOLD: 500,

View File

@ -282,7 +282,6 @@ Engine.prototype = {
function SyncEngine(name) { function SyncEngine(name) {
Engine.call(this, name || "SyncEngine"); Engine.call(this, name || "SyncEngine");
this.loadToFetch();
} }
SyncEngine.prototype = { SyncEngine.prototype = {
__proto__: Engine.prototype, __proto__: Engine.prototype,
@ -322,19 +321,6 @@ SyncEngine.prototype = {
Svc.Prefs.set(this.name + ".lastSync", "0"); Svc.Prefs.set(this.name + ".lastSync", "0");
}, },
get toFetch() this._toFetch,
set toFetch(val) {
this._toFetch = val;
Utils.jsonSave("toFetch/" + this.name, this, val);
},
loadToFetch: function loadToFetch() {
// Initialize to empty if there's no file
this._toFetch = [];
Utils.jsonLoad("toFetch/" + this.name, this, Utils.bind2(this, function(o)
this._toFetch = o));
},
// Create a new record using the store and add in crypto fields // Create a new record using the store and add in crypto fields
_createRecord: function SyncEngine__createRecord(id) { _createRecord: function SyncEngine__createRecord(id) {
let record = this._store.createRecord(id, this.engineURL + "/" + id); let record = this._store.createRecord(id, this.engineURL + "/" + id);
@ -439,23 +425,19 @@ SyncEngine.prototype = {
this._delete = {}; this._delete = {};
}, },
// Generate outgoing records // Process incoming records
_processIncoming: function SyncEngine__processIncoming() { _processIncoming: function SyncEngine__processIncoming() {
this._log.trace("Downloading & applying server changes"); this._log.trace("Downloading & applying server changes");
// Figure out how many total items to fetch this sync; do less on mobile. // Figure out how many total items to fetch this sync; do less on mobile.
// 50 is hardcoded here because of URL length restrictions. let batchSize = Infinity;
// (GUIDs can be up to 64 chars long)
let fetchNum = Infinity;
let newitems = new Collection(this.engineURL, this._recordObj); let newitems = new Collection(this.engineURL, this._recordObj);
if (Svc.Prefs.get("client.type") == "mobile") { if (Svc.Prefs.get("client.type") == "mobile") {
fetchNum = 50; batchSize = MOBILE_BATCH_SIZE;
newitems.sort = "index";
} }
newitems.newer = this.lastSync; newitems.newer = this.lastSync;
newitems.full = true; newitems.full = true;
newitems.limit = fetchNum; newitems.limit = batchSize;
let count = {applied: 0, reconciled: 0}; let count = {applied: 0, reconciled: 0};
let handled = []; let handled = [];
@ -502,16 +484,13 @@ SyncEngine.prototype = {
resp.failureCode = ENGINE_DOWNLOAD_FAIL; resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp; throw resp;
} }
// Subtract out the number of items we just got
fetchNum -= handled.length;
} }
// Check if we got the maximum that we requested; get the rest if so // Mobile: check if we got the maximum that we requested; get the rest if so.
let toFetch = [];
if (handled.length == newitems.limit) { if (handled.length == newitems.limit) {
let guidColl = new Collection(this.engineURL); let guidColl = new Collection(this.engineURL);
guidColl.newer = this.lastSync; guidColl.newer = this.lastSync;
guidColl.sort = "index";
let guids = guidColl.get(); let guids = guidColl.get();
if (!guids.success) if (!guids.success)
@ -521,20 +500,18 @@ SyncEngine.prototype = {
// were already waiting and prepend the new ones // were already waiting and prepend the new ones
let extra = Utils.arraySub(guids.obj, handled); let extra = Utils.arraySub(guids.obj, handled);
if (extra.length > 0) if (extra.length > 0)
this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra)); toFetch = extra.concat(Utils.arraySub(toFetch, extra));
} }
// Process any backlog of GUIDs if we haven't fetched too many this sync // Mobile: process any backlog of GUIDs
while (this.toFetch.length > 0 && fetchNum > 0) { while (toFetch.length) {
// Reuse the original query, but get rid of the restricting params // Reuse the original query, but get rid of the restricting params
newitems.limit = 0; newitems.limit = 0;
newitems.newer = 0; newitems.newer = 0;
// Get the first bunch of records and save the rest for later // Get the first bunch of records and save the rest for later
let minFetch = Math.min(150, this.toFetch.length, fetchNum); newitems.ids = toFetch.slice(0, batchSize);
newitems.ids = this.toFetch.slice(0, minFetch); toFetch = toFetch.slice(batchSize);
this.toFetch = this.toFetch.slice(minFetch);
fetchNum -= minFetch;
// Reuse the existing record handler set earlier // Reuse the existing record handler set earlier
let resp = newitems.get(); let resp = newitems.get();
@ -548,7 +525,7 @@ SyncEngine.prototype = {
this.lastSync = this.lastModified; this.lastSync = this.lastModified;
this._log.info(["Records:", count.applied, "applied,", count.reconciled, this._log.info(["Records:", count.applied, "applied,", count.reconciled,
"reconciled,", this.toFetch.length, "left to fetch"].join(" ")); "reconciled."].join(" "));
}, },
/** /**
@ -788,7 +765,6 @@ SyncEngine.prototype = {
_resetClient: function SyncEngine__resetClient() { _resetClient: function SyncEngine__resetClient() {
this.resetLastSync(); this.resetLastSync();
this.toFetch = [];
}, },
wipeServer: function wipeServer(ignoreCrypto) { wipeServer: function wipeServer(ignoreCrypto) {

View File

@ -64,48 +64,17 @@ function test_lastSync() {
} }
} }
function test_toFetch() {
_("SyncEngine.toFetch corresponds to file on disk");
let engine = makeSteamEngine();
try {
// Ensure pristine environment
do_check_eq(engine.toFetch.length, 0);
// Write file to disk
let toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
engine.toFetch = toFetch;
do_check_eq(engine.toFetch, toFetch);
let fakefile = syncTesting.fakeFilesystem.fakeContents[
"weave/toFetch/steam.json"];
do_check_eq(fakefile, JSON.stringify(toFetch));
// Read file from disk
toFetch = [Utils.makeGUID(), Utils.makeGUID()];
syncTesting.fakeFilesystem.fakeContents["weave/toFetch/steam.json"]
= JSON.stringify(toFetch);
engine.loadToFetch();
do_check_eq(engine.toFetch.length, 2);
do_check_eq(engine.toFetch[0], toFetch[0]);
do_check_eq(engine.toFetch[1], toFetch[1]);
} finally {
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
}
}
function test_resetClient() { function test_resetClient() {
_("SyncEngine.resetClient resets lastSync and toFetch"); _("SyncEngine.resetClient resets lastSync");
let engine = makeSteamEngine(); let engine = makeSteamEngine();
try { try {
// Ensure pristine environment // Ensure pristine environment
do_check_eq(Svc.Prefs.get("steam.lastSync"), undefined); do_check_eq(Svc.Prefs.get("steam.lastSync"), undefined);
do_check_eq(engine.toFetch.length, 0);
engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
engine.lastSync = 123.45; engine.lastSync = 123.45;
engine.resetClient(); engine.resetClient();
do_check_eq(engine.lastSync, 0); do_check_eq(engine.lastSync, 0);
do_check_eq(engine.toFetch.length, 0);
} finally { } finally {
syncTesting = new SyncTestingInfrastructure(makeSteamEngine); syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
Svc.Prefs.resetBranch(""); Svc.Prefs.resetBranch("");
@ -128,14 +97,12 @@ function test_wipeServer() {
try { try {
// Some data to reset. // Some data to reset.
engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
engine.lastSync = 123.45; engine.lastSync = 123.45;
_("Wipe server data and reset client."); _("Wipe server data and reset client.");
engine.wipeServer(true); engine.wipeServer(true);
do_check_eq(steamCollection.payload, undefined); do_check_eq(steamCollection.payload, undefined);
do_check_eq(engine.lastSync, 0); do_check_eq(engine.lastSync, 0);
do_check_eq(engine.toFetch.length, 0);
_("We passed a truthy arg earlier in which case it doesn't wipe the crypto collection."); _("We passed a truthy arg earlier in which case it doesn't wipe the crypto collection.");
do_check_eq(steamCrypto.payload, PAYLOAD); do_check_eq(steamCrypto.payload, PAYLOAD);
@ -153,7 +120,6 @@ function run_test() {
test_url_attributes(); test_url_attributes();
test_syncID(); test_syncID();
test_lastSync(); test_lastSync();
test_toFetch();
test_resetClient(); test_resetClient();
test_wipeServer(); test_wipeServer();
} }

View File

@ -473,7 +473,6 @@ function test_processIncoming_emptyServer() {
// Merely ensure that this code path is run without any errors // Merely ensure that this code path is run without any errors
engine._processIncoming(); engine._processIncoming();
do_check_eq(engine.lastSync, 0); do_check_eq(engine.lastSync, 0);
do_check_eq(engine.toFetch.length, 0);
} finally { } finally {
server.stop(do_test_finished); server.stop(do_test_finished);
@ -676,14 +675,22 @@ function test_processIncoming_reconcile() {
} }
function test_processIncoming_fetchNum() { function test_processIncoming_mobile_batchSize() {
_("SyncEngine._processIncoming doesn't fetch everything at ones on mobile clients"); _("SyncEngine._processIncoming doesn't fetch everything at once on mobile clients");
Svc.Prefs.set("clusterURL", "http://localhost:8080/"); Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo"); Svc.Prefs.set("username", "foo");
Svc.Prefs.set("client.type", "mobile"); Svc.Prefs.set("client.type", "mobile");
let crypto_steam = new ServerWBO('steam'); let crypto_steam = new ServerWBO('steam');
// A collection that logs each GET
let collection = new ServerCollection(); let collection = new ServerCollection();
collection.get_log = [];
collection._get = collection.get;
collection.get = function (options) {
this.get_log.push(options);
return this._get(options);
};
// Let's create some 234 server side records. They're all at least // Let's create some 234 server side records. They're all at least
// 10 minutes old. // 10 minutes old.
@ -707,72 +714,31 @@ function test_processIncoming_fetchNum() {
try { try {
// On a mobile client, the first sync will only get the first 50 // On a mobile client, we get new records from the server in batches of 50.
// objects from the server
engine._processIncoming(); engine._processIncoming();
do_check_eq([id for (id in engine._store.items)].length, 50); do_check_eq([id for (id in engine._store.items)].length, 234);
do_check_true('record-no-0' in engine._store.items); do_check_true('record-no-0' in engine._store.items);
do_check_true('record-no-49' in engine._store.items); do_check_true('record-no-49' in engine._store.items);
do_check_eq(engine.toFetch.length, 234 - 50);
// The next sync will get another 50 objects, assuming the server
// hasn't got any new data.
engine._processIncoming();
do_check_eq([id for (id in engine._store.items)].length, 100);
do_check_true('record-no-50' in engine._store.items); do_check_true('record-no-50' in engine._store.items);
do_check_true('record-no-99' in engine._store.items);
do_check_eq(engine.toFetch.length, 234 - 100);
// Now let's say there are some new items on the server
for (i=0; i < 5; i++) {
let id = 'new-record-no-' + i;
let payload = encryptPayload({id: id, denomination: "New record No. " + i});
let wbo = new ServerWBO(id, payload);
wbo.modified = Date.now()/1000 - 60*i;
collection.wbos[id] = wbo;
}
// Let's tell the engine the server has got newer data. This is
// normally done by the WeaveSvc after retrieving info/collections.
engine.lastModified = Date.now() / 1000 + 1;
// Now we'll fetch another 50 items, but 5 of those are the new
// ones, so we've only fetched another 45 of the older ones.
engine._processIncoming();
do_check_eq([id for (id in engine._store.items)].length, 150);
do_check_true('new-record-no-0' in engine._store.items);
do_check_true('new-record-no-4' in engine._store.items);
do_check_true('record-no-100' in engine._store.items);
do_check_true('record-no-144' in engine._store.items);
do_check_eq(engine.toFetch.length, 234 - 100 - 45);
// Now let's modify a few existing records on the server so that
// they have to be refetched.
collection.wbos['record-no-3'].modified = Date.now()/1000 + 1;
collection.wbos['record-no-41'].modified = Date.now()/1000 + 1;
collection.wbos['record-no-122'].modified = Date.now()/1000 + 1;
// Once again we'll tell the engine that the server's got newer data
// and once again we'll fetch 50 items, but 3 of those are the
// existing records, so we're only fetching 47 new ones.
engine.lastModified = Date.now() / 1000 + 2;
engine._processIncoming();
do_check_eq([id for (id in engine._store.items)].length, 197);
do_check_true('record-no-145' in engine._store.items);
do_check_true('record-no-191' in engine._store.items);
do_check_eq(engine.toFetch.length, 234 - 100 - 45 - 47);
// Finally let's fetch the rest, making sure that will fetch
// everything up to the last record.
while(engine.toFetch.length) {
engine._processIncoming();
}
do_check_eq([id for (id in engine._store.items)].length, 234 + 5);
do_check_true('record-no-233' in engine._store.items); do_check_true('record-no-233' in engine._store.items);
// Verify that the right number of GET requests with the right
// kind of parameters were made.
do_check_eq(collection.get_log.length,
Math.ceil(234 / MOBILE_BATCH_SIZE) + 1);
do_check_eq(collection.get_log[0].full, 1);
do_check_eq(collection.get_log[0].limit, MOBILE_BATCH_SIZE);
do_check_eq(collection.get_log[1].full, undefined);
do_check_eq(collection.get_log[1].limit, undefined);
for (let i = 1; i <= Math.floor(234 / MOBILE_BATCH_SIZE); i++) {
do_check_eq(collection.get_log[i+1].full, 1);
do_check_eq(collection.get_log[i+1].limit, undefined);
if (i < Math.floor(234 / MOBILE_BATCH_SIZE))
do_check_eq(collection.get_log[i+1].ids.length, MOBILE_BATCH_SIZE);
else
do_check_eq(collection.get_log[i+1].ids.length, 234 % MOBILE_BATCH_SIZE);
}
} finally { } finally {
server.stop(do_test_finished); server.stop(do_test_finished);
Svc.Prefs.resetBranch(""); Svc.Prefs.resetBranch("");
@ -1162,7 +1128,7 @@ function run_test() {
test_processIncoming_emptyServer(); test_processIncoming_emptyServer();
test_processIncoming_createFromServer(); test_processIncoming_createFromServer();
test_processIncoming_reconcile(); test_processIncoming_reconcile();
test_processIncoming_fetchNum(); test_processIncoming_mobile_batchSize();
test_uploadOutgoing_toEmptyServer(); test_uploadOutgoing_toEmptyServer();
test_uploadOutgoing_failed(); test_uploadOutgoing_failed();
test_uploadOutgoing_MAX_UPLOAD_RECORDS(); test_uploadOutgoing_MAX_UPLOAD_RECORDS();