Move stuff around.

- Rename fido2/pyu2f -> fido2/_pyu2f
- Partial docstrings, with type information.
- Rewrote server example.
This commit is contained in:
Dain Nilsson
2018-07-03 14:57:00 +02:00
parent 7b2b5ba232
commit 1edb0d9d4e
35 changed files with 1296 additions and 282 deletions

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@
build/
dist/
.eggs/
.idea/
.ropeproject/
ChangeLog
man/*.1

View File

@@ -4,4 +4,4 @@ repos:
hooks:
- id: flake8
- id: double-quote-string-fixer
exclude: '^(fido2|test)/pyu2f/.*'
exclude: '^(fido2|test)/_pyu2f/.*'

View File

@@ -1,4 +1,4 @@
== fido2
== python-fido2
image:https://travis-ci.org/Yubico/python-fido2.svg?branch=master["Travis CI Status", link="https://travis-ci.org/Yubico/python-fido2"]
image:https://ci.appveyor.com/api/projects/status/8orx9nbdfq52w47s/branch/master?svg=true["Appveyor Status", link="https://ci.appveyor.com/project/Yubico53275/python-fido-host/branch/master"]

View File

@@ -38,193 +38,96 @@ Now navigate to https://localhost:5000 in a supported web browser.
from __future__ import print_function, absolute_import, unicode_literals
from fido2.client import ClientData
from fido2.server import Fido2Server
from fido2.ctap2 import AttestationObject, AuthenticatorData
from flask import Flask, request
from fido2 import cbor
from flask import Flask, session, request, redirect, abort
import os
HTML = """
<html>
<head><title>Fido 2.0 webauthn demo</title></head>
<body>
<h1>Webauthn demo</h1>
<p>
<strong>This demo requires a browser supporting the WebAuthn API!</strong>
</p>
<hr>
{content}
</body>
</html>
"""
app = Flask(__name__, static_url_path='')
app.secret_key = os.urandom(32) # Used for session.
INDEX_HTML = HTML.format(content="""
<a href="/register">Register</a><br>
<a href="/authenticate">Authenticate</a><br>
""")
REGISTER_HTML = HTML.format(content="""
<h2>Register a credential</h2>
<p>Touch your authenticator device now...</p>
<script>
navigator.credentials.create({{
publicKey: {{
rp: {{
id: document.domain,
name: 'Demo server'
}},
user: {{
id: {user_id},
name: 'a_user',
displayName: 'A. User',
icon: 'https://example.com/image.png'
}},
challenge: {challenge},
pubKeyCredParams: [
{{
alg: -7,
type: 'public-key'
}}
],
excludeCredentials: [],
attestation: 'direct',
timeout: 60000
}}
}}).then(function(attestation) {{
console.log(attestation);
console.log(JSON.stringify({{
attestationObject: Array.from(new Uint8Array(attestation.response.attestationObject)),
clientData: Array.from(new Uint8Array(attestation.response.clientDataJSON))
}}));
fetch('/register', {{
method: 'POST',
body: JSON.stringify({{
attestationObject: Array.from(new Uint8Array(attestation.response.attestationObject)),
clientData: Array.from(new Uint8Array(attestation.response.clientDataJSON))
}})
}}).then(function() {{
alert('Registration successful. More details in server log...');
window.location = '/';
}});
}}, function(reason) {{
console.log('Failed', reason);
}});
</script>
""") # noqa
server = Fido2Server('localhost')
rp = {
'id': 'localhost',
'name': 'Demo server'
}
AUTH_HTML = HTML.format(content="""
<h2>Authenticate using a credential</h2>
<p>Touch your authenticator device now...</p>
<script>
navigator.credentials.get({{
publicKey: {{
rpId: document.domain,
challenge: {challenge},
allowCredentials: [
{{
type: 'public-key',
id: {credential_id}
}}
],
timeout: 60000
}}
}}).then(function(attestation) {{
console.log(attestation);
fetch('/authenticate', {{
method: 'POST',
body: JSON.stringify({{
authenticatorData: Array.from(new Uint8Array(attestation.response.authenticatorData)),
clientData: Array.from(new Uint8Array(attestation.response.clientDataJSON)),
signature: Array.from(new Uint8Array(attestation.response.signature))
}})
}}).then(function() {{
alert('Authentication successful. More details in server log...');
window.location = '/';
}});
}}, function(reason) {{
console.log('Failed', reason);
}});
</script>
""") # noqa
def to_js_array(value):
return 'new Uint8Array(%r)' % list(bytearray(value))
def from_js_array(value):
return bytes(bytearray(value))
app = Flask(__name__)
global credential, last_challenge
credential, last_challenge = None, None
# Registered credentials are stored globally, in memory only. Single user
# support, state is lost when the server terminates.
credentials = []
@app.route('/')
def index():
return INDEX_HTML
return redirect('/index.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
global credential, last_challenge
if request.method == 'POST':
data = request.get_json(force=True)
client_data = ClientData(from_js_array(data['clientData']))
att_obj = AttestationObject(from_js_array(data['attestationObject']))
print('clientData', client_data)
print('AttestationObject:', att_obj)
@app.route('/api/register/begin', methods=['POST'])
def register_begin():
registration_data = server.register_begin(rp, {
'id': b'user_id',
'name': 'a_user',
'displayName': 'A. User',
'icon': 'https://example.com/image.png'
}, credentials)
# Verify the challenge
if client_data.challenge != last_challenge:
raise ValueError('Challenge mismatch!')
# Verify the signature
att_obj.verify(client_data.hash)
credential = att_obj.auth_data.credential_data
print('REGISTERED CREDENTIAL:', credential)
return 'OK'
last_challenge = os.urandom(32)
return REGISTER_HTML.format(
user_id=to_js_array(b'user_id'),
challenge=to_js_array(last_challenge)
)
session['challenge'] = registration_data['publicKey']['challenge']
print('\n\n\n\n')
print(registration_data)
print('\n\n\n\n')
return cbor.dumps(registration_data)
@app.route('/authenticate', methods=['GET', 'POST'])
def authenticate():
global credential, last_challenge
if not credential:
return HTML.format(content='No credential registered!')
@app.route('/api/register/complete', methods=['POST'])
def register_complete():
data = cbor.loads(request.get_data())[0]
client_data = ClientData(data['clientDataJSON'])
att_obj = AttestationObject(data['attestationObject'])
print('clientData', client_data)
print('AttestationObject:', att_obj)
if request.method == 'POST':
data = request.get_json(force=True)
client_data = ClientData(from_js_array(data['clientData']))
auth_data = AuthenticatorData(from_js_array(data['authenticatorData']))
signature = from_js_array(data['signature'])
print('clientData', client_data)
print('AuthenticatorData', auth_data)
auth_data = server.register_complete(
session['challenge'], client_data, att_obj)
# Verify the challenge
if client_data.challenge != last_challenge:
raise ValueError('Challenge mismatch!')
credentials.append(auth_data.credential_data)
print('REGISTERED CREDENTIAL:', auth_data.credential_data)
return cbor.dumps({'status': 'OK'})
# Verify the signature
credential.public_key.verify(auth_data + client_data.hash, signature)
print('ASSERTION OK')
return 'OK'
last_challenge = os.urandom(32)
return AUTH_HTML.format(
challenge=to_js_array(last_challenge),
credential_id=to_js_array(credential.credential_id)
)
@app.route('/api/authenticate/begin', methods=['POST'])
def authenticate_begin():
if not credentials:
abort(404)
auth_data = server.authenticate_begin(rp['id'], credentials)
session['challenge'] = auth_data['publicKey']['challenge']
return cbor.dumps(auth_data)
@app.route('/api/authenticate/complete', methods=['POST'])
def authenticate_complete():
if not credentials:
abort(404)
data = cbor.loads(request.get_data())[0]
credential_id = data['credentialId']
client_data = ClientData(data['clientDataJSON'])
auth_data = AuthenticatorData(data['authenticatorData'])
signature = data['signature']
print('clientData', client_data)
print('AuthenticatorData', auth_data)
server.authenticate_complete(credentials, credential_id,
session.pop('challenge'), client_data,
auth_data, signature)
print('ASSERTION OK')
return cbor.dumps({'status': 'OK'})
if __name__ == '__main__':
print(__doc__)
app.run(ssl_context='adhoc', debug=True)

View File

@@ -0,0 +1,47 @@
<html>
<head>
<title>Fido 2.0 webauthn demo</title>
<script src="/cbor.js"></script>
</head>
<body>
<h1>Webauthn demo</h1>
<p>
<strong>This demo requires a browser supporting the WebAuthn API!</strong>
</p>
<hr>
<h2>Authenticate using a credential</h2>
<p>Touch your authenticator device now...</p>
<script>
fetch('/api/authenticate/begin', {
method: 'POST',
}).then(function(response) {
return response.arrayBuffer();
}).then(function(data) {
return CBOR.decode(data);
}).then(function(options) {
navigator.credentials.get(options).then(function(assertion) {
console.log(assertion);
fetch('/api/authenticate/complete', {
method: 'POST',
headers: {'Content-Type': 'application/cbor'},
body: CBOR.encode({
"credentialId": new Uint8Array(assertion.rawId),
"authenticatorData": new Uint8Array(assertion.response.authenticatorData),
"clientDataJSON": new Uint8Array(assertion.response.clientDataJSON),
"signature": new Uint8Array(assertion.response.signature)
})
}).then(function() {
alert('Authentication successful. More details in server log...');
window.location = '/index.html';
});
}, function(reason) {
console.log('Failed', reason);
alert(reason);
window.location = '/index.html';
});
});
</script>
</body>
</html>

View File

@@ -0,0 +1,406 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2014-2016 Patrick Gansterer <paroga@paroga.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
(function(global, undefined) { "use strict";
var POW_2_24 = 5.960464477539063e-8,
POW_2_32 = 4294967296,
POW_2_53 = 9007199254740992;
function encode(value) {
var data = new ArrayBuffer(256);
var dataView = new DataView(data);
var lastLength;
var offset = 0;
function prepareWrite(length) {
var newByteLength = data.byteLength;
var requiredLength = offset + length;
while (newByteLength < requiredLength)
newByteLength <<= 1;
if (newByteLength !== data.byteLength) {
var oldDataView = dataView;
data = new ArrayBuffer(newByteLength);
dataView = new DataView(data);
var uint32count = (offset + 3) >> 2;
for (var i = 0; i < uint32count; ++i)
dataView.setUint32(i << 2, oldDataView.getUint32(i << 2));
}
lastLength = length;
return dataView;
}
function commitWrite() {
offset += lastLength;
}
function writeFloat64(value) {
commitWrite(prepareWrite(8).setFloat64(offset, value));
}
function writeUint8(value) {
commitWrite(prepareWrite(1).setUint8(offset, value));
}
function writeUint8Array(value) {
var dataView = prepareWrite(value.length);
for (var i = 0; i < value.length; ++i)
dataView.setUint8(offset + i, value[i]);
commitWrite();
}
function writeUint16(value) {
commitWrite(prepareWrite(2).setUint16(offset, value));
}
function writeUint32(value) {
commitWrite(prepareWrite(4).setUint32(offset, value));
}
function writeUint64(value) {
var low = value % POW_2_32;
var high = (value - low) / POW_2_32;
var dataView = prepareWrite(8);
dataView.setUint32(offset, high);
dataView.setUint32(offset + 4, low);
commitWrite();
}
function writeTypeAndLength(type, length) {
if (length < 24) {
writeUint8(type << 5 | length);
} else if (length < 0x100) {
writeUint8(type << 5 | 24);
writeUint8(length);
} else if (length < 0x10000) {
writeUint8(type << 5 | 25);
writeUint16(length);
} else if (length < 0x100000000) {
writeUint8(type << 5 | 26);
writeUint32(length);
} else {
writeUint8(type << 5 | 27);
writeUint64(length);
}
}
function encodeItem(value) {
var i;
if (value === false)
return writeUint8(0xf4);
if (value === true)
return writeUint8(0xf5);
if (value === null)
return writeUint8(0xf6);
if (value === undefined)
return writeUint8(0xf7);
switch (typeof value) {
case "number":
if (Math.floor(value) === value) {
if (0 <= value && value <= POW_2_53)
return writeTypeAndLength(0, value);
if (-POW_2_53 <= value && value < 0)
return writeTypeAndLength(1, -(value + 1));
}
writeUint8(0xfb);
return writeFloat64(value);
case "string":
var utf8data = [];
for (i = 0; i < value.length; ++i) {
var charCode = value.charCodeAt(i);
if (charCode < 0x80) {
utf8data.push(charCode);
} else if (charCode < 0x800) {
utf8data.push(0xc0 | charCode >> 6);
utf8data.push(0x80 | charCode & 0x3f);
} else if (charCode < 0xd800) {
utf8data.push(0xe0 | charCode >> 12);
utf8data.push(0x80 | (charCode >> 6) & 0x3f);
utf8data.push(0x80 | charCode & 0x3f);
} else {
charCode = (charCode & 0x3ff) << 10;
charCode |= value.charCodeAt(++i) & 0x3ff;
charCode += 0x10000;
utf8data.push(0xf0 | charCode >> 18);
utf8data.push(0x80 | (charCode >> 12) & 0x3f);
utf8data.push(0x80 | (charCode >> 6) & 0x3f);
utf8data.push(0x80 | charCode & 0x3f);
}
}
writeTypeAndLength(3, utf8data.length);
return writeUint8Array(utf8data);
default:
var length;
if (Array.isArray(value)) {
length = value.length;
writeTypeAndLength(4, length);
for (i = 0; i < length; ++i)
encodeItem(value[i]);
} else if (value instanceof Uint8Array) {
writeTypeAndLength(2, value.length);
writeUint8Array(value);
} else {
var keys = Object.keys(value);
length = keys.length;
writeTypeAndLength(5, length);
for (i = 0; i < length; ++i) {
var key = keys[i];
encodeItem(key);
encodeItem(value[key]);
}
}
}
}
encodeItem(value);
if ("slice" in data)
return data.slice(0, offset);
var ret = new ArrayBuffer(offset);
var retView = new DataView(ret);
for (var i = 0; i < offset; ++i)
retView.setUint8(i, dataView.getUint8(i));
return ret;
}
function decode(data, tagger, simpleValue) {
var dataView = new DataView(data);
var offset = 0;
if (typeof tagger !== "function")
tagger = function(value) { return value; };
if (typeof simpleValue !== "function")
simpleValue = function() { return undefined; };
function commitRead(length, value) {
offset += length;
return value;
}
function readArrayBuffer(length) {
return commitRead(length, new Uint8Array(data, offset, length));
}
function readFloat16() {
var tempArrayBuffer = new ArrayBuffer(4);
var tempDataView = new DataView(tempArrayBuffer);
var value = readUint16();
var sign = value & 0x8000;
var exponent = value & 0x7c00;
var fraction = value & 0x03ff;
if (exponent === 0x7c00)
exponent = 0xff << 10;
else if (exponent !== 0)
exponent += (127 - 15) << 10;
else if (fraction !== 0)
return (sign ? -1 : 1) * fraction * POW_2_24;
tempDataView.setUint32(0, sign << 16 | exponent << 13 | fraction << 13);
return tempDataView.getFloat32(0);
}
function readFloat32() {
return commitRead(4, dataView.getFloat32(offset));
}
function readFloat64() {
return commitRead(8, dataView.getFloat64(offset));
}
function readUint8() {
return commitRead(1, dataView.getUint8(offset));
}
function readUint16() {
return commitRead(2, dataView.getUint16(offset));
}
function readUint32() {
return commitRead(4, dataView.getUint32(offset));
}
function readUint64() {
return readUint32() * POW_2_32 + readUint32();
}
function readBreak() {
if (dataView.getUint8(offset) !== 0xff)
return false;
offset += 1;
return true;
}
function readLength(additionalInformation) {
if (additionalInformation < 24)
return additionalInformation;
if (additionalInformation === 24)
return readUint8();
if (additionalInformation === 25)
return readUint16();
if (additionalInformation === 26)
return readUint32();
if (additionalInformation === 27)
return readUint64();
if (additionalInformation === 31)
return -1;
throw "Invalid length encoding";
}
function readIndefiniteStringLength(majorType) {
var initialByte = readUint8();
if (initialByte === 0xff)
return -1;
var length = readLength(initialByte & 0x1f);
if (length < 0 || (initialByte >> 5) !== majorType)
throw "Invalid indefinite length element";
return length;
}
function appendUtf16Data(utf16data, length) {
for (var i = 0; i < length; ++i) {
var value = readUint8();
if (value & 0x80) {
if (value < 0xe0) {
value = (value & 0x1f) << 6
| (readUint8() & 0x3f);
length -= 1;
} else if (value < 0xf0) {
value = (value & 0x0f) << 12
| (readUint8() & 0x3f) << 6
| (readUint8() & 0x3f);
length -= 2;
} else {
value = (value & 0x0f) << 18
| (readUint8() & 0x3f) << 12
| (readUint8() & 0x3f) << 6
| (readUint8() & 0x3f);
length -= 3;
}
}
if (value < 0x10000) {
utf16data.push(value);
} else {
value -= 0x10000;
utf16data.push(0xd800 | (value >> 10));
utf16data.push(0xdc00 | (value & 0x3ff));
}
}
}
function decodeItem() {
var initialByte = readUint8();
var majorType = initialByte >> 5;
var additionalInformation = initialByte & 0x1f;
var i;
var length;
if (majorType === 7) {
switch (additionalInformation) {
case 25:
return readFloat16();
case 26:
return readFloat32();
case 27:
return readFloat64();
}
}
length = readLength(additionalInformation);
if (length < 0 && (majorType < 2 || 6 < majorType))
throw "Invalid length";
switch (majorType) {
case 0:
return length;
case 1:
return -1 - length;
case 2:
if (length < 0) {
var elements = [];
var fullArrayLength = 0;
while ((length = readIndefiniteStringLength(majorType)) >= 0) {
fullArrayLength += length;
elements.push(readArrayBuffer(length));
}
var fullArray = new Uint8Array(fullArrayLength);
var fullArrayOffset = 0;
for (i = 0; i < elements.length; ++i) {
fullArray.set(elements[i], fullArrayOffset);
fullArrayOffset += elements[i].length;
}
return fullArray;
}
return readArrayBuffer(length);
case 3:
var utf16data = [];
if (length < 0) {
while ((length = readIndefiniteStringLength(majorType)) >= 0)
appendUtf16Data(utf16data, length);
} else
appendUtf16Data(utf16data, length);
return String.fromCharCode.apply(null, utf16data);
case 4:
var retArray;
if (length < 0) {
retArray = [];
while (!readBreak())
retArray.push(decodeItem());
} else {
retArray = new Array(length);
for (i = 0; i < length; ++i)
retArray[i] = decodeItem();
}
return retArray;
case 5:
var retObject = {};
for (i = 0; i < length || length < 0 && !readBreak(); ++i) {
var key = decodeItem();
retObject[key] = decodeItem();
}
return retObject;
case 6:
return tagger(decodeItem(), length);
case 7:
switch (length) {
case 20:
return false;
case 21:
return true;
case 22:
return null;
case 23:
return undefined;
default:
return simpleValue(length);
}
}
}
var ret = decodeItem();
if (offset !== data.byteLength)
throw "Remaining bytes";
return ret;
}
var obj = { encode: encode, decode: decode };
if (typeof define === "function" && define.amd)
define("cbor/cbor", obj);
else if (typeof module !== "undefined" && module.exports)
module.exports = obj;
else if (!global.CBOR)
global.CBOR = obj;
})(this);

View File

@@ -0,0 +1,17 @@
<html>
<head>
<title>Fido 2.0 webauthn demo</title>
<script src="/cbor.js"></script>
</head>
<body>
<h1>Webauthn demo</h1>
<p>
<strong>This demo requires a browser supporting the WebAuthn API!</strong>
</p>
<hr>
<a href="/register.html">Register</a><br>
<a href="/authenticate.html">Authenticate</a><br>
</body>
</html>

View File

@@ -0,0 +1,46 @@
<html>
<head>
<title>Fido 2.0 webauthn demo</title>
<script src="/cbor.js"></script>
</head>
<body>
<h1>Webauthn demo</h1>
<p>
<strong>This demo requires a browser supporting the WebAuthn API!</strong>
</p>
<hr>
<h2>Register a credential</h2>
<p>Touch your authenticator device now...</p>
<script>
fetch('/api/register/begin', {
method: 'POST',
}).then(function(response) {
return response.arrayBuffer();
}).then(function(data) {
return CBOR.decode(data);
}).then(function(options) {
navigator.credentials.create(options).then(function(attestation) {
console.log(attestation.response);
console.log(CBOR.encode(attestation.response));
fetch('/api/register/complete', {
method: 'POST',
headers: {'Content-Type': 'application/cbor'},
body: CBOR.encode({
"attestationObject": new Uint8Array(attestation.response.attestationObject),
"clientDataJSON": new Uint8Array(attestation.response.clientDataJSON),
})
}).then(function() {
alert('Registration successful. More details in server log...');
window.location = '/index.html';
});
}, function(reason) {
console.log('Failed', reason);
alert(reason);
window.location = '/index.html';
});
});
</script>
</body>
</html>

View File

@@ -34,7 +34,6 @@ if six.PY2:
class ABC(object):
pass
abc.ABC = ABC
abc.abstractclassmethod = abc.abstractmethod
__version__ = '0.3.1-dev0'

View File

@@ -210,7 +210,7 @@ class UsbHidTransport(object):
self.packet_size = in_size
self.read_timeout_secs = read_timeout_secs
self.logger = logging.getLogger('pyu2f.hidtransport')
self.logger = logging.getLogger('_pyu2f.hidtransport')
self.InternalInit()

View File

@@ -25,7 +25,7 @@ import threading
from . import base
logger = logging.getLogger('pyu2f.macos')
logger = logging.getLogger('_pyu2f.macos')
# Constants
DEVICE_PATH_BUFFER_SIZE = 512

View File

@@ -64,7 +64,7 @@ def dump_list(data):
def _sort_keys(entry):
key = entry[0]
return (six.indexbytes(key, 0), len(key), key)
return six.indexbytes(key, 0), len(key), key
def dump_dict(data):
@@ -78,8 +78,8 @@ def dump_bytes(data):
def dump_text(data):
data = data.encode('utf8')
return dump_int(len(data), mt=3) + data
data_bytes = data.encode('utf8')
return dump_int(len(data_bytes), mt=3) + data_bytes
_SERIALIZERS = [

View File

@@ -40,8 +40,9 @@ import six
class ClientData(bytes):
def __init__(self, data):
self.data = json.loads(data.decode())
def __init__(self, _):
super(ClientData, self).__init__()
self.data = json.loads(self.decode())
def get(self, key):
return self.data[key]
@@ -74,7 +75,6 @@ class ClientData(bytes):
class ClientError(Exception):
@unique
class ERR(IntEnum):
OTHER_ERROR = 1
@@ -161,7 +161,7 @@ class U2fClient(object):
self.poll_delay = 0.25
self.ctap = CTAP1(device)
self.origin = origin
self._verify = verify_app_id
self._verify = verify
def _verify_app_id(self, app_id):
try:
@@ -171,12 +171,12 @@ class U2fClient(object):
pass # Fall through to ClientError
raise ClientError.ERR.BAD_REQUEST()
def register(self, app_id, register_requests, registered_keys,
timeout=None, on_keepalive=None):
def register(self, app_id, register_requests, registered_keys, timeout=None,
on_keepalive=None):
self._verify_app_id(app_id)
version = self.ctap.get_version()
dummy_param = b'\0'*32
dummy_param = b'\0' * 32
for key in registered_keys:
if key['version'] != version:
continue
@@ -263,12 +263,12 @@ class Fido2Client(object):
self.origin = origin
self._verify = verify
try:
self.ctap = CTAP2(device)
self.pin_protocol = PinProtocolV1(self.ctap)
self.ctap2 = CTAP2(device)
self.pin_protocol = PinProtocolV1(self.ctap2)
self._do_make_credential = self._ctap2_make_credential
self._do_get_assertion = self._ctap2_get_assertion
except ValueError:
self.ctap = CTAP1(device)
self.ctap1 = CTAP1(device)
self._do_make_credential = self._ctap1_make_credential
self._do_get_assertion = self._ctap1_get_assertion
@@ -283,6 +283,7 @@ class Fido2Client(object):
def make_credential(self, rp, user, challenge, algos=[ES256.ALGORITHM],
exclude_list=None, extensions=None, rk=False, uv=False,
pin=None, timeout=None, on_keepalive=None):
self._verify_rp_id(rp['id'])
client_data = ClientData.build(
@@ -304,7 +305,7 @@ class Fido2Client(object):
extensions, rk, uv, pin, timeout, on_keepalive):
key_params = [{'type': 'public-key', 'alg': alg} for alg in algos]
info = self.ctap.get_info()
info = self.ctap2.get_info()
pin_auth = None
pin_protocol = None
if pin:
@@ -326,10 +327,10 @@ class Fido2Client(object):
if uv:
options['uv'] = True
return self.ctap.make_credential(client_data.hash, rp, user,
key_params, exclude_list,
extensions, options, pin_auth,
pin_protocol, timeout, on_keepalive)
return self.ctap2.make_credential(client_data.hash, rp, user,
key_params, exclude_list,
extensions, options, pin_auth,
pin_protocol, timeout, on_keepalive)
def _ctap1_make_credential(self, client_data, rp, user, algos, exclude_list,
extensions, rk, uv, pin, timeout, on_keepalive):
@@ -338,27 +339,29 @@ class Fido2Client(object):
app_param = sha256(rp['id'].encode())
dummy_param = b'\0'*32
dummy_param = b'\0' * 32
for cred in exclude_list or []:
key_handle = cred['id']
try:
self.ctap.authenticate(dummy_param, app_param, key_handle, True)
self.ctap1.authenticate(
dummy_param, app_param, key_handle, True)
raise ClientError.ERR.OTHER_ERROR() # Shouldn't happen
except ApduError as e:
if e.code == APDU.USE_NOT_SATISFIED:
_call_polling(self.ctap1_poll_delay, timeout, on_keepalive,
self.ctap.register, dummy_param, dummy_param)
self.ctap1.register, dummy_param, dummy_param)
raise ClientError.ERR.DEVICE_INELIGIBLE()
return AttestationObject.from_ctap1(
app_param,
_call_polling(self.ctap1_poll_delay, timeout, on_keepalive,
self.ctap.register, client_data.hash, app_param)
self.ctap1.register, client_data.hash, app_param)
)
def get_assertion(self, rp_id, challenge, allow_list=None, extensions=None,
up=True, uv=False, pin=None, timeout=None,
on_keepalive=None):
self._verify_rp_id(rp_id)
client_data = ClientData.build(
@@ -399,12 +402,12 @@ class Fido2Client(object):
if len(options) == 0:
options = None
assertions = [self.ctap.get_assertion(
assertions = [self.ctap2.get_assertion(
rp_id, client_data.hash, allow_list, extensions, options, pin_auth,
pin_protocol, timeout, on_keepalive
)]
for _ in range((assertions[0].number_of_credentials or 1) - 1):
assertions.append(self.ctap.get_next_assertion())
assertions.append(self.ctap2.get_next_assertion())
return assertions
def _ctap1_get_assertion(self, client_data, rp_id, allow_list, extensions,
@@ -418,7 +421,7 @@ class Fido2Client(object):
try:
auth_resp = _call_polling(
self.ctap1_poll_delay, timeout, on_keepalive,
self.ctap.authenticate, client_param, app_param, cred['id']
self.ctap1.authenticate, client_param, app_param, cred['id']
)
return [
AssertionResponse.from_ctap1(app_param, cred, auth_resp)

View File

@@ -34,27 +34,51 @@ from cryptography.hazmat.primitives.asymmetric import ec, rsa, padding
class CoseKey(dict):
"""A COSE formatted public key.
:param _: The COSE key paramters.
:cvar ALGORITHM: COSE algorithm identifier.
"""
ALGORITHM = None
def verify(self, message, signature):
"""Validates a digital signature over a given message.
:param message: The message which was signed.
:param signature: The signature to check.
"""
raise NotImplementedError('Signature verification not supported.')
@classmethod
def from_cryptography_key(cls, public_key):
"""Converts a PublicKey object from Cryptography into a COSE key.
:param public_key: Either an EC or RSA public key.
:return: A CoseKey.
"""
raise NotImplementedError('Creation from cryptography not supported.')
@staticmethod
def for_alg(alg):
"""Get a subclass of CoseKey corresponding to an algorithm identifier.
:param alg: The COSE identifier of the algorithm.
:return: A CoseKey.
"""
for cls in CoseKey.__subclasses__():
if getattr(cls, 'ALGORITHM', None) == alg:
if cls.ALGORITHM == alg:
return cls
return UnsupportedKey
@staticmethod
def parse(cose):
"""Create a CoseKey from a dict"""
return CoseKey.for_alg(cose[3])(cose)
class UnsupportedKey(CoseKey):
pass
"""A COSE key with an unsupported algorithm."""
class ES256(CoseKey):
@@ -80,6 +104,11 @@ class ES256(CoseKey):
@classmethod
def from_ctap1(cls, data):
"""Creates an ES256 key from a CTAP1 formatted public key byte string.
:param data: A 65 byte SECP256R1 public key.
:return: A ES256 key.
"""
return cls({
1: 2,
3: cls.ALGORITHM,

View File

@@ -39,22 +39,22 @@ class CtapDevice(abc.ABC):
@abc.abstractmethod
def call(self, cmd, data=b'', event=None, on_keepalive=None):
"""
cmd is the integer value of the command.
data is the binary string value of the payload.
event is an instance of threading.Event which can be used to cancel the
invocation.
on_keepalive is an optional callback function that is invoked on
keepalive message from the authenticator, with the keepalive status code
as an argument. The callback is only invoked once for consecutive
keepalive messages with the same status.
"""Sends a command to the authenticator, and reads the response.
:param cmd: The integer value of the command.
:param data: The payload of the command.
:param event: An optional threading.Event which can be used to cancel
the invocation.
:param on_keepalive: An optional callback to handle keep-alive messages
from the authenticator. The function is only called once for
consecutive keep-alive messages with the same status.
:return: The response from the authenticator.
"""
@abc.abstractclassmethod
@classmethod
@abc.abstractmethod
def list_devices(cls):
"""
Generates instances of cls for discoverable devices.
"""
"""Generates instances of cls for discoverable devices."""
class CtapError(Exception):

View File

@@ -39,12 +39,20 @@ import six
@unique
class APDU(IntEnum):
"""APDU response codes."""
OK = 0x9000
USE_NOT_SATISFIED = 0x6985
WRONG_DATA = 0x6a80
class ApduError(Exception):
"""An Exception thrown when a response APDU doesn't have an OK (0x9000)
status.
:param code: APDU response code.
:param data: APDU response body.
"""
def __init__(self, code, data=b''):
self.code = code
self.data = data
@@ -55,7 +63,18 @@ class ApduError(Exception):
class RegistrationData(bytes):
"""Binary response data for a CTAP1 registration.
:param _: The binary contents of the response data.
:ivar public_key: Binary representation of the credential public key.
:ivar key_handle: Binary key handle of the credential.
:ivar certificate: Attestation certificate of the authenticator, DER
encoded.
:ivar signature: Attestation signature.
"""
def __init__(self, _):
super(RegistrationData, self).__init__()
if six.indexbytes(self, 0) != 0x05:
raise ValueError('Reserved byte != 0x05')
@@ -75,9 +94,16 @@ class RegistrationData(bytes):
@property
def b64(self):
"""Websafe base64 encoded string of the RegistrationData."""
return websafe_encode(self)
def verify(self, app_param, client_param):
"""Verify the included signature with regard to the given app and client
params.
:param app_param: SHA256 hash of the app ID used for the request.
:param client_param: SHA256 hash of the ClientData used for the request.
"""
FidoU2FAttestation.verify_signature(
app_param, client_param, self.key_handle, self.public_key,
self.certificate, self.signature)
@@ -98,19 +124,42 @@ class RegistrationData(bytes):
@classmethod
def from_b64(cls, data):
"""Parse a RegistrationData from a websafe base64 encoded string.
:param data: Websafe base64 encoded string.
:return: The decoded and parsed RegistrationData.
"""
return cls(websafe_decode(data))
class SignatureData(bytes):
"""Binary response data for a CTAP1 authentication.
:param _: The binary contents of the response data.
:ivar user_presence: User presence byte.
:ivar counter: Signature counter.
:ivar signature: Cryptographic signature.
"""
def __init__(self, _):
self.user_presence, self.counter = struct.unpack('>BI', self[:5])
super(SignatureData, self).__init__()
self.user_presence = six.indexbytes(self, 0)
self.counter = struct.unpack('>I', self[1:5])[0]
self.signature = self[5:]
@property
def b64(self):
"""str: Websafe base64 encoded string of the SignatureData."""
return websafe_encode(self)
def verify(self, app_param, client_param, public_key):
"""Verify the included signature with regard to the given app and client
params, using the given public key.
:param app_param: SHA256 hash of the app ID used for the request.
:param client_param: SHA256 hash of the ClientData used for the request.
:param public_key: Binary representation of the credential public key.
"""
m = app_param + self[:5] + client_param
ES256.from_ctap1(public_key).verify(m, self.signature)
@@ -124,10 +173,19 @@ class SignatureData(bytes):
@classmethod
def from_b64(cls, data):
"""Parse a SignatureData from a websafe base64 encoded string.
:param data: Websafe base64 encoded string.
:return: The decoded and parsed SignatureData.
"""
return cls(websafe_decode(data))
class CTAP1(object):
"""Implementation of the CTAP1 specification.
:param device: A CtapHidDevice handle supporting CTAP1.
"""
@unique
class INS(IntEnum):
REGISTER = 0x01
@@ -135,9 +193,23 @@ class CTAP1(object):
VERSION = 0x03
def __init__(self, device):
self.device = device
def send_apdu(self, cla=0, ins=0, p1=0, p2=0, data=b''):
"""Packs and sends an APDU for use in CTAP1 commands.
This is a low-level method mainly used internally. Avoid calling it
directly if possible, and use the get_version, register, and
authenticate methods if possible instead.
:param cla: The CLA parameter of the request.
:param ins: The INS parameter of the request.
:param p1: The P1 parameter of the request.
:param p2: The P2 parameter of the request.
:param data: The body of the request.
:return: The response APDU data of a successful request.
:raise: ApduError
"""
size = len(data)
size_h = size >> 16 & 0xff
size_l = size & 0xffff
@@ -152,15 +224,35 @@ class CTAP1(object):
return data
def get_version(self):
"""Get the U2F version implemented by the authenticator.
The only version specified is "U2F_V2".
:return: A U2F version string.
"""
return self.send_apdu(ins=CTAP1.INS.VERSION).decode()
def register(self, client_param, app_param):
"""Register a new U2F credential.
:param client_param: SHA256 hash of the ClientData used for the request.
:param app_param: SHA256 hash of the app ID used for the request.
:return: The registration response from the authenticator.
"""
data = client_param + app_param
response = self.send_apdu(ins=CTAP1.INS.REGISTER, data=data)
return RegistrationData(response)
def authenticate(self, client_param, app_param, key_handle,
check_only=False):
"""Authenticate a previously registered credential.
:param client_param: SHA256 hash of the ClientData used for the request.
:param app_param: SHA256 hash of the app ID used for the request.
:param key_handle: The binary key handle of the credential.
:param check_only: True to send a "check-only" request, which is used to
determine if a key handle is known.
:return: The authentication response from the authenticator.
"""
data = client_param + app_param \
+ struct.pack('>B', len(key_handle)) + key_handle
p1 = 0x07 if check_only else 0x03

Some files were not shown because too many files have changed in this diff Show More