Bug 1120380 - Update the retention logic for archived pings. r=gfritzsche

This commit is contained in:
Alessio Placitelli 2015-05-19 05:24:00 -04:00
parent 9222b51622
commit 4464c17c83
3 changed files with 239 additions and 81 deletions

View File

@ -60,13 +60,6 @@ this.TelemetryArchive = {
promiseArchivePing: function(ping) { promiseArchivePing: function(ping) {
return TelemetryArchiveImpl.promiseArchivePing(ping); return TelemetryArchiveImpl.promiseArchivePing(ping);
}, },
/**
* Used in tests only to fake a restart of the module.
*/
_testReset: function() {
TelemetryArchiveImpl._testReset();
},
}; };
/** /**
@ -81,12 +74,6 @@ function shouldArchivePings() {
let TelemetryArchiveImpl = { let TelemetryArchiveImpl = {
_logger: null, _logger: null,
// Tracks the archived pings in a Map of (id -> {timestampCreated, type}).
// We use this to cache info on archived pings to avoid scanning the disk more than once.
_archivedPings: new Map(),
// Whether we already scanned the archived pings on disk.
_scannedArchiveDirectory: false,
get _log() { get _log() {
if (!this._logger) { if (!this._logger) {
this._logger = Log.repository.getLoggerWithMessagePrefix(LOGGER_NAME, LOGGER_PREFIX); this._logger = Log.repository.getLoggerWithMessagePrefix(LOGGER_NAME, LOGGER_PREFIX);
@ -95,11 +82,6 @@ let TelemetryArchiveImpl = {
return this._logger; return this._logger;
}, },
_testReset: function() {
this._archivedPings = new Map();
this._scannedArchiveDirectory = false;
},
promiseArchivePing: function(ping) { promiseArchivePing: function(ping) {
if (!shouldArchivePings()) { if (!shouldArchivePings()) {
this._log.trace("promiseArchivePing - archiving is disabled"); this._log.trace("promiseArchivePing - archiving is disabled");
@ -113,27 +95,11 @@ let TelemetryArchiveImpl = {
} }
} }
const creationDate = new Date(ping.creationDate);
if (this._archivedPings.has(ping.id)) {
const data = this._archivedPings.get(ping.id);
if (data.timestampCreated > creationDate.getTime()) {
this._log.error("promiseArchivePing - trying to overwrite newer ping with the same id");
return Promise.reject(new Error("trying to overwrite newer ping with the same id"));
} else {
this._log.warn("promiseArchivePing - overwriting older ping with the same id");
}
}
this._archivedPings.set(ping.id, {
timestampCreated: creationDate.getTime(),
type: ping.type,
});
return TelemetryStorage.saveArchivedPing(ping); return TelemetryStorage.saveArchivedPing(ping);
}, },
_buildArchivedPingList: function() { _buildArchivedPingList: function(archivedPingsMap) {
let list = [for (p of this._archivedPings) { let list = [for (p of archivedPingsMap) {
id: p[0], id: p[0],
timestampCreated: p[1].timestampCreated, timestampCreated: p[1].timestampCreated,
type: p[1].type, type: p[1].type,
@ -147,33 +113,13 @@ let TelemetryArchiveImpl = {
promiseArchivedPingList: function() { promiseArchivedPingList: function() {
this._log.trace("promiseArchivedPingList"); this._log.trace("promiseArchivedPingList");
if (this._scannedArchiveDirectory) { return TelemetryStorage.loadArchivedPingList().then(loadedInfo => {
return Promise.resolve(this._buildArchivedPingList()) return this._buildArchivedPingList(loadedInfo);
}
return TelemetryStorage.loadArchivedPingList().then((loadedInfo) => {
// Add the ping info from scanning to the existing info.
// We might have pings added before lazily loading this list.
for (let [id, info] of loadedInfo) {
this._archivedPings.set(id, {
timestampCreated: info.timestampCreated,
type: info.type,
});
}
this._scannedArchiveDirectory = true;
return this._buildArchivedPingList();
}); });
}, },
promiseArchivedPingById: function(id) { promiseArchivedPingById: function(id) {
this._log.trace("promiseArchivedPingById - id: " + id); this._log.trace("promiseArchivedPingById - id: " + id);
const data = this._archivedPings.get(id); return TelemetryStorage.loadArchivedPing(id);
if (!data) {
this._log.trace("promiseArchivedPingById - no ping with id: " + id);
return Promise.reject(new Error("TelemetryArchive.promiseArchivedPingById - no ping with id " + id));
}
return TelemetryStorage.loadArchivedPing(id, data.timestampCreated, data.type);
}, },
}; };

View File

@ -174,6 +174,7 @@ this.TelemetryController = Object.freeze({
*/ */
reset: function() { reset: function() {
Impl._clientID = null; Impl._clientID = null;
TelemetryStorage.reset();
return this.setup(); return this.setup();
}, },
/** /**
@ -1067,6 +1068,10 @@ let Impl = {
this._clientID = yield ClientID.getClientID(); this._clientID = yield ClientID.getClientID();
Preferences.set(PREF_CACHED_CLIENTID, this._clientID); Preferences.set(PREF_CACHED_CLIENTID, this._clientID);
// Purge the pings archive by removing outdated pings. We don't wait for this
// task to complete, but TelemetryStorage blocks on it during shutdown.
TelemetryStorage.runCleanPingArchiveTask();
Telemetry.asyncFetchTelemetryData(function () {}); Telemetry.asyncFetchTelemetryData(function () {});
this._delayedInitTaskDeferred.resolve(); this._delayedInitTaskDeferred.resolve();
} catch (e) { } catch (e) {

View File

@ -17,6 +17,7 @@ Cu.import("resource://gre/modules/Services.jsm", this);
Cu.import("resource://gre/modules/XPCOMUtils.jsm", this); Cu.import("resource://gre/modules/XPCOMUtils.jsm", this);
Cu.import("resource://gre/modules/osfile.jsm", this); Cu.import("resource://gre/modules/osfile.jsm", this);
Cu.import("resource://gre/modules/Task.jsm", this); Cu.import("resource://gre/modules/Task.jsm", this);
Cu.import("resource://gre/modules/TelemetryUtils.jsm", this);
Cu.import("resource://gre/modules/Promise.jsm", this); Cu.import("resource://gre/modules/Promise.jsm", this);
XPCOMUtils.defineLazyModuleGetter(this, 'Deprecated', XPCOMUtils.defineLazyModuleGetter(this, 'Deprecated',
@ -52,6 +53,9 @@ const OVERDUE_PING_FILE_AGE = 7 * 24 * 60 * 60 * 1000; // 1 week
// Maximum number of pings to save. // Maximum number of pings to save.
const MAX_LRU_PINGS = 50; const MAX_LRU_PINGS = 50;
// Maxmimum time, in milliseconds, archive pings should be retained.
const MAX_ARCHIVED_PINGS_RETENTION_MS = 180 * 24 * 60 * 60 * 1000; // 180 days
// The number of outstanding saved pings that we have issued loading // The number of outstanding saved pings that we have issued loading
// requests for. // requests for.
let pingsLoaded = 0; let pingsLoaded = 0;
@ -69,6 +73,13 @@ let pendingPings = [];
let isPingDirectoryCreated = false; let isPingDirectoryCreated = false;
/**
* This is a policy object used to override behavior for testing.
*/
let Policy = {
now: () => new Date(),
};
this.TelemetryStorage = { this.TelemetryStorage = {
get MAX_PING_FILE_AGE() { get MAX_PING_FILE_AGE() {
return MAX_PING_FILE_AGE; return MAX_PING_FILE_AGE;
@ -109,12 +120,34 @@ this.TelemetryStorage = {
* Load an archived ping from disk. * Load an archived ping from disk.
* *
* @param {string} id The pings id. * @param {string} id The pings id.
* @param {number} timestampCreated The pings creation timestamp.
* @param {string} type The pings type.
* @return {promise<object>} Promise that is resolved with the ping data. * @return {promise<object>} Promise that is resolved with the ping data.
*/ */
loadArchivedPing: function(id, timestampCreated, type) { loadArchivedPing: function(id) {
return TelemetryStorageImpl.loadArchivedPing(id, timestampCreated, type); return TelemetryStorageImpl.loadArchivedPing(id);
},
/**
* Clean the pings archive by removing old pings.
* This will scan the archive directory.
*
* @return {Promise} Resolved when the cleanup task completes.
*/
runCleanPingArchiveTask: function() {
return TelemetryStorageImpl.runCleanPingArchiveTask();
},
/**
* Reset the storage state in tests.
*/
reset: function() {
return TelemetryStorageImpl.reset();
},
/**
* Test method that allows waiting on the archive clean task to finish.
*/
testCleanupTaskPromise: function() {
return (TelemetryStorageImpl._archiveCleanTask || Promise.resolve());
}, },
/** /**
@ -398,6 +431,19 @@ let TelemetryStorageImpl = {
// Used to serialize aborted session ping writes to disk. // Used to serialize aborted session ping writes to disk.
_abortedSessionSerializer: new SaveSerializer(), _abortedSessionSerializer: new SaveSerializer(),
// Tracks the archived pings in a Map of (id -> {timestampCreated, type}).
// We use this to cache info on archived pings to avoid scanning the disk more than once.
_archivedPings: new Map(),
// Track the archive loading task to prevent multiple tasks from being executed.
_archiveCleanTaskArchiveLoadingTask: null,
// Track the archive cleanup task.
_archiveCleanTask: null,
// Whether we already scanned the archived pings on disk.
_scannedArchiveDirectory: false,
// Track the shutdown process to bail out of the clean up task quickly.
_shutdown: false,
get _log() { get _log() {
if (!this._logger) { if (!this._logger) {
this._logger = Log.repository.getLoggerWithMessagePrefix(LOGGER_NAME, LOGGER_PREFIX); this._logger = Log.repository.getLoggerWithMessagePrefix(LOGGER_NAME, LOGGER_PREFIX);
@ -412,7 +458,11 @@ let TelemetryStorageImpl = {
* @return {Promise} Promise that is resolved when shutdown is complete. * @return {Promise} Promise that is resolved when shutdown is complete.
*/ */
shutdown: Task.async(function*() { shutdown: Task.async(function*() {
this._shutdown = true;
yield this._abortedSessionSerializer.flushTasks(); yield this._abortedSessionSerializer.flushTasks();
// If the archive cleaning task is running, block on it. It should bail out as soon
// as possible.
yield this._archiveCleanTask;
}), }),
/** /**
@ -423,24 +473,44 @@ let TelemetryStorageImpl = {
*/ */
saveArchivedPing: Task.async(function*(ping) { saveArchivedPing: Task.async(function*(ping) {
const creationDate = new Date(ping.creationDate); const creationDate = new Date(ping.creationDate);
if (this._archivedPings.has(ping.id)) {
const data = this._archivedPings.get(ping.id);
if (data.timestampCreated > creationDate.getTime()) {
this._log.error("saveArchivedPing - trying to overwrite newer ping with the same id");
return Promise.reject(new Error("trying to overwrite newer ping with the same id"));
} else {
this._log.warn("saveArchivedPing - overwriting older ping with the same id");
}
}
// Get the archived ping path and append the lz4 suffix to it (so we have 'jsonlz4'). // Get the archived ping path and append the lz4 suffix to it (so we have 'jsonlz4').
const filePath = getArchivedPingPath(ping.id, creationDate, ping.type) + "lz4"; const filePath = getArchivedPingPath(ping.id, creationDate, ping.type) + "lz4";
yield OS.File.makeDir(OS.Path.dirname(filePath), { ignoreExisting: true, yield OS.File.makeDir(OS.Path.dirname(filePath), { ignoreExisting: true,
from: OS.Constants.Path.profileDir }); from: OS.Constants.Path.profileDir });
yield this.savePingToFile(ping, filePath, /*overwrite*/ true, /*compressed*/ true); yield this.savePingToFile(ping, filePath, /*overwrite*/ true, /*compressed*/ true);
this._archivedPings.set(ping.id, {
timestampCreated: creationDate.getTime(),
type: ping.type,
});
}), }),
/** /**
* Load an archived ping from disk. * Load an archived ping from disk.
* *
* @param {string} id The pings id. * @param {string} id The pings id.
* @param {number} timestampCreated The pings creation timestamp.
* @param {string} type The pings type.
* @return {promise<object>} Promise that is resolved with the ping data. * @return {promise<object>} Promise that is resolved with the ping data.
*/ */
loadArchivedPing: Task.async(function*(id, timestampCreated, type) { loadArchivedPing: Task.async(function*(id) {
this._log.trace("loadArchivedPing - id: " + id + ", timestampCreated: " + timestampCreated + ", type: " + type); this._log.trace("loadArchivedPing - id: " + id);
const path = getArchivedPingPath(id, new Date(timestampCreated), type);
const data = this._archivedPings.get(id);
if (!data) {
this._log.trace("loadArchivedPing - no ping with id: " + id);
return Promise.reject(new Error("TelemetryStorage.loadArchivedPing - no ping with id " + id));
}
const path = getArchivedPingPath(id, new Date(data.timestampCreated), data.type);
const pathCompressed = path + "lz4"; const pathCompressed = path + "lz4";
try { try {
@ -472,6 +542,95 @@ let TelemetryStorageImpl = {
yield OS.File.remove(pathCompressed, {ignoreAbsent: true}); yield OS.File.remove(pathCompressed, {ignoreAbsent: true});
}), }),
/**
* Clean the pings archive by removing old pings.
*
* @return {Promise} Resolved when the cleanup task completes.
*/
runCleanPingArchiveTask: function() {
// If there's an archive cleaning task already running, return it.
if (this._archiveCleanTask) {
return this._archiveCleanTask;
}
// Make sure to clear |_archiveCleanTask| once done.
let clear = () => this._archiveCleanTask = null;
// Since there's no archive cleaning task running, start it.
this._archiveCleanTask = this.cleanArchiveTask().then(clear, clear);
return this._archiveCleanTask;
},
cleanArchiveTask: Task.async(function*() {
this._log.trace("cleanArchiveTask");
if (!(yield OS.File.exists(gPingsArchivePath))) {
return;
}
const now = Policy.now().getTime();
let dirIterator = new OS.File.DirectoryIterator(gPingsArchivePath);
let subdirs = (yield dirIterator.nextBatch()).filter(e => e.isDir);
// Keep track of the newest removed month to update the cache, if needed.
let newestRemovedMonth = null;
// Walk through the monthly subdirs of the form <YYYY-MM>/
for (let dir of subdirs) {
if (this._shutdown) {
this._log.trace("cleanArchiveTask - Terminating the clean up task due to shutdown");
return;
}
if (!isValidArchiveDir(dir.name)) {
this._log.warn("cleanArchiveTask - skipping invalidly named subdirectory " + dir.path);
continue;
}
const archiveDate = getDateFromArchiveDir(dir.name);
if (!archiveDate) {
this._log.warn("cleanArchiveTask - skipping invalid subdirectory date " + dir.path);
continue;
}
// If this archive directory is older than 180 days, remove it.
if (!TelemetryUtils.areTimesClose(archiveDate.getTime(), now,
MAX_ARCHIVED_PINGS_RETENTION_MS)) {
try {
yield OS.File.removeDir(dir.path);
// Update the newest removed month.
if (archiveDate > newestRemovedMonth) {
newestRemovedMonth = archiveDate;
}
} catch (ex) {
this._log.error("cleanArchiveTask - Unable to remove " + dir.path, ex);
}
}
}
// If the archive directory was already scanned, filter the ping archive cache.
if (this._scannedArchiveDirectory && newestRemovedMonth) {
// Scan the archive cache for pings older than the newest directory pruned above.
for (let [id, info] of this._archivedPings) {
const timestampCreated = new Date(info.timestampCreated);
if (timestampCreated.getTime() > newestRemovedMonth.getTime()) {
continue;
}
// Remove outdated pings from the cache.
this._archivedPings.delete(id);
}
}
}),
/**
* Reset the storage state in tests.
*/
reset: function() {
this._shutdown = false;
this._scannedArchiveDirectory = false;
this._archivedPings = new Map();
},
/** /**
* Get a list of info on the archived pings. * Get a list of info on the archived pings.
* This will scan the archive directory and grab basic data about the existing * This will scan the archive directory and grab basic data about the existing
@ -479,26 +638,45 @@ let TelemetryStorageImpl = {
* *
* @return {promise<sequence<object>>} * @return {promise<sequence<object>>}
*/ */
loadArchivedPingList: Task.async(function*() { loadArchivedPingList: function() {
this._log.trace("loadArchivedPingList"); // If there's an archive loading task already running, return it.
if (this._archiveScanningTask) {
return this._archiveScanningTask;
}
if (this._scannedArchiveDirectory) {
this._log.trace("loadArchivedPingList - Archive already scanned, hitting cache.");
return Promise.resolve(this._archivedPings);
}
// Make sure to clear |_archiveScanningTask| once done.
let clear = pingList => {
this._archiveScanningTask = null;
return pingList;
};
// Since there's no archive loading task running, start it.
this._archiveScanningTask = this._scanArchive().then(clear, clear);
return this._archiveScanningTask;
},
_scanArchive: Task.async(function*() {
this._log.trace("_scanArchive");
if (!(yield OS.File.exists(gPingsArchivePath))) { if (!(yield OS.File.exists(gPingsArchivePath))) {
return new Map(); return new Map();
} }
let archivedPings = new Map();
let dirIterator = new OS.File.DirectoryIterator(gPingsArchivePath); let dirIterator = new OS.File.DirectoryIterator(gPingsArchivePath);
let subdirs = (yield dirIterator.nextBatch()).filter(e => e.isDir); let subdirs = (yield dirIterator.nextBatch()).filter(e => e.isDir);
// Walk through the monthly subdirs of the form <YYYY-MM>/ // Walk through the monthly subdirs of the form <YYYY-MM>/
for (let dir of subdirs) { for (let dir of subdirs) {
const dirRegEx = /^[0-9]{4}-[0-9]{2}$/; if (!isValidArchiveDir(dir.name)) {
if (!dirRegEx.test(dir.name)) { this._log.warn("_scanArchive - skipping invalidly named subdirectory " + dir.path);
this._log.warn("loadArchivedPingList - skipping invalidly named subdirectory " + dir.path);
continue; continue;
} }
this._log.trace("loadArchivedPingList - checking in subdir: " + dir.path); this._log.trace("_scanArchive - checking in subdir: " + dir.path);
let pingIterator = new OS.File.DirectoryIterator(dir.path); let pingIterator = new OS.File.DirectoryIterator(dir.path);
let pings = (yield pingIterator.nextBatch()).filter(e => !e.isDir); let pings = (yield pingIterator.nextBatch()).filter(e => !e.isDir);
@ -511,26 +689,28 @@ let TelemetryStorageImpl = {
} }
// In case of conflicts, overwrite only with newer pings. // In case of conflicts, overwrite only with newer pings.
if (archivedPings.has(data.id)) { if (this._archivedPings.has(data.id)) {
const overwrite = data.timestamp > archivedPings.get(data.id).timestampCreated; const overwrite = data.timestamp > this._archivedPings.get(data.id).timestampCreated;
this._log.warn("loadArchivedPingList - have seen this id before: " + data.id + this._log.warn("_scanArchive - have seen this id before: " + data.id +
", overwrite: " + overwrite); ", overwrite: " + overwrite);
if (!overwrite) { if (!overwrite) {
continue; continue;
} }
yield this._removeArchivedPing(data.id, data.timestampCreated, data.type) yield this._removeArchivedPing(data.id, data.timestampCreated, data.type)
.catch((e) => this._log.warn("loadArchivedPingList - failed to remove ping", e)); .catch((e) => this._log.warn("_scanArchive - failed to remove ping", e));
} }
archivedPings.set(data.id, { this._archivedPings.set(data.id, {
timestampCreated: data.timestamp, timestampCreated: data.timestamp,
type: data.type, type: data.type,
}); });
} }
} }
return archivedPings; // Mark the archive as scanned, so we no longer hit the disk.
this._scannedArchiveDirectory = true;
return this._archivedPings;
}), }),
/** /**
@ -915,3 +1095,30 @@ function getArchivedPingPath(aPingId, aDate, aType) {
let fileName = [aDate.getTime(), aPingId, aType, "json"].join("."); let fileName = [aDate.getTime(), aPingId, aType, "json"].join(".");
return OS.Path.join(archivedPingDir, fileName); return OS.Path.join(archivedPingDir, fileName);
} }
/**
* Check if a directory name is in the "YYYY-MM" format.
* @param {String} aDirName The name of the pings archive directory.
* @return {Boolean} True if the directory name is in the right format, false otherwise.
*/
function isValidArchiveDir(aDirName) {
const dirRegEx = /^[0-9]{4}-[0-9]{2}$/;
return dirRegEx.test(aDirName);
}
/**
* Gets a date object from an archive directory name.
* @param {String} aDirName The name of the pings archive directory. Must be in the YYYY-MM
* format.
* @return {Object} A Date object or null if the dir name is not valid.
*/
function getDateFromArchiveDir(aDirName) {
let [year, month] = aDirName.split("-");
year = parseInt(year);
month = parseInt(month);
// Make sure to have sane numbers.
if (!Number.isFinite(month) || !Number.isFinite(year) || month < 1 || month > 12) {
return null;
}
return new Date(year, month - 1, 1, 0, 0, 0);
}