Coverage for cas_server/utils.py: 82%

314 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-18 09:47 +0000

1# -*- coding: utf-8 -*- 

2# This program is distributed in the hope that it will be useful, but WITHOUT 

3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for 

5# more details. 

6# 

7# You should have received a copy of the GNU General Public License version 3 

8# along with this program; if not, write to the Free Software Foundation, Inc., 51 

9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

10# 

11# (c) 2015-2025 Valentin Samir 

12"""Some util function for the app""" 

13from .default_settings import settings 

14 

15from django.http import HttpResponseRedirect, HttpResponse 

16from django.contrib import messages 

17from django.contrib.messages import constants as DEFAULT_MESSAGE_LEVELS 

18from django.core.serializers.json import DjangoJSONEncoder 

19from django.utils import timezone 

20from django.core.exceptions import ValidationError 

21try: 

22 from django.urls import reverse 

23 from django.utils.translation import gettext_lazy as _ 

24except ImportError: 

25 from django.core.urlresolvers import reverse 

26 from django.utils.translation import ugettext_lazy as _ 

27 

28import re 

29import random 

30import string 

31import json 

32import hashlib 

33import base64 

34import requests 

35import time 

36import logging 

37import binascii 

38# The crypt module is deprecated and will be removed in version 3.13 

39try: 

40 import crypt 

41except ImportError: 

42 crypt = None 

43 

44from importlib import import_module 

45from datetime import datetime, timedelta, timezone as tz 

46from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode 

47 

48from . import VERSION 

49 

50#: logger facility 

51logger = logging.getLogger(__name__) 

52 

53 

54def json_encode(obj): 

55 """Encode a python object to json""" 

56 try: 

57 return json_encode.encoder.encode(obj) 

58 except AttributeError: 

59 json_encode.encoder = DjangoJSONEncoder(default=str) 

60 return json_encode(obj) 

61 

62 

63def context(params): 

64 """ 

65 Function that add somes variable to the context before template rendering 

66 

67 :param dict params: The context dictionary used to render templates. 

68 :return: The ``params`` dictionary with the key ``settings`` set to 

69 :obj:`django.conf.settings`. 

70 :rtype: dict 

71 """ 

72 params["settings"] = settings 

73 params["message_levels"] = DEFAULT_MESSAGE_LEVELS 

74 

75 if settings.CAS_NEW_VERSION_HTML_WARNING: 

76 LAST_VERSION = last_version() 

77 params["VERSION"] = VERSION 

78 params["LAST_VERSION"] = LAST_VERSION 

79 if LAST_VERSION is not None: 

80 params["upgrade_available"] = decode_version(VERSION) < decode_version(LAST_VERSION) 

81 else: 

82 params["upgrade_available"] = False 

83 

84 if settings.CAS_INFO_MESSAGES_ORDER: 

85 params["CAS_INFO_RENDER"] = [] 

86 for msg_name in settings.CAS_INFO_MESSAGES_ORDER: 

87 if msg_name in settings.CAS_INFO_MESSAGES: 

88 if not isinstance(settings.CAS_INFO_MESSAGES[msg_name], dict): 

89 continue 

90 msg = settings.CAS_INFO_MESSAGES[msg_name].copy() 

91 if "message" in msg: 

92 msg["name"] = msg_name 

93 # use info as default infox type 

94 msg["type"] = msg.get("type", "info") 

95 # make box discardable by default 

96 msg["discardable"] = msg.get("discardable", True) 

97 msg_hash = ( 

98 str(msg["message"]).encode("utf-8") + 

99 msg["type"].encode("utf-8") 

100 ) 

101 # hash depend of the rendering language 

102 msg["hash"] = hashlib.md5(msg_hash).hexdigest() 

103 params["CAS_INFO_RENDER"].append(msg) 

104 return params 

105 

106 

107def json_response(request, data): 

108 """ 

109 Wrapper dumping `data` to a json and sending it to the user with an HttpResponse 

110 

111 :param django.http.HttpRequest request: The request object used to generate this response. 

112 :param dict data: The python dictionnary to return as a json 

113 :return: The content of ``data`` serialized in json 

114 :rtype: django.http.HttpResponse 

115 """ 

116 data["messages"] = [] 

