aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2013-08-25 11:46:08 +0200
committerMichał Górny <mgorny@gentoo.org>2013-08-25 22:45:29 +0200
commit25a1e361e499590fac0c28cb6e366da6935871c7 (patch)
tree4e26d4dc3e76dd7ab5b72ba08f43358e5479298b
parentStore SSH handler list in settings. (diff)
downloadidentity.gentoo.org-25a1e361e499590fac0c28cb6e366da6935871c7.tar.gz
identity.gentoo.org-25a1e361e499590fac0c28cb6e366da6935871c7.tar.bz2
identity.gentoo.org-25a1e361e499590fac0c28cb6e366da6935871c7.zip
Add tests for SSH handlers.
-rw-r--r--okupy/common/ssh.py3
-rw-r--r--okupy/tests/unit/test_auth.py40
-rw-r--r--okupy/tests/unit/test_ssh.py189
3 files changed, 218 insertions, 14 deletions
diff --git a/okupy/common/ssh.py b/okupy/common/ssh.py
index 5969b19..1c4a49a 100644
--- a/okupy/common/ssh.py
+++ b/okupy/common/ssh.py
@@ -31,7 +31,8 @@ class SSHServer(paramiko.ServerInterface):
return 'publickey'
def check_auth_publickey(self, username, key):
- # for some reason, this is called twice...
+ # for some reason, this is called twice... therefore, we need
+ # to cache the result since token will be revoked on first use
if self._message:
return paramiko.AUTH_SUCCESSFUL
diff --git a/okupy/tests/unit/test_auth.py b/okupy/tests/unit/test_auth.py
index 97c7ddb..7c445f6 100644
--- a/okupy/tests/unit/test_auth.py
+++ b/okupy/tests/unit/test_auth.py
@@ -4,21 +4,17 @@ from mockldap import MockLdap
from django.conf import settings
from django.contrib.auth import authenticate
+from django.test import TestCase
from .. import vars
-from ...common.test_helpers import OkupyTestCase, set_request
+from ...common.test_helpers import ldap_users, set_request
import base64
import paramiko
-def get_ssh_key(person, number=0):
- keystr = person['sshPublicKey'][number]
- return base64.b64decode(keystr.split()[1])
-
-
-class AuthUnitTests(OkupyTestCase):
+class AuthSSLUnitTests(TestCase):
@classmethod
def setUpClass(cls):
cls.mockldap = MockLdap(vars.DIRECTORY)
@@ -70,21 +66,39 @@ class AuthUnitTests(OkupyTestCase):
u = authenticate(request=request)
self.assertIs(u, None)
+
+class AuthSSHUnitTests(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.mockldap = MockLdap(vars.DIRECTORY)
+
+ def setUp(self):
+ self.mockldap.start()
+ self.ldapobject = self.mockldap[settings.AUTH_LDAP_SERVER_URI]
+
+ def tearDown(self):
+ self.mockldap.stop()
+
+ @staticmethod
+ def get_ssh_key(person, number=0):
+ keystr = person['sshPublicKey'][number]
+ return base64.b64decode(keystr.split()[1])
+
def test_valid_rsa_ssh_key_authenticates_alice(self):
- alice = vars.DIRECTORY['uid=alice,ou=people,o=test']
- key = paramiko.RSAKey(data=get_ssh_key(alice))
+ dn, alice = ldap_users('alice')
+ key = paramiko.RSAKey(data=self.get_ssh_key(alice))
u = authenticate(ssh_key=key)
self.assertEqual(u.username, alice['uid'][0])
def test_valid_dss_ssh_key_authenticates_bob(self):
- bob = vars.DIRECTORY['uid=bob,ou=people,o=test']
- key = paramiko.DSSKey(data=get_ssh_key(bob, 1))
+ dn, bob = ldap_users('bob')
+ key = paramiko.DSSKey(data=self.get_ssh_key(bob, 1))
u = authenticate(ssh_key=key)
self.assertEqual(u.username, bob['uid'][0])
def test_valid_rsa_key_with_comment_authenticates_bob(self):
- bob = vars.DIRECTORY['uid=bob,ou=people,o=test']
- key = paramiko.RSAKey(data=get_ssh_key(bob))
+ dn, bob = ldap_users('bob')
+ key = paramiko.RSAKey(data=self.get_ssh_key(bob))
u = authenticate(ssh_key=key)
self.assertEqual(u.username, bob['uid'][0])
diff --git a/okupy/tests/unit/test_ssh.py b/okupy/tests/unit/test_ssh.py
new file mode 100644
index 0000000..915be6a
--- /dev/null
+++ b/okupy/tests/unit/test_ssh.py
@@ -0,0 +1,189 @@
+# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python
+
+from django.conf import settings
+from django.test import TestCase
+from django.test.utils import override_settings
+
+import base64
+import socket
+
+import paramiko
+
+from ..vars import TEST_SSH_KEY_FOR_NO_USER
+from ...common.ssh import ssh_handler, SSHServer
+from ...common.exceptions import OkupyError
+
+
+@override_settings(SSH_HANDLERS={})
+class SSHUnitTests(TestCase):
+ def setUp(self):
+ self._key = paramiko.RSAKey(
+ data=base64.b64decode(TEST_SSH_KEY_FOR_NO_USER))
+ self._server = SSHServer()
+
+ def test_ssh_handler_decorator_works(self):
+ @ssh_handler
+ def test(key):
+ pass
+
+ self.assertEqual(settings.SSH_HANDLERS.get('test'), test)
+
+ def test_noarg_handler_works(self):
+ @ssh_handler
+ def noarg(key):
+ return 'yay'
+
+ self.assertEqual(
+ self._server.check_auth_publickey('noarg', self._key),
+ paramiko.AUTH_SUCCESSFUL)
+
+ def test_failure_is_propagated_properly(self):
+ @ssh_handler
+ def failing(key):
+ return None
+
+ self.assertEqual(
+ self._server.check_auth_publickey('failing', self._key),
+ paramiko.AUTH_FAILED)
+
+ def test_argument_splitting_works(self):
+ @ssh_handler
+ def twoarg(a, b, key):
+ if a == '1' and b == '2':
+ return 'yay'
+ else:
+ return None
+
+ self.assertEqual(
+ self._server.check_auth_publickey('twoarg+1+2', self._key),
+ paramiko.AUTH_SUCCESSFUL)
+
+ def test_default_arguments_work(self):
+ @ssh_handler
+ def oneortwoarg(a, b='3', key=None):
+ if not key:
+ raise ValueError('key must not be None')
+ if a == '1' and b == '3':
+ return 'yay'
+ else:
+ return None
+
+ self.assertEqual(
+ self._server.check_auth_publickey('oneortwoarg+1', self._key),
+ paramiko.AUTH_SUCCESSFUL)
+
+ def test_wrong_command_returns_failure(self):
+ @ssh_handler
+ def somehandler(key):
+ return 'er?'
+
+ self.assertEqual(
+ self._server.check_auth_publickey('otherhandler', self._key),
+ paramiko.AUTH_FAILED)
+
+ def test_missing_arguments_return_failure(self):
+ @ssh_handler
+ def onearg(arg, key):
+ return 'er?'
+
+ self.assertEqual(
+ self._server.check_auth_publickey('onearg', self._key),
+ paramiko.AUTH_FAILED)
+
+ def test_too_many_arguments_return_failure(self):
+ @ssh_handler
+ def onearg(arg, key):
+ return 'er?'
+
+ self.assertEqual(
+ self._server.check_auth_publickey('onearg+1+2', self._key),
+ paramiko.AUTH_FAILED)
+
+ def test_typeerror_is_propagated_properly(self):
+ @ssh_handler
+ def onearg(key):
+ raise TypeError
+
+ self.assertRaises(TypeError,
+ self._server.check_auth_publickey, 'onearg', self._key)
+
+ def test_result_caching_works(self):
+ class Cache(object):
+ def __init__(self):
+ self.first = True
+
+ def __call__(self, key):
+ if self.first:
+ self.first = False
+ return 'yay'
+ else:
+ return None
+
+ cache = Cache()
+ @ssh_handler
+ def cached(key):
+ return cache(key)
+
+ if (self._server.check_auth_publickey('cached', self._key)
+ != paramiko.AUTH_SUCCESSFUL):
+ raise OkupyError('Test prerequisite failed')
+ self.assertEqual(
+ self._server.check_auth_publickey('cached', self._key),
+ paramiko.AUTH_SUCCESSFUL)
+
+ def test_message_is_printed_to_exec_request(self):
+ @ssh_handler
+ def noarg(key):
+ return 'test-message'
+
+ if (self._server.check_auth_publickey('noarg', self._key)
+ != paramiko.AUTH_SUCCESSFUL):
+ raise OkupyError('Test prerequisite failed')
+
+ s1, s2 = socket.socketpair()
+ self.assertTrue(self._server.check_channel_exec_request(s1, ':'))
+ self.assertEqual(s2.makefile().read().rstrip(), 'test-message')
+
+ def test_message_is_printed_to_shell_request(self):
+ @ssh_handler
+ def noarg(key):
+ return 'test-message'
+
+ if (self._server.check_auth_publickey('noarg', self._key)
+ != paramiko.AUTH_SUCCESSFUL):
+ raise OkupyError('Test prerequisite failed')
+
+ s1, s2 = socket.socketpair()
+ self.assertTrue(self._server.check_channel_shell_request(s1))
+ self.assertEqual(s2.makefile().read().rstrip(), 'test-message')
+
+ def test_cache_is_invalidated_after_channel_request(self):
+ class Cache(object):
+ def __init__(self):
+ self.first = True
+
+ def __call__(self, key):
+ if self.first:
+ self.first = False
+ return 'test-message'
+ else:
+ return None
+
+ cache = Cache()
+ @ssh_handler
+ def cached(key):
+ return cache(key)
+
+ if (self._server.check_auth_publickey('cached', self._key)
+ != paramiko.AUTH_SUCCESSFUL):
+ raise OkupyError('Test prerequisite failed')
+
+ s1, s2 = socket.socketpair()
+ if not self._server.check_channel_shell_request(s1):
+ raise OkupyError('Test prerequisite failed')
+ if s2.makefile().read().rstrip() != 'test-message':
+ raise OkupyError('Test prerequisite failed')
+
+ self.assertEqual(
+ self._server.check_auth_publickey('cached', self._key),
+ paramiko.AUTH_FAILED)