Coverage for cas_server/auth.py: 100%

38 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-18 09:47 +0000

1# -*- coding: utf-8 -*- 

2# This program is distributed in the hope that it will be useful, but WITHOUT 

3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for 

5# more details. 

6# 

7# You should have received a copy of the GNU General Public License version 3 

8# along with this program; if not, write to the Free Software Foundation, Inc., 51 

9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

10# 

11# (c) 2015-2025 Valentin Samir 

12"""Some authentication classes for the CAS""" 

13from django.conf import settings 

14from django.contrib.auth import get_user_model 

15from django.utils import timezone 

16from django.db import connections, DatabaseError 

17 

18import warnings 

19from datetime import timedelta 

20try: # pragma: no cover 

21 import MySQLdb 

22 import MySQLdb.cursors 

23except ImportError: 

24 MySQLdb = None 

25 

26 

27try: # pragma: no cover 

28 import ldap3 

29 import ldap3.core.exceptions 

30except ImportError: 

31 ldap3 = None 

32 

33from .models import FederatedUser, UserAttributes 

34from .utils import check_password, dictfetchall 

35 

36 

37class AuthUser(object): 

38 """ 

39 Authentication base class 

40 

41 :param unicode username: A username, stored in the :attr:`username` class attribute. 

42 """ 

43 

44 #: username used to instanciate the current object 

45 username = None 

46 

47 def __init__(self, username): 

48 self.username = username 

49 

50 def test_password(self, password): 

51 """ 

52 Tests ``password`` against the user-supplied password. 

53 

54 :raises NotImplementedError: always. The method need to be implemented by subclasses 

55 """ 

56 raise NotImplementedError() 

57 

58 def attributs(self): 

59 """ 

60 The user attributes. 

61 

62 raises NotImplementedError: always. The method need to be implemented by subclasses 

63 """ 

64 raise NotImplementedError() 

65 

66 

67class DummyAuthUser(AuthUser): # pragma: no cover 

68 """ 

69 A Dummy authentication class. Authentication always fails 

70 

71 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

72 class attribute. There is no valid value for this attribute here. 

73 """ 

74 

75 def test_password(self, password): 

76 """ 

77 Tests ``password`` against the user-supplied password. 

78 

79 :param unicode password: a clear text password as submited by the user. 

80 :return: always ``False`` 

81 :rtype: bool 

82 """ 

83 return False 

84 

85 def attributs(self): 

86 """ 

87 The user attributes. 

88 

89 :return: en empty :class:`dict`. 

90 :rtype: dict 

91 """ 

92 return {} 

93 

94 

95class TestAuthUser(AuthUser): 

96 """ 

97 A test authentication class only working for one unique user. 

98 

99 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

100 class attribute. The uniq valid value is ``settings.CAS_TEST_USER``. 

101 """ 

102 

103 def test_password(self, password): 

104 """ 

105 Tests ``password`` against the user-supplied password. 

106 

107 :param unicode password: a clear text password as submited by the user. 

108 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and 

109 ``password`` is equal to ``settings.CAS_TEST_PASSWORD``, ``False`` otherwise. 

110 :rtype: bool 

111 """ 

112 return self.username == settings.CAS_TEST_USER and password == settings.CAS_TEST_PASSWORD 

113 

114 def attributs(self): 

115 """ 

116 The user attributes. 

117 

118 :return: the ``settings.CAS_TEST_ATTRIBUTES`` :class:`dict` if 

119 :attr:`username<AuthUser.username>` is valid, an empty :class:`dict` otherwise. 

120 :rtype: dict 

121 """ 

122 if self.username == settings.CAS_TEST_USER: 

123 return settings.CAS_TEST_ATTRIBUTES 

124 else: # pragma: no cover (should not happen) 

125 return {} 

126 

127 

128class DBAuthUser(AuthUser): # pragma: no cover 

129 """base class for databate based auth classes""" 

