Commit 05244d65 by Scott Duckworth

Merge branch 'release/2.3.0'

parents bff6b020 3010e7d9
......@@ -21,16 +21,29 @@ should be a string containing options accepted by sshd, with ``{username}``
being replaced with the username of the user associated with the incoming
public key.
django-sshkey can also help you keep track of when a key was last used.
``SSHKEY_AUTHORIZED_KEYS_OPTIONS`` also replaces ``{key_id}`` with the key's
id. The command that is run can then notify django-sshkey that the key was used
by issuing a HTTP POST to the lookup URL, placing the key_id in the request
body.
For instance::
SSHKEY_AUTHORIZED_KEYS_OPTIONS = 'command="my-command {username}",no-pty'
SSHKEY_AUTHORIZED_KEYS_OPTIONS = 'command="my-command {username} {key_id}",no-pty'
in settings.py will cause keys produced by the below commands to look similar
to::
command="my-command fred",no-pty ssh-rsa AAAAB3NzaC1yc2E...
command="my-command fred 15",no-pty ssh-rsa AAAAB3NzaC1yc2E...
sshd would then verify the key is correct and run ``my-command``.
``my-command`` would then know that this is fred and that he is using key 15,
and could tell django-sshkey to update the last_used field of that key by
running the equivalent of this command::
curl -d 15 http://localhost:8000/sshkey/lookup
assuming the key ``AAAAB3NzaC1yc2E...`` is owned by fred.
Your URL may vary depending upon your configuration.
URL Configuration
-----------------
......@@ -57,6 +70,52 @@ mapping.
and only the systems that need to run the lookup commands should have access
to it.
Settings
--------
``SSHKEY_AUTHORIZED_KEYS_OPTIONS``
String, optional. Defines the SSH options that will be prepended to each
public key. ``{username}`` will be replaced by the username; ``{key_id}``
will be replaced by the key's id. New in version 2.3.
``SSHKEY_ALLOW_EDIT``
Boolean, defaults to ``False``. Whether or not editing keys is allowed.
Note that no email will be sent in any case when a key is edited, hence the
reason that editing keys is disabled by default. New in version 2.3.
``SSHKEY_EMAIL_ADD_KEY``
Boolean, defaults to ``True``. Whether or not an email should be sent to the
user when a new key is added to their account. New in version 2.3.
``SSHKEY_EMAIL_ADD_KEY_SUBJECT``
String, defaults to ``"A new key was added to your account"``. The subject of
the email that gets sent out when a new key is added. New in version 2.3.
``SSHKEY_FROM_EMAIL``
String, defaults to ``DEFAULT_FROM_EMAIL``. New in version 2.3.
``SSHKEY_SEND_HTML_EMAIL``
Boolean, defaults to ``False``. Whether or not multipart HTML emails should
be sent. New in version 2.3.
Templates
---------
Example templates are available in the ``templates.example`` directory.
``sshkey/userkey_list.html``
Used when listing a user's keys.
``sshkey/userkey_detail.html``
Used when adding or editing a user's keys.
``sshkey/add_key.txt``
The plain text body of the email sent when a new key is added. New in version
2.3.
``sshkey/add_key.html``
The HTML body of the email sent when a new key is added. New in version 2.3.
Tying OpenSSH to django-sshkey
==============================
......@@ -83,6 +142,39 @@ slower. To use the variants, replace ``lookup`` with ``pylookup``. For
example, use ``django-sshkey-pylookup-all`` instead of
``django-sshkey-lookup-all``.
Using ``django-sshkey-lookup``
------------------------------
::
Usage: django-sshkey-lookup -a URL
django-sshkey-lookup -u URL USERNAME
django-sshkey-lookup -f URL FINGERPRINT
django-sshkey-lookup URL [USERNAME]
This program has different modes of operation:
``-a``
Print all public keys.
``-u``
Print all public keys owned by the specified user.
``-f``
Print all public keys matching the specified fingerprint.
Default
Compatibility mode. If the username parameter is given then print all public
keys owned by the specified user; otherwise perform the same functionality as
``django-sshkey-lookup-by-fingerprint`` (see below).
All modes expect that the lookup URL be specified as the first non-option
parameter.
This command is compatible with the old script ``lookup.sh`` but was renamed
to have a less ambiguous name when installed system-wide. A symlink is left in
its place for backwards compatibility.
Using ``django-sshkey-lookup-all``
----------------------------------
......@@ -151,20 +243,6 @@ This program:
* is ideal if you want all Django users to access SSH via a shared system user
account and be identified by their SSH public key.
Using ``django-sshkey-lookup``
------------------------------
``Usage: django-sshkey-lookup URL [USERNAME]``
This program is a wrapper around the previous two commands. The first
parameter is placed in the ``SSHKEY_LOOKUP_URL`` environment variable. If the
second parameter is present then ``django-sshkey-lookup-by-username`` is
executed; otherwise ``django-sshkey-lookup-by-fingerprint`` is executed.
This command is compatible with the old script ``lookup.sh`` but was renamed
to have a less ambiguous name when installed system-wide. A symlink is left in
its place for backwards compatibility.
.. _OpenSSH: http://www.openssh.com/
.. _openssh-akcenv: https://github.com/ScottDuckworth/openssh-akcenv
.. _openssh-stdinkey: https://github.com/ScottDuckworth/openssh-stdinkey
......@@ -17,7 +17,9 @@ The following table maps django-sshkey version to migration labels:
+---------+---------------+-------+------------------------------------------+
| 1.1 | sshkey | 0002 | |
+---------+---------------+-------+------------------------------------------+
| 2.0+ | django_sshkey | 0001 | See Upgrading from 1.1.x to 2.x below |
| 2.0-2.2 | django_sshkey | 0001 | See Upgrading from 1.1.x to 2.x below |
+---------+---------------+-------+------------------------------------------+
| 2.3 | django_sshkey | 0002 | |
+---------+---------------+-------+------------------------------------------+
To upgrade, install the new version of django-sshkey and then migrate your
......
......@@ -27,14 +27,70 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
usage() {
echo "Usage: $0 -a URL"
echo " $0 -u URL USERNAME"
echo " $0 -f URL FINGERPRINT"
echo " $0 URL [USERNAME]"
}
mode=x
while getopts ':hafu' opt; do
case $opt in
h)
usage
exit 0
;;
a) mode=a ;;
f) mode=f ;;
u) mode=u ;;
[?])
exec 1>&2
echo "Invalid option: -$OPTARG"
usage
exit 1
;;
esac
done
shift $(($OPTIND-1))
if [ $# -eq 0 ]; then
echo "Usage: $0 URL [USERNAME]" >&2
usage >&2
exit 1
fi
SSHKEY_LOOKUP_URL="$1"
export SSHKEY_LOOKUP_URL
if [ $# -eq 1 ]; then
url="$1"
case $mode in
a)
query=
;;
f)
if [ $# -lt 2 ]; then
usage >&2
exit 1
fi
query="fingerprint=$2"
;;
u)
if [ $# -lt 2 ]; then
usage >&2
exit 1
fi
query="username=$2"
;;
x)
if [ $# -eq 1 ]; then
SSHKEY_LOOKUP_URL="${url}"
export SSHKEY_LOOKUP_URL
exec `dirname $0`/django-sshkey-lookup-by-fingerprint
else
query="username=$2"
fi
;;
esac
if type curl >/dev/null 2>&1; then
exec curl -s -G "${url}" --data-urlencode "${query}"
else
exec `dirname $0`/django-sshkey-lookup-by-username "$2"
exec wget -q -O - "${url}?${query}"
fi
......@@ -26,4 +26,4 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
__version__ = '2.2.0'
__version__ = '2.3.0'
......@@ -37,6 +37,7 @@ class UserKeyAdmin(admin.ModelAdmin):
'fingerprint',
'created',
'last_modified',
'last_used',
]
search_fields = [
'user__username',
......@@ -45,6 +46,7 @@ class UserKeyAdmin(admin.ModelAdmin):
'fingerprint',
'created',
'last_modified',
'last_used',
]
admin.site.register(UserKey, UserKeyAdmin)
......@@ -33,3 +33,14 @@ class UserKeyForm(forms.ModelForm):
class Meta:
model = UserKey
fields = ['name', 'key']
widgets = {
'name': forms.TextInput(attrs={
'size': 50,
'placeholder': "username@hostname, or leave blank to use key comment",
}),
'key': forms.Textarea(attrs={
'cols': 72,
'rows': 15,
'placeholder': "Paste in the contents of your public key file here",
}),
}
# encoding: utf-8
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding field 'UserKey.last_used'
db.add_column('sshkey_userkey', 'last_used', self.gf('django.db.models.fields.DateTimeField')(null=True), keep_default=False)
def backwards(self, orm):
# Deleting field 'UserKey.last_used'
db.delete_column('sshkey_userkey', 'last_used')
models = {
'auth.group': {
'Meta': {'object_name': 'Group'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}),
'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'})
},
'auth.permission': {
'Meta': {'ordering': "('content_type__app_label', 'content_type__model', 'codename')", 'unique_together': "(('content_type', 'codename'),)", 'object_name': 'Permission'},
'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['contenttypes.ContentType']"}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
},
'auth.user': {
'Meta': {'object_name': 'User'},
'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}),
'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}),
'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'})
},
'contenttypes.contenttype': {
'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"},
'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '100'})
},
'django_sshkey.userkey': {
'Meta': {'unique_together': "[('user', 'name')]", 'object_name': 'UserKey', 'db_table': "'sshkey_userkey'"},
'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'null': 'True', 'blank': 'True'}),
'fingerprint': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '47', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'key': ('django.db.models.fields.TextField', [], {'max_length': '2000'}),
'last_modified': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'null': 'True', 'blank': 'True'}),
'last_used': ('django.db.models.fields.DateTimeField', [], {'null': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'blank': 'True'}),
'user': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']"})
}
}
complete_apps = ['django_sshkey']
......@@ -26,10 +26,15 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import datetime
from django.db import models
from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django_sshkey.util import sshkey_re, sshkey_fingerprint
from django.db.models.signals import pre_save
from django.dispatch import receiver
from django_sshkey.util import PublicKeyParseError, pubkey_parse
from django_sshkey import settings
class UserKey(models.Model):
user = models.ForeignKey(User, db_index=True)
......@@ -37,7 +42,8 @@ class UserKey(models.Model):
key = models.TextField(max_length=2000)
fingerprint = models.CharField(max_length=47, blank=True, db_index=True)
created = models.DateTimeField(auto_now_add=True, null=True)
last_modified = models.DateTimeField(auto_now=True, null=True)
last_modified = models.DateTimeField(null=True)
last_used = models.DateTimeField(null=True)
class Meta:
db_table = 'sshkey_userkey'
......@@ -53,19 +59,16 @@ class UserKey(models.Model):
self.key = self.key.strip()
def clean(self):
m = sshkey_re.match(self.key)
errmsg = 'Key is not a valid SSH protocol 2 base64-encoded key'
if not m:
raise ValidationError(errmsg)
try:
self.fingerprint = sshkey_fingerprint(m.group('b64key'))
except TypeError:
raise ValidationError(errmsg)
pubkey = pubkey_parse(self.key)
except PublicKeyParseError as e:
raise ValidationError(str(e))
self.key = pubkey.format_openssh()
self.fingerprint = pubkey.fingerprint()
if not self.name:
comment = m.group('comment')
if not comment:
if not pubkey.comment:
raise ValidationError('Name or key comment required')
self.name = comment
self.name = pubkey.comment
def validate_unique(self, exclude=None):
if self.pk is None:
......@@ -86,3 +89,48 @@ class UserKey(models.Model):
raise ValidationError({'key': [message]})
except type(self).DoesNotExist:
pass
def export(self, format='RFC4716'):
pubkey = pubkey_parse(self.key)
f = format.upper()
if f == 'RFC4716':
return pubkey.format_rfc4716()
if f == 'PEM':
return pubkey.format_pem()
raise ValueError("Invalid format")
def save(self, *args, **kwargs):
if kwargs.pop('update_last_modified', True):
self.last_modified = datetime.datetime.now()
super(UserKey, self).save(*args, **kwargs)
def touch(self):
self.last_used = datetime.datetime.now()
self.save(update_last_modified=False)
@receiver(pre_save, sender=UserKey)
def send_email_add_key(sender, instance, **kwargs):
if not settings.SSHKEY_EMAIL_ADD_KEY or instance.pk:
return
from django.template.loader import render_to_string
from django.core.mail import EmailMultiAlternatives
from django.core.urlresolvers import reverse
context_dict = {
'key': instance,
'subject': settings.SSHKEY_EMAIL_ADD_KEY_SUBJECT,
}
request = getattr(instance, 'request', None)
if request:
context_dict['request'] = request
context_dict['userkey_list_uri'] = request.build_absolute_uri(reverse('django_sshkey.views.userkey_list'))
text_content = render_to_string('sshkey/add_key.txt', context_dict)
msg = EmailMultiAlternatives(
settings.SSHKEY_EMAIL_ADD_KEY_SUBJECT,
text_content,
settings.SSHKEY_FROM_EMAIL,
[instance.user.email],
)
if settings.SSHKEY_SEND_HTML_EMAIL:
html_content = render_to_string('sshkey/add_key.html', context_dict)
msg.attach_alternative(html_content, 'text/html')
msg.send()
......@@ -29,13 +29,10 @@
from django.conf import settings
SSHKEY_AUTHORIZED_KEYS_OPTIONS = getattr(settings, 'SSHKEY_AUTHORIZED_KEYS_OPTIONS', None)
SSHKEY_AUTHORIZED_KEYS_COMMAND = getattr(settings, 'SSHKEY_AUTHORIZED_KEYS_COMMAND', None)
if SSHKEY_AUTHORIZED_KEYS_COMMAND is not None:
import warnings
with warnings.catch_warnings():
import warnings
warnings.simplefilter('default', DeprecationWarning)
warnings.warn(
'SSHKEY_AUTHORIZED_KEYS_COMMAND has been deprecated; '
'use SSHKEY_AUTHORIZED_KEYS_OPTIONS instead.',
DeprecationWarning)
SSHKEY_ALLOW_EDIT = getattr(settings, 'SSHKEY_ALLOW_EDIT', False)
SSHKEY_EMAIL_ADD_KEY = getattr(settings, 'SSHKEY_EMAIL_ADD_KEY', True)
SSHKEY_EMAIL_ADD_KEY_SUBJECT = getattr(settings, 'SSHKEY_EMAIL_ADD_KEY_SUBJECT',
"A new public key was added to your account"
)
SSHKEY_FROM_EMAIL = getattr(settings, 'SSHKEY_FROM_EMAIL', settings.DEFAULT_FROM_EMAIL)
SSHKEY_SEND_HTML_EMAIL = getattr(settings, 'SSHKEY_SEND_HTML_EMAIL', False)
<html>
<body>
<p>{{ key.user.first_name }},</p>
<p>The following SSH public key was added to your account
{% if request.META.SERVER_NAME %}
on {{ request.META.SERVER_NAME }}
{% endif %}
{% if request.META.REMOTE_ADDR %}
from {{ request.META.REMOTE_ADDR }}{% if request.META.REMOTE_HOST %}
({{ request.META.REMOTE_HOST }}){% endif %}{% endif %}:</p>
<p>
Name: {{ key.name }}<br/>
Fingerprint: {{ key.fingerprint }}
</p>
<p><b>If you believe this key was added in error then you should
<a href="{{ userkey_list_uri }}">click here</a> and delete the key.</b></p>
</body>
</html>
{{ key.user.first_name }},
The following SSH public key was added to your account{% if request.META.SERVER_NAME %} on {{ request.META.SERVER_NAME }}{% endif %}{% if request.META.REMOTE_ADDR %}
from {{ request.META.REMOTE_ADDR }}{% if request.META.REMOTE_HOST %} ({{ request.META.REMOTE_HOST }}){% endif %}{% endif %}:
Name: {{ key.name }}
Fingerprint: {{ key.fingerprint }}
If you believe this key was added in error then you should go to
{{ userkey_list_uri }} and delete the key.
<h1>My Keys</h1>
{% if messages %}
<ul class="messages">
{% for message in messages %}
<li{% if message.tags %} class="{{ message.tags }}"{% endif %}>{{ message }}</li>
{% endfor %}
</ul>
{% endif %}
<p><a href="{% url django_sshkey.views.userkey_add %}">Add Key</a></p>
<table>
<tr>
<th>Key</th>
<th>Fingerprint</th>
<th>Created</th>
{% if allow_edit %}
<th>Last Modified</th>
{% endif %}
<th>Last Used</th>
<th></th>
</tr>
{% for userkey in userkey_list %}
<tr>
<td>{{ userkey.name }}</td>
<td>{{ userkey.fingerprint }}</td>
<td>{{ userkey.created }}</td>
<td>{{ userkey.last_modified }}</td>
<td><a href="{% url django_sshkey.views.userkey_edit userkey.pk %}">Edit</a></td>
<td><a href="{% url django_sshkey.views.userkey_delete userkey.pk %}">Delete</a></td>
<td>{{ userkey.created|default:"unknown" }}</td>
{% if allow_edit %}
<td>{{ userkey.last_modified|default:"unknown" }}</td>
{% endif %}
<td>{{ userkey.last_used|default:"never" }}</td>
<td>
{% if allow_edit %}
<a href="{% url django_sshkey.views.userkey_edit userkey.pk %}">Edit</a>
{% endif %}
<a href="{% url django_sshkey.views.userkey_delete userkey.pk %}">Delete</a>
</td>
</tr>
{% endfor %}
</table>
......@@ -50,6 +50,16 @@ def ssh_keygen(type=None, passphrase='', comment=None, file=None):
cmd += ['-f', file]
subprocess.check_call(cmd)
def ssh_key_export(input_path, output_path, format='RFC4716'):
cmd = ['ssh-keygen', '-e', '-m', format, '-f', input_path]
with open(output_path, 'wb') as f:
subprocess.check_call(cmd, stdout=f)
def ssh_key_import(input_path, output_path, format='RFC4716'):
cmd = ['ssh-keygen', '-i', '-m', format, '-f', input_path]
with open(output_path, 'wb') as f:
subprocess.check_call(cmd, stdout=f)
def ssh_fingerprint(pubkey_path):
cmd = ['ssh-keygen', '-lf', pubkey_path]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
......@@ -169,6 +179,21 @@ class UserKeyCreationTestCase(BaseTestCase):
key.save()
self.assertEqual(key.fingerprint, fingerprint)
def test_touch(self):
import datetime
key = UserKey(
user = self.user1,
name = 'name',
key = open(self.key1_path+'.pub').read(),
)
key.full_clean()
key.save()
self.assertIsNone(key.last_used)
key.touch()
key.save()
self.assertIsInstance(key.last_used, datetime.datetime)
key.touch()
def test_same_name_same_user(self):
key1 = UserKey(
user = self.user1,
......@@ -230,44 +255,146 @@ class UserKeyCreationTestCase(BaseTestCase):
)
self.assertRaises(ValidationError, key2.full_clean)
class RFC4716TestCase(BaseTestCase):
@classmethod
def setUpClass(cls):
super(RFC4716TestCase, cls).setUpClass()
cls.user1 = User.objects.create(username='user1')
# key1 has a comment
cls.key1_path = os.path.join(cls.key_dir, 'key1')
cls.key1_rfc4716_path = os.path.join(cls.key_dir, 'key1.rfc4716')
ssh_keygen(comment='comment', file=cls.key1_path)
ssh_key_export(cls.key1_path, cls.key1_rfc4716_path, 'RFC4716')
# key2 does not have a comment
cls.key2_path = os.path.join(cls.key_dir, 'key2')
cls.key2_rfc4716_path = os.path.join(cls.key_dir, 'key2.rfc4716')
ssh_keygen(comment='', file=cls.key2_path)
ssh_key_export(cls.key2_path, cls.key2_rfc4716_path, 'RFC4716')
@classmethod
def tearDownClass(cls):
User.objects.all().delete()
super(RFC4716TestCase, cls).tearDownClass()
def tearDown(self):
UserKey.objects.all().delete()
def test_import_with_comment(self):
key = UserKey(
user = self.user1,
name = 'name',
key = open(self.key1_rfc4716_path).read(),
)
key.full_clean()
key.save()
self.assertEqual(key.key.split()[:2], open(self.key1_path+'.pub').read().split()[:2])
def test_import_without_comment(self):
key = UserKey(
user = self.user1,
name = 'name',
key = open(self.key2_rfc4716_path).read(),
)
key.full_clean()
key.save()
self.assertEqual(key.key.split()[:2], open(self.key2_path+'.pub').read().split()[:2])
def test_export(self):
key = UserKey(
user = self.user1,
name = 'name',
key = open(self.key1_path+'.pub').read(),
)
key.full_clean()
key.save()
export_path = os.path.join(self.key_dir, 'export')
import_path = os.path.join(self.key_dir, 'import')
with open(export_path, 'w') as f:
f.write(key.export('RFC4716'))
ssh_key_import(export_path, import_path, 'RFC4716')
self.assertEqual(open(import_path).read().split()[:2], open(self.key1_path+'.pub').read().split()[:2])
class PemTestCase(BaseTestCase):
@classmethod
def setUpClass(cls):
super(PemTestCase, cls).setUpClass()
cls.user1 = User.objects.create(username='user1')
cls.key1_path = os.path.join(cls.key_dir, 'key1')
cls.key1_pem_path = os.path.join(cls.key_dir, 'key1.pem')
ssh_keygen(comment='', file=cls.key1_path)
ssh_key_export(cls.key1_path, cls.key1_pem_path, 'PEM')
@classmethod
def tearDownClass(cls):
User.objects.all().delete()
super(PemTestCase, cls).tearDownClass()
def tearDown(self):
UserKey.objects.all().delete()
def test_import(self):
key = UserKey(
user = self.user1,
name = 'name',
key = open(self.key1_pem_path).read(),
)
key.full_clean()
key.save()
self.assertEqual(key.key.split()[:2], open(self.key1_path+'.pub').read().split()[:2])
def test_export(self):
key = UserKey(
user = self.user1,
name = 'name',
key = open(self.key1_path+'.pub').read(),
)
key.full_clean()
key.save()
export_path = os.path.join(self.key_dir, 'export')
import_path = os.path.join(self.key_dir, 'import')
with open(export_path, 'w') as f:
f.write(key.export('PEM'))
ssh_key_import(export_path, import_path, 'PEM')
self.assertEqual(open(import_path).read().split()[:2], open(self.key1_path+'.pub').read().split()[:2])
class UserKeyLookupTestCase(BaseTestCase):
@classmethod
def setUpClass(cls):
super(UserKeyLookupTestCase, cls).setUpClass()
cls.original_options = settings.SSHKEY_AUTHORIZED_KEYS_OPTIONS
settings.SSHKEY_AUTHORIZED_KEYS_OPTIONS = 'command="{username}"'
settings.SSHKEY_AUTHORIZED_KEYS_OPTIONS = 'command="{username} {key_id}"'
cls.user1 = User.objects.create(username='user1')
cls.user2 = User.objects.create(username='user2')
cls.key1_path = os.path.join(cls.key_dir, 'key1')
ssh_keygen(file=cls.key1_path)
key1 = UserKey(
cls.key1 = UserKey(
user = cls.user1,
name = 'key1',
key = open(cls.key1_path+'.pub').read(),
)
key1.full_clean()
key1.save()
cls.key1.full_clean()
cls.key1.save()
cls.key2_path = os.path.join(cls.key_dir, 'key2')
ssh_keygen(file=cls.key2_path)
key2 = UserKey(
cls.key2 = UserKey(
user = cls.user1,
name = 'key2',
key = open(cls.key2_path+'.pub').read(),
)
key2.full_clean()
key2.save()
cls.key2.full_clean()
cls.key2.save()
cls.key3_path = os.path.join(cls.key_dir, 'key3')
ssh_keygen(file=cls.key3_path)
key3 = UserKey(
cls.key3 = UserKey(
user = cls.user2,
name = 'key3',
key = open(cls.key3_path+'.pub').read(),
)
key3.full_clean()
key3.save()
cls.key3.full_clean()
cls.key3.save()
cls.key4_path = os.path.join(cls.key_dir, 'key4')
ssh_keygen(file=cls.key4_path)
......@@ -287,9 +414,9 @@ class UserKeyLookupTestCase(BaseTestCase):
self.assertEqual(response['Content-Type'], 'text/plain')
actual_content = set(response.content.strip().split('\n'))
correct_content = set((
'command="user1" ' + open(self.key1_path + '.pub').read().strip(),
'command="user1" ' + open(self.key2_path + '.pub').read().strip(),
'command="user2" ' + open(self.key3_path + '.pub').read().strip(),
'command="user1 %s" %s' % (self.key1.id, open(self.key1_path + '.pub').read().strip()),
'command="user1 %s" %s' % (self.key2.id, open(self.key2_path + '.pub').read().strip()),
'command="user2 %s" %s' % (self.key3.id, open(self.key3_path + '.pub').read().strip()),
))
self.assertEqual(actual_content, correct_content)
......@@ -304,7 +431,7 @@ class UserKeyLookupTestCase(BaseTestCase):
username = self.user1.username
actual_content = set(response.content.strip().split('\n'))
correct_content = set((
'command="user1" ' + open(self.key1_path + '.pub').read().strip(),
'command="user1 %s" %s' % (self.key1.id, open(self.key1_path + '.pub').read().strip()),
))
self.assertEqual(actual_content, correct_content)
......@@ -319,7 +446,7 @@ class UserKeyLookupTestCase(BaseTestCase):
body = open(self.key1_path + '.pub').read().strip()
actual_content = set(response.content.strip().split('\n'))
correct_content = set((
'command="user2" ' + open(self.key3_path + '.pub').read().strip(),
'command="user2 %s" %s' % (self.key3.id, open(self.key3_path + '.pub').read().strip()),
))
self.assertEqual(actual_content, correct_content)
......@@ -334,8 +461,8 @@ class UserKeyLookupTestCase(BaseTestCase):
body = open(self.key1_path + '.pub').read().strip()
actual_content = set(response.content.strip().split('\n'))
correct_content = set((
'command="user1" ' + open(self.key1_path + '.pub').read().strip(),
'command="user1" ' + open(self.key2_path + '.pub').read().strip(),
'command="user1 %s" %s' % (self.key1.id, open(self.key1_path + '.pub').read().strip()),
'command="user1 %s" %s' % (self.key2.id, open(self.key2_path + '.pub').read().strip()),
))
self.assertEqual(actual_content, correct_content)
......
......@@ -26,18 +26,184 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import re
import base64
import binascii
import struct
SSHKEY_LOOKUP_URL_DEFAULT = 'http://localhost:8000/sshkey/lookup'
sshkey_re = re.compile(r'(?P<type>[\w-]+)\s+(?P<b64key>\S+)(?:\s+(?P<comment>\S.+))?$')
def wrap(text, width, wrap_end=None):
n = 0
t = ''
if wrap_end is None:
while n < len(text):
m = n + width
t += text[n:m]
if len(text) <= m:
return t
t += '\n'
n = m
else:
while n < len(text):
m = n + width
if len(text) <= m:
return t + text[n:m]
m -= len(wrap_end)
t += text[n:m] + wrap_end + '\n'
n = m
return t
def bytes2int(b):
h = binascii.hexlify(b)
return int(h, 16)
def int2bytes(i):
h = '%x' % i
if len(h) & 1:
h = '0' + h
return bytearray.fromhex(h)
class PublicKeyParseError(Exception):
def __init__(self, text):
self.text = text
def sshkey_fingerprint(b64key):
import base64
def __str__(self):
return "Unrecognized public key format"
class PublicKey(object):
def __init__(self, b64key, comment=None):
self.b64key = b64key
self.comment = comment
keydata = base64.b64decode(b64key.encode('ascii'))
self.keydata = keydata
self.parts = []
while keydata:
dlen = struct.unpack('>I', keydata[:4])[0]
data, keydata = keydata[4:4+dlen], keydata[4+dlen:]
self.parts.append(data)
self.algorithm = self.parts[0]
def fingerprint(self):
import hashlib
key = base64.b64decode(b64key)
fp_plain = hashlib.md5(key).hexdigest()
return ':'.join(a+b for a,b in zip(fp_plain[::2], fp_plain[1::2]))
fp = hashlib.md5(self.keydata).hexdigest()
return ':'.join(a+b for a,b in zip(fp[::2], fp[1::2]))
def format_openssh(self):
out = self.algorithm + ' ' + self.b64key
if self.comment:
out += ' ' + self.comment
return out
def format_rfc4716(self):
out = '---- BEGIN SSH2 PUBLIC KEY ----\n'
if self.comment:
comment = 'Comment: "%s"' % self.comment
out += wrap(comment, 72, '\\') + '\n'
out += wrap(self.b64key, 72) + '\n'
out += '---- END SSH2 PUBLIC KEY ----'
return out
def format_pem(self):
if self.algorithm != 'ssh-rsa' and len(self.parts) == 3:
raise TypeError("key is not a valid RSA key")
from pyasn1.codec.der import encoder as der_encoder
from pyasn1.type import univ
e = bytes2int(self.parts[1])
n = bytes2int(self.parts[2])
pkcs1_seq = univ.Sequence()
pkcs1_seq.setComponentByPosition(0, univ.Integer(n))
pkcs1_seq.setComponentByPosition(1, univ.Integer(e))
der = der_encoder.encode(pkcs1_seq)
out = (
'-----BEGIN RSA PUBLIC KEY-----\n' +
wrap(base64.b64encode(der), 64) + '\n' +
'-----END RSA PUBLIC KEY-----'
)
return out
def pubkey_parse_openssh(text):
fields = text.split(None, 2)
if len(fields) < 2:
raise PublicKeyParseError(text)
try:
if len(fields) == 2:
key = PublicKey(fields[1])
else:
key = PublicKey(fields[1], fields[2])
except TypeError:
raise PublicKeyParseError(text)
if fields[0] != key.algorithm:
raise PublicKeyParseError(text)
return key
def pubkey_parse_rfc4716(text):
lines = text.splitlines()
if not (
lines[0] == '---- BEGIN SSH2 PUBLIC KEY ----'
and lines[-1] == '---- END SSH2 PUBLIC KEY ----'
):
raise PublicKeyParseError(text)
lines = lines[1:-1]
b64key = ''
headers = {}
while lines:
line = lines.pop(0)
if ':' in line:
while line[-1] == '\\':
line = line[:-1] + lines.pop(0)
k,v = line.split(':', 1)
headers[k.lower()] = v.lstrip()
else:
b64key += line
comment = headers.get('comment')
if comment and comment[0] in ('"', "'") and comment[0] == comment[-1]:
comment = comment[1:-1]
try:
return PublicKey(b64key, comment)
except TypeError:
raise PublicKeyParseError(text)
def pubkey_parse_pem(text):
from pyasn1.codec.der import decoder as der_decoder
lines = text.splitlines()
if not (
lines[0] == '-----BEGIN RSA PUBLIC KEY-----'
and lines[-1] == '-----END RSA PUBLIC KEY-----'
):
raise PublicKeyParseError(text)
der = base64.b64decode(''.join(lines[1:-1]).encode('ascii'))
pkcs1_seq = der_decoder.decode(der)
n_val = pkcs1_seq[0][0]
e_val = pkcs1_seq[0][1]
n = int2bytes(n_val)
e = int2bytes(e_val)
if n[0] & 0x80: n = b'\x00' + n
if e[0] & 0x80: e = b'\x00' + e
algorithm = 'ssh-rsa'.encode('ascii')
keydata = (
struct.pack('>I', len(algorithm)) +
algorithm +
struct.pack('>I', len(e)) +
e +
struct.pack('>I', len(n)) +
n
)
b64key = base64.b64encode(keydata).decode('ascii')
return PublicKey(b64key)
def pubkey_parse(text):
lines = text.splitlines()
if len(lines) == 1:
return pubkey_parse_openssh(text)
if lines[0] == '---- BEGIN SSH2 PUBLIC KEY ----':
return pubkey_parse_rfc4716(text)
if lines[0] == '-----BEGIN RSA PUBLIC KEY-----':
return pubkey_parse_pem(text)
raise PublicKeyParseError(text)
def lookup_all(url):
import urllib
......@@ -87,28 +253,62 @@ def lookup_by_fingerprint_main():
"Error: cannot retrieve fingerprint from environment or stdin\n"
)
sys.exit(1)
m = sshkey_re.match(key)
if not m:
sys.stderr.write(
"Error: cannot parse SSH protocol 2 base64-encoded key"
)
try:
pubkey = pubkey_parse(key)
except PublicKeyParseError as e:
sys.stderr.write("Error: " + str(e))
sys.exit(1)
fingerprint = sshkey_fingerprint(m.group('b64key'))
fingerprint = pubkey.fingerprint()
url = getenv('SSHKEY_LOOKUP_URL', SSHKEY_LOOKUP_URL_DEFAULT)
for key in lookup_by_fingerprint(url, fingerprint):
sys.stdout.write(key)
def lookup_main():
import sys
import getopt
from os import environ
if len(sys.argv) < 2:
sys.stderr.write('Usage: %s URL [USERNAME]\n' % sys.argv[0])
usage = (
"Usage: {prog} -a URL\n"
" {prog} -u URL USERNAME\n"
" {prog} -f URL FINGERPRINT\n"
" {prog} URL [USERNAME]\n"
).format(prog=sys.argv[0])
try:
opts, args = getopt.getopt(sys.argv[1:], 'hafu')
except getopt.GetoptError as e:
sys.stderr.write("Error: %s\n" % str(e))
sys.stderr.write(usage)
sys.exit(1)
url = sys.argv[1]
if len(sys.argv) == 2:
mode = 'x'
for o, a in opts:
if o == '-h':
sys.stdout.write(usage)
sys.exit(0)
else:
mode = o[1]
if len(args) == 0:
sys.stderr.write(usage)
sys.exit(1)
url = args[0]
if mode == 'a':
response = lookup_all(url)
elif mode == 'f':
if len(args) < 2:
sys.stderr.write(usage)
sys.exit(1)
response = lookup_by_fingerprint(url, args[1])
elif mode == 'u':
if len(args) < 2:
sys.stderr.write(usage)
sys.exit(1)
response = lookup_by_username(url, args[1])
else:
if len(args) == 1:
environ['SSHKEY_LOOKUP_URL'] = url
lookup_by_fingerprint_main()
return lookup_by_fingerprint_main()
else:
username = sys.argv[2]
for key in lookup_by_username(url, username):
response = lookup_by_username(url, args[1])
for key in response:
sys.stdout.write(key)
......@@ -28,6 +28,7 @@
from django.http import HttpResponse, HttpResponseRedirect
from django.views.decorators.http import require_http_methods, require_GET
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import get_object_or_404, render_to_response
from django.template import RequestContext
from django.contrib import messages
......@@ -39,8 +40,14 @@ from django_sshkey import settings
from django_sshkey.models import UserKey
from django_sshkey.forms import UserKeyForm
@require_GET
@require_http_methods(['GET', 'POST'])
@csrf_exempt
def lookup(request):
if request.method == 'POST':
payload = request.read()
key = UserKey.objects.get(id=int(payload))
key.touch()
return HttpResponse(str(key.last_used), mimetype='text/plain')
try:
fingerprint = request.GET['fingerprint']
keys = UserKey.objects.filter(fingerprint=fingerprint)
......@@ -54,13 +61,9 @@ def lookup(request):
for key in keys:
if settings.SSHKEY_AUTHORIZED_KEYS_OPTIONS:
options = settings.SSHKEY_AUTHORIZED_KEYS_OPTIONS.format(
username=key.user.username) + ' '
elif settings.SSHKEY_AUTHORIZED_KEYS_COMMAND:
options = 'command="%s" ' % (
settings.SSHKEY_AUTHORIZED_KEYS_COMMAND
.format(username=key.user.username)
.replace('"', r'\"')
)
username=key.user.username,
key_id=key.id,
) + ' '
else:
options = ''
response += options + key.key + '\n'
......@@ -72,7 +75,7 @@ def userkey_list(request):
userkey_list = UserKey.objects.filter(user=request.user)
return render_to_response(
'sshkey/userkey_list.html',
{ 'userkey_list': userkey_list },
{ 'userkey_list': userkey_list, 'allow_edit': settings.SSHKEY_ALLOW_EDIT },
context_instance = RequestContext(request),
)
......@@ -81,6 +84,7 @@ def userkey_list(request):
def userkey_add(request):
if request.method == 'POST':
userkey = UserKey(user=request.user)
userkey.request = request
form = UserKeyForm(request.POST, instance=userkey)
if form.is_valid():
form.save()
......@@ -88,7 +92,7 @@ def userkey_add(request):
url = request.GET.get('next', default_redirect)
if not is_safe_url(url=url, host=request.get_host()):
url = default_redirect
message = 'SSH key %s was saved.' % userkey.name
message = 'SSH public key %s was added.' % userkey.name
messages.success(request, message, fail_silently=True)
return HttpResponseRedirect(url)
else:
......@@ -102,6 +106,8 @@ def userkey_add(request):
@login_required
@require_http_methods(['GET', 'POST'])
def userkey_edit(request, pk):
if not settings.SSHKEY_ALLOW_EDIT:
raise PermissionDenied
userkey = get_object_or_404(UserKey, pk=pk)
if userkey.user != request.user:
raise PermissionDenied
......@@ -113,7 +119,7 @@ def userkey_edit(request, pk):
url = request.GET.get('next', default_redirect)
if not is_safe_url(url=url, host=request.get_host()):
url = default_redirect
message = 'SSH key %s was saved.' % userkey.name
message = 'SSH public key %s was saved.' % userkey.name
messages.success(request, message, fail_silently=True)
return HttpResponseRedirect(url)
else:
......@@ -131,6 +137,6 @@ def userkey_delete(request, pk):
if userkey.user != request.user:
raise PermissionDenied
userkey.delete()
message = 'SSH key %s was deleted.' % userkey.name
message = 'SSH public key %s was deleted.' % userkey.name
messages.success(request, message, fail_silently=True)
return HttpResponseRedirect(reverse('django_sshkey.views.userkey_list'))
# Django settings for testproject project.
import getpass
import socket
DEBUG = True
TEMPLATE_DEBUG = DEBUG
DEFAULT_FROM_EMAIL = '%s@%s' % (getpass.getuser(), socket.gethostname())
ADMINS = (
# ('Your Name', 'your_email@example.com'),
)
......
../sshkey/templates.example
\ No newline at end of file
../django_sshkey/templates.example
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment