Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2# This program is distributed in the hope that it will be useful, but WITHOUT 

3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for 

5# more details. 

6# 

7# You should have received a copy of the GNU General Public License version 3 

8# along with this program; if not, write to the Free Software Foundation, Inc., 51 

9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

10# 

11# (c) 2015-2016 Valentin Samir 

12"""Some authentication classes for the CAS""" 

13from django.conf import settings 

14from django.contrib.auth import get_user_model 

15from django.utils import timezone 

16from django.db import connections, DatabaseError 

17 

18import warnings 

19from datetime import timedelta 

20from six.moves import range 

21try: # pragma: no cover 

22 import MySQLdb 

23 import MySQLdb.cursors 

24except ImportError: 

25 MySQLdb = None 

26 

27 

28try: # pragma: no cover 

29 import ldap3 

30 import ldap3.core.exceptions 

31except ImportError: 

32 ldap3 = None 

33 

34from .models import FederatedUser, UserAttributes 

35from .utils import check_password, dictfetchall 

36 

37 

38class AuthUser(object): 

39 """ 

40 Authentication base class 

41 

42 :param unicode username: A username, stored in the :attr:`username` class attribute. 

43 """ 

44 

45 #: username used to instanciate the current object 

46 username = None 

47 

48 def __init__(self, username): 

49 self.username = username 

50 

51 def test_password(self, password): 

52 """ 

53 Tests ``password`` against the user-supplied password. 

54 

55 :raises NotImplementedError: always. The method need to be implemented by subclasses 

56 """ 

57 raise NotImplementedError() 

58 

59 def attributs(self): 

60 """ 

61 The user attributes. 

62 

63 raises NotImplementedError: always. The method need to be implemented by subclasses 

64 """ 

65 raise NotImplementedError() 

66 

67 

68class DummyAuthUser(AuthUser): # pragma: no cover 

69 """ 

70 A Dummy authentication class. Authentication always fails 

71 

72 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

73 class attribute. There is no valid value for this attribute here. 

74 """ 

75 

76 def test_password(self, password): 

77 """ 

78 Tests ``password`` against the user-supplied password. 

79 

80 :param unicode password: a clear text password as submited by the user. 

81 :return: always ``False`` 

82 :rtype: bool 

83 """ 

84 return False 

85 

86 def attributs(self): 

87 """ 

88 The user attributes. 

89 

90 :return: en empty :class:`dict`. 

91 :rtype: dict 

92 """ 

93 return {} 

94 

95 

96class TestAuthUser(AuthUser): 

97 """ 

98 A test authentication class only working for one unique user. 

99 

100 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

101 class attribute. The uniq valid value is ``settings.CAS_TEST_USER``. 

102 """ 

103 

104 def test_password(self, password): 

105 """ 

106 Tests ``password`` against the user-supplied password. 

107 

108 :param unicode password: a clear text password as submited by the user. 

109 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and 

110 ``password`` is equal to ``settings.CAS_TEST_PASSWORD``, ``False`` otherwise. 

111 :rtype: bool 

112 """ 

113 return self.username == settings.CAS_TEST_USER and password == settings.CAS_TEST_PASSWORD 

114 

115 def attributs(self): 

116 """ 

117 The user attributes. 

118 

119 :return: the ``settings.CAS_TEST_ATTRIBUTES`` :class:`dict` if 

120 :attr:`username<AuthUser.username>` is valid, an empty :class:`dict` otherwise. 

121 :rtype: dict 

122 """ 

123 if self.username == settings.CAS_TEST_USER: 

124 return settings.CAS_TEST_ATTRIBUTES 

125 else: # pragma: no cover (should not happen) 

126 return {} 

127 

128 

129class DBAuthUser(AuthUser): # pragma: no cover 

130 """base class for databate based auth classes""" 

131 #: DB user attributes as a :class:`dict` if the username is found in the database. 

132 user = None 

133 

134 def attributs(self): 

135 """ 

136 The user attributes. 

137 

138 :return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode` 

139 or :class:`list` of :func:`unicode`. If the user do not exists, the returned 

140 :class:`dict` is empty. 

141 :rtype: dict 

142 """ 

143 if self.user: 

144 return self.user 

145 else: 

146 return {} 

147 

148 

149class MysqlAuthUser(DBAuthUser): # pragma: no cover 

150 """ 

151 DEPRECATED, use :class:`SqlAuthUser` instead. 

152 

153 A mysql authentication class: authenticate user against a mysql database 

154 

155 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

156 class attribute. Valid value are fetched from the MySQL database set with 

157 ``settings.CAS_SQL_*`` settings parameters using the query 

158 ``settings.CAS_SQL_USER_QUERY``. 

159 """ 

160 

161 def __init__(self, username): 

162 warnings.warn( 

163 ( 

164 "MysqlAuthUser authentication class is deprecated: " 

165 "use cas_server.auth.SqlAuthUser instead" 

166 ), 

167 UserWarning 

168 ) 

169 # see the connect function at 

170 # http://mysql-python.sourceforge.net/MySQLdb.html#functions-and-attributes 

171 # for possible mysql config parameters. 

172 mysql_config = { 

173 "user": settings.CAS_SQL_USERNAME, 

174 "passwd": settings.CAS_SQL_PASSWORD, 

175 "db": settings.CAS_SQL_DBNAME, 

176 "host": settings.CAS_SQL_HOST, 

177 "charset": settings.CAS_SQL_DBCHARSET, 

178 "cursorclass": MySQLdb.cursors.DictCursor 

179 } 

180 if not MySQLdb: 

181 raise RuntimeError("Please install MySQLdb before using the MysqlAuthUser backend") 

182 conn = MySQLdb.connect(**mysql_config) 

183 curs = conn.cursor() 

184 if curs.execute(settings.CAS_SQL_USER_QUERY, (username,)) == 1: 

185 self.user = curs.fetchone() 

186 super(MysqlAuthUser, self).__init__(self.user['username']) 

187 else: 

188 super(MysqlAuthUser, self).__init__(username) 

189 

190 def test_password(self, password): 

191 """ 

192 Tests ``password`` against the user-supplied password. 

193 

194 :param unicode password: a clear text password as submited by the user. 

195 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is 

196 correct, ``False`` otherwise. 

197 :rtype: bool 

198 """ 

199 if self.user: 

200 return check_password( 

201 settings.CAS_SQL_PASSWORD_CHECK, 

202 password, 

203 self.user["password"], 

204 settings.CAS_SQL_DBCHARSET 

205 ) 

206 else: 

207 return False 

208 

209 

210class SqlAuthUser(DBAuthUser): # pragma: no cover 

211 """ 

212 A SQL authentication class: authenticate user against a SQL database. The SQL database 

213 must be configures in settings.py as ``settings.DATABASES['cas_server']``. 

214 

215 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

216 class attribute. Valid value are fetched from the MySQL database set with 

217 ``settings.CAS_SQL_*`` settings parameters using the query 

218 ``settings.CAS_SQL_USER_QUERY``. 

219 """ 

220 

221 def __init__(self, username): 

222 if "cas_server" not in connections: 

223 raise RuntimeError("Please configure the 'cas_server' database in settings.DATABASES") 

224 for retry_nb in range(3): 

225 try: 

226 with connections["cas_server"].cursor() as curs: 

227 curs.execute(settings.CAS_SQL_USER_QUERY, (username,)) 

228 results = dictfetchall(curs) 

229 if len(results) == 1: 

230 self.user = results[0] 

231 super(SqlAuthUser, self).__init__(self.user['username']) 

232 else: 

233 super(SqlAuthUser, self).__init__(username) 

234 break 

235 except DatabaseError: 

236 connections["cas_server"].close() 

237 if retry_nb == 2: 

238 raise 

239 

240 def test_password(self, password): 

241 """ 

242 Tests ``password`` against the user-supplied password. 

243 

244 :param unicode password: a clear text password as submited by the user. 

245 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is 

246 correct, ``False`` otherwise. 

247 :rtype: bool 

248 """ 

249 if self.user: 

250 return check_password( 

251 settings.CAS_SQL_PASSWORD_CHECK, 

252 password, 

253 self.user["password"], 

254 settings.CAS_SQL_PASSWORD_CHARSET 

255 ) 

256 else: 

257 return False 

258 

259 

260class LdapAuthUser(DBAuthUser): # pragma: no cover 

261 """ 

262 A ldap authentication class: authenticate user against a ldap database 

263 

264 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

265 class attribute. Valid value are fetched from the ldap database set with 

266 ``settings.CAS_LDAP_*`` settings parameters. 

267 """ 

268 

269 _conn = None 

270 

271 @classmethod 

272 def get_conn(cls): 

273 """Return a connection object to the ldap database""" 

274 conn = cls._conn 

275 if conn is None or conn.closed: 

276 conn = ldap3.Connection( 

277 settings.CAS_LDAP_SERVER, 

278 settings.CAS_LDAP_USER, 

279 settings.CAS_LDAP_PASSWORD, 

280 client_strategy="RESTARTABLE", 

281 auto_bind=True 

282 ) 

283 cls._conn = conn 

284 return conn 

285 

286 def __init__(self, username): 

287 if not ldap3: 

288 raise RuntimeError("Please install ldap3 before using the LdapAuthUser backend") 

289 if not settings.CAS_LDAP_BASE_DN: 

290 raise ValueError( 

291 "You must define CAS_LDAP_BASE_DN for using the ldap authentication backend" 

292 ) 

293 # in case we got deconnected from the database, retry to connect 2 times 

294 for retry_nb in range(3): 

295 try: 

296 conn = self.get_conn() 

297 if conn.search( 

298 settings.CAS_LDAP_BASE_DN, 

299 settings.CAS_LDAP_USER_QUERY % ldap3.utils.conv.escape_bytes(username), 

300 attributes=ldap3.ALL_ATTRIBUTES 

301 ) and len(conn.entries) == 1: 

302 # try the new ldap3>=2 API 

303 try: 

304 user = conn.entries[0].entry_attributes_as_dict 

305 # store the user dn 

306 user["dn"] = conn.entries[0].entry_dn 

307 # fallback to ldap3<2 API 

308 except ( 

309 ldap3.core.exceptions.LDAPKeyError, # ldap3<1 exception 

310 ldap3.core.exceptions.LDAPAttributeError # ldap3<2 exception 

311 ): 

312 user = conn.entries[0].entry_get_attributes_dict() 

313 # store the user dn 

314 user["dn"] = conn.entries[0].entry_get_dn() 

315 if user.get(settings.CAS_LDAP_USERNAME_ATTR): 

316 self.user = user 

317 super(LdapAuthUser, self).__init__(user[settings.CAS_LDAP_USERNAME_ATTR][0]) 

318 else: 

319 super(LdapAuthUser, self).__init__(username) 

320 else: 

321 super(LdapAuthUser, self).__init__(username) 

322 break 

323 except ldap3.core.exceptions.LDAPCommunicationError: 

324 if retry_nb == 2: 

325 raise 

326 

327 def test_password(self, password): 

328 """ 

329 Tests ``password`` against the user-supplied password. 

330 

331 :param unicode password: a clear text password as submited by the user. 

332 :return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is 

333 correct, ``False`` otherwise. 

334 :rtype: bool 

335 """ 

336 if self.user and settings.CAS_LDAP_PASSWORD_CHECK == "bind": 

337 try: 

338 conn = ldap3.Connection( 

339 settings.CAS_LDAP_SERVER, 

340 self.user["dn"], 

341 password, 

342 auto_bind=True 

343 ) 

344 try: 

345 # fetch the user attribute 

346 if conn.search( 

347 settings.CAS_LDAP_BASE_DN, 

348 settings.CAS_LDAP_USER_QUERY % ldap3.utils.conv.escape_bytes(self.username), 

349 attributes=ldap3.ALL_ATTRIBUTES 

350 ) and len(conn.entries) == 1: 

351 # try the ldap3>=2 API 

352 try: 

353 attributes = conn.entries[0].entry_attributes_as_dict 

354 # store the user dn 

355 attributes["dn"] = conn.entries[0].entry_dn 

356 # fallback to ldap<2 API 

357 except ( 

358 ldap3.core.exceptions.LDAPKeyError, # ldap3<1 exception 

359 ldap3.core.exceptions.LDAPAttributeError # ldap3<2 exception 

360 ): 

361 attributes = conn.entries[0].entry_get_attributes_dict() 

362 attributes["dn"] = conn.entries[0].entry_get_dn() 

363 # cache the attributes locally as we wont have access to the user password 

364 # later. 

365 user = UserAttributes.objects.get_or_create(username=self.username)[0] 

366 user.attributs = attributes 

367 user.save() 

368 finally: 

369 conn.unbind() 

370 return True 

371 except ( 

372 ldap3.core.exceptions.LDAPBindError, 

373 ldap3.core.exceptions.LDAPCommunicationError 

374 ): 

375 return False 

376 elif self.user and self.user.get(settings.CAS_LDAP_PASSWORD_ATTR): 

377 return check_password( 

378 settings.CAS_LDAP_PASSWORD_CHECK, 

379 password, 

380 self.user[settings.CAS_LDAP_PASSWORD_ATTR][0], 

381 settings.CAS_LDAP_PASSWORD_CHARSET 

382 ) 

383 else: 

384 return False 

385 

386 def attributs(self): 

387 """ 

388 The user attributes. 

389 

390 :return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode` 

391 or :class:`list` of :func:`unicode`. If the user do not exists, the returned 

392 :class:`dict` is empty. 

393 :rtype: dict 

394 :raises NotImplementedError: if the password check method in `CAS_LDAP_PASSWORD_CHECK` 

395 do not allow to fetch the attributes without the user credentials. 

396 """ 

397 if settings.CAS_LDAP_PASSWORD_CHECK == "bind": 

398 raise NotImplementedError() 

399 else: 

400 return super(LdapAuthUser, self).attributs() 

401 

402 

403class DjangoAuthUser(AuthUser): # pragma: no cover 

404 """ 

405 A django auth class: authenticate user against django internal users 

406 

407 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

408 class attribute. Valid value are usernames of django internal users. 

409 """ 

410 #: a django user object if the username is found. The user model is retreived 

411 #: using :func:`django.contrib.auth.get_user_model`. 

412 user = None 

413 

414 def __init__(self, username): 

415 User = get_user_model() 

416 try: 

417 self.user = User.objects.get(username=username) 

418 except User.DoesNotExist: 

419 pass 

420 super(DjangoAuthUser, self).__init__(username) 

421 

422 def test_password(self, password): 

423 """ 

424 Tests ``password`` against the user-supplied password. 

425 

426 :param unicode password: a clear text password as submited by the user. 

427 :return: ``True`` if :attr:`user` is valid and ``password`` is 

428 correct, ``False`` otherwise. 

429 :rtype: bool 

430 """ 

431 if self.user: 

432 return self.user.check_password(password) 

433 else: 

434 return False 

435 

436 def attributs(self): 

437 """ 

438 The user attributes, defined as the fields on the :attr:`user` object. 

439 

440 :return: a :class:`dict` with the :attr:`user` object fields. Attributes may be 

441 If the user do not exists, the returned :class:`dict` is empty. 

442 :rtype: dict 

443 """ 

444 if self.user: 

445 attr = {} 

446 # _meta.get_fields() is from the new documented _meta interface in django 1.8 

447 try: 

448 field_names = [ 

449 field.attname for field in self.user._meta.get_fields() 

450 if hasattr(field, "attname") 

451 ] 

452 # backward compatibility with django 1.7 

453 except AttributeError: # pragma: no cover (only used by django 1.7) 

454 field_names = self.user._meta.get_all_field_names() 

455 for name in field_names: 

456 attr[name] = getattr(self.user, name) 

457 

458 # unfold user_permissions many to many relation 

459 if 'user_permissions' in attr: 

460 attr['user_permissions'] = [ 

461 ( 

462 u"%s.%s" % ( 

463 perm.content_type.model_class().__module__, 

464 perm.content_type.model_class().__name__ 

465 ), 

466 perm.codename 

467 ) for perm in attr['user_permissions'].filter() 

468 ] 

469 

470 # unfold group many to many relation 

471 if 'groups' in attr: 

472 attr['groups'] = [group.name for group in attr['groups'].filter()] 

473 

474 return attr 

475 else: 

476 return {} 

477 

478 

479class CASFederateAuth(AuthUser): 

480 """ 

481 Authentication class used then CAS_FEDERATE is True 

482 

483 :param unicode username: A username, stored in the :attr:`username<AuthUser.username>` 

484 class attribute. Valid value are usernames of 

485 :class:`FederatedUser<cas_server.models.FederatedUser>` object. 

486 :class:`FederatedUser<cas_server.models.FederatedUser>` object are created on CAS 

487 backends successful ticket validation. 

488 """ 

489 #: a :class`FederatedUser<cas_server.models.FederatedUser>` object if ``username`` is found. 

490 user = None 

491 

492 def __init__(self, username): 

493 try: 

494 self.user = FederatedUser.get_from_federated_username(username) 

495 super(CASFederateAuth, self).__init__( 

496 self.user.federated_username 

497 ) 

498 except FederatedUser.DoesNotExist: 

499 super(CASFederateAuth, self).__init__(username) 

500 

501 def test_password(self, ticket): 

502 """ 

503 Tests ``password`` against the user-supplied password. 

504 

505 :param unicode password: The CAS tickets just used to validate the user authentication 

506 against its CAS backend. 

507 :return: ``True`` if :attr:`user` is valid and ``password`` is 

508 a ticket validated less than ``settings.CAS_TICKET_VALIDITY`` secondes and has not 

509 being previously used for authenticated this 

510 :class:`FederatedUser<cas_server.models.FederatedUser>`. ``False`` otherwise. 

511 :rtype: bool 

512 """ 

513 if not self.user or not self.user.ticket: 

514 return False 

515 else: 

516 return ( 

517 ticket == self.user.ticket and 

518 self.user.last_update > 

519 (timezone.now() - timedelta(seconds=settings.CAS_TICKET_VALIDITY)) 

520 ) 

521 

522 def attributs(self): 

523 """ 

524 The user attributes, as returned by the CAS backend. 

525 

526 :return: :obj:`FederatedUser.attributs<cas_server.models.FederatedUser.attributs>`. 

527 If the user do not exists, the returned :class:`dict` is empty. 

528 :rtype: dict 

529 """ 

530 if not self.user: # pragma: no cover (should not happen) 

531 return {} 

532 else: 

533 return self.user.attributs