Commit 474e6498 by Scott Duckworth

implement import/export of PEM public keys

parent 043fe297
...@@ -84,9 +84,11 @@ class UserKey(models.Model): ...@@ -84,9 +84,11 @@ class UserKey(models.Model):
except type(self).DoesNotExist: except type(self).DoesNotExist:
pass pass
def export_openssh(self): def export(self, format='RFC4716'):
return self.key
def export_rfc4716(self):
pubkey = pubkey_parse(self.key) pubkey = pubkey_parse(self.key)
f = format.upper()
if f == 'RFC4716':
return pubkey.format_rfc4716() return pubkey.format_rfc4716()
if f == 'PEM':
return pubkey.format_pem()
raise ValueError("Invalid format")
...@@ -267,7 +267,6 @@ class RFC4716TestCase(BaseTestCase): ...@@ -267,7 +267,6 @@ class RFC4716TestCase(BaseTestCase):
) )
key.full_clean() key.full_clean()
key.save() key.save()
self.assertEqual(key.name, 'name')
self.assertEqual(key.key.split()[:2], open(self.key1_path+'.pub').read().split()[:2]) self.assertEqual(key.key.split()[:2], open(self.key1_path+'.pub').read().split()[:2])
def test_without_comment(self): def test_without_comment(self):
...@@ -278,9 +277,36 @@ class RFC4716TestCase(BaseTestCase): ...@@ -278,9 +277,36 @@ class RFC4716TestCase(BaseTestCase):
) )
key.full_clean() key.full_clean()
key.save() key.save()
self.assertEqual(key.name, 'name')
self.assertEqual(key.key.split()[:2], open(self.key2_path+'.pub').read().split()[:2]) self.assertEqual(key.key.split()[:2], open(self.key2_path+'.pub').read().split()[:2])
class PemTestCase(BaseTestCase):
@classmethod
def setUpClass(cls):
super(PemTestCase, cls).setUpClass()
cls.user1 = User.objects.create(username='user1')
cls.key1_path = os.path.join(cls.key_dir, 'key1')
cls.key1_pem_path = os.path.join(cls.key_dir, 'key1.pem')
ssh_keygen(comment='', file=cls.key1_path)
ssh_key_export(cls.key1_path, cls.key1_pem_path, 'PEM')
@classmethod
def tearDownClass(cls):
User.objects.all().delete()
super(PemTestCase, cls).tearDownClass()
def tearDown(self):
UserKey.objects.all().delete()
def test(self):
key = UserKey(
user = self.user1,
name = 'name',
key = open(self.key1_pem_path).read(),
)
key.full_clean()
key.save()
self.assertEqual(key.key.split()[:2], open(self.key1_path+'.pub').read().split()[:2])
class UserKeyLookupTestCase(BaseTestCase): class UserKeyLookupTestCase(BaseTestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
......
...@@ -52,6 +52,21 @@ def wrap(text, width, wrap_end=None): ...@@ -52,6 +52,21 @@ def wrap(text, width, wrap_end=None):
n = m n = m
return t return t
def bin2hex(data):
return ''.join('%02x' % struct.unpack('B', b) for b in data)
hex2bin = bytearray.fromhex
def bytes2int(b):
h = ''.join('%02x' % struct.unpack('B', x) for x in b)
return int(h, 16)
def int2bytes(i):
h = '%x' % i
if len(h) & 1:
h = '0' + h
return bytearray.fromhex(h)
class PublicKeyParseError(Exception): class PublicKeyParseError(Exception):
def __init__(self, text): def __init__(self, text):
self.text = text self.text = text
...@@ -87,6 +102,30 @@ class PublicKey(object): ...@@ -87,6 +102,30 @@ class PublicKey(object):
out += '---- END SSH2 PUBLIC KEY ----' out += '---- END SSH2 PUBLIC KEY ----'
return out return out
def format_pem(self):
if self.algorithm != 'ssh-rsa':
raise TypeError("key is not a RSA key")
from pyasn1.codec.der import encoder as der_encoder
from pyasn1.type import univ
keydata = self.keydata
parts = []
while keydata:
dlen = struct.unpack('>I', keydata[:4])[0]
data, keydata = keydata[4:4+dlen], keydata[4+dlen:]
parts.append(data)
e = bytes2int(parts[1])
n = bytes2int(parts[2])
pkcs1_seq = univ.Sequence()
pkcs1_seq.setComponentByPosition(0, univ.Integer(n))
pkcs1_seq.setComponentByPosition(1, univ.Integer(e))
der = der_encoder.encode(pkcs1_seq)
out = (
'-----BEGIN RSA PUBLIC KEY-----\n' +
wrap(base64.b64encode(der), 64) + '\n' +
'-----END RSA PUBLIC KEY-----'
)
return out
def pubkey_parse_openssh(text): def pubkey_parse_openssh(text):
fields = text.split(None, 2) fields = text.split(None, 2)
if len(fields) < 2: if len(fields) < 2:
...@@ -129,6 +168,34 @@ def pubkey_parse_rfc4716(text): ...@@ -129,6 +168,34 @@ def pubkey_parse_rfc4716(text):
except TypeError: except TypeError:
raise PublicKeyParseError(text) raise PublicKeyParseError(text)
def pubkey_parse_pem(text):
from pyasn1.codec.der import decoder as der_decoder
lines = text.splitlines()
if not (
lines[0] == '-----BEGIN RSA PUBLIC KEY-----'
and lines[-1] == '-----END RSA PUBLIC KEY-----'
):
raise PublicKeyParseError(text)
der = base64.b64decode(''.join(lines[1:-1]).encode('ascii'))
pkcs1_seq = der_decoder.decode(der)
n_val = pkcs1_seq[0][0]
e_val = pkcs1_seq[0][1]
n = int2bytes(n_val)
e = int2bytes(e_val)
if n[0] & 0x80: n = b'\x00' + n
if e[0] & 0x80: e = b'\x00' + e
algorithm = 'ssh-rsa'.encode('ascii')
keydata = (
struct.pack('>I', len(algorithm)) +
algorithm +
struct.pack('>I', len(e)) +
e +
struct.pack('>I', len(n)) +
n
)
b64key = base64.b64encode(keydata).decode('ascii')
return PublicKey(b64key)
def pubkey_parse(text): def pubkey_parse(text):
lines = text.splitlines() lines = text.splitlines()
...@@ -138,6 +205,9 @@ def pubkey_parse(text): ...@@ -138,6 +205,9 @@ def pubkey_parse(text):
if lines[0] == '---- BEGIN SSH2 PUBLIC KEY ----': if lines[0] == '---- BEGIN SSH2 PUBLIC KEY ----':
return pubkey_parse_rfc4716(text) return pubkey_parse_rfc4716(text)
if lines[0] == '-----BEGIN RSA PUBLIC KEY-----':
return pubkey_parse_pem(text)
raise PublicKeyParseError(text) raise PublicKeyParseError(text)
def lookup_all(url): def lookup_all(url):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment