Bug 569295 - limit the number of bytes we attempt to upload to the storage servers. r=rnewman

This commit is contained in:
Mark Hammond 2016-01-21 12:30:25 +11:00
parent 4dff654d4e
commit a804f0bd3e
4 changed files with 185 additions and 31 deletions

View File

@ -94,10 +94,10 @@ SCORE_UPDATE_DELAY: 100,
// observed spurious idle/back events and short enough to pre-empt user activity.
IDLE_OBSERVER_BACK_DELAY: 100,
// Number of records to upload in a single POST (multiple POSTS if exceeded)
// FIXME: Record size limit is 256k (new cluster), so this can be quite large!
// (Bug 569295)
// Max number of records or bytes to upload in a single POST - we'll do multiple POSTS if either
// MAX_UPLOAD_RECORDS or MAX_UPLOAD_BYTES is hit)
MAX_UPLOAD_RECORDS: 100,
MAX_UPLOAD_BYTES: 1024 * 1023, // just under 1MB
MAX_HISTORY_UPLOAD: 5000,
MAX_HISTORY_DOWNLOAD: 5000,

View File

@ -1434,13 +1434,7 @@ SyncEngine.prototype = {
// collection we'll upload
let up = new Collection(this.engineURL, null, this.service);
let count = 0;
// Upload what we've got so far in the collection
let doUpload = Utils.bind2(this, function(desc) {
this._log.info("Uploading " + desc + " of " + modifiedIDs.length +
" records");
let resp = up.post();
let handleResponse = resp => {
if (!resp.success) {
this._log.debug("Uploading records failed: " + resp);
resp.failureCode = ENGINE_UPLOAD_FAIL;
@ -1463,32 +1457,29 @@ SyncEngine.prototype = {
let id = resp.obj.success[key];
delete this._modified[id];
}
}
up.clearRecords();
});
let postQueue = up.newPostQueue(this._log, handleResponse);
for (let id of modifiedIDs) {
let out;
let ok = false;
try {
let out = this._createRecord(id);
out = this._createRecord(id);
if (this._log.level <= Log.Level.Trace)
this._log.trace("Outgoing: " + out);
out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
up.pushData(out);
ok = true;
} catch (ex if !Async.isShutdownException(ex)) {
this._log.warn("Error creating record", ex);
}
// Partial upload
if ((++count % MAX_UPLOAD_RECORDS) == 0)
doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out");
if (ok) {
postQueue.enqueue(out);
}
this._store._sleep(0);
}
// Final upload
if (count % MAX_UPLOAD_RECORDS > 0)
doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
postQueue.flush();
}
},

View File

@ -600,14 +600,6 @@ Collection.prototype = {
this._rebuildURL();
},
pushData: function Coll_pushData(data) {
this._data.push(data);
},
clearRecords: function Coll_clearRecords() {
this._data = [];
},
set recordHandler(onRecord) {
// Save this because onProgress is called with this as the ChannelListener
let coll = this;
@ -629,4 +621,82 @@ Collection.prototype = {
}
};
},
// This object only supports posting via the postQueue object.
post() {
throw new Error("Don't directly post to a collection - use newPostQueue instead");
},
newPostQueue(log, postCallback) {
let poster = data => {
return Resource.prototype.post.call(this, data);
}
return new PostQueue(poster, log, postCallback);
},
};
/* A helper to manage the posting of records while respecting the various
size limits.
*/
function PostQueue(poster, log, postCallback) {
// The "post" function we should use when it comes time to do the post.
this.poster = poster;
this.log = log;
// The callback we make with the response when we do get around to making the
// post (which could be during any of the enqueue() calls or the final flush())
// This callback may be called multiple times and must not add new items to
// the queue.
this.postCallback = postCallback;
// The string where we are capturing the stringified version of the records
// queued so far. It will always be invalid JSON as it is always missing the
// close bracket.
this.queued = "";
// The number of records we've queued so far.
this.numQueued = 0;
}
PostQueue.prototype = {
enqueue(record) {
// We want to ensure the record has a .toJSON() method defined - even
// though JSON.stringify() would implicitly call it, the stringify might
// still work even if it isn't defined, which isn't what we want.
let jsonRepr = record.toJSON();
if (!jsonRepr) {
throw new Error("You must only call this with objects that explicitly support JSON");
}
let bytes = JSON.stringify(jsonRepr);
// Note that we purposely don't check if a single record would exceed our
// limit - we still attempt the post and if it sees a 413 like we think it
// will, we just let that do whatever it does (which is probably cause
// ongoing sync failures for that engine - bug 1241356 exists to fix this)
// (Note that counter-intuitively, the post of the oversized record will
// not happen here but on the next .enqueue/.flush.)
// Do a flush if we can't add this record without exceeding our limits.
let newLength = this.queued.length + bytes.length + 1; // extra 1 for trailing "]"
if (this.numQueued >= MAX_UPLOAD_RECORDS || newLength >= MAX_UPLOAD_BYTES) {
this.log.trace("PostQueue flushing"); // flush logs more info...
// We need to write the queue out before handling this one.
this.flush();
}
// Either a ',' or a '[' depending on whether this is the first record.
this.queued += this.numQueued ? "," : "[";
this.queued += bytes;
this.numQueued++;
},
flush() {
if (!this.queued) {
// nothing queued.
return;
}
this.log.info(`Posting ${this.numQueued} records of ${this.queued.length+1} bytes`);
let queued = this.queued + "]";
this.queued = "";
this.numQueued = 0;
this.postCallback(this.poster(queued));
},
}

View File

@ -1486,6 +1486,99 @@ add_test(function test_uploadOutgoing_MAX_UPLOAD_RECORDS() {
}
});
add_test(function test_uploadOutgoing_MAX_UPLOAD_BYTES() {
_("SyncEngine._uploadOutgoing uploads in batches of MAX_UPLOAD_BYTES");
Service.identity.username = "foo";
let collection = new ServerCollection();
// Let's count how many times the client posts to the server
let uploadCounts = [];
collection.post = (function(orig) {
return function(data) {
uploadCounts.push(JSON.parse(data).length);
return orig.call(this, data);
};
}(collection.post));
let engine = makeRotaryEngine();
// A helper function that calculates the overhead of a record as uploaded
// to the server - it returns the size of a record with an empty string.
// This is so we can calculate exactly how many records we can fit into a
// batch (ie, we expect the record size that's actually uploaded to be the
// result of this function + the length of the data)
let calculateRecordOverhead = function() {
engine._store.items["string-no-x"] = "";
let x = engine._createRecord("string-no-x");
x.encrypt(Service.collectionKeys.keyForCollection(engine.name));
delete engine._store.items["string-no-x"];
return JSON.stringify(x).length;
}
let allIds = [];
// Create a bunch of records (and server side handlers) - make 20 that will
// fit inside our byte limit.
let fullItemSize = (MAX_UPLOAD_BYTES - 2) / 20;
// fullItemSize includes the "," between records and quote characters (as we
// will use strings)
let itemSize = fullItemSize - calculateRecordOverhead() - (3 * 20);
// Add 21 of this size - the first 20 should fit in the first batch.
for (let i = 0; i < 21; i++) {
let id = 'string-no-' + i;
engine._store.items[id] = "X".repeat(itemSize);
engine._tracker.addChangedID(id, 0);
collection.insert(id);
allIds.push(id);
}
// Now a single large item that's greater than MAX_UPLOAD_BYTES. This should
// cause the 1 item that didn't fit in the previous batch to be uploaded
// by itself, then this large one by itself.
engine._store.items["large-item"] = "Y".repeat(MAX_UPLOAD_BYTES*2);
engine._tracker.addChangedID("large-item", 0);
collection.insert("large-item");
allIds.push("large-item");
// And a few more small items - these should all be uploaded together.
for (let i = 0; i < 20; i++) {
let id = 'small-no-' + i;
engine._store.items[id] = "ZZZZ";
engine._tracker.addChangedID(id, 0);
collection.insert(id);
allIds.push(id);
}
let meta_global = Service.recordManager.set(engine.metaURL,
new WBORecord(engine.metaURL));
meta_global.payload.engines = {rotary: {version: engine.version,
syncID: engine.syncID}};
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler()
});
let syncTesting = new SyncTestingInfrastructure(server);
try {
// Confirm initial environment.
do_check_eq(uploadCounts.length, 0);
engine._syncStartup();
engine._uploadOutgoing();
// Ensure all records have been uploaded.
for (let checkId of allIds) {
do_check_true(!!collection.payload(checkId));
}
// Ensure that the uploads were performed in the batch sizes we expect.
Assert.deepEqual(uploadCounts, [20, 1, 1, 20]);
} finally {
cleanAndGo(server);
}
});
add_test(function test_syncFinish_noDelete() {
_("SyncEngine._syncFinish resets tracker's score");