117 for msg in messages.get_messages(request): 

118 data["messages"].append({'message': msg.message, 'level': msg.level_tag}) 

119 return HttpResponse(json.dumps(data), content_type="application/json") 

120 

121 

122def import_attr(path): 

123 """ 

124 transform a python dotted path to the attr 

125 

126 :param path: A dotted path to a python object or a python object 

127 :type path: :obj:`unicode` or :obj:`str` or anything 

128 :return: The python object pointed by the dotted path or the python object unchanged 

129 """ 

130 # if we got a str, decode it to unicode (normally it should only contain ascii) 

131 if isinstance(path, bytes): 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 path = path.decode("utf-8") 

133 # if path is not an unicode, return it unchanged (may be it is already the attribute to import) 

134 if not isinstance(path, str): 

135 return path 

136 if u"." not in path: 

137 ValueError("%r should be of the form `module.attr` and we just got `attr`" % path) 

138 module, attr = path.rsplit(u'.', 1) 

139 try: 

140 return getattr(import_module(module), attr) 

141 except ImportError: 

142 raise ImportError("Module %r not found" % module) 

143 except AttributeError: 

144 raise AttributeError("Module %r has not attribut %r" % (module, attr)) 

145 

146 

147def redirect_params(url_name, params=None): 

148 """ 

149 Redirect to ``url_name`` with ``params`` as querystring 

150 

151 :param unicode url_name: a URL pattern name 

152 :param params: Some parameter to append to the reversed URL 

153 :type params: :obj:`dict` or :obj:`NoneType<types.NoneType>` 

154 :return: A redirection to the URL with name ``url_name`` with ``params`` as querystring. 

155 :rtype: django.http.HttpResponseRedirect 

156 """ 

157 url = reverse(url_name) 

158 params = urlencode(params if params else {}) 

159 return HttpResponseRedirect(url + "?%s" % params) 

160 

161 

162def reverse_params(url_name, params=None, **kwargs): 

163 """ 

164 compute the reverse url of ``url_name`` and add to it parameters from ``params`` 

165 as querystring 

166 

167 :param unicode url_name: a URL pattern name 

168 :param params: Some parameter to append to the reversed URL 

169 :type params: :obj:`dict` or :obj:`NoneType<types.NoneType>` 

170 :param **kwargs: additional parameters needed to compure the reverse URL 

171 :return: The computed reverse URL of ``url_name`` with possible querystring from ``params`` 

172 :rtype: unicode 

173 """ 

174 url = reverse(url_name, **kwargs) 

175 params = urlencode(params if params else {}) 

176 if params: 

177 return u"%s?%s" % (url, params) 

178 else: 

179 return url 

180 

181 

182def copy_params(get_or_post_params, ignore=None): 

183 """ 

184 copy a :class:`django.http.QueryDict` in a :obj:`dict` ignoring keys in the set ``ignore`` 

185 

186 :param django.http.QueryDict get_or_post_params: A GET or POST 

187 :class:`QueryDict<django.http.QueryDict>` 

188 :param set ignore: An optinal set of keys to ignore during the copy 

189 :return: A copy of get_or_post_params 

190 :rtype: dict 

191 """ 

192 if ignore is None: 

193 ignore = set() 

194 params = {} 

195 for key in get_or_post_params: 

196 if key not in ignore and get_or_post_params[key]: 

197 params[key] = get_or_post_params[key] 

198 return params 

199 

200 

201def set_cookie(response, key, value, max_age): 

202 """ 

203 Set the cookie ``key`` on ``response`` with value ``value`` valid for ``max_age`` secondes 

204 

205 :param django.http.HttpResponse response: a django response where to set the cookie 

206 :param unicode key: the cookie key 

207 :param unicode value: the cookie value 

208 :param int max_age: the maximum validity age of the cookie 

209 """ 

210 expires = datetime.strftime( 

211 datetime.now(tz.utc) + timedelta(seconds=max_age), 

212 "%a, %d-%b-%Y %H:%M:%S GMT" 

213 ) 

214 response.set_cookie( 

215 key, 

216 value, 

217 max_age=max_age, 

218 expires=expires, 

219 domain=settings.SESSION_COOKIE_DOMAIN, 

220 secure=settings.SESSION_COOKIE_SECURE or None 

221 ) 

222 

223 

224def get_current_url(request, ignore_params=None): 

225 """ 

226 Giving a django request, return the current http url, possibly ignoring some GET parameters 

227 

228 :param django.http.HttpRequest request: The current request object. 

229 :param set ignore_params: An optional set of GET parameters to ignore 

230 :return: The URL of the current page, possibly omitting some parameters from 

231 ``ignore_params`` in the querystring. 

232 :rtype: unicode 

233 """ 

234 if ignore_params is None: 

235 ignore_params = set() 

236 protocol = u'https' if request.is_secure() else u"http" 

237 service_url = u"%s://%s%s" % (protocol, request.get_host(), request.path) 

238 if request.GET: 

239 params = copy_params(request.GET, ignore_params) 

240 if params: 

241 service_url += u"?%s" % urlencode(params) 

242 return service_url 

243 

244 

245def update_url(url, params): 

246 """ 

247 update parameters using ``params`` in the ``url`` query string 

248 

249 :param url: An URL possibily with a querystring 

250 :type url: :obj:`unicode` or :obj:`str` 

251 :param dict params: A dictionary of parameters for updating the url querystring 

252 :return: The URL with an updated querystring 

253 :rtype: unicode 

254 """ 

255 def to_unicode(data): 

256 if isinstance(data, bytes): 

257 return data.decode('utf-8') 

258 else: 

259 return data 

260 

261 def to_bytes(data): 

262 if not isinstance(data, bytes): 

263 return data.encode('utf-8') 

264 else: 

265 return data 

266 

267 url = to_unicode(url) 

268 params = {to_unicode(key): to_unicode(value) for (key, value) in params.items()} 

269 

270 url_parts = list(urlparse(url)) 

271 query = dict(parse_qsl(url_parts[4], keep_blank_values=True)) 

272 query.update(params) 

273 # make the params order deterministic 

274 query = list(query.items()) 

275 query.sort() 

276 url_query = urlencode(query) 

277 url_parts[4] = url_query 

278 url = urlunparse(url_parts) 

279 

280 if isinstance(url, bytes): 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 url = url.decode('utf-8') 

282 return url 

283 

284 

285def unpack_nested_exception(error): 

286 """ 

287 If exception are stacked, return the first one 

288 

289 :param error: A python exception with possible exception embeded within 

290 :return: A python exception with no exception embeded within 

291 """ 

292 i = 0 

293 while True: 

294 if error.args[i:]: 

295 if isinstance(error.args[i], Exception): 

296 error = error.args[i] 

297 i = 0 

298 else: 

299 i += 1 

300 else: 

301 break 

302 return error 

303 

304 

305def _gen_ticket(prefix=None, lg=settings.CAS_TICKET_LEN): 

306 """ 

307 Generate a ticket with prefix ``prefix`` and length ``lg`` 

308 

309 :param unicode prefix: An optional prefix (probably ST, PT, PGT or PGTIOU) 

310 :param int lg: The length of the generated ticket (with the prefix) 

311 :return: A randomlly generated ticket of length ``lg`` 

312 :rtype: unicode 

313 """ 

314 random_part = u''.join( 

315 random.choice( 

316 string.ascii_letters + string.digits 

317 ) for _ in range(lg - len(prefix or "") - 1) 

318 ) 

319 if prefix is not None: 

320 return u'%s-%s' % (prefix, random_part) 

321 else: 

322 return random_part 

323 

324 

325def gen_lt(): 

326 """ 

327 Generate a Login Ticket 

328 

329 :return: A ticket with prefix ``settings.CAS_LOGIN_TICKET_PREFIX`` and length 

330 ``settings.CAS_LT_LEN`` 

331 :rtype: unicode 

332 """ 

333 return _gen_ticket(settings.CAS_LOGIN_TICKET_PREFIX, settings.CAS_LT_LEN) 

334 

335 

336def gen_st(): 

337 """ 

338 Generate a Service Ticket 

339 

340 :return: A ticket with prefix ``settings.CAS_SERVICE_TICKET_PREFIX`` and length 

341 ``settings.CAS_ST_LEN`` 

342 :rtype: unicode 

343 """ 

344 return _gen_ticket(settings.CAS_SERVICE_TICKET_PREFIX, settings.CAS_ST_LEN) 

345 

346 

347def gen_pt(): 

348 """ 

349 Generate a Proxy Ticket 

350 

351 :return: A ticket with prefix ``settings.CAS_PROXY_TICKET_PREFIX`` and length 

352 ``settings.CAS_PT_LEN`` 

353 :rtype: unicode 

354 """ 

355 return _gen_ticket(settings.CAS_PROXY_TICKET_PREFIX, settings.CAS_PT_LEN) 

356 

357 

358def gen_pgt(): 

359 """ 

360 Generate a Proxy Granting Ticket 

361 

362 :return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_PREFIX`` and length 

363 ``settings.CAS_PGT_LEN`` 

364 :rtype: unicode 

365 """ 

366 return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_PREFIX, settings.CAS_PGT_LEN) 

367 

368 

369def gen_pgtiou(): 

370 """ 

371 Generate a Proxy Granting Ticket IOU 

372 

373 :return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX`` and length 

374 ``settings.CAS_PGTIOU_LEN`` 

375 :rtype: unicode 

376 """ 

377 return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX, settings.CAS_PGTIOU_LEN) 

378 

379 

380def gen_saml_id(): 

381 """ 

382 Generate an saml id 

383 

384 :return: A random id of length ``settings.CAS_TICKET_LEN`` 

385 :rtype: unicode 

386 """ 

387 return _gen_ticket() 

388 

389 

390def get_tuple(nuplet, index, default=None): 

391 """ 

392 :param tuple nuplet: A tuple 

393 :param int index: An index 

394 :param default: An optional default value 

395 :return: ``nuplet[index]`` if defined, else ``default`` (possibly ``None``) 

396 """ 

397 if nuplet is None: 

398 return default 

399 try: 

400 return nuplet[index] 

401 except IndexError: 

402 return default 

403 

404 

405def crypt_salt_is_valid(salt): 

406 """ 

407 Validate a salt as crypt salt 

408 

409 :param str salt: a password salt 

410 :return: ``True`` if ``salt`` is a valid crypt salt on this system, ``False`` otherwise 

411 :rtype: bool 

412 """ 

413 if crypt is None: 413 ↛ 415line 413 didn't jump to line 415 because the condition on line 413 was always true

414 return False 

415 if len(salt) < 2: 

416 return False 

417 else: 

418 if salt[0] == '$': 

419 if salt[1] == '$': 

420 return False 

421 else: 

422 if '$' not in salt[1:]: 

423 return False 

424 else: 

425 try: 

426 hashed = crypt.crypt("", salt) 

427 except OSError: 

428 return False 

429 if not hashed or '$' not in hashed[1:]: 

430 return False 

431 else: 

432 return True 

433 else: 

434 return True 

435 

436 

437class LdapHashUserPassword(object): 

438 """ 

439 Class to deal with hashed password as defined at 

440 https://datatracker.ietf.org/doc/html/draft-stroeder-hashed-userpassword-values 

441 """ 

442 

443 #: valide schemes that require a salt 

444 schemes_salt = {b"{SMD5}", b"{SSHA}", b"{SSHA256}", b"{SSHA384}", b"{SSHA512}", b"{CRYPT}"} 

445 #: valide sschemes that require no slat 

446 schemes_nosalt = {b"{MD5}", b"{SHA}", b"{SHA256}", b"{SHA384}", b"{SHA512}"} 

447 

448 #: map beetween scheme and hash function 

449 _schemes_to_hash = { 

450 b"{SMD5}": hashlib.md5, 

451 b"{MD5}": hashlib.md5, 

452 b"{SSHA}": hashlib.sha1, 

453 b"{SHA}": hashlib.sha1, 

454 b"{SSHA256}": hashlib.sha256, 

455 b"{SHA256}": hashlib.sha256, 

456 b"{SSHA384}": hashlib.sha384, 

457 b"{SHA384}": hashlib.sha384, 

458 b"{SSHA512}": hashlib.sha512, 

459 b"{SHA512}": hashlib.sha512 

460 } 

461 

462 #: map between scheme and hash length 