130 #: DB user attributes as a :class:`dict` if the username is found in the database. 

131 user = None 

132 

133 def attributs(self): 

134 """ 

135 The user attributes. 

136 

137 :return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode` 

138 or :class:`list` of :func:`unicode`. If the user do not exists, the returned 

139 :class:`dict` is empty. 

140 :rtype: dict 

141 """ 

142 if self.user: 

143 return self.user 

144 else: 

145 return {} 

146 

147 

148class MysqlAuthUser(DBAuthUser): # pragma: no cover 

149 """ 

150 DEPRECATED, use :class:`SqlAuthUser` instead. 

151 

152 A mysql authentication class: authenticate user against a mysql database 

153 

154 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

155 class attribute. Valid value are fetched from the MySQL database set with 

156 ``settings.CAS_SQL_*`` settings parameters using the query 

157 ``settings.CAS_SQL_USER_QUERY``. 

158 """ 

159 

160 def __init__(self, username): 

161 warnings.warn( 

162 ( 

163 "MysqlAuthUser authentication class is deprecated: " 

164 "use cas_server.auth.SqlAuthUser instead" 

165 ), 

166 UserWarning 

167 ) 

168 # see the connect function at 

169 # http://mysql-python.sourceforge.net/MySQLdb.html#functions-and-attributes 

170 # for possible mysql config parameters. 

171 mysql_config = { 

172 "user": settings.CAS_SQL_USERNAME, 

173 "passwd": settings.CAS_SQL_PASSWORD, 

174 "db": settings.CAS_SQL_DBNAME, 

175 "host": settings.CAS_SQL_HOST, 

176 "charset": settings.CAS_SQL_DBCHARSET, 

177 "cursorclass": MySQLdb.cursors.DictCursor 

178 } 

179 if not MySQLdb: 

180 raise RuntimeError("Please install MySQLdb before using the MysqlAuthUser backend") 

181 conn = MySQLdb.connect(**mysql_config) 

182 curs = conn.cursor() 

183 if curs.execute(settings.CAS_SQL_USER_QUERY, (username,)) == 1: 

184 self.user = curs.fetchone() 

185 super(MysqlAuthUser, self).__init__(self.user['username']) 

186 else: 

187 super(MysqlAuthUser, self).__init__(username) 

188 

189 def test_password(self, password): 

190 """ 

191 Tests ``password`` against the user-supplied password. 

192 

193 :param unicode password: a clear text password as submited by the user. 

194 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is 

195 correct, ``False`` otherwise. 

196 :rtype: bool 

197 """ 

198 if self.user: 

199 return check_password( 

200 settings.CAS_SQL_PASSWORD_CHECK, 

201 password, 

202 self.user["password"], 

203 settings.CAS_SQL_DBCHARSET 

204 ) 

205 else: 

206 return False 

207 

208 

209class SqlAuthUser(DBAuthUser): # pragma: no cover 

210 """ 

211 A SQL authentication class: authenticate user against a SQL database. The SQL database 

212 must be configures in settings.py as ``settings.DATABASES['cas_server']``. 

213 

214 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

215 class attribute. Valid value are fetched from the MySQL database set with 

216 ``settings.CAS_SQL_*`` settings parameters using the query 

217 ``settings.CAS_SQL_USER_QUERY``. 

218 """ 

219 

220 def __init__(self, username): 

221 if "cas_server" not in connections: 

222 raise RuntimeError("Please configure the 'cas_server' database in settings.DATABASES") 

223 for retry_nb in range(3): 

224 try: 

225 with connections["cas_server"].cursor() as curs: 

226 curs.execute(settings.CAS_SQL_USER_QUERY, (username,)) 

227 results = dictfetchall(curs) 

228 if len(results) == 1: 

229 self.user = results[0] 

230 super(SqlAuthUser, self).__init__(self.user['username']) 

231 else: 

232 super(SqlAuthUser, self).__init__(username) 

233 break 

