Coverage for cas_server/auth.py: 100%
38 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-18 09:47 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-18 09:47 +0000
1# -*- coding: utf-8 -*-
2# This program is distributed in the hope that it will be useful, but WITHOUT
3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
5# more details.
6#
7# You should have received a copy of the GNU General Public License version 3
8# along with this program; if not, write to the Free Software Foundation, Inc., 51
9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
10#
11# (c) 2015-2025 Valentin Samir
12"""Some authentication classes for the CAS"""
13from django.conf import settings
14from django.contrib.auth import get_user_model
15from django.utils import timezone
16from django.db import connections, DatabaseError
18import warnings
19from datetime import timedelta
20try: # pragma: no cover
21 import MySQLdb
22 import MySQLdb.cursors
23except ImportError:
24 MySQLdb = None
27try: # pragma: no cover
28 import ldap3
29 import ldap3.core.exceptions
30except ImportError:
31 ldap3 = None
33from .models import FederatedUser, UserAttributes
34from .utils import check_password, dictfetchall
37class AuthUser(object):
38 """
39 Authentication base class
41 :param unicode username: A username, stored in the :attr:`username` class attribute.
42 """
44 #: username used to instanciate the current object
45 username = None
47 def __init__(self, username):
48 self.username = username
50 def test_password(self, password):
51 """
52 Tests ``password`` against the user-supplied password.
54 :raises NotImplementedError: always. The method need to be implemented by subclasses
55 """
56 raise NotImplementedError()
58 def attributs(self):
59 """
60 The user attributes.
62 raises NotImplementedError: always. The method need to be implemented by subclasses
63 """
64 raise NotImplementedError()
67class DummyAuthUser(AuthUser): # pragma: no cover
68 """
69 A Dummy authentication class. Authentication always fails
71 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
72 class attribute. There is no valid value for this attribute here.
73 """
75 def test_password(self, password):
76 """
77 Tests ``password`` against the user-supplied password.
79 :param unicode password: a clear text password as submited by the user.
80 :return: always ``False``
81 :rtype: bool
82 """
83 return False
85 def attributs(self):
86 """
87 The user attributes.
89 :return: en empty :class:`dict`.
90 :rtype: dict
91 """
92 return {}
95class TestAuthUser(AuthUser):
96 """
97 A test authentication class only working for one unique user.
99 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
100 class attribute. The uniq valid value is ``settings.CAS_TEST_USER``.
101 """
103 def test_password(self, password):
104 """
105 Tests ``password`` against the user-supplied password.
107 :param unicode password: a clear text password as submited by the user.
108 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and
109 ``password`` is equal to ``settings.CAS_TEST_PASSWORD``, ``False`` otherwise.
110 :rtype: bool
111 """
112 return self.username == settings.CAS_TEST_USER and password == settings.CAS_TEST_PASSWORD
114 def attributs(self):
115 """
116 The user attributes.
118 :return: the ``settings.CAS_TEST_ATTRIBUTES`` :class:`dict` if
119 :attr:`username<AuthUser.username>` is valid, an empty :class:`dict` otherwise.
120 :rtype: dict
121 """
122 if self.username == settings.CAS_TEST_USER:
123 return settings.CAS_TEST_ATTRIBUTES
124 else: # pragma: no cover (should not happen)
125 return {}
128class DBAuthUser(AuthUser): # pragma: no cover
129 """base class for databate based auth classes"""
130 #: DB user attributes as a :class:`dict` if the username is found in the database.
131 user = None
133 def attributs(self):
134 """
135 The user attributes.
137 :return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode`
138 or :class:`list` of :func:`unicode`. If the user do not exists, the returned
139 :class:`dict` is empty.
140 :rtype: dict
141 """
142 if self.user:
143 return self.user
144 else:
145 return {}
148class MysqlAuthUser(DBAuthUser): # pragma: no cover
149 """
150 DEPRECATED, use :class:`SqlAuthUser` instead.
152 A mysql authentication class: authenticate user against a mysql database
154 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
155 class attribute. Valid value are fetched from the MySQL database set with
156 ``settings.CAS_SQL_*`` settings parameters using the query
157 ``settings.CAS_SQL_USER_QUERY``.
158 """
160 def __init__(self, username):
161 warnings.warn(
162 (
163 "MysqlAuthUser authentication class is deprecated: "
164 "use cas_server.auth.SqlAuthUser instead"
165 ),
166 UserWarning
167 )
168 # see the connect function at
169 # http://mysql-python.sourceforge.net/MySQLdb.html#functions-and-attributes
170 # for possible mysql config parameters.
171 mysql_config = {
172 "user": settings.CAS_SQL_USERNAME,
173 "passwd": settings.CAS_SQL_PASSWORD,
174 "db": settings.CAS_SQL_DBNAME,
175 "host": settings.CAS_SQL_HOST,
176 "charset": settings.CAS_SQL_DBCHARSET,
177 "cursorclass": MySQLdb.cursors.DictCursor
178 }
179 if not MySQLdb:
180 raise RuntimeError("Please install MySQLdb before using the MysqlAuthUser backend")
181 conn = MySQLdb.connect(**mysql_config)
182 curs = conn.cursor()
183 if curs.execute(settings.CAS_SQL_USER_QUERY, (username,)) == 1:
184 self.user = curs.fetchone()
185 super(MysqlAuthUser, self).__init__(self.user['username'])
186 else:
187 super(MysqlAuthUser, self).__init__(username)
189 def test_password(self, password):
190 """
191 Tests ``password`` against the user-supplied password.
193 :param unicode password: a clear text password as submited by the user.
194 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is
195 correct, ``False`` otherwise.
196 :rtype: bool
197 """
198 if self.user:
199 return check_password(
200 settings.CAS_SQL_PASSWORD_CHECK,
201 password,
202 self.user["password"],
203 settings.CAS_SQL_DBCHARSET
204 )
205 else:
206 return False
209class SqlAuthUser(DBAuthUser): # pragma: no cover
210 """
211 A SQL authentication class: authenticate user against a SQL database. The SQL database
212 must be configures in settings.py as ``settings.DATABASES['cas_server']``.
214 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
215 class attribute. Valid value are fetched from the MySQL database set with
216 ``settings.CAS_SQL_*`` settings parameters using the query
217 ``settings.CAS_SQL_USER_QUERY``.
218 """
220 def __init__(self, username):
221 if "cas_server" not in connections:
222 raise RuntimeError("Please configure the 'cas_server' database in settings.DATABASES")
223 for retry_nb in range(3):
224 try:
225 with connections["cas_server"].cursor() as curs:
226 curs.execute(settings.CAS_SQL_USER_QUERY, (username,))
227 results = dictfetchall(curs)
228 if len(results) == 1:
229 self.user = results[0]
230 super(SqlAuthUser, self).__init__(self.user['username'])
231 else:
232 super(SqlAuthUser, self).__init__(username)
233 break
234 except DatabaseError:
235 connections["cas_server"].close()
236 if retry_nb == 2:
237 raise
239 def test_password(self, password):
240 """
241 Tests ``password`` against the user-supplied password.
243 :param unicode password: a clear text password as submited by the user.
244 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is
245 correct, ``False`` otherwise.
246 :rtype: bool
247 """
248 if self.user:
249 return check_password(
250 settings.CAS_SQL_PASSWORD_CHECK,
251 password,
252 self.user["password"],
253 settings.CAS_SQL_PASSWORD_CHARSET
254 )
255 else:
256 return False
259class LdapAuthUser(DBAuthUser): # pragma: no cover
260 """
261 A ldap authentication class: authenticate user against a ldap database
263 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
264 class attribute. Valid value are fetched from the ldap database set with
265 ``settings.CAS_LDAP_*`` settings parameters.
266 """
268 _conn = None
270 @classmethod
271 def get_conn(cls):
272 """Return a connection object to the ldap database"""
273 conn = cls._conn
274 if conn is None or conn.closed:
275 conn = ldap3.Connection(
276 settings.CAS_LDAP_SERVER,
277 settings.CAS_LDAP_USER,
278 settings.CAS_LDAP_PASSWORD,
279 client_strategy="RESTARTABLE",
280 auto_bind=True
281 )
282 cls._conn = conn
283 return conn
285 @staticmethod
286 def get_ldap_user_query(username):
287 if '%s' in settings.CAS_LDAP_USER_QUERY:
288 warnings.warn(
289 r'Using %s in your CAS_LDAP_USER_QUERY is deprecated. '
290 r'Please upgrade your config to use %(username)s instead',
291 DeprecationWarning,
292 stacklevel=0
293 )
294 return settings.CAS_LDAP_USER_QUERY % (ldap3.utils.conv.escape_bytes(username), )
295 else:
296 return settings.CAS_LDAP_USER_QUERY % {
297 'username': ldap3.utils.conv.escape_bytes(username)
298 }
300 def __init__(self, username):
301 if not ldap3:
302 raise RuntimeError("Please install ldap3 before using the LdapAuthUser backend")
303 if not settings.CAS_LDAP_BASE_DN:
304 raise ValueError(
305 "You must define CAS_LDAP_BASE_DN for using the ldap authentication backend"
306 )
307 # in case we got deconnected from the database, retry to connect 2 times
308 for retry_nb in range(3):
309 try:
310 conn = self.get_conn()
311 if conn.search(
312 settings.CAS_LDAP_BASE_DN,
313 self.get_ldap_user_query(username),
314 attributes=ldap3.ALL_ATTRIBUTES
315 ) and len(conn.entries) == 1:
316 # try the new ldap3>=2 API
317 try:
318 user = conn.entries[0].entry_attributes_as_dict
319 # store the user dn
320 user["dn"] = conn.entries[0].entry_dn
321 # fallback to ldap3<2 API
322 except (
323 ldap3.core.exceptions.LDAPKeyError, # ldap3<1 exception
324 ldap3.core.exceptions.LDAPAttributeError # ldap3<2 exception
325 ):
326 user = conn.entries[0].entry_get_attributes_dict()
327 # store the user dn
328 user["dn"] = conn.entries[0].entry_get_dn()
329 if user.get(settings.CAS_LDAP_USERNAME_ATTR):
330 self.user = user
331 super(LdapAuthUser, self).__init__(user[settings.CAS_LDAP_USERNAME_ATTR][0])
332 else:
333 super(LdapAuthUser, self).__init__(username)
334 else:
335 super(LdapAuthUser, self).__init__(username)
336 break
337 except ldap3.core.exceptions.LDAPCommunicationError:
338 if retry_nb == 2:
339 raise
341 def test_password(self, password):
342 """
343 Tests ``password`` against the user-supplied password.
345 :param unicode password: a clear text password as submited by the user.
346 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is
347 correct, ``False`` otherwise.
348 :rtype: bool
349 """
350 if self.user and settings.CAS_LDAP_PASSWORD_CHECK == "bind":
351 try:
352 conn = ldap3.Connection(
353 settings.CAS_LDAP_SERVER,
354 self.user["dn"],
355 password,
356 auto_bind=True
357 )
358 try:
359 # fetch the user attribute
360 if conn.search(
361 settings.CAS_LDAP_BASE_DN,
362 self.get_ldap_user_query(self.username),
363 attributes=ldap3.ALL_ATTRIBUTES
364 ) and len(conn.entries) == 1:
365 # try the ldap3>=2 API
366 try:
367 attributes = conn.entries[0].entry_attributes_as_dict
368 # store the user dn
369 attributes["dn"] = conn.entries[0].entry_dn
370 # fallback to ldap<2 API
371 except (
372 ldap3.core.exceptions.LDAPKeyError, # ldap3<1 exception
373 ldap3.core.exceptions.LDAPAttributeError # ldap3<2 exception
374 ):
375 attributes = conn.entries[0].entry_get_attributes_dict()
376 attributes["dn"] = conn.entries[0].entry_get_dn()
377 # cache the attributes locally as we wont have access to the user password
378 # later.
379 user = UserAttributes.objects.get_or_create(username=self.username)[0]
380 user.attributs = attributes
381 user.save()
382 finally:
383 conn.unbind()
384 return True
385 except (
386 ldap3.core.exceptions.LDAPBindError,
387 ldap3.core.exceptions.LDAPCommunicationError
388 ):
389 return False
390 elif self.user and self.user.get(settings.CAS_LDAP_PASSWORD_ATTR):
391 return check_password(
392 settings.CAS_LDAP_PASSWORD_CHECK,
393 password,
394 self.user[settings.CAS_LDAP_PASSWORD_ATTR][0],
395 settings.CAS_LDAP_PASSWORD_CHARSET
396 )
397 else:
398 return False
400 def attributs(self):
401 """
402 The user attributes.
404 :return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode`
405 or :class:`list` of :func:`unicode`. If the user do not exists, the returned
406 :class:`dict` is empty.
407 :rtype: dict
408 """
409 if settings.CAS_LDAP_PASSWORD_CHECK == "bind":
410 if settings.CAS_LDAP_ATTRS_VIEW == 1:
411 user = UserAttributes.objects.get(username=self.username)
412 return user.attributs
413 else:
414 return self.user
415 else:
416 return super(LdapAuthUser, self).attributs()
419class DjangoAuthUser(AuthUser): # pragma: no cover
420 """
421 A django auth class: authenticate user against django internal users
423 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
424 class attribute. Valid value are usernames of django internal users.
425 """
426 #: a django user object if the username is found. The user model is retreived
427 #: using :func:`django.contrib.auth.get_user_model`.
428 user = None
430 def __init__(self, username):
431 User = get_user_model()
432 try:
433 self.user = User.objects.get(username=username)
434 except User.DoesNotExist:
435 pass
436 super(DjangoAuthUser, self).__init__(username)
438 def test_password(self, password):
439 """
440 Tests ``password`` against the user-supplied password.
442 :param unicode password: a clear text password as submited by the user.
443 :return: ``True`` if :attr:`user` is valid and ``password`` is
444 correct, ``False`` otherwise.
445 :rtype: bool
446 """
447 if self.user:
448 return self.user.check_password(password)
449 else:
450 return False
452 def attributs(self):
453 """
454 The user attributes, defined as the fields on the :attr:`user` object.
456 :return: a :class:`dict` with the :attr:`user` object fields. Attributes may be
457 If the user do not exists, the returned :class:`dict` is empty.
458 :rtype: dict
459 """
460 if self.user:
461 attr = {}
462 # _meta.get_fields() is from the new documented _meta interface in django 1.8
463 try:
464 field_names = [
465 field.attname for field in self.user._meta.get_fields()
466 if hasattr(field, "attname")
467 ]
468 # backward compatibility with django 1.7
469 except AttributeError: # pragma: no cover (only used by django 1.7)
470 field_names = self.user._meta.get_all_field_names()
471 for name in field_names:
472 attr[name] = getattr(self.user, name)
474 # unfold user_permissions many to many relation
475 if 'user_permissions' in attr:
476 attr['user_permissions'] = [
477 (
478 u"%s.%s" % (
479 perm.content_type.model_class().__module__,
480 perm.content_type.model_class().__name__
481 ),
482 perm.codename
483 ) for perm in attr['user_permissions'].filter()
484 ]
486 # unfold group many to many relation
487 if 'groups' in attr:
488 attr['groups'] = [group.name for group in attr['groups'].filter()]
490 return attr
491 else:
492 return {}
495class CASFederateAuth(AuthUser):
496 """
497 Authentication class used then CAS_FEDERATE is True
499 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
500 class attribute. Valid value are usernames of
501 :class:`FederatedUser<cas_server.models.FederatedUser>` object.
502 :class:`FederatedUser<cas_server.models.FederatedUser>` object are created on CAS
503 backends successful ticket validation.
504 """
505 #: a :class`FederatedUser<cas_server.models.FederatedUser>` object if ``username`` is found.
506 user = None
508 def __init__(self, username):
509 try:
510 self.user = FederatedUser.get_from_federated_username(username)
511 super(CASFederateAuth, self).__init__(
512 self.user.federated_username
513 )
514 except FederatedUser.DoesNotExist:
515 super(CASFederateAuth, self).__init__(username)
517 def test_password(self, ticket):
518 """
519 Tests ``password`` against the user-supplied password.
521 :param unicode password: The CAS tickets just used to validate the user authentication
522 against its CAS backend.
523 :return: ``True`` if :attr:`user` is valid and ``password`` is
524 a ticket validated less than ``settings.CAS_TICKET_VALIDITY`` secondes and has not
525 being previously used for authenticated this
526 :class:`FederatedUser<cas_server.models.FederatedUser>`. ``False`` otherwise.
527 :rtype: bool
528 """
529 if not self.user or not self.user.ticket:
530 return False
531 else:
532 return (
533 ticket == self.user.ticket and
534 self.user.last_update >
535 (timezone.now() - timedelta(seconds=settings.CAS_TICKET_VALIDITY))
536 )
538 def attributs(self):
539 """
540 The user attributes, as returned by the CAS backend.
542 :return: :obj:`FederatedUser.attributs<cas_server.models.FederatedUser.attributs>`.
543 If the user do not exists, the returned :class:`dict` is empty.
544 :rtype: dict
545 """
546 if not self.user: # pragma: no cover (should not happen)
547 return {}
548 else:
549 return self.user.attributs