463 _schemes_to_len = { 

464 b"{SMD5}": 16, 

465 b"{SSHA}": 20, 

466 b"{SSHA256}": 32, 

467 b"{SSHA384}": 48, 

468 b"{SSHA512}": 64, 

469 } 

470 

471 class BadScheme(ValueError): 

472 """ 

473 Error raised then the hash scheme is not in 

474 :attr:`LdapHashUserPassword.schemes_salt` + :attr:`LdapHashUserPassword.schemes_nosalt` 

475 """ 

476 pass 

477 

478 class BadHash(ValueError): 

479 """Error raised then the hash is too short""" 

480 pass 

481 

482 class BadSalt(ValueError): 

483 """Error raised then, with the scheme ``{CRYPT}``, the salt is invalid""" 

484 pass 

485 

486 @classmethod 

487 def _raise_bad_scheme(cls, scheme, valid, msg): 

488 """ 

489 Raise :attr:`BadScheme` error for ``scheme``, possible valid scheme are 

490 in ``valid``, the error message is ``msg`` 

491 

492 :param bytes scheme: A bad scheme 

493 :param list valid: A list a valid scheme 

494 :param str msg: The error template message 

495 :raises LdapHashUserPassword.BadScheme: always 

496 """ 

497 valid_schemes = [s.decode() for s in valid] 

498 valid_schemes.sort() 

499 raise cls.BadScheme(msg % (scheme, u", ".join(valid_schemes))) 

500 

501 @classmethod 

502 def _test_scheme(cls, scheme): 

503 """ 

504 Test if a scheme is valide or raise BadScheme 

505 

506 :param bytes scheme: A scheme 

507 :raises BadScheme: if ``scheme`` is not a valid scheme 

508 """ 

509 if scheme not in cls.schemes_salt and scheme not in cls.schemes_nosalt: 

510 cls._raise_bad_scheme( 

511 scheme, 

512 cls.schemes_salt | cls.schemes_nosalt, 

513 "The scheme %r is not valid. Valide schemes are %s." 

514 ) 

515 

516 @classmethod 

517 def _test_scheme_salt(cls, scheme): 

518 """ 

519 Test if the scheme need a salt or raise BadScheme 

520 

521 :param bytes scheme: A scheme 

522 :raises BadScheme: if ``scheme` require no salt 

523 """ 

524 if scheme not in cls.schemes_salt: 

525 cls._raise_bad_scheme( 

526 scheme, 

527 cls.schemes_salt, 

528 "The scheme %r is only valid without a salt. Valide schemes with salt are %s." 

529 ) 

530 

531 @classmethod 

532 def _test_scheme_nosalt(cls, scheme): 

533 """ 

534 Test if the scheme need no salt or raise BadScheme 

535 

536 :param bytes scheme: A scheme 

537 :raises BadScheme: if ``scheme` require a salt 

538 """ 

539 if scheme not in cls.schemes_nosalt: 

540 cls._raise_bad_scheme( 

541 scheme, 

542 cls.schemes_nosalt, 

543 "The scheme %r is only valid with a salt. Valide schemes without salt are %s." 

544 ) 

545 

546 @classmethod 

547 def hash(cls, scheme, password, salt=None, charset="utf8"): 

548 """ 

549 Hash ``password`` with ``scheme`` using ``salt``. 

550 This three variable beeing encoded in ``charset``. 

551 

552 :param bytes scheme: A valid scheme 

553 :param bytes password: A byte string to hash using ``scheme`` 

554 :param bytes salt: An optional salt to use if ``scheme`` requires any 

555 :param str charset: The encoding of ``scheme``, ``password`` and ``salt`` 

556 :return: The hashed password encoded with ``charset`` 

557 :rtype: bytes 

558 """ 

559 scheme = scheme.upper() 

560 cls._test_scheme(scheme) 

561 if salt is None or salt == b"": 

562 salt = b"" 

563 cls._test_scheme_nosalt(scheme) 

564 else: 

565 cls._test_scheme_salt(scheme) 

566 try: 

567 return scheme + base64.b64encode( 

568 cls._schemes_to_hash[scheme](password + salt).digest() + salt 

569 ) 

570 except KeyError: 

571 if crypt is None: 

572 raise cls.BadScheme("Crypt is not available on the system") 

573 password = password.decode(charset) 

574 salt = salt.decode(charset) 

575 if not crypt_salt_is_valid(salt): 

576 raise cls.BadSalt("System crypt implementation do not support the salt %r" % salt) 

577 hashed_password = crypt.crypt(password, salt) 

578 hashed_password = hashed_password.encode(charset) 

579 return scheme + hashed_password 

580 

581 @classmethod 

582 def get_scheme(cls, hashed_passord): 

583 """ 

584 Return the scheme of ``hashed_passord`` or raise :attr:`BadHash` 

585 

586 :param bytes hashed_passord: A hashed password 

587 :return: The scheme used by the hashed password 

588 :rtype: bytes 

589 :raises BadHash: if no valid scheme is found within ``hashed_passord`` 

590 """ 

591 if not hashed_passord[0] == b'{'[0] or b'}' not in hashed_passord: 

592 raise cls.BadHash("%r should start with the scheme enclosed with { }" % hashed_passord) 

593 scheme = hashed_passord.split(b'}', 1)[0] 

594 scheme = scheme.upper() + b"}" 

595 return scheme 

596 

597 @classmethod 

598 def get_salt(cls, hashed_passord): 

599 """ 

600 Return the salt of ``hashed_passord`` possibly empty 

601 

602 :param bytes hashed_passord: A hashed password 

603 :return: The salt used by the hashed password (empty if no salt is used) 

604 :rtype: bytes 

605 :raises BadHash: if no valid scheme is found within ``hashed_passord`` or if the 

606 hashed password is too short for the scheme found. 

607 """ 

608 scheme = cls.get_scheme(hashed_passord) 

609 cls._test_scheme(scheme) 

610 if scheme in cls.schemes_nosalt: 

611 return b"" 

612 elif scheme == b'{CRYPT}': 612 ↛ 613line 612 didn't jump to line 613 because the condition on line 612 was never true

613 if b'$' in hashed_passord: 

614 return b'$'.join(hashed_passord.split(b'$', 3)[:-1])[len(scheme):] 

615 return hashed_passord.split(b'}', 1)[-1] 

616 else: 

617 try: 

618 hashed_passord = base64.b64decode(hashed_passord[len(scheme):]) 

619 except (TypeError, binascii.Error) as error: 

620 raise cls.BadHash("Bad base64: %s" % error) 

621 if len(hashed_passord) < cls._schemes_to_len[scheme]: 

622 raise cls.BadHash("Hash too short for the scheme %s" % scheme) 

623 return hashed_passord[cls._schemes_to_len[scheme]:] 

624 

625 

626def check_password(method, password, hashed_password, charset): 

627 """ 

628 Check that ``password`` match `hashed_password` using ``method``, 

629 assuming the encoding is ``charset``. 

630 

631 :param str method: on of ``"crypt"``, ``"ldap"``, ``"hex_md5"``, ``"hex_sha1"``, 

632 ``"hex_sha224"``, ``"hex_sha256"``, ``"hex_sha384"``, ``"hex_sha512"``, ``"plain"`` 

633 :param password: The user inputed password 

634 :type password: :obj:`str` or :obj:`unicode` 

635 :param hashed_password: The hashed password as stored in the database 

636 :type hashed_password: :obj:`str` or :obj:`unicode` 

637 :param str charset: The used char encoding (also used internally, so it must be valid for 

638 the charset used by ``password`` when it was initially ) 

639 :return: True if ``password`` match ``hashed_password`` using ``method``, 

640 ``False`` otherwise 

641 :rtype: bool 

642 """ 

643 if not isinstance(password, bytes): 

644 password = password.encode(charset) 

645 if not isinstance(hashed_password, bytes): 

646 hashed_password = hashed_password.encode(charset) 

647 if method == "plain": 

648 return password == hashed_password 

649 elif method == "crypt": 649 ↛ 650line 649 didn't jump to line 650 because the condition on line 649 was never true

650 if crypt is None: 

651 raise ValueError("Crypt is not available on the system") 

652 if hashed_password.startswith(b'$'): 

653 salt = b'$'.join(hashed_password.split(b'$', 3)[:-1]) 