234 except DatabaseError: 

235 connections["cas_server"].close() 

236 if retry_nb == 2: 

237 raise 

238 

239 def test_password(self, password): 

240 """ 

241 Tests ``password`` against the user-supplied password. 

242 

243 :param unicode password: a clear text password as submited by the user. 

244 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is 

245 correct, ``False`` otherwise. 

246 :rtype: bool 

247 """ 

248 if self.user: 

249 return check_password( 

250 settings.CAS_SQL_PASSWORD_CHECK, 

251 password, 

252 self.user["password"], 

253 settings.CAS_SQL_PASSWORD_CHARSET 

254 ) 

255 else: 

256 return False 

257 

258 

259class LdapAuthUser(DBAuthUser): # pragma: no cover 

260 """ 

261 A ldap authentication class: authenticate user against a ldap database 

262 

263 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

264 class attribute. Valid value are fetched from the ldap database set with 

265 ``settings.CAS_LDAP_*`` settings parameters. 

266 """ 

267 

268 _conn = None 

269 

270 @classmethod 

271 def get_conn(cls): 

272 """Return a connection object to the ldap database""" 

273 conn = cls._conn 

274 if conn is None or conn.closed: 

275 conn = ldap3.Connection( 

276 settings.CAS_LDAP_SERVER, 

277 settings.CAS_LDAP_USER, 

278 settings.CAS_LDAP_PASSWORD, 

279 client_strategy="RESTARTABLE", 

280 auto_bind=True 

281 ) 

282 cls._conn = conn 

283 return conn 

284 

285 @staticmethod 

286 def get_ldap_user_query(username): 

287 if '%s' in settings.CAS_LDAP_USER_QUERY: 

288 warnings.warn( 

289 r'Using %s in your CAS_LDAP_USER_QUERY is deprecated. ' 

290 r'Please upgrade your config to use %(username)s instead', 

291 DeprecationWarning, 

292 stacklevel=0 

293 ) 

294 return settings.CAS_LDAP_USER_QUERY % (ldap3.utils.conv.escape_bytes(username), ) 

295 else: 

296 return settings.CAS_LDAP_USER_QUERY % { 

297 'username': ldap3.utils.conv.escape_bytes(username) 

298 } 

299 

300 def __init__(self, username): 

301 if not ldap3: 

302 raise RuntimeError("Please install ldap3 before using the LdapAuthUser backend") 

303 if not settings.CAS_LDAP_BASE_DN: 

304 raise ValueError( 

305 "You must define CAS_LDAP_BASE_DN for using the ldap authentication backend" 

306 ) 

307 # in case we got deconnected from the database, retry to connect 2 times 

308 for retry_nb in range(3): 

309 try: 

310 conn = self.get_conn() 

311 if conn.search( 

312 settings.CAS_LDAP_BASE_DN, 

313 self.get_ldap_user_query(username), 

314 attributes=ldap3.ALL_ATTRIBUTES 

315 ) and len(conn.entries) == 1: 

316 # try the new ldap3>=2 API 

317 try: 

318 user = conn.entries[0].entry_attributes_as_dict 

319 # store the user dn 

320 user["dn"] = conn.entries[0].entry_dn 

321 # fallback to ldap3<2 API 

322 except ( 

323 ldap3.core.exceptions.LDAPKeyError, # ldap3<1 exception 

324 ldap3.core.exceptions.LDAPAttributeError # ldap3<2 exception 

325 ): 

326 user = conn.entries[0].entry_get_attributes_dict() 

327 # store the user dn 

328 user["dn"] = conn.entries[0].entry_get_dn() 

329 if user.get(settings.CAS_LDAP_USERNAME_ATTR): 

330 self.user = user 

331 super(LdapAuthUser, self).__init__(user[settings.CAS_LDAP_USERNAME_ATTR][0]) 

332 else: 

333 super(LdapAuthUser, self).__init__(username) 

334 else: 

335 super(LdapAuthUser, self).__init__(username) 

336 break 

337 except ldap3.core.exceptions.LDAPCommunicationError: 

338 if retry_nb == 2: 

339 raise 

340 

341 def test_password(self, password): 

342 """ 

343 Tests ``password`` against the user-supplied password. 

344 

345 :param unicode password: a clear text password as submited by the user. 

346 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is 

347 correct, ``False`` otherwise. 

348 :rtype: bool 

349 """ 

350 if self.user and settings.CAS_LDAP_PASSWORD_CHECK == "bind": 

351 try: 

352 conn = ldap3.Connection( 

353 settings.CAS_LDAP_SERVER, 

354 self.user["dn"], 

355 password, 

356 auto_bind=True 

357 ) 

358 try: 

359 # fetch the user attribute 

360 if conn.search( 

361 settings.CAS_LDAP_BASE_DN, 

362 self.get_ldap_user_query(self.username), 

363 attributes=ldap3.ALL_ATTRIBUTES 

364 ) and len(conn.entries) == 1: 

365 # try the ldap3>=2 API 

366 try: 

367 attributes = conn.entries[0].entry_attributes_as_dict 

368 # store the user dn 

369 attributes["dn"] = conn.entries[0].entry_dn 

370 # fallback to ldap<2 API 

371 except ( 

372 ldap3.core.exceptions.LDAPKeyError, # ldap3<1 exception 

373 ldap3.core.exceptions.LDAPAttributeError # ldap3<2 exception 

374 ): 

375 attributes = conn.entries[0].entry_get_attributes_dict() 

376 attributes["dn"] = conn.entries[0].entry_get_dn() 

377 # cache the attributes locally as we wont have access to the user password 

378 # later. 

379 user = UserAttributes.objects.get_or_create(username=self.username)[0] 

380 user.attributs = attributes 

381 user.save() 

382 finally: 

383 conn.unbind() 

384 return True 

385 except ( 

386 ldap3.core.exceptions.LDAPBindError, 

387 ldap3.core.exceptions.LDAPCommunicationError 

388 ): 

389 return False 

390 elif self.user and self.user.get(settings.CAS_LDAP_PASSWORD_ATTR): 

391 return check_password( 

392 settings.CAS_LDAP_PASSWORD_CHECK, 

393 password, 

394 self.user[settings.CAS_LDAP_PASSWORD_ATTR][0], 

395 settings.CAS_LDAP_PASSWORD_CHARSET 

396 ) 

397 else: 

398 return False 

399 

400 def attributs(self): 

401 """ 

402 The user attributes. 

403 

404 :return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode` 

405 or :class:`list` of :func:`unicode`. If the user do not exists, the returned 

406 :class:`dict` is empty. 

407 :rtype: dict 

408 """ 

409 if settings.CAS_LDAP_PASSWORD_CHECK == "bind": 

410 if settings.CAS_LDAP_ATTRS_VIEW == 1: 

411 user = UserAttributes.objects.get(username=self.username) 

412 return user.attributs 

413 else: 

414 return self.user 

415 else: 

416 return super(LdapAuthUser, self).attributs() 

417 

418 

419class DjangoAuthUser(AuthUser): # pragma: no cover 

420 """ 

421 A django auth class: authenticate user against django internal users 

422 

423 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

424 class attribute. Valid value are usernames of django internal users. 

425 """ 

426 #: a django user object if the username is found. The user model is retreived 

427 #: using :func:`django.contrib.auth.get_user_model`. 

428 user = None 

429 

430 def __init__(self, username): 

431 User = get_user_model() 

432 try: 

433 self.user = User.objects.get(username=username) 

434 except User.DoesNotExist: 

435 pass 

436 super(DjangoAuthUser, self).__init__(username) 

437 

438 def test_password(self, password): 

439 """ 

440 Tests ``password`` against the user-supplied password. 

441 

442 :param unicode password: a clear text password as submited by the user. 

443 :return: ``True`` if :attr:`user` is valid and ``password`` is 

444 correct, ``False`` otherwise. 

445 :rtype: bool 

446 """ 

447 if self.user: 

448 return self.user.check_password(password) 

449 else: 

450 return False 

451 

452 def attributs(self): 

453 """ 

454 The user attributes, defined as the fields on the :attr:`user` object. 

455 

456 :return: a :class:`dict` with the :attr:`user` object fields. Attributes may be 

457 If the user do not exists, the returned :class:`dict` is empty. 

458 :rtype: dict 

459 """ 

460 if self.user: 

461 attr = {} 

462 # _meta.get_fields() is from the new documented _meta interface in django 1.8 

463 try: 

464 field_names = [ 

465 field.attname for field in self.user._meta.get_fields() 

466 if hasattr(field, "attname") 

467 ] 

468 # backward compatibility with django 1.7 

469 except AttributeError: # pragma: no cover (only used by django 1.7) 

470 field_names = self.user._meta.get_all_field_names() 

471 for name in field_names: 

472 attr[name] = getattr(self.user, name) 

473 

474 # unfold user_permissions many to many relation 

475 if 'user_permissions' in attr: 

476 attr['user_permissions'] = [ 

477 ( 

478 u"%s.%s" % ( 

479 perm.content_type.model_class().__module__, 

480 perm.content_type.model_class().__name__ 

481 ), 

482 perm.codename 

483 ) for perm in attr['user_permissions'].filter() 

484 ] 

485 

486 # unfold group many to many relation 

487 if 'groups' in attr: 

488 attr['groups'] = [group.name for group in attr['groups'].filter()] 

489 

490 return attr 

491 else: 

492 return {} 

493 

494 

495class CASFederateAuth(AuthUser): 

496 """ 

497 Authentication class used then CAS_FEDERATE is True 

498 

499 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

500 class attribute. Valid value are usernames of 

501 :class:`FederatedUser<cas_server.models.FederatedUser>` object. 

502 :class:`FederatedUser<cas_server.models.FederatedUser>` object are created on CAS 

503 backends successful ticket validation. 

504 """ 

505 #: a :class`FederatedUser<cas_server.models.FederatedUser>` object if ``username`` is found. 

506 user = None 

507 

508 def __init__(self, username): 

509 try: 

510 self.user = FederatedUser.get_from_federated_username(username) 

511 super(CASFederateAuth, self).__init__( 

512 self.user.federated_username 

513 ) 

514 except FederatedUser.DoesNotExist: 

515 super(CASFederateAuth, self).__init__(username) 

516 

517 def test_password(self, ticket): 

518 """ 

519 Tests ``password`` against the user-supplied password. 

520 

521 :param unicode password: The CAS tickets just used to validate the user authentication 

522 against its CAS backend. 

523 :return: ``True`` if :attr:`user` is valid and ``password`` is 

524 a ticket validated less than ``settings.CAS_TICKET_VALIDITY`` secondes and has not 

525 being previously used for authenticated this 

526 :class:`FederatedUser<cas_server.models.FederatedUser>`. ``False`` otherwise. 

527 :rtype: bool 

528 """ 

529 if not self.user or not self.user.ticket: 

530 return False 

531 else: 

532 return ( 

533 ticket == self.user.ticket and 

534 self.user.last_update > 

535 (timezone.now() - timedelta(seconds=settings.CAS_TICKET_VALIDITY)) 

536 ) 

537 

538 def attributs(self): 

539 """ 

540 The user attributes, as returned by the CAS backend. 

541 

542 :return: :obj:`FederatedUser.attributs<cas_server.models.FederatedUser.attributs>`. 

543 If the user do not exists, the returned :class:`dict` is empty. 

544 :rtype: dict 

545 """ 

546 if not self.user: # pragma: no cover (should not happen) 

547 return {} 

548 else: 

549 return self.user.attributs