Bug 1185544 - Add data delivery to the WebSocket backend. r=dragana,nsm

This commit is contained in:
Kit Cambridge 2015-09-17 05:08:50 -07:00
parent 142f948e0e
commit a368186f2e
9 changed files with 325 additions and 114 deletions

View File

@ -9,11 +9,43 @@ const Cu = Components.utils;
Cu.importGlobalProperties(['crypto']);
this.EXPORTED_SYMBOLS = ['PushServiceHttp2Crypto', 'concatArray'];
this.EXPORTED_SYMBOLS = ['PushCrypto', 'concatArray',
'getEncryptionKeyParams', 'getEncryptionParams',
'base64UrlDecode'];
var ENCRYPT_INFO = new TextEncoder('utf-8').encode('Content-Encoding: aesgcm128');
var NONCE_INFO = new TextEncoder('utf-8').encode('Content-Encoding: nonce');
this.getEncryptionKeyParams = function(encryptKeyField) {
var params = encryptKeyField.split(',');
return params.reduce((m, p) => {
var pmap = p.split(';').reduce(parseHeaderFieldParams, {});
if (pmap.keyid && pmap.dh) {
m[pmap.keyid] = pmap.dh;
}
return m;
}, {});
};
this.getEncryptionParams = function(encryptField) {
var p = encryptField.split(',', 1)[0];
if (!p) {
return null;
}
return p.split(';').reduce(parseHeaderFieldParams, {});
};
var parseHeaderFieldParams = (m, v) => {
var i = v.indexOf('=');
if (i >= 0) {
// A quoted string with internal quotes is invalid for all the possible
// values of this header field.
m[v.substring(0, i).trim()] = v.substring(i + 1).trim()
.replace(/^"(.*)"$/, '$1');
}
return m;
};
function chunkArray(array, size) {
var start = array.byteOffset || 0;
array = array.buffer || array;
@ -29,7 +61,7 @@ function chunkArray(array, size) {
return result;
}
function base64UrlDecode(s) {
this.base64UrlDecode = function(s) {
s = s.replace(/-/g, '+').replace(/_/g, '/');
// Replace padding if it was stripped by the sender.
@ -55,7 +87,7 @@ function base64UrlDecode(s) {
array[i] = decoded.charCodeAt(i);
}
return array;
}
};
this.concatArray = function(arrays) {
var size = arrays.reduce((total, a) => total + a.byteLength, 0);
@ -108,7 +140,7 @@ function generateNonce(base, index) {
return nonce;
}
this.PushServiceHttp2Crypto = {
this.PushCrypto = {
generateKeys: function() {
return crypto.subtle.generateKey({ name: 'ECDH', namedCurve: 'P-256'},

View File

@ -44,6 +44,8 @@ function PushRecord(props) {
this.originAttributes = props.originAttributes;
this.pushCount = props.pushCount || 0;
this.lastPush = props.lastPush || 0;
this.p256dhPublicKey = props.p256dhPublicKey;
this.p256dhPrivateKey = props.p256dhPrivateKey;
this.setQuota(props.quota);
this.ctime = (typeof props.ctime === "number") ? props.ctime : 0;
}
@ -191,12 +193,14 @@ PushRecord.prototype = {
pushEndpoint: this.pushEndpoint,
lastPush: this.lastPush,
pushCount: this.pushCount,
p256dhKey: this.p256dhPublicKey,
};
},
toRegister() {
return {
pushEndpoint: this.pushEndpoint,
p256dhKey: this.p256dhPublicKey,
};
},
};

View File

@ -28,6 +28,7 @@ Cu.import("resource://gre/modules/Promise.jsm");
const {PushServiceWebSocket} = Cu.import("resource://gre/modules/PushServiceWebSocket.jsm");
const {PushServiceHttp2} = Cu.import("resource://gre/modules/PushServiceHttp2.jsm");
const {PushCrypto} = Cu.import("resource://gre/modules/PushCrypto.jsm");
// Currently supported protocols: WebSocket.
const CONNECTION_PROTOCOLS = [PushServiceWebSocket, PushServiceHttp2];
@ -710,11 +711,11 @@ this.PushService = {
_notifyAllAppsRegister: function() {
debug("notifyAllAppsRegister()");
// records are objects describing the registration as stored in IndexedDB.
return this._db.getAllUnexpired().then(records =>
records.forEach(record =>
this._notifySubscriptionChangeObservers(record)
)
);
return this._db.getAllUnexpired().then(records => {
records.forEach(record => {
this._notifySubscriptionChangeObservers(record);
});
});
},
dropRegistrationAndNotifyApp: function(aKeyId) {
@ -730,9 +731,31 @@ this.PushService = {
.then(record => this._notifySubscriptionChangeObservers(record));
},
ensureP256dhKey: function(record) {
if (record.p256dhPublicKey && record.p256dhPrivateKey) {
return Promise.resolve(record);
}
// We do not have a encryption key. so we need to generate it. This
// is only going to happen on db upgrade from version 4 to higher.
return PushCrypto.generateKeys()
.then(exportedKeys => {
return this.updateRecordAndNotifyApp(record.keyID, record => {
record.p256dhPublicKey = exportedKeys[0];
record.p256dhPrivateKey = exportedKeys[1];
return record;
});
}, error => {
return this.dropRegistrationAndNotifyApp(record.keyID).then(
() => Promise.reject(error));
});
},
updateRecordAndNotifyApp: function(aKeyID, aUpdateFunc) {
return this._db.update(aKeyID, aUpdateFunc)
.then(record => this._notifySubscriptionChangeObservers(record));
.then(record => {
this._notifySubscriptionChangeObservers(record);
return record;
});
},
_recordDidNotNotify: function(reason) {
@ -749,13 +772,14 @@ this.PushService = {
*
* @param {String} keyID The push registration ID.
* @param {String} message The message contents.
* @param {Object} cryptoParams The message encryption settings.
* @param {Function} updateFunc A function that receives the existing
* registration record as its argument, and returns a new record. If the
* function returns `null` or `undefined`, the record will not be updated.
* `PushServiceWebSocket` uses this to drop incoming updates with older
* versions.
*/
receivedPushMessage: function(keyID, message, updateFunc) {
receivedPushMessage: function(keyID, message, cryptoParams, updateFunc) {
debug("receivedPushMessage()");
Services.telemetry.getHistogramById("PUSH_API_NOTIFICATION_RECEIVED").add();
@ -779,10 +803,11 @@ this.PushService = {
this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_NO_VERSION_INCREMENT);
return null;
}
// FIXME(nsm): WHY IS expired checked here but then also checked in the next case?
// Because `unregister` is advisory only, we can still receive messages
// for stale Simple Push registrations from the server. To work around
// this, we check if the record has expired before *and* after updating
// the quota.
if (newRecord.isExpired()) {
// Because `unregister` is advisory only, we can still receive messages
// for stale registrations from the server.
debug("receivedPushMessage: Ignoring update for expired key ID " + keyID);
return null;
}
@ -794,20 +819,33 @@ this.PushService = {
if (!record) {
return notified;
}
if (shouldNotify) {
notified = this._notifyApp(record, message);
let decodedPromise;
if (cryptoParams) {
decodedPromise = PushCrypto.decodeMsg(
message,
record.p256dhPrivateKey,
cryptoParams.dh,
cryptoParams.salt,
cryptoParams.rs
).then(bytes => new TextDecoder("utf-8").decode(bytes));
} else {
decodedPromise = Promise.resolve("");
}
if (record.isExpired()) {
this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_EXPIRED);
// Drop the registration in the background. If the user returns to the
// site, the service worker will be notified on the next `idle-daily`
// event.
this._sendUnregister(record).catch(error => {
debug("receivedPushMessage: Unregister error: " + error);
});
}
return notified;
return decodedPromise.then(message => {
if (shouldNotify) {
notified = this._notifyApp(record, message);
}
if (record.isExpired()) {
this._recordDidNotNotify(kDROP_NOTIFICATION_REASON_EXPIRED);
// Drop the registration in the background. If the user returns to the
// site, the service worker will be notified on the next `idle-daily`
// event.
this._sendUnregister(record).catch(error => {
debug("receivedPushMessage: Unregister error: " + error);
});
}
return notified;
});
}).catch(error => {
debug("receivedPushMessage: Error notifying app: " + error);
});

View File

@ -19,8 +19,12 @@ Cu.import("resource://gre/modules/Timer.jsm");
Cu.import("resource://gre/modules/Preferences.jsm");
Cu.import("resource://gre/modules/Promise.jsm");
const {PushServiceHttp2Crypto, concatArray} =
Cu.import("resource://gre/modules/PushServiceHttp2Crypto.jsm");
const {
PushCrypto,
concatArray,
getEncryptionKeyParams,
getEncryptionParams,
} = Cu.import("resource://gre/modules/PushCrypto.jsm");
this.EXPORTED_SYMBOLS = ["PushServiceHttp2"];
@ -176,30 +180,10 @@ PushChannelListener.prototype = {
}
};
var parseHeaderFieldParams = (m, v) => {
var i = v.indexOf('=');
if (i >= 0) {
// A quoted string with internal quotes is invalid for all the possible
// values of this header field.
m[v.substring(0, i).trim()] = v.substring(i + 1).trim()
.replace(/^"(.*)"$/, '$1');
}
return m;
};
function encryptKeyFieldParser(aRequest) {
try {
var encryptKeyField = aRequest.getRequestHeader("Encryption-Key");
var params = encryptKeyField.split(',');
return params.reduce((m, p) => {
var pmap = p.split(';').reduce(parseHeaderFieldParams, {});
if (pmap.keyid && pmap.dh) {
m[pmap.keyid] = pmap.dh;
}
return m;
}, {});
return getEncryptionKeyParams(encryptKeyField);
} catch(e) {
// getRequestHeader can throw.
return null;
@ -208,10 +192,8 @@ function encryptKeyFieldParser(aRequest) {
function encryptFieldParser(aRequest) {
try {
return aRequest.getRequestHeader("Encryption")
.split(',', 1)[0]
.split(';')
.reduce(parseHeaderFieldParams, {});
var encryptField = aRequest.getRequestHeader("Encryption");
return getEncryptionParams(encryptField);
} catch(e) {
// getRequestHeader can throw.
return null;
@ -533,7 +515,7 @@ this.PushServiceHttp2 = {
retries: 0
})
.then(result =>
PushServiceHttp2Crypto.generateKeys()
PushCrypto.generateKeys()
.then(exportedKeys => {
result.p256dhPublicKey = exportedKeys[0];
result.p256dhPrivateKey = exportedKeys[1];
@ -724,34 +706,11 @@ this.PushServiceHttp2 = {
for (let i = 0; i < aSubscriptions.length; i++) {
let record = aSubscriptions[i];
if (record.p256dhPublicKey && record.p256dhPrivateKey) {
this._mainPushService.ensureP256dhKey(record).then(record => {
this._startSingleConnection(record);
} else {
// We do not have a encryption key. so we need to generate it. This
// is only going to happen on db upgrade from version 4 to higher.
PushServiceHttp2Crypto.generateKeys()
.then(exportedKeys => {
if (this._mainPushService) {
return this._mainPushService
.updateRecordAndNotifyApp(record.subscriptionUri, record => {
record.p256dhPublicKey = exportedKeys[0];
record.p256dhPrivateKey = exportedKeys[1];
return record;
});
}
}, error => {
record = null;
if (this._mainPushService) {
this._mainPushService
.dropRegistrationAndNotifyApp(record.subscriptionUri);
}
})
.then(_ => {
if (record) {
this._startSingleConnection(record);
}
});
}
}, error => {
debug("startConnections: Error updating record " + record.keyID);
});
}
},
@ -875,22 +834,16 @@ this.PushServiceHttp2 = {
_pushChannelOnStop: function(aUri, aAckUri, aMessage, dh, salt, rs) {
debug("pushChannelOnStop() ");
this._mainPushService.getByKeyID(aUri)
.then(aPushRecord =>
PushServiceHttp2Crypto.decodeMsg(aMessage, aPushRecord.p256dhPrivateKey,
dh, salt, rs)
.then(msg => {
var msgString = '';
for (var i=0; i<msg.length; i++) {
msgString += String.fromCharCode(msg[i]);
}
return this._mainPushService.receivedPushMessage(aUri,
msgString,
record => {
// Always update the stored record.
return record;
});
})
let cryptoParams = {
dh: dh,
salt: salt,
rs: rs,
};
this._mainPushService.receivedPushMessage(
aUri, aMessage, cryptoParams, record => {
// Always update the stored record.
return record;
}
)
.then(_ => this._ackMsgRecv(aAckUri))
.catch(err => {
@ -907,8 +860,6 @@ function PushRecordHttp2(record) {
PushRecord.call(this, record);
this.subscriptionUri = record.subscriptionUri;
this.pushReceiptEndpoint = record.pushReceiptEndpoint;
this.p256dhPublicKey = record.p256dhPublicKey;
this.p256dhPrivateKey = record.p256dhPrivateKey;
}
PushRecordHttp2.prototype = Object.create(PushRecord.prototype, {
@ -922,13 +873,11 @@ PushRecordHttp2.prototype = Object.create(PushRecord.prototype, {
PushRecordHttp2.prototype.toRegistration = function() {
let registration = PushRecord.prototype.toRegistration.call(this);
registration.pushReceiptEndpoint = this.pushReceiptEndpoint;
registration.p256dhKey = this.p256dhPublicKey;
return registration;
};
PushRecordHttp2.prototype.toRegister = function() {
let register = PushRecord.prototype.toRegister.call(this);
register.pushReceiptEndpoint = this.pushReceiptEndpoint;
register.p256dhKey = this.p256dhPublicKey;
return register;
};

View File

@ -12,6 +12,12 @@ const Cr = Components.results;
const {PushDB} = Cu.import("resource://gre/modules/PushDB.jsm");
const {PushRecord} = Cu.import("resource://gre/modules/PushRecord.jsm");
const {
PushCrypto,
base64UrlDecode,
getEncryptionKeyParams,
getEncryptionParams,
} = Cu.import("resource://gre/modules/PushCrypto.jsm");
Cu.import("resource://gre/modules/Preferences.jsm");
Cu.import("resource://gre/modules/Timer.jsm");
Cu.import("resource://gre/modules/Promise.jsm");
@ -33,7 +39,7 @@ var threadManager = Cc["@mozilla.org/thread-manager;1"]
.getService(Ci.nsIThreadManager);
const kPUSHWSDB_DB_NAME = "pushapi";
const kPUSHWSDB_DB_VERSION = 4; // Change this if the IndexedDB format changes
const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
const kPUSHWSDB_STORE_NAME = "pushapi";
const kUDP_WAKEUP_WS_STATUS_CODE = 4774; // WebSocket Close status code sent
@ -47,6 +53,27 @@ this.EXPORTED_SYMBOLS = ["PushServiceWebSocket"];
// Don't modify this, instead set dom.push.debug.
var gDebuggingEnabled = true;
function getCryptoParams(headers) {
if (!headers) {
return null;
}
var keymap = getEncryptionKeyParams(headers.encryption_key);
if (!keymap) {
return null;
}
var enc = getEncryptionParams(headers.encryption);
if (!enc || !enc.keyid) {
return null;
}
var dh = keymap[enc.keyid];
var salt = enc.salt;
var rs = (enc.rs)? parseInt(enc.rs, 10) : 4096;
if (!dh || !salt || isNaN(rs) || (rs <= 1)) {
return null;
}
return {dh, salt, rs};
}
function debug(s) {
if (gDebuggingEnabled) {
dump("-*- PushServiceWebSocket.jsm: " + s + "\n");
@ -136,11 +163,11 @@ this.PushServiceWebSocket = {
observe: function(aSubject, aTopic, aData) {
switch (aTopic) {
case "nsPref:changed":
if (aData == "dom.push.debug") {
gDebuggingEnabled = prefs.get("debug");
}
break;
case "nsPref:changed":
if (aData == "dom.push.debug") {
gDebuggingEnabled = prefs.get("debug");
}
break;
case "timer-callback":
if (aSubject == this._requestTimeoutTimer) {
if (Object.keys(this._pendingRequests).length === 0) {
@ -265,6 +292,9 @@ this.PushServiceWebSocket = {
*/
_upperLimit: 0,
/** Indicates whether the server supports Web Push-style message delivery. */
_dataEnabled: false,
/**
* Sends a message to the Push Server through an open websocket.
* typeof(msg) shall be an object
@ -356,6 +386,8 @@ this.PushServiceWebSocket = {
}
this._mainPushService = null;
this._dataEnabled = false;
},
/**
@ -749,13 +781,28 @@ this.PushServiceWebSocket = {
return;
}
function finishHandshake() {
this._UAID = reply.uaid;
this._currentState = STATE_READY;
let notifyRequestQueue = () => {
if (this._notifyRequestQueue) {
this._notifyRequestQueue();
this._notifyRequestQueue = null;
}
};
function finishHandshake() {
this._UAID = reply.uaid;
this._currentState = STATE_READY;
this._dataEnabled = !!reply.use_webpush;
if (this._dataEnabled) {
this._mainPushService.getAllUnexpired().then(records =>
Promise.all(records.map(record =>
this._mainPushService.ensureP256dhKey(record).catch(error => {
debug("finishHandshake: Error updating record " + record.keyID);
})
))
).then(notifyRequestQueue);
} else {
notifyRequestQueue();
}
}
// By this point we've got a UAID from the server that we are ready to
@ -821,11 +868,48 @@ this.PushServiceWebSocket = {
}
},
_handleDataUpdate: function(update) {
let promise;
if (typeof update.channelID != "string") {
debug("handleDataUpdate: Discarding message without channel ID");
return;
}
if (typeof update.data != "string") {
promise = this._mainPushService.receivedPushMessage(
update.channelID,
null,
null,
record => record
);
} else {
let params = getCryptoParams(update.headers);
if (!params) {
debug("handleDataUpdate: Discarding invalid encrypted message");
return;
}
let message = base64UrlDecode(update.data);
promise = this._mainPushService.receivedPushMessage(
update.channelID,
message,
params,
record => record
);
}
promise.then(() => this._sendAck(update.channelID)).catch(err => {
debug("handleDataUpdate: Error delivering message: " + err);
});
},
/**
* Protocol handler invoked by server message.
*/
_handleNotificationReply: function(reply) {
debug("handleNotificationReply()");
if (this._dataEnabled) {
this._handleDataUpdate(reply);
return;
}
if (typeof reply.updates !== 'object') {
debug("No 'updates' field in response. Type = " + typeof reply.updates);
return;
@ -902,6 +986,16 @@ this.PushServiceWebSocket = {
ctime: Date.now()
};
this._queueRequest(data);
}).then(record => {
if (!this._dataEnabled) {
return record;
}
return PushCrypto.generateKeys()
.then(([publicKey, privateKey]) => {
record.p256dhPublicKey = publicKey;
record.p256dhPrivateKey = privateKey;
return record;
});
});
}
@ -961,7 +1055,7 @@ this.PushServiceWebSocket = {
_receivedUpdate: function(aChannelID, aLatestVersion) {
debug("Updating: " + aChannelID + " -> " + aLatestVersion);
this._mainPushService.receivedPushMessage(aChannelID, "", record => {
this._mainPushService.receivedPushMessage(aChannelID, null, null, record => {
if (record.version === null ||
record.version < aLatestVersion) {
debug("Version changed for " + aChannelID + ": " + aLatestVersion);
@ -990,6 +1084,7 @@ this.PushServiceWebSocket = {
let data = {
messageType: "hello",
use_webpush: true,
};
if (this._UAID) {

View File

@ -15,12 +15,12 @@ EXTRA_PP_JS_MODULES += [
]
EXTRA_JS_MODULES += [
'PushCrypto.jsm',
'PushDB.jsm',
'PushRecord.jsm',
'PushService.jsm',
'PushServiceChildPreload.jsm',
'PushServiceHttp2.jsm',
'PushServiceHttp2Crypto.jsm',
]
MOCHITEST_MANIFESTS += [

View File

@ -0,0 +1,92 @@
/* Any copyright is dedicated to the Public Domain.
http://creativecommons.org/publicdomain/zero/1.0/ */
'use strict';
const {PushDB, PushService, PushServiceWebSocket, PushCrypto} = serviceExports;
const userAgentID = '4dffd396-6582-471d-8c0c-84f394e9f7db';
function run_test() {
do_get_profile();
setPrefs({
userAgentID,
});
disableServiceWorkerEvents(
'https://example.com/page/1',
'https://example.com/page/2',
'https://example.com/page/3'
);
run_next_test();
}
add_task(function* test_with_data_enabled() {
let db = PushServiceWebSocket.newPushDB();
do_register_cleanup(() => {return db.drop().then(_ => db.close());});
let [publicKey, privateKey] = yield PushCrypto.generateKeys();
let records = [{
channelID: 'eb18f12a-cc42-4f14-accb-3bfc1227f1aa',
pushEndpoint: 'https://example.org/push/no-key/1',
scope: 'https://example.com/page/1',
originAttributes: '',
quota: Infinity,
}, {
channelID: '0d8886b9-8da1-4778-8f5d-1cf93a877ed6',
pushEndpoint: 'https://example.org/push/key',
scope: 'https://example.com/page/2',
originAttributes: '',
p256dhPublicKey: publicKey,
p256dhPrivateKey: privateKey,
quota: Infinity,
}];
for (let record of records) {
yield db.put(record);
}
PushService.init({
serverURI: "wss://push.example.org/",
networkInfo: new MockDesktopNetworkInfo(),
db,
makeWebSocket(uri) {
return new MockWebSocket(uri, {
onHello(request) {
ok(request.use_webpush,
'Should use Web Push if data delivery is enabled');
this.serverSendMsg(JSON.stringify({
messageType: 'hello',
status: 200,
uaid: request.uaid,
use_webpush: true,
}));
},
onRegister(request) {
this.serverSendMsg(JSON.stringify({
messageType: 'register',
status: 200,
uaid: userAgentID,
channelID: request.channelID,
pushEndpoint: 'https://example.org/push/new',
}));
}
});
},
});
let newRecord = yield PushNotificationService.register(
'https://example.com/page/3',
ChromeUtils.originAttributesToSuffix({ appId: Ci.nsIScriptSecurityManager.NO_APP_ID, inBrowser: false })
);
ok(newRecord.p256dhPublicKey, 'Should generate public keys for new records');
ok(newRecord.p256dhPrivateKey, 'Should generate private keys for new records');
let record = yield db.getByKeyID('eb18f12a-cc42-4f14-accb-3bfc1227f1aa');
ok(record.p256dhPublicKey, 'Should add public key to partial record');
ok(record.p256dhPrivateKey, 'Should add private key to partial record');
record = yield db.getByKeyID('0d8886b9-8da1-4778-8f5d-1cf93a877ed6');
deepEqual(record.p256dhPublicKey, publicKey,
'Should leave existing public key');
deepEqual(record.p256dhPrivateKey, privateKey,
'Should leave existing private key');
});

View File

@ -33,12 +33,13 @@ skip-if = toolkit == 'android'
[test_unregister_not_found.js]
[test_unregister_success.js]
[test_webapps_cleardata.js]
[test_updateRecordNoEncryptionKeys_ws.js]
#http2 test
[test_resubscribe_4xxCode_http2.js]
[test_resubscribe_5xxCode_http2.js]
[test_resubscribe_listening_for_msg_error_http2.js]
[test_register_5xxCode_http2.js]
[test_updateRecordNoEncryptionKeys.js]
[test_updateRecordNoEncryptionKeys_http2.js]
[test_register_success_http2.js]
skip-if = !hasNode
run-sequentially = node server exceptions dont replay well