654 elif hashed_password.startswith(b'_'): # pragma: no cover old BSD format not supported 

655 salt = hashed_password[:9] 

656 else: 

657 salt = hashed_password[:2] 

658 password = password.decode(charset) 

659 salt = salt.decode(charset) 

660 hashed_password = hashed_password.decode(charset) 

661 if not crypt_salt_is_valid(salt): 

662 raise ValueError("System crypt implementation do not support the salt %r" % salt) 

663 crypted_password = crypt.crypt(password, salt) 

664 return crypted_password == hashed_password 

665 elif method == "ldap": 

666 scheme = LdapHashUserPassword.get_scheme(hashed_password) 

667 salt = LdapHashUserPassword.get_salt(hashed_password) 

668 return LdapHashUserPassword.hash(scheme, password, salt, charset=charset) == hashed_password 

669 elif ( 

670 method.startswith("hex_") and 

671 method[4:] in {"md5", "sha1", "sha224", "sha256", "sha384", "sha512"} 

672 ): 

673 return getattr( 

674 hashlib, 

675 method[4:] 

676 )(password).hexdigest().encode("ascii") == hashed_password.lower() 

677 else: 

678 raise ValueError("Unknown password method check %r" % method) 

679 

680 

681def decode_version(version): 

682 """ 

683 decode a version string following version semantic http://semver.org/ input a tuple of int. 

684 It will work as long as we do not use pre release versions. 

685 

686 :param unicode version: A dotted version 

687 :return: A tuple a int 

688 :rtype: tuple 

689 """ 

690 return tuple(int(sub_version) for sub_version in version.split('.')) 

691 

692 

693def last_version(): 

694 """ 

695 Fetch the last version from pypi and return it. On successful fetch from pypi, the response 

696 is cached 24h, on error, it is cached 10 min. 

697 

698 :return: the last django-cas-server version 

699 :rtype: unicode 

700 """ 

701 try: 

702 last_update, version, success = last_version._cache 

703 except AttributeError: 

704 last_update = 0 

705 version = None 

706 success = False 

707 cache_delta = 24 * 3600 if success else 600 

708 if (time.time() - last_update) < cache_delta: 

709 return version 

710 else: 

711 try: 

712 req = requests.get(settings.CAS_NEW_VERSION_JSON_URL) 

713 data = json.loads(req.text) 

714 version = data["info"]["version"] 

715 last_version._cache = (time.time(), version, True) 

716 return version 

717 except ( 

718 KeyError, 

719 ValueError, 

720 requests.exceptions.RequestException 

721 ) as error: # pragma: no cover (should not happen unless pypi is not available) 

722 logger.error( 

723 "Unable to fetch %s: %s" % (settings.CAS_NEW_VERSION_JSON_URL, error) 

724 ) 

725 last_version._cache = (time.time(), version, False) 

726 

727 

728def dictfetchall(cursor): 

729 "Return all rows from a django cursor as a dict" 

730 columns = [col[0] for col in cursor.description] 

731 return [ 

732 dict(zip(columns, row)) 

733 for row in cursor.fetchall() 

734 ] 

735 

736 

737def logout_request(ticket): 

738 """ 

739 Forge a SLO logout request 

740 

741 :param unicode ticket: A ticket value 

742 :return: A SLO XML body request 

743 :rtype: unicode 

744 """ 

745 return u"""<samlp:LogoutRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" 

746 ID="%(id)s" Version="2.0" IssueInstant="%(datetime)s"> 

747<saml:NameID xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"></saml:NameID> 

748<samlp:SessionIndex>%(ticket)s</samlp:SessionIndex> 

749</samlp:LogoutRequest>""" % { 

750 'id': gen_saml_id(), 

751 'datetime': timezone.now().isoformat(), 

752 'ticket': ticket 

753 } 

754 

755 

756def regexpr_validator(value): 

757 """ 

758 Test that ``value`` is a valid regular expression 

759 

760 :param unicode value: A regular expression to test 

761 :raises ValidationError: if ``value`` is not a valid regular expression 

762 """ 

763 try: 

764 re.compile(value) 

765 except re.error: 

766 raise ValidationError( 

767 _('"%(value)s" is not a valid regular expression'), 

768 params={'value': value} 

769 )