Commit 0bac8180 by Scott Duckworth

add support for RFC4716 public keys

Putty and some commercial SSH implementations use the RFC4716 format, or SSH2
compatible public key format, for their public keys instead of the OpenSSH
public key format.

This change allows RFC4716 public keys to be entered into the key field.  The
key will be translated to OpenSSH format before being stored.
parent 60c54a8f
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
from django.db import models from django.db import models
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django_sshkey.util import sshkey_re, sshkey_fingerprint from django_sshkey.util import SSHKeyFormatError, key_parse
class UserKey(models.Model): class UserKey(models.Model):
user = models.ForeignKey(User, db_index=True) user = models.ForeignKey(User, db_index=True)
...@@ -53,16 +53,15 @@ class UserKey(models.Model): ...@@ -53,16 +53,15 @@ class UserKey(models.Model):
self.key = self.key.strip() self.key = self.key.strip()
def clean(self): def clean(self):
m = sshkey_re.match(self.key)
errmsg = 'Key is not a valid SSH protocol 2 base64-encoded key'
if not m:
raise ValidationError(errmsg)
try: try:
self.fingerprint = sshkey_fingerprint(m.group('b64key')) type, b64key, comment, self.fingerprint = key_parse(self.key)
except TypeError: if comment:
raise ValidationError(errmsg) self.key = "%s %s %s" % (type.decode(), b64key.decode(), comment)
else:
self.key = "%s %s" % (type.decode(), b64key.decode())
except SSHKeyFormatError as e:
raise ValidationError(str(e))
if not self.name: if not self.name:
comment = m.group('comment')
if not comment: if not comment:
raise ValidationError('Name or key comment required') raise ValidationError('Name or key comment required')
self.name = comment self.name = comment
......
...@@ -50,6 +50,11 @@ def ssh_keygen(type=None, passphrase='', comment=None, file=None): ...@@ -50,6 +50,11 @@ def ssh_keygen(type=None, passphrase='', comment=None, file=None):
cmd += ['-f', file] cmd += ['-f', file]
subprocess.check_call(cmd) subprocess.check_call(cmd)
def ssh_key_export(input_file, output_file, format='RFC4716'):
cmd = ['ssh-keygen', '-e', '-m', format, '-f', input_file]
with open(output_file, 'wb') as f:
subprocess.check_call(cmd, stdout=f)
def ssh_fingerprint(pubkey_path): def ssh_fingerprint(pubkey_path):
cmd = ['ssh-keygen', '-lf', pubkey_path] cmd = ['ssh-keygen', '-lf', pubkey_path]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE) p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
...@@ -230,6 +235,52 @@ class UserKeyCreationTestCase(BaseTestCase): ...@@ -230,6 +235,52 @@ class UserKeyCreationTestCase(BaseTestCase):
) )
self.assertRaises(ValidationError, key2.full_clean) self.assertRaises(ValidationError, key2.full_clean)
class RFC4716TestCase(BaseTestCase):
@classmethod
def setUpClass(cls):
super(RFC4716TestCase, cls).setUpClass()
cls.user1 = User.objects.create(username='user1')
# key1 has a comment
cls.key1_path = os.path.join(cls.key_dir, 'key1')
cls.key1_rfc4716_path = os.path.join(cls.key_dir, 'key1.rfc4716')
ssh_keygen(comment='comment', file=cls.key1_path)
ssh_key_export(cls.key1_path, cls.key1_rfc4716_path, 'RFC4716')
# key2 does not have a comment
cls.key2_path = os.path.join(cls.key_dir, 'key2')
cls.key2_rfc4716_path = os.path.join(cls.key_dir, 'key2.rfc4716')
ssh_keygen(comment='', file=cls.key2_path)
ssh_key_export(cls.key2_path, cls.key2_rfc4716_path, 'RFC4716')
@classmethod
def tearDownClass(cls):
User.objects.all().delete()
super(RFC4716TestCase, cls).tearDownClass()
def tearDown(self):
UserKey.objects.all().delete()
def test_with_comment(self):
key = UserKey(
user = self.user1,
name = 'name',
key = open(self.key1_rfc4716_path).read(),
)
key.full_clean()
key.save()
self.assertEqual(key.name, 'name')
self.assertEqual(key.key.split()[:2], open(self.key1_path+'.pub').read().split()[:2])
def test_without_comment(self):
key = UserKey(
user = self.user1,
name = 'name',
key = open(self.key2_rfc4716_path).read(),
)
key.full_clean()
key.save()
self.assertEqual(key.name, 'name')
self.assertEqual(key.key.split()[:2], open(self.key2_path+'.pub').read().split()[:2])
class UserKeyLookupTestCase(BaseTestCase): class UserKeyLookupTestCase(BaseTestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
......
...@@ -26,18 +26,72 @@ ...@@ -26,18 +26,72 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import re
SSHKEY_LOOKUP_URL_DEFAULT = 'http://localhost:8000/sshkey/lookup' SSHKEY_LOOKUP_URL_DEFAULT = 'http://localhost:8000/sshkey/lookup'
sshkey_re = re.compile(r'(?P<type>[\w-]+)\s+(?P<b64key>\S+)(?:\s+(?P<comment>\S.+))?$') class SSHKeyFormatError(Exception):
def __init__(self, text):
self.text = text
def __str__(self):
return "Unrecognized public key format"
def sshkey_fingerprint(b64key): def key_parse(text):
import base64 import base64
import hashlib import hashlib
key = base64.b64decode(b64key) import struct
fp_plain = hashlib.md5(key).hexdigest() lines = text.splitlines()
return ':'.join(a+b for a,b in zip(fp_plain[::2], fp_plain[1::2]))
# OpenSSH public key
if len(lines) == 1 and text.startswith(b'ssh-'):
fields = text.split(None, 2)
if len(fields) < 2:
raise SSHKeyFormatError(text)
type = fields[0]
b64key = fields[1]
comment = None
if len(fields) == 3:
comment = fields[2]
try:
key = base64.b64decode(b64key)
except TypeError:
raise SSHKeyFormatError(text)
# SSH2 public key
elif (
lines[0] == b'---- BEGIN SSH2 PUBLIC KEY ----'
and lines[-1] == b'---- END SSH2 PUBLIC KEY ----'
):
b64key = b''
headers = {}
lines = lines[1:-1]
while lines:
line = lines.pop(0)
if b':' in line:
while line[-1] == b'\\':
line = line[:-1] + lines.pop(0)
k,v = line.split(b':', 1)
headers[k.lower().decode('ascii')] = v.lstrip().decode('utf-8')
else:
b64key += line
comment = headers.get('comment')
if comment and comment[0] in ('"', "'") and comment[0] == comment[-1]:
comment = comment[1:-1]
try:
key = base64.b64decode(b64key)
except TypeError:
raise SSHKeyFormatError(text)
if len(key) < 4:
raise SSHKeyFormatError(text)
n = struct.unpack('>I', key[:4])
type = key[4:4+n[0]]
# unrecognized format
else:
raise SSHKeyFormatError(text)
fp = hashlib.md5(key).hexdigest()
fp = ':'.join(a+b for a,b in zip(fp[::2], fp[1::2]))
return (type, b64key, comment, fp)
def lookup_all(url): def lookup_all(url):
import urllib import urllib
...@@ -87,13 +141,11 @@ def lookup_by_fingerprint_main(): ...@@ -87,13 +141,11 @@ def lookup_by_fingerprint_main():
"Error: cannot retrieve fingerprint from environment or stdin\n" "Error: cannot retrieve fingerprint from environment or stdin\n"
) )
sys.exit(1) sys.exit(1)
m = sshkey_re.match(key) try:
if not m: type, b64key, comment, fingerprint = key_parse(key)
sys.stderr.write( except SSHKeyFormatError as e:
"Error: cannot parse SSH protocol 2 base64-encoded key" sys.stderr.write("Error: " + str(e))
)
sys.exit(1) sys.exit(1)
fingerprint = sshkey_fingerprint(m.group('b64key'))
url = getenv('SSHKEY_LOOKUP_URL', SSHKEY_LOOKUP_URL_DEFAULT) url = getenv('SSHKEY_LOOKUP_URL', SSHKEY_LOOKUP_URL_DEFAULT)
for key in lookup_by_fingerprint(url, fingerprint): for key in lookup_by_fingerprint(url, fingerprint):
sys.stdout.write(key) sys.stdout.write(key)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment