Merge m-c to inbound.

This commit is contained in:
Ryan VanderMeulen 2013-01-27 15:23:35 -05:00
commit f88aeca1c6
13 changed files with 446 additions and 45 deletions

View File

@ -32,7 +32,7 @@ const DEFAULT_LOAD_DELAY_MSEC = 10 * 1000;
* EXAMPLE USAGE
* =============
*
* let reporter = Cc["@mozilla.org/healthreport/service;1"]
* let reporter = Cc["@mozilla.org/datareporting/service;1"]
* .getService(Ci.nsISupports)
* .wrappedJSObject
* .healthReporter;

View File

@ -38,7 +38,8 @@ const DEFAULT_DATABASE_NAME = "healthreport.sqlite";
* lower-level components (such as collection and submission) together.
*
* An instance of this type is created as an XPCOM service. See
* HealthReportService.js and HealthReportComponents.manifest.
* DataReportingService.js and
* DataReporting.manifest/HealthReportComponents.manifest.
*
* It is theoretically possible to have multiple instances of this running
* in the application. For example, this type may one day handle submission
@ -306,6 +307,9 @@ HealthReporter.prototype = Object.freeze({
this._log.info("HealthReporter started.");
this._initialized = true;
Services.obs.addObserver(this, "idle-daily", false);
// Clean up caches and reduce memory usage.
this._storage.compact();
this._initializedDeferred.resolve(this);
},
@ -483,7 +487,7 @@ HealthReporter.prototype = Object.freeze({
* Register a `Metrics.Provider` with this instance.
*
* This needs to be called or no data will be collected. See also
* registerProvidersFromCategoryManager`.
* `registerProvidersFromCategoryManager`.
*
* @param provider
* (Metrics.Provider) The provider to register for collection.
@ -626,10 +630,12 @@ HealthReporter.prototype = Object.freeze({
};
for (let [measurementKey, measurement] of provider.measurements) {
let name = providerName + "." + measurement.name + "." + measurement.version;
let name = providerName + "." + measurement.name;
let serializer;
try {
// The measurement is responsible for returning a serializer which
// is aware of the measurement version.
serializer = measurement.serializer(measurement.SERIALIZE_JSON);
} catch (ex) {
this._log.warn("Error obtaining serializer for measurement: " + name +
@ -692,6 +698,7 @@ HealthReporter.prototype = Object.freeze({
o.errors = errors;
}
this._storage.compact();
throw new Task.Result(JSON.stringify(o));
},

View File

@ -394,7 +394,7 @@ CurrentSessionMeasurement.prototype = Object.freeze({
},
_serializeJSONSingular: function (data) {
let result = {};
let result = {"_v": this.version};
for (let [field, value] of data) {
result[field] = value[1];
@ -535,7 +535,9 @@ ActiveAddonsMeasurement.prototype = Object.freeze({
}
// Exceptions are caught in the caller.
return JSON.parse(data.get("addons")[1]);
let result = JSON.parse(data.get("addons")[1]);
result._v = this.version;
return result;
},
});

View File

@ -204,8 +204,10 @@ add_task(function test_json_payload_dummy_provider() {
print(payload);
let o = JSON.parse(payload);
let name = "DummyProvider.DummyMeasurement";
do_check_eq(Object.keys(o.data.last).length, 1);
do_check_true("DummyProvider.DummyMeasurement.1" in o.data.last);
do_check_true(name in o.data.last);
do_check_eq(o.data.last[name]._v, 1);
reporter._shutdown();
});

View File

@ -111,9 +111,10 @@ add_task(function test_collect() {
let serializer = active.serializer(active.SERIALIZE_JSON);
let serialized = serializer.singular(data.singular);
do_check_eq(typeof(serialized), "object");
do_check_eq(Object.keys(serialized).length, 2);
do_check_eq(Object.keys(serialized).length, 3); // Our two keys, plus _v.
do_check_true("addon0" in serialized);
do_check_true("addon1" in serialized);
do_check_eq(serialized._v, 1);
let counts = provider.getMeasurement("counts", 1);
data = yield counts.getValues();

View File

@ -34,6 +34,7 @@ add_task(function test_collect_smoketest() {
let serializer = m.serializer(m.SERIALIZE_JSON);
let d = serializer.singular(data.singular);
do_check_eq(d._v, 1);
do_check_eq(d.vendor, "Mozilla");
do_check_eq(d.name, "xpcshell");
do_check_eq(d.id, "xpcshell@tests.mozilla.org");

View File

@ -170,6 +170,7 @@ add_task(function test_serialization() {
let serializer = current.serializer(current.SERIALIZE_JSON);
let fields = serializer.singular(data.singular);
do_check_eq(fields._v, 2);
do_check_eq(fields.activeTicks, 0);
do_check_eq(fields.startDay, Metrics.dateToDays(recorder.startDate));
do_check_eq(fields.main, 500);

View File

@ -32,6 +32,7 @@ add_task(function test_collect_smoketest() {
let serializer = m.serializer(m.SERIALIZE_JSON);
let d = serializer.singular(data.singular);
do_check_eq(d._v, 1);
do_check_true(d.cpuCount > 0);
do_check_neq(d.name, null);

View File

@ -246,7 +246,7 @@ Measurement.prototype = Object.freeze({
},
_serializeJSONSingular: function (data) {
let result = {};
let result = {"_v": this.version};
for (let [field, data] of data) {
// There could be legacy fields in storage we no longer care about.
@ -278,7 +278,7 @@ Measurement.prototype = Object.freeze({
},
_serializeJSONDay: function (data) {
let result = {};
let result = {"_v": this.version};
for (let [field, data] of data) {
if (!this._fieldsByName.has(field)) {

View File

@ -1204,6 +1204,21 @@ MetricsStorageSqliteBackend.prototype = Object.freeze({
});
},
/**
* Reduce memory usage as much as possible.
*
* This returns a promise that will be resolved on completion.
*
* @return Promise<>
*/
compact: function () {
let self = this;
return this.enqueueOperation(function doCompact() {
self._connection.discardCachedStatements();
return self._connection.shrinkMemory();
});
},
/**
* Ensure a field ID matches a specified type.
*

View File

@ -256,15 +256,17 @@ add_task(function test_serialize_json_default() {
let serializer = m.serializer(m.SERIALIZE_JSON);
let formatted = serializer.singular(data.singular);
do_check_eq(Object.keys(formatted).length, 2);
do_check_eq(Object.keys(formatted).length, 3); // Our keys + _v.
do_check_true("last-numeric" in formatted);
do_check_true("last-text" in formatted);
do_check_eq(formatted["last-numeric"], 6);
do_check_eq(formatted["last-text"], "hello");
do_check_eq(formatted["_v"], 1);
formatted = serializer.daily(data.days.getDay(now));
do_check_eq(Object.keys(formatted).length, 5);
do_check_eq(Object.keys(formatted).length, 6); // Our keys + _v.
do_check_eq(formatted["daily-counter"], 2);
do_check_eq(formatted["_v"], 1);
do_check_true(Array.isArray(formatted["daily-discrete-numeric"]));
do_check_eq(formatted["daily-discrete-numeric"].length, 2);

View File

@ -8,7 +8,7 @@ this.EXPORTED_SYMBOLS = [
"Sqlite",
];
const {interfaces: Ci, utils: Cu} = Components;
const {classes: Cc, interfaces: Ci, utils: Cu} = Components;
Cu.import("resource://gre/modules/commonjs/promise/core.js");
Cu.import("resource://gre/modules/osfile.jsm");
@ -43,6 +43,13 @@ let connectionCounters = {};
* to obtain a lock, possibly making database access slower. Defaults to
* true.
*
* shrinkMemoryOnConnectionIdleMS -- (integer) If defined, the connection
* will attempt to minimize its memory usage after this many
* milliseconds of connection idle. The connection is idle when no
* statements are executing. There is no default value which means no
* automatic memory minimization will occur. Please note that this is
* *not* a timer on the idle service and this could fire while the
* application is active.
*
* FUTURE options to control:
*
@ -69,6 +76,18 @@ function openConnection(options) {
let sharedMemoryCache = "sharedMemoryCache" in options ?
options.sharedMemoryCache : true;
let openedOptions = {};
if ("shrinkMemoryOnConnectionIdleMS" in options) {
if (!Number.isInteger(options.shrinkMemoryOnConnectionIdleMS)) {
throw new Error("shrinkMemoryOnConnectionIdleMS must be an integer. " +
"Got: " + options.shrinkMemoryOnConnectionIdleMS);
}
openedOptions.shrinkMemoryOnConnectionIdleMS =
options.shrinkMemoryOnConnectionIdleMS;
}
let file = FileUtils.File(path);
let openDatabaseFn = sharedMemoryCache ?
Services.storage.openDatabase :
@ -92,7 +111,8 @@ function openConnection(options) {
return Promise.reject(new Error("Connection is not ready."));
}
return Promise.resolve(new OpenedConnection(connection, basename, number));
return Promise.resolve(new OpenedConnection(connection, basename, number,
openedOptions));
} catch (ex) {
log.warn("Could not open database: " + CommonUtils.exceptionStr(ex));
return Promise.reject(ex);
@ -143,8 +163,11 @@ function openConnection(options) {
* (string) The basename of this database name. Used for logging.
* @param number
* (Number) The connection number to this database.
* @param options
* (object) Options to control behavior of connection. See
* `openConnection`.
*/
function OpenedConnection(connection, basename, number) {
function OpenedConnection(connection, basename, number, options) {
let log = Log4Moz.repository.getLogger("Sqlite.Connection." + basename);
// getLogger() returns a shared object. We can't modify the functions on this
@ -176,10 +199,23 @@ function OpenedConnection(connection, basename, number) {
this._cachedStatements = new Map();
this._anonymousStatements = new Map();
this._anonymousCounter = 0;
this._inProgressStatements = new Map();
this._inProgressCounter = 0;
// A map from statement index to mozIStoragePendingStatement, to allow for
// canceling prior to finalizing the mozIStorageStatements.
this._pendingStatements = new Map();
// Increments for each executed statement for the life of the connection.
this._statementCounter = 0;
this._inProgressTransaction = null;
this._idleShrinkMS = options.shrinkMemoryOnConnectionIdleMS;
if (this._idleShrinkMS) {
this._idleShrinkTimer = Cc["@mozilla.org/timer;1"]
.createInstance(Ci.nsITimer);
// We wait for the first statement execute to start the timer because
// shrinking now would not do anything.
}
}
OpenedConnection.prototype = Object.freeze({
@ -259,7 +295,7 @@ OpenedConnection.prototype = Object.freeze({
}
this._log.debug("Request to close connection.");
this._clearIdleShrinkTimer();
let deferred = Promise.defer();
// We need to take extra care with transactions during shutdown.
@ -287,11 +323,14 @@ OpenedConnection.prototype = Object.freeze({
_finalize: function (deferred) {
this._log.debug("Finalizing connection.");
// Cancel any in-progress statements.
for (let [k, statement] of this._inProgressStatements) {
// Cancel any pending statements.
for (let [k, statement] of this._pendingStatements) {
statement.cancel();
}
this._inProgressStatements.clear();
this._pendingStatements.clear();
// We no longer need to track these.
this._statementCounter = 0;
// Next we finalize all active statements.
for (let [k, statement] of this._anonymousStatements) {
@ -389,7 +428,27 @@ OpenedConnection.prototype = Object.freeze({
this._cachedStatements.set(sql, statement);
}
return this._executeStatement(sql, statement, params, onRow);
this._clearIdleShrinkTimer();
let deferred = Promise.defer();
try {
this._executeStatement(sql, statement, params, onRow).then(
function onResult(result) {
this._startIdleShrinkTimer();
deferred.resolve(result);
}.bind(this),
function onError(error) {
this._startIdleShrinkTimer();
deferred.reject(error);
}.bind(this)
);
} catch (ex) {
this._startIdleShrinkTimer();
throw ex;
}
return deferred.promise;
},
/**
@ -418,22 +477,32 @@ OpenedConnection.prototype = Object.freeze({
let index = this._anonymousCounter++;
this._anonymousStatements.set(index, statement);
this._clearIdleShrinkTimer();
let onFinished = function () {
this._anonymousStatements.delete(index);
statement.finalize();
this._startIdleShrinkTimer();
}.bind(this);
let deferred = Promise.defer();
this._executeStatement(sql, statement, params, onRow).then(
function onResult(rows) {
this._anonymousStatements.delete(index);
statement.finalize();
deferred.resolve(rows);
}.bind(this),
try {
this._executeStatement(sql, statement, params, onRow).then(
function onResult(rows) {
onFinished();
deferred.resolve(rows);
}.bind(this),
function onError(error) {
this._anonymousStatements.delete(index);
statement.finalize();
deferred.reject(error);
}.bind(this)
);
function onError(error) {
onFinished();
deferred.reject(error);
}.bind(this)
);
} catch (ex) {
onFinished();
throw ex;
}
return deferred.promise;
},
@ -586,6 +655,40 @@ OpenedConnection.prototype = Object.freeze({
);
},
/**
* Free up as much memory from the underlying database connection as possible.
*
* @return Promise<>
*/
shrinkMemory: function () {
this._log.info("Shrinking memory usage.");
let onShrunk = this._clearIdleShrinkTimer.bind(this);
return this.execute("PRAGMA shrink_memory").then(onShrunk, onShrunk);
},
/**
* Discard all cached statements.
*
* Note that this relies on us being non-interruptible between
* the insertion or retrieval of a statement in the cache and its
* execution: we finalize all statements, which is only safe if
* they will not be executed again.
*
* @return (integer) the number of statements discarded.
*/
discardCachedStatements: function () {
let count = 0;
for (let [k, statement] of this._cachedStatements) {
++count;
statement.finalize();
}
this._cachedStatements.clear();
this._log.debug("Discarded " + count + " cached statements.");
return count;
},
_executeStatement: function (sql, statement, params, onRow) {
if (statement.state != statement.MOZ_STORAGE_STATEMENT_READY) {
throw new Error("Statement is not ready for execution.");
@ -608,7 +711,7 @@ OpenedConnection.prototype = Object.freeze({
"object. Got: " + params);
}
let index = this._inProgressCounter++;
let index = this._statementCounter++;
let deferred = Promise.defer();
let userCancelled = false;
@ -659,8 +762,8 @@ OpenedConnection.prototype = Object.freeze({
},
handleCompletion: function (reason) {
self._log.debug("Stmt #" + index + " finished");
self._inProgressStatements.delete(index);
self._log.debug("Stmt #" + index + " finished.");
self._pendingStatements.delete(index);
switch (reason) {
case Ci.mozIStorageStatementCallback.REASON_FINISHED:
@ -695,8 +798,7 @@ OpenedConnection.prototype = Object.freeze({
},
});
this._inProgressStatements.set(index, pending);
this._pendingStatements.set(index, pending);
return deferred.promise;
},
@ -705,6 +807,24 @@ OpenedConnection.prototype = Object.freeze({
throw new Error("Connection is not open.");
}
},
_clearIdleShrinkTimer: function () {
if (!this._idleShrinkTimer) {
return;
}
this._idleShrinkTimer.cancel();
},
_startIdleShrinkTimer: function () {
if (!this._idleShrinkTimer) {
return;
}
this._idleShrinkTimer.initWithCallback(this.shrinkMemory.bind(this),
this._idleShrinkMS,
this._idleShrinkTimer.TYPE_ONE_SHOT);
},
});
this.Sqlite = {

View File

@ -3,7 +3,7 @@
"use strict";
const {utils: Cu} = Components;
const {classes: Cc, interfaces: Ci, utils: Cu} = Components;
do_get_profile();
@ -12,23 +12,46 @@ Cu.import("resource://gre/modules/osfile.jsm");
Cu.import("resource://gre/modules/Sqlite.jsm");
Cu.import("resource://gre/modules/Task.jsm");
// To spin the event loop in test.
Cu.import("resource://services-common/async.js");
function getConnection(dbName) {
let path = dbName + ".sqlite";
function sleep(ms) {
let deferred = Promise.defer();
return Sqlite.openConnection({path: path});
let timer = Cc["@mozilla.org/timer;1"]
.createInstance(Ci.nsITimer);
timer.initWithCallback({
notify: function () {
deferred.resolve();
},
}, ms, timer.TYPE_ONE_SHOT);
return deferred.promise;
}
function getDummyDatabase(name) {
function getConnection(dbName, extraOptions={}) {
let path = dbName + ".sqlite";
let options = {path: path};
for (let [k, v] in Iterator(extraOptions)) {
options[k] = v;
}
return Sqlite.openConnection(options);
}
function getDummyDatabase(name, extraOptions={}) {
const TABLES = {
dirs: "id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT",
files: "id INTEGER PRIMARY KEY AUTOINCREMENT, dir_id INTEGER, path TEXT",
};
let c = yield getConnection(name);
let c = yield getConnection(name, extraOptions);
c._initialStatementCount = 0;
for (let [k, v] in Iterator(TABLES)) {
yield c.execute("CREATE TABLE " + k + "(" + v + ")");
c._initialStatementCount++;
}
throw new Task.Result(c);
@ -161,11 +184,17 @@ add_task(function test_execute_invalid_statement() {
let deferred = Promise.defer();
do_check_eq(c._anonymousStatements.size, 0);
c.execute("SELECT invalid FROM unknown").then(do_throw, function onError(error) {
deferred.resolve();
});
yield deferred.promise;
// Ensure we don't leak the statement instance.
do_check_eq(c._anonymousStatements.size, 0);
yield c.close();
});
@ -323,3 +352,223 @@ add_task(function test_detect_multiple_transactions() {
yield c.close();
});
add_task(function test_shrink_memory() {
let c = yield getDummyDatabase("shrink_memory");
// It's just a simple sanity test. We have no way of measuring whether this
// actually does anything.
yield c.shrinkMemory();
yield c.close();
});
add_task(function test_no_shrink_on_init() {
let c = yield getConnection("no_shrink_on_init",
{shrinkMemoryOnConnectionIdleMS: 200});
let oldShrink = c.shrinkMemory;
let count = 0;
Object.defineProperty(c, "shrinkMemory", {
value: function () {
count++;
},
});
// We should not shrink until a statement has been executed.
yield sleep(220);
do_check_eq(count, 0);
yield c.execute("SELECT 1");
yield sleep(220);
do_check_eq(count, 1);
yield c.close();
});
add_task(function test_idle_shrink_fires() {
let c = yield getDummyDatabase("idle_shrink_fires",
{shrinkMemoryOnConnectionIdleMS: 200});
c._clearIdleShrinkTimer();
let oldShrink = c.shrinkMemory;
let shrinkPromises = [];
let count = 0;
Object.defineProperty(c, "shrinkMemory", {
value: function () {
count++;
let promise = oldShrink.call(c);
shrinkPromises.push(promise);
return promise;
},
});
// We reset the idle shrink timer after monkeypatching because otherwise the
// installed timer callback will reference the non-monkeypatched function.
c._startIdleShrinkTimer();
yield sleep(220);
do_check_eq(count, 1);
do_check_eq(shrinkPromises.length, 1);
yield shrinkPromises[0];
shrinkPromises.shift();
// We shouldn't shrink again unless a statement was executed.
yield sleep(300);
do_check_eq(count, 1);
yield c.execute("SELECT 1");
yield sleep(300);
do_check_eq(count, 2);
do_check_eq(shrinkPromises.length, 1);
yield shrinkPromises[0];
yield c.close();
});
add_task(function test_idle_shrink_reset_on_operation() {
const INTERVAL = 500;
let c = yield getDummyDatabase("idle_shrink_reset_on_operation",
{shrinkMemoryOnConnectionIdleMS: INTERVAL});
c._clearIdleShrinkTimer();
let oldShrink = c.shrinkMemory;
let shrinkPromises = [];
let count = 0;
Object.defineProperty(c, "shrinkMemory", {
value: function () {
count++;
let promise = oldShrink.call(c);
shrinkPromises.push(promise);
return promise;
},
});
let now = new Date();
c._startIdleShrinkTimer();
let initialIdle = new Date(now.getTime() + INTERVAL);
// Perform database operations until initial scheduled time has been passed.
let i = 0;
while (new Date() < initialIdle) {
yield c.execute("INSERT INTO dirs (path) VALUES (?)", ["" + i]);
i++;
}
do_check_true(i > 0);
// We should not have performed an idle while doing operations.
do_check_eq(count, 0);
// Wait for idle timer.
yield sleep(INTERVAL);
// Ensure we fired.
do_check_eq(count, 1);
do_check_eq(shrinkPromises.length, 1);
yield shrinkPromises[0];
yield c.close();
});
add_task(function test_in_progress_counts() {
let c = yield getDummyDatabase("in_progress_counts");
do_check_eq(c._statementCounter, c._initialStatementCount);
do_check_eq(c._pendingStatements.size, 0);
yield c.executeCached("INSERT INTO dirs (path) VALUES ('foo')");
do_check_eq(c._statementCounter, c._initialStatementCount + 1);
do_check_eq(c._pendingStatements.size, 0);
let expectOne;
let expectTwo;
// Please forgive me.
let inner = Async.makeSpinningCallback();
let outer = Async.makeSpinningCallback();
// We want to make sure that two queries executing simultaneously
// result in `_pendingStatements.size` reaching 2, then dropping back to 0.
//
// To do so, we kick off a second statement within the row handler
// of the first, then wait for both to finish.
yield c.executeCached("SELECT * from dirs", null, function onRow() {
// In the onRow handler, we're still an outstanding query.
// Expect a single in-progress entry.
expectOne = c._pendingStatements.size;
// Start another query, checking that after its statement has been created
// there are two statements in progress.
let p = c.executeCached("SELECT 10, path from dirs");
expectTwo = c._pendingStatements.size;
// Now wait for it to be done before we return from the row handler …
p.then(function onInner() {
inner();
});
}).then(function onOuter() {
// … and wait for the inner to be done before we finish …
inner.wait();
outer();
});
// … and wait for both queries to have finished before we go on and
// test postconditions.
outer.wait();
do_check_eq(expectOne, 1);
do_check_eq(expectTwo, 2);
do_check_eq(c._statementCounter, c._initialStatementCount + 3);
do_check_eq(c._pendingStatements.size, 0);
yield c.close();
});
add_task(function test_discard_while_active() {
let c = yield getDummyDatabase("discard_while_active");
yield c.executeCached("INSERT INTO dirs (path) VALUES ('foo')");
yield c.executeCached("INSERT INTO dirs (path) VALUES ('bar')");
let discarded = -1;
let first = true;
let sql = "SELECT * FROM dirs";
yield c.executeCached(sql, null, function onRow(row) {
if (!first) {
return;
}
first = false;
discarded = c.discardCachedStatements();
});
// We discarded everything, because the SELECT had already started to run.
do_check_eq(3, discarded);
// And again is safe.
do_check_eq(0, c.discardCachedStatements());
yield c.close();
});
add_task(function test_discard_cached() {
let c = yield getDummyDatabase("discard_cached");
yield c.executeCached("SELECT * from dirs");
do_check_eq(1, c._cachedStatements.size);
yield c.executeCached("SELECT * from files");
do_check_eq(2, c._cachedStatements.size);
yield c.executeCached("SELECT * from dirs");
do_check_eq(2, c._cachedStatements.size);
c.discardCachedStatements();
do_check_eq(0, c._cachedStatements.size);
yield c.close();
});