Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2# This program is distributed in the hope that it will be useful, but WITHOUT
3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
5# more details.
6#
7# You should have received a copy of the GNU General Public License version 3
8# along with this program; if not, write to the Free Software Foundation, Inc., 51
9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
10#
11# (c) 2015-2016 Valentin Samir
12"""Some authentication classes for the CAS"""
13from django.conf import settings
14from django.contrib.auth import get_user_model
15from django.utils import timezone
16from django.db import connections, DatabaseError
18import warnings
19from datetime import timedelta
20from six.moves import range
21try: # pragma: no cover
22 import MySQLdb
23 import MySQLdb.cursors
24except ImportError:
25 MySQLdb = None
28try: # pragma: no cover
29 import ldap3
30 import ldap3.core.exceptions
31except ImportError:
32 ldap3 = None
34from .models import FederatedUser, UserAttributes
35from .utils import check_password, dictfetchall
38class AuthUser(object):
39 """
40 Authentication base class
42 :param unicode username: A username, stored in the :attr:`username` class attribute.
43 """
45 #: username used to instanciate the current object
46 username = None
48 def __init__(self, username):
49 self.username = username
51 def test_password(self, password):
52 """
53 Tests ``password`` against the user-supplied password.
55 :raises NotImplementedError: always. The method need to be implemented by subclasses
56 """
57 raise NotImplementedError()
59 def attributs(self):
60 """
61 The user attributes.
63 raises NotImplementedError: always. The method need to be implemented by subclasses
64 """
65 raise NotImplementedError()
68class DummyAuthUser(AuthUser): # pragma: no cover
69 """
70 A Dummy authentication class. Authentication always fails
72 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
73 class attribute. There is no valid value for this attribute here.
74 """
76 def test_password(self, password):
77 """
78 Tests ``password`` against the user-supplied password.
80 :param unicode password: a clear text password as submited by the user.
81 :return: always ``False``
82 :rtype: bool
83 """
84 return False
86 def attributs(self):
87 """
88 The user attributes.
90 :return: en empty :class:`dict`.
91 :rtype: dict
92 """
93 return {}
96class TestAuthUser(AuthUser):
97 """
98 A test authentication class only working for one unique user.
100 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
101 class attribute. The uniq valid value is ``settings.CAS_TEST_USER``.
102 """
104 def test_password(self, password):
105 """
106 Tests ``password`` against the user-supplied password.
108 :param unicode password: a clear text password as submited by the user.
109 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and
110 ``password`` is equal to ``settings.CAS_TEST_PASSWORD``, ``False`` otherwise.
111 :rtype: bool
112 """
113 return self.username == settings.CAS_TEST_USER and password == settings.CAS_TEST_PASSWORD
115 def attributs(self):
116 """
117 The user attributes.
119 :return: the ``settings.CAS_TEST_ATTRIBUTES`` :class:`dict` if
120 :attr:`username<AuthUser.username>` is valid, an empty :class:`dict` otherwise.
121 :rtype: dict
122 """
123 if self.username == settings.CAS_TEST_USER:
124 return settings.CAS_TEST_ATTRIBUTES
125 else: # pragma: no cover (should not happen)
126 return {}
129class DBAuthUser(AuthUser): # pragma: no cover
130 """base class for databate based auth classes"""
131 #: DB user attributes as a :class:`dict` if the username is found in the database.
132 user = None
134 def attributs(self):
135 """
136 The user attributes.
138 :return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode`
139 or :class:`list` of :func:`unicode`. If the user do not exists, the returned
140 :class:`dict` is empty.
141 :rtype: dict
142 """
143 if self.user:
144 return self.user
145 else:
146 return {}
149class MysqlAuthUser(DBAuthUser): # pragma: no cover
150 """
151 DEPRECATED, use :class:`SqlAuthUser` instead.
153 A mysql authentication class: authenticate user against a mysql database
155 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
156 class attribute. Valid value are fetched from the MySQL database set with
157 ``settings.CAS_SQL_*`` settings parameters using the query
158 ``settings.CAS_SQL_USER_QUERY``.
159 """
161 def __init__(self, username):
162 warnings.warn(
163 (
164 "MysqlAuthUser authentication class is deprecated: "
165 "use cas_server.auth.SqlAuthUser instead"
166 ),
167 UserWarning
168 )
169 # see the connect function at
170 # http://mysql-python.sourceforge.net/MySQLdb.html#functions-and-attributes
171 # for possible mysql config parameters.
172 mysql_config = {
173 "user": settings.CAS_SQL_USERNAME,
174 "passwd": settings.CAS_SQL_PASSWORD,
175 "db": settings.CAS_SQL_DBNAME,
176 "host": settings.CAS_SQL_HOST,
177 "charset": settings.CAS_SQL_DBCHARSET,
178 "cursorclass": MySQLdb.cursors.DictCursor
179 }
180 if not MySQLdb:
181 raise RuntimeError("Please install MySQLdb before using the MysqlAuthUser backend")
182 conn = MySQLdb.connect(**mysql_config)
183 curs = conn.cursor()
184 if curs.execute(settings.CAS_SQL_USER_QUERY, (username,)) == 1:
185 self.user = curs.fetchone()
186 super(MysqlAuthUser, self).__init__(self.user['username'])
187 else:
188 super(MysqlAuthUser, self).__init__(username)
190 def test_password(self, password):
191 """
192 Tests ``password`` against the user-supplied password.
194 :param unicode password: a clear text password as submited by the user.
195 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is
196 correct, ``False`` otherwise.
197 :rtype: bool
198 """
199 if self.user:
200 return check_password(
201 settings.CAS_SQL_PASSWORD_CHECK,
202 password,
203 self.user["password"],
204 settings.CAS_SQL_DBCHARSET
205 )
206 else:
207 return False
210class SqlAuthUser(DBAuthUser): # pragma: no cover
211 """
212 A SQL authentication class: authenticate user against a SQL database. The SQL database
213 must be configures in settings.py as ``settings.DATABASES['cas_server']``.
215 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
216 class attribute. Valid value are fetched from the MySQL database set with
217 ``settings.CAS_SQL_*`` settings parameters using the query
218 ``settings.CAS_SQL_USER_QUERY``.
219 """
221 def __init__(self, username):
222 if "cas_server" not in connections:
223 raise RuntimeError("Please configure the 'cas_server' database in settings.DATABASES")
224 for retry_nb in range(3):
225 try:
226 with connections["cas_server"].cursor() as curs:
227 curs.execute(settings.CAS_SQL_USER_QUERY, (username,))
228 results = dictfetchall(curs)
229 if len(results) == 1:
230 self.user = results[0]
231 super(SqlAuthUser, self).__init__(self.user['username'])
232 else:
233 super(SqlAuthUser, self).__init__(username)
234 break
235 except DatabaseError:
236 connections["cas_server"].close()
237 if retry_nb == 2:
238 raise
240 def test_password(self, password):
241 """
242 Tests ``password`` against the user-supplied password.
244 :param unicode password: a clear text password as submited by the user.
245 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is
246 correct, ``False`` otherwise.
247 :rtype: bool
248 """
249 if self.user:
250 return check_password(
251 settings.CAS_SQL_PASSWORD_CHECK,
252 password,
253 self.user["password"],
254 settings.CAS_SQL_PASSWORD_CHARSET
255 )
256 else:
257 return False
260class LdapAuthUser(DBAuthUser): # pragma: no cover
261 """
262 A ldap authentication class: authenticate user against a ldap database
264 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
265 class attribute. Valid value are fetched from the ldap database set with
266 ``settings.CAS_LDAP_*`` settings parameters.
267 """
269 _conn = None
271 @classmethod
272 def get_conn(cls):
273 """Return a connection object to the ldap database"""
274 conn = cls._conn
275 if conn is None or conn.closed:
276 conn = ldap3.Connection(
277 settings.CAS_LDAP_SERVER,
278 settings.CAS_LDAP_USER,
279 settings.CAS_LDAP_PASSWORD,
280 client_strategy="RESTARTABLE",
281 auto_bind=True
282 )
283 cls._conn = conn
284 return conn
286 def __init__(self, username):
287 if not ldap3:
288 raise RuntimeError("Please install ldap3 before using the LdapAuthUser backend")
289 if not settings.CAS_LDAP_BASE_DN:
290 raise ValueError(
291 "You must define CAS_LDAP_BASE_DN for using the ldap authentication backend"
292 )
293 # in case we got deconnected from the database, retry to connect 2 times
294 for retry_nb in range(3):
295 try:
296 conn = self.get_conn()
297 if conn.search(
298 settings.CAS_LDAP_BASE_DN,
299 settings.CAS_LDAP_USER_QUERY % ldap3.utils.conv.escape_bytes(username),
300 attributes=ldap3.ALL_ATTRIBUTES
301 ) and len(conn.entries) == 1:
302 # try the new ldap3>=2 API
303 try:
304 user = conn.entries[0].entry_attributes_as_dict
305 # store the user dn
306 user["dn"] = conn.entries[0].entry_dn
307 # fallback to ldap3<2 API
308 except (
309 ldap3.core.exceptions.LDAPKeyError, # ldap3<1 exception
310 ldap3.core.exceptions.LDAPAttributeError # ldap3<2 exception
311 ):
312 user = conn.entries[0].entry_get_attributes_dict()
313 # store the user dn
314 user["dn"] = conn.entries[0].entry_get_dn()
315 if user.get(settings.CAS_LDAP_USERNAME_ATTR):
316 self.user = user
317 super(LdapAuthUser, self).__init__(user[settings.CAS_LDAP_USERNAME_ATTR][0])
318 else:
319 super(LdapAuthUser, self).__init__(username)
320 else:
321 super(LdapAuthUser, self).__init__(username)
322 break
323 except ldap3.core.exceptions.LDAPCommunicationError:
324 if retry_nb == 2:
325 raise
327 def test_password(self, password):
328 """
329 Tests ``password`` against the user-supplied password.
331 :param unicode password: a clear text password as submited by the user.
332 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is
333 correct, ``False`` otherwise.
334 :rtype: bool
335 """
336 if self.user and settings.CAS_LDAP_PASSWORD_CHECK == "bind":
337 try:
338 conn = ldap3.Connection(
339 settings.CAS_LDAP_SERVER,
340 self.user["dn"],
341 password,
342 auto_bind=True
343 )
344 try:
345 # fetch the user attribute
346 if conn.search(
347 settings.CAS_LDAP_BASE_DN,
348 settings.CAS_LDAP_USER_QUERY % ldap3.utils.conv.escape_bytes(self.username),
349 attributes=ldap3.ALL_ATTRIBUTES
350 ) and len(conn.entries) == 1:
351 # try the ldap3>=2 API
352 try:
353 attributes = conn.entries[0].entry_attributes_as_dict
354 # store the user dn
355 attributes["dn"] = conn.entries[0].entry_dn
356 # fallback to ldap<2 API
357 except (
358 ldap3.core.exceptions.LDAPKeyError, # ldap3<1 exception
359 ldap3.core.exceptions.LDAPAttributeError # ldap3<2 exception
360 ):
361 attributes = conn.entries[0].entry_get_attributes_dict()
362 attributes["dn"] = conn.entries[0].entry_get_dn()
363 # cache the attributes locally as we wont have access to the user password
364 # later.
365 user = UserAttributes.objects.get_or_create(username=self.username)[0]
366 user.attributs = attributes
367 user.save()
368 finally:
369 conn.unbind()
370 return True
371 except (
372 ldap3.core.exceptions.LDAPBindError,
373 ldap3.core.exceptions.LDAPCommunicationError
374 ):
375 return False
376 elif self.user and self.user.get(settings.CAS_LDAP_PASSWORD_ATTR):
377 return check_password(
378 settings.CAS_LDAP_PASSWORD_CHECK,
379 password,
380 self.user[settings.CAS_LDAP_PASSWORD_ATTR][0],
381 settings.CAS_LDAP_PASSWORD_CHARSET
382 )
383 else:
384 return False
386 def attributs(self):
387 """
388 The user attributes.
390 :return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode`
391 or :class:`list` of :func:`unicode`. If the user do not exists, the returned
392 :class:`dict` is empty.
393 :rtype: dict
394 :raises NotImplementedError: if the password check method in `CAS_LDAP_PASSWORD_CHECK`
395 do not allow to fetch the attributes without the user credentials.
396 """
397 if settings.CAS_LDAP_PASSWORD_CHECK == "bind":
398 raise NotImplementedError()
399 else:
400 return super(LdapAuthUser, self).attributs()
403class DjangoAuthUser(AuthUser): # pragma: no cover
404 """
405 A django auth class: authenticate user against django internal users
407 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
408 class attribute. Valid value are usernames of django internal users.
409 """
410 #: a django user object if the username is found. The user model is retreived
411 #: using :func:`django.contrib.auth.get_user_model`.
412 user = None
414 def __init__(self, username):
415 User = get_user_model()
416 try:
417 self.user = User.objects.get(username=username)
418 except User.DoesNotExist:
419 pass
420 super(DjangoAuthUser, self).__init__(username)
422 def test_password(self, password):
423 """
424 Tests ``password`` against the user-supplied password.
426 :param unicode password: a clear text password as submited by the user.
427 :return: ``True`` if :attr:`user` is valid and ``password`` is
428 correct, ``False`` otherwise.
429 :rtype: bool
430 """
431 if self.user:
432 return self.user.check_password(password)
433 else:
434 return False
436 def attributs(self):
437 """
438 The user attributes, defined as the fields on the :attr:`user` object.
440 :return: a :class:`dict` with the :attr:`user` object fields. Attributes may be
441 If the user do not exists, the returned :class:`dict` is empty.
442 :rtype: dict
443 """
444 if self.user:
445 attr = {}
446 # _meta.get_fields() is from the new documented _meta interface in django 1.8
447 try:
448 field_names = [
449 field.attname for field in self.user._meta.get_fields()
450 if hasattr(field, "attname")
451 ]
452 # backward compatibility with django 1.7
453 except AttributeError: # pragma: no cover (only used by django 1.7)
454 field_names = self.user._meta.get_all_field_names()
455 for name in field_names:
456 attr[name] = getattr(self.user, name)
458 # unfold user_permissions many to many relation
459 if 'user_permissions' in attr:
460 attr['user_permissions'] = [
461 (
462 u"%s.%s" % (
463 perm.content_type.model_class().__module__,
464 perm.content_type.model_class().__name__
465 ),
466 perm.codename
467 ) for perm in attr['user_permissions'].filter()
468 ]
470 # unfold group many to many relation
471 if 'groups' in attr:
472 attr['groups'] = [group.name for group in attr['groups'].filter()]
474 return attr
475 else:
476 return {}
479class CASFederateAuth(AuthUser):
480 """
481 Authentication class used then CAS_FEDERATE is True
483 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
484 class attribute. Valid value are usernames of
485 :class:`FederatedUser<cas_server.models.FederatedUser>` object.
486 :class:`FederatedUser<cas_server.models.FederatedUser>` object are created on CAS
487 backends successful ticket validation.
488 """
489 #: a :class`FederatedUser<cas_server.models.FederatedUser>` object if ``username`` is found.
490 user = None
492 def __init__(self, username):
493 try:
494 self.user = FederatedUser.get_from_federated_username(username)
495 super(CASFederateAuth, self).__init__(
496 self.user.federated_username
497 )
498 except FederatedUser.DoesNotExist:
499 super(CASFederateAuth, self).__init__(username)
501 def test_password(self, ticket):
502 """
503 Tests ``password`` against the user-supplied password.
505 :param unicode password: The CAS tickets just used to validate the user authentication
506 against its CAS backend.
507 :return: ``True`` if :attr:`user` is valid and ``password`` is
508 a ticket validated less than ``settings.CAS_TICKET_VALIDITY`` secondes and has not
509 being previously used for authenticated this
510 :class:`FederatedUser<cas_server.models.FederatedUser>`. ``False`` otherwise.
511 :rtype: bool
512 """
513 if not self.user or not self.user.ticket:
514 return False
515 else:
516 return (
517 ticket == self.user.ticket and
518 self.user.last_update >
519 (timezone.now() - timedelta(seconds=settings.CAS_TICKET_VALIDITY))
520 )
522 def attributs(self):
523 """
524 The user attributes, as returned by the CAS backend.
526 :return: :obj:`FederatedUser.attributs<cas_server.models.FederatedUser.attributs>`.
527 If the user do not exists, the returned :class:`dict` is empty.
528 :rtype: dict
529 """
530 if not self.user: # pragma: no cover (should not happen)
531 return {}
532 else:
533 return self.user.attributs