Coverage for cas_server/utils.py: 94%

309 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-18 11:22 +0000

1# -*- coding: utf-8 -*- 

2# This program is distributed in the hope that it will be useful, but WITHOUT 

3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for 

5# more details. 

6# 

7# You should have received a copy of the GNU General Public License version 3 

8# along with this program; if not, write to the Free Software Foundation, Inc., 51 

9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

10# 

11# (c) 2015-2016 Valentin Samir 

12"""Some util function for the app""" 

13from .default_settings import settings 

14 

15from django.http import HttpResponseRedirect, HttpResponse 

16from django.contrib import messages 

17from django.contrib.messages import constants as DEFAULT_MESSAGE_LEVELS 

18from django.core.serializers.json import DjangoJSONEncoder 

19from django.utils import timezone 

20from django.core.exceptions import ValidationError 

21try: 

22 from django.urls import reverse 

23 from django.utils.translation import gettext_lazy as _ 

24except ImportError: 

25 from django.core.urlresolvers import reverse 

26 from django.utils.translation import ugettext_lazy as _ 

27 

28import re 

29import random 

30import string 

31import json 

32import hashlib 

33import base64 

34import six 

35import requests 

36import time 

37import logging 

38import binascii 

39# The crypt module is deprecated and will be removed in version 3.13 

40try: 

41 import crypt 

42except ImportError: 

43 crypt = None 

44 

45from importlib import import_module 

46from datetime import datetime, timedelta 

47from six.moves.urllib.parse import urlparse, urlunparse, parse_qsl, urlencode 

48 

49from . import VERSION 

50 

51#: logger facility 

52logger = logging.getLogger(__name__) 

53 

54 

55def json_encode(obj): 

56 """Encode a python object to json""" 

57 try: 

58 return json_encode.encoder.encode(obj) 

59 except AttributeError: 

60 json_encode.encoder = DjangoJSONEncoder(default=six.text_type) 

61 return json_encode(obj) 

62 

63 

64def context(params): 

65 """ 

66 Function that add somes variable to the context before template rendering 

67 

68 :param dict params: The context dictionary used to render templates. 

69 :return: The ``params`` dictionary with the key ``settings`` set to 

70 :obj:`django.conf.settings`. 

71 :rtype: dict 

72 """ 

73 params["settings"] = settings 

74 params["message_levels"] = DEFAULT_MESSAGE_LEVELS 

75 

76 if settings.CAS_NEW_VERSION_HTML_WARNING: 

77 LAST_VERSION = last_version() 

78 params["VERSION"] = VERSION 

79 params["LAST_VERSION"] = LAST_VERSION 

80 if LAST_VERSION is not None: 

81 params["upgrade_available"] = decode_version(VERSION) < decode_version(LAST_VERSION) 

82 else: 

83 params["upgrade_available"] = False 

84 

85 if settings.CAS_INFO_MESSAGES_ORDER: 

86 params["CAS_INFO_RENDER"] = [] 

87 for msg_name in settings.CAS_INFO_MESSAGES_ORDER: 

88 if msg_name in settings.CAS_INFO_MESSAGES: 

89 if not isinstance(settings.CAS_INFO_MESSAGES[msg_name], dict): 

90 continue 

91 msg = settings.CAS_INFO_MESSAGES[msg_name].copy() 

92 if "message" in msg: 

93 msg["name"] = msg_name 

94 # use info as default infox type 

95 msg["type"] = msg.get("type", "info") 

96 # make box discardable by default 

97 msg["discardable"] = msg.get("discardable", True) 

98 msg_hash = ( 

99 six.text_type(msg["message"]).encode("utf-8") + 

100 msg["type"].encode("utf-8") 

101 ) 

102 # hash depend of the rendering language 

103 msg["hash"] = hashlib.md5(msg_hash).hexdigest() 

104 params["CAS_INFO_RENDER"].append(msg) 

105 return params 

106 

107 

108def json_response(request, data): 

109 """ 

110 Wrapper dumping `data` to a json and sending it to the user with an HttpResponse 

111 

112 :param django.http.HttpRequest request: The request object used to generate this response. 

113 :param dict data: The python dictionnary to return as a json 

114 :return: The content of ``data`` serialized in json 

115 :rtype: django.http.HttpResponse 

116 """ 

117 data["messages"] = [] 

118 for msg in messages.get_messages(request): 

119 data["messages"].append({'message': msg.message, 'level': msg.level_tag}) 

120 return HttpResponse(json.dumps(data), content_type="application/json") 

121 

122 

123def import_attr(path): 

124 """ 

125 transform a python dotted path to the attr 

126 

127 :param path: A dotted path to a python object or a python object 

128 :type path: :obj:`unicode` or :obj:`str` or anything 

129 :return: The python object pointed by the dotted path or the python object unchanged 

130 """ 

131 # if we got a str, decode it to unicode (normally it should only contain ascii) 

132 if isinstance(path, six.binary_type): 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 path = path.decode("utf-8") 

134 # if path is not an unicode, return it unchanged (may be it is already the attribute to import) 

135 if not isinstance(path, six.text_type): 

136 return path 

137 if u"." not in path: 

138 ValueError("%r should be of the form `module.attr` and we just got `attr`" % path) 

139 module, attr = path.rsplit(u'.', 1) 

140 try: 

141 return getattr(import_module(module), attr) 

142 except ImportError: 

143 raise ImportError("Module %r not found" % module) 

144 except AttributeError: 

145 raise AttributeError("Module %r has not attribut %r" % (module, attr)) 

146 

147 

148def redirect_params(url_name, params=None): 

149 """ 

150 Redirect to ``url_name`` with ``params`` as querystring 

151 

152 :param unicode url_name: a URL pattern name 

153 :param params: Some parameter to append to the reversed URL 

154 :type params: :obj:`dict` or :obj:`NoneType<types.NoneType>` 

155 :return: A redirection to the URL with name ``url_name`` with ``params`` as querystring. 

156 :rtype: django.http.HttpResponseRedirect 

157 """ 

158 url = reverse(url_name) 

159 params = urlencode(params if params else {}) 

160 return HttpResponseRedirect(url + "?%s" % params) 

161 

162 

163def reverse_params(url_name, params=None, **kwargs): 

164 """ 

165 compute the reverse url of ``url_name`` and add to it parameters from ``params`` 

166 as querystring 

167 

168 :param unicode url_name: a URL pattern name 

169 :param params: Some parameter to append to the reversed URL 

170 :type params: :obj:`dict` or :obj:`NoneType<types.NoneType>` 

171 :param **kwargs: additional parameters needed to compure the reverse URL 

172 :return: The computed reverse URL of ``url_name`` with possible querystring from ``params`` 

173 :rtype: unicode 

174 """ 

175 url = reverse(url_name, **kwargs) 

176 params = urlencode(params if params else {}) 

177 if params: 

178 return u"%s?%s" % (url, params) 

179 else: 

180 return url 

181 

182 

183def copy_params(get_or_post_params, ignore=None): 

184 """ 

185 copy a :class:`django.http.QueryDict` in a :obj:`dict` ignoring keys in the set ``ignore`` 

186 

187 :param django.http.QueryDict get_or_post_params: A GET or POST 

188 :class:`QueryDict<django.http.QueryDict>` 

189 :param set ignore: An optinal set of keys to ignore during the copy 

190 :return: A copy of get_or_post_params 

191 :rtype: dict 

192 """ 

193 if ignore is None: 

194 ignore = set() 

195 params = {} 

196 for key in get_or_post_params: 

197 if key not in ignore and get_or_post_params[key]: 

198 params[key] = get_or_post_params[key] 

199 return params 

200 

201 

202def set_cookie(response, key, value, max_age): 

203 """ 

204 Set the cookie ``key`` on ``response`` with value ``value`` valid for ``max_age`` secondes 

205 

206 :param django.http.HttpResponse response: a django response where to set the cookie 

207 :param unicode key: the cookie key 

208 :param unicode value: the cookie value 

209 :param int max_age: the maximum validity age of the cookie 

210 """ 

211 expires = datetime.strftime( 

212 datetime.utcnow() + timedelta(seconds=max_age), 

213 "%a, %d-%b-%Y %H:%M:%S GMT" 

214 ) 

215 response.set_cookie( 

216 key, 

217 value, 

218 max_age=max_age, 

219 expires=expires, 

220 domain=settings.SESSION_COOKIE_DOMAIN, 

221 secure=settings.SESSION_COOKIE_SECURE or None 

222 ) 

223 

224 

225def get_current_url(request, ignore_params=None): 

226 """ 

227 Giving a django request, return the current http url, possibly ignoring some GET parameters 

228 

229 :param django.http.HttpRequest request: The current request object. 

230 :param set ignore_params: An optional set of GET parameters to ignore 

231 :return: The URL of the current page, possibly omitting some parameters from 

232 ``ignore_params`` in the querystring. 

233 :rtype: unicode 

234 """ 

235 if ignore_params is None: 

236 ignore_params = set() 

237 protocol = u'https' if request.is_secure() else u"http" 

238 service_url = u"%s://%s%s" % (protocol, request.get_host(), request.path) 

239 if request.GET: 

240 params = copy_params(request.GET, ignore_params) 

241 if params: 

242 service_url += u"?%s" % urlencode(params) 

243 return service_url 

244 

245 

246def update_url(url, params): 

247 """ 

248 update parameters using ``params`` in the ``url`` query string 

249 

250 :param url: An URL possibily with a querystring 

251 :type url: :obj:`unicode` or :obj:`str` 

252 :param dict params: A dictionary of parameters for updating the url querystring 

253 :return: The URL with an updated querystring 

254 :rtype: unicode 

255 """ 

256 def to_unicode(data): 

257 if isinstance(data, bytes): 

258 return data.decode('utf-8') 

259 else: 

260 return data 

261 

262 def to_bytes(data): 

263 if not isinstance(data, bytes): 

264 return data.encode('utf-8') 

265 else: 

266 return data 

267 

268 if six.PY3: 

269 url = to_unicode(url) 

270 params = {to_unicode(key): to_unicode(value) for (key, value) in params.items()} 

271 else: 

272 url = to_bytes(url) 

273 params = {to_bytes(key): to_bytes(value) for (key, value) in params.items()} 

274 

275 url_parts = list(urlparse(url)) 

276 query = dict(parse_qsl(url_parts[4], keep_blank_values=True)) 

277 query.update(params) 

278 # make the params order deterministic 

279 query = list(query.items()) 

280 query.sort() 

281 url_query = urlencode(query) 

282 url_parts[4] = url_query 

283 url = urlunparse(url_parts) 

284 

285 if isinstance(url, bytes): 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 url = url.decode('utf-8') 

287 return url 

288 

289 

290def unpack_nested_exception(error): 

291 """ 

292 If exception are stacked, return the first one 

293 

294 :param error: A python exception with possible exception embeded within 

295 :return: A python exception with no exception embeded within 

296 """ 

297 i = 0 

298 while True: 

299 if error.args[i:]: 

300 if isinstance(error.args[i], Exception): 

301 error = error.args[i] 

302 i = 0 

303 else: 

304 i += 1 

305 else: 

306 break 

307 return error 

308 

309 

310def _gen_ticket(prefix=None, lg=settings.CAS_TICKET_LEN): 

311 """ 

312 Generate a ticket with prefix ``prefix`` and length ``lg`` 

313 

314 :param unicode prefix: An optional prefix (probably ST, PT, PGT or PGTIOU) 

315 :param int lg: The length of the generated ticket (with the prefix) 

316 :return: A randomlly generated ticket of length ``lg`` 

317 :rtype: unicode 

318 """ 

319 random_part = u''.join( 

320 random.choice( 

321 string.ascii_letters + string.digits 

322 ) for _ in range(lg - len(prefix or "") - 1) 

323 ) 

324 if prefix is not None: 

325 return u'%s-%s' % (prefix, random_part) 

326 else: 

327 return random_part 

328 

329 

330def gen_lt(): 

331 """ 

332 Generate a Login Ticket 

333 

334 :return: A ticket with prefix ``settings.CAS_LOGIN_TICKET_PREFIX`` and length 

335 ``settings.CAS_LT_LEN`` 

336 :rtype: unicode 

337 """ 

338 return _gen_ticket(settings.CAS_LOGIN_TICKET_PREFIX, settings.CAS_LT_LEN) 

339 

340 

341def gen_st(): 

342 """ 

343 Generate a Service Ticket 

344 

345 :return: A ticket with prefix ``settings.CAS_SERVICE_TICKET_PREFIX`` and length 

346 ``settings.CAS_ST_LEN`` 

347 :rtype: unicode 

348 """ 

349 return _gen_ticket(settings.CAS_SERVICE_TICKET_PREFIX, settings.CAS_ST_LEN) 

350 

351 

352def gen_pt(): 

353 """ 

354 Generate a Proxy Ticket 

355 

356 :return: A ticket with prefix ``settings.CAS_PROXY_TICKET_PREFIX`` and length 

357 ``settings.CAS_PT_LEN`` 

358 :rtype: unicode 

359 """ 

360 return _gen_ticket(settings.CAS_PROXY_TICKET_PREFIX, settings.CAS_PT_LEN) 

361 

362 

363def gen_pgt(): 

364 """ 

365 Generate a Proxy Granting Ticket 

366 

367 :return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_PREFIX`` and length 

368 ``settings.CAS_PGT_LEN`` 

369 :rtype: unicode 

370 """ 

371 return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_PREFIX, settings.CAS_PGT_LEN) 

372 

373 

374def gen_pgtiou(): 

375 """ 

376 Generate a Proxy Granting Ticket IOU 

377 

378 :return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX`` and length 

379 ``settings.CAS_PGTIOU_LEN`` 

380 :rtype: unicode 

381 """ 

382 return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX, settings.CAS_PGTIOU_LEN) 

383 

384 

385def gen_saml_id(): 

386 """ 

387 Generate an saml id 

388 

389 :return: A random id of length ``settings.CAS_TICKET_LEN`` 

390 :rtype: unicode 

391 """ 

392 return _gen_ticket() 

393 

394 

395def get_tuple(nuplet, index, default=None): 

396 """ 

397 :param tuple nuplet: A tuple 

398 :param int index: An index 

399 :param default: An optional default value 

400 :return: ``nuplet[index]`` if defined, else ``default`` (possibly ``None``) 

401 """ 

402 if nuplet is None: 

403 return default 

404 try: 

405 return nuplet[index] 

406 except IndexError: 

407 return default 

408 

409 

410def crypt_salt_is_valid(salt): 

411 """ 

412 Validate a salt as crypt salt 

413 

414 :param str salt: a password salt 

415 :return: ``True`` if ``salt`` is a valid crypt salt on this system, ``False`` otherwise 

416 :rtype: bool 

417 """ 

418 if crypt is None: 418 ↛ 419line 418 didn't jump to line 419 because the condition on line 418 was never true

419 return False 

420 if len(salt) < 2: 

421 return False 

422 else: 

423 if salt[0] == '$': 

424 if salt[1] == '$': 

425 return False 

426 else: 

427 if '$' not in salt[1:]: 

428 return False 

429 else: 

430 try: 

431 hashed = crypt.crypt("", salt) 

432 except OSError: 

433 return False 

434 if not hashed or '$' not in hashed[1:]: 

435 return False 

436 else: 

437 return True 

438 else: 

439 return True 

440 

441 

442class LdapHashUserPassword(object): 

443 """ 

444 Class to deal with hashed password as defined at 

445 https://tools.ietf.org/id/draft-stroeder-hashed-userpassword-values-01.html 

446 """ 

447 

448 #: valide schemes that require a salt 

449 schemes_salt = {b"{SMD5}", b"{SSHA}", b"{SSHA256}", b"{SSHA384}", b"{SSHA512}", b"{CRYPT}"} 

450 #: valide sschemes that require no slat 

451 schemes_nosalt = {b"{MD5}", b"{SHA}", b"{SHA256}", b"{SHA384}", b"{SHA512}"} 

452 

453 #: map beetween scheme and hash function 

454 _schemes_to_hash = { 

455 b"{SMD5}": hashlib.md5, 

456 b"{MD5}": hashlib.md5, 

457 b"{SSHA}": hashlib.sha1, 

458 b"{SHA}": hashlib.sha1, 

459 b"{SSHA256}": hashlib.sha256, 

460 b"{SHA256}": hashlib.sha256, 

461 b"{SSHA384}": hashlib.sha384, 

462 b"{SHA384}": hashlib.sha384, 

463 b"{SSHA512}": hashlib.sha512, 

464 b"{SHA512}": hashlib.sha512 

465 } 

466 

467 #: map between scheme and hash length 

468 _schemes_to_len = { 

469 b"{SMD5}": 16, 

470 b"{SSHA}": 20, 

471 b"{SSHA256}": 32, 

472 b"{SSHA384}": 48, 

473 b"{SSHA512}": 64, 

474 } 

475 

476 class BadScheme(ValueError): 

477 """ 

478 Error raised then the hash scheme is not in 

479 :attr:`LdapHashUserPassword.schemes_salt` + :attr:`LdapHashUserPassword.schemes_nosalt` 

480 """ 

481 pass 

482 

483 class BadHash(ValueError): 

484 """Error raised then the hash is too short""" 

485 pass 

486 

487 class BadSalt(ValueError): 

488 """Error raised then, with the scheme ``{CRYPT}``, the salt is invalid""" 

489 pass 

490 

491 @classmethod 

492 def _raise_bad_scheme(cls, scheme, valid, msg): 

493 """ 

494 Raise :attr:`BadScheme` error for ``scheme``, possible valid scheme are 

495 in ``valid``, the error message is ``msg`` 

496 

497 :param bytes scheme: A bad scheme 

498 :param list valid: A list a valid scheme 

499 :param str msg: The error template message 

500 :raises LdapHashUserPassword.BadScheme: always 

501 """ 

502 valid_schemes = [s.decode() for s in valid] 

503 valid_schemes.sort() 

504 raise cls.BadScheme(msg % (scheme, u", ".join(valid_schemes))) 

505 

506 @classmethod 

507 def _test_scheme(cls, scheme): 

508 """ 

509 Test if a scheme is valide or raise BadScheme 

510 

511 :param bytes scheme: A scheme 

512 :raises BadScheme: if ``scheme`` is not a valid scheme 

513 """ 

514 if scheme not in cls.schemes_salt and scheme not in cls.schemes_nosalt: 

515 cls._raise_bad_scheme( 

516 scheme, 

517 cls.schemes_salt | cls.schemes_nosalt, 

518 "The scheme %r is not valid. Valide schemes are %s." 

519 ) 

520 

521 @classmethod 

522 def _test_scheme_salt(cls, scheme): 

523 """ 

524 Test if the scheme need a salt or raise BadScheme 

525 

526 :param bytes scheme: A scheme 

527 :raises BadScheme: if ``scheme` require no salt 

528 """ 

529 if scheme not in cls.schemes_salt: 

530 cls._raise_bad_scheme( 

531 scheme, 

532 cls.schemes_salt, 

533 "The scheme %r is only valid without a salt. Valide schemes with salt are %s." 

534 ) 

535 

536 @classmethod 

537 def _test_scheme_nosalt(cls, scheme): 

538 """ 

539 Test if the scheme need no salt or raise BadScheme 

540 

541 :param bytes scheme: A scheme 

542 :raises BadScheme: if ``scheme` require a salt 

543 """ 

544 if scheme not in cls.schemes_nosalt: 

545 cls._raise_bad_scheme( 

546 scheme, 

547 cls.schemes_nosalt, 

548 "The scheme %r is only valid with a salt. Valide schemes without salt are %s." 

549 ) 

550 

551 @classmethod 

552 def hash(cls, scheme, password, salt=None, charset="utf8"): 

553 """ 

554 Hash ``password`` with ``scheme`` using ``salt``. 

555 This three variable beeing encoded in ``charset``. 

556 

557 :param bytes scheme: A valid scheme 

558 :param bytes password: A byte string to hash using ``scheme`` 

559 :param bytes salt: An optional salt to use if ``scheme`` requires any 

560 :param str charset: The encoding of ``scheme``, ``password`` and ``salt`` 

561 :return: The hashed password encoded with ``charset`` 

562 :rtype: bytes 

563 """ 

564 scheme = scheme.upper() 

565 cls._test_scheme(scheme) 

566 if salt is None or salt == b"": 

567 salt = b"" 

568 cls._test_scheme_nosalt(scheme) 

569 else: 

570 cls._test_scheme_salt(scheme) 

571 try: 

572 return scheme + base64.b64encode( 

573 cls._schemes_to_hash[scheme](password + salt).digest() + salt 

574 ) 

575 except KeyError: 

576 if crypt is None: 

577 raise cls.BadScheme("Crypt is not available on the system") 

578 if six.PY3: 

579 password = password.decode(charset) 

580 salt = salt.decode(charset) 

581 if not crypt_salt_is_valid(salt): 

582 raise cls.BadSalt("System crypt implementation do not support the salt %r" % salt) 

583 hashed_password = crypt.crypt(password, salt) 

584 if six.PY3: 

585 hashed_password = hashed_password.encode(charset) 

586 return scheme + hashed_password 

587 

588 @classmethod 

589 def get_scheme(cls, hashed_passord): 

590 """ 

591 Return the scheme of ``hashed_passord`` or raise :attr:`BadHash` 

592 

593 :param bytes hashed_passord: A hashed password 

594 :return: The scheme used by the hashed password 

595 :rtype: bytes 

596 :raises BadHash: if no valid scheme is found within ``hashed_passord`` 

597 """ 

598 if not hashed_passord[0] == b'{'[0] or b'}' not in hashed_passord: 

599 raise cls.BadHash("%r should start with the scheme enclosed with { }" % hashed_passord) 

600 scheme = hashed_passord.split(b'}', 1)[0] 

601 scheme = scheme.upper() + b"}" 

602 return scheme 

603 

604 @classmethod 

605 def get_salt(cls, hashed_passord): 

606 """ 

607 Return the salt of ``hashed_passord`` possibly empty 

608 

609 :param bytes hashed_passord: A hashed password 

610 :return: The salt used by the hashed password (empty if no salt is used) 

611 :rtype: bytes 

612 :raises BadHash: if no valid scheme is found within ``hashed_passord`` or if the 

613 hashed password is too short for the scheme found. 

614 """ 

615 scheme = cls.get_scheme(hashed_passord) 

616 cls._test_scheme(scheme) 

617 if scheme in cls.schemes_nosalt: 

618 return b"" 

619 elif scheme == b'{CRYPT}': 

620 if b'$' in hashed_passord: 620 ↛ 622line 620 didn't jump to line 622 because the condition on line 620 was always true

621 return b'$'.join(hashed_passord.split(b'$', 3)[:-1])[len(scheme):] 

622 return hashed_passord.split(b'}', 1)[-1] 

623 else: 

624 try: 

625 hashed_passord = base64.b64decode(hashed_passord[len(scheme):]) 

626 except (TypeError, binascii.Error) as error: 

627 raise cls.BadHash("Bad base64: %s" % error) 

628 if len(hashed_passord) < cls._schemes_to_len[scheme]: 

629 raise cls.BadHash("Hash too short for the scheme %s" % scheme) 

630 return hashed_passord[cls._schemes_to_len[scheme]:] 

631 

632 

633def check_password(method, password, hashed_password, charset): 

634 """ 

635 Check that ``password`` match `hashed_password` using ``method``, 

636 assuming the encoding is ``charset``. 

637 

638 :param str method: on of ``"crypt"``, ``"ldap"``, ``"hex_md5"``, ``"hex_sha1"``, 

639 ``"hex_sha224"``, ``"hex_sha256"``, ``"hex_sha384"``, ``"hex_sha512"``, ``"plain"`` 

640 :param password: The user inputed password 

641 :type password: :obj:`str` or :obj:`unicode` 

642 :param hashed_password: The hashed password as stored in the database 

643 :type hashed_password: :obj:`str` or :obj:`unicode` 

644 :param str charset: The used char encoding (also used internally, so it must be valid for 

645 the charset used by ``password`` when it was initially ) 

646 :return: True if ``password`` match ``hashed_password`` using ``method``, 

647 ``False`` otherwise 

648 :rtype: bool 

649 """ 

650 if not isinstance(password, six.binary_type): 

651 password = password.encode(charset) 

652 if not isinstance(hashed_password, six.binary_type): 

653 hashed_password = hashed_password.encode(charset) 

654 if method == "plain": 

655 return password == hashed_password 

656 elif method == "crypt": 

657 if crypt is None: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true

658 raise ValueError("Crypt is not available on the system") 

659 if hashed_password.startswith(b'$'): 

660 salt = b'$'.join(hashed_password.split(b'$', 3)[:-1]) 

661 elif hashed_password.startswith(b'_'): # pragma: no cover old BSD format not supported 

662 salt = hashed_password[:9] 

663 else: 

664 salt = hashed_password[:2] 

665 if six.PY3: 

666 password = password.decode(charset) 

667 salt = salt.decode(charset) 

668 hashed_password = hashed_password.decode(charset) 

669 if not crypt_salt_is_valid(salt): 

670 raise ValueError("System crypt implementation do not support the salt %r" % salt) 

671 crypted_password = crypt.crypt(password, salt) 

672 return crypted_password == hashed_password 

673 elif method == "ldap": 

674 scheme = LdapHashUserPassword.get_scheme(hashed_password) 

675 salt = LdapHashUserPassword.get_salt(hashed_password) 

676 return LdapHashUserPassword.hash(scheme, password, salt, charset=charset) == hashed_password 

677 elif ( 

678 method.startswith("hex_") and 

679 method[4:] in {"md5", "sha1", "sha224", "sha256", "sha384", "sha512"} 

680 ): 

681 return getattr( 

682 hashlib, 

683 method[4:] 

684 )(password).hexdigest().encode("ascii") == hashed_password.lower() 

685 else: 

686 raise ValueError("Unknown password method check %r" % method) 

687 

688 

689def decode_version(version): 

690 """ 

691 decode a version string following version semantic http://semver.org/ input a tuple of int. 

692 It will work as long as we do not use pre release versions. 

693 

694 :param unicode version: A dotted version 

695 :return: A tuple a int 

696 :rtype: tuple 

697 """ 

698 return tuple(int(sub_version) for sub_version in version.split('.')) 

699 

700 

701def last_version(): 

702 """ 

703 Fetch the last version from pypi and return it. On successful fetch from pypi, the response 

704 is cached 24h, on error, it is cached 10 min. 

705 

706 :return: the last django-cas-server version 

707 :rtype: unicode 

708 """ 

709 try: 

710 last_update, version, success = last_version._cache 

711 except AttributeError: 

712 last_update = 0 

713 version = None 

714 success = False 

715 cache_delta = 24 * 3600 if success else 600 

716 if (time.time() - last_update) < cache_delta: 

717 return version 

718 else: 

719 try: 

720 req = requests.get(settings.CAS_NEW_VERSION_JSON_URL) 

721 data = json.loads(req.text) 

722 version = data["info"]["version"] 

723 last_version._cache = (time.time(), version, True) 

724 return version 

725 except ( 

726 KeyError, 

727 ValueError, 

728 requests.exceptions.RequestException 

729 ) as error: # pragma: no cover (should not happen unless pypi is not available) 

730 logger.error( 

731 "Unable to fetch %s: %s" % (settings.CAS_NEW_VERSION_JSON_URL, error) 

732 ) 

733 last_version._cache = (time.time(), version, False) 

734 

735 

736def dictfetchall(cursor): 

737 "Return all rows from a django cursor as a dict" 

738 columns = [col[0] for col in cursor.description] 

739 return [ 

740 dict(zip(columns, row)) 

741 for row in cursor.fetchall() 

742 ] 

743 

744 

745def logout_request(ticket): 

746 """ 

747 Forge a SLO logout request 

748 

749 :param unicode ticket: A ticket value 

750 :return: A SLO XML body request 

751 :rtype: unicode 

752 """ 

753 return u"""<samlp:LogoutRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" 

754 ID="%(id)s" Version="2.0" IssueInstant="%(datetime)s"> 

755<saml:NameID xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"></saml:NameID> 

756<samlp:SessionIndex>%(ticket)s</samlp:SessionIndex> 

757</samlp:LogoutRequest>""" % { 

758 'id': gen_saml_id(), 

759 'datetime': timezone.now().isoformat(), 

760 'ticket': ticket 

761 } 

762 

763 

764def regexpr_validator(value): 

765 """ 

766 Test that ``value`` is a valid regular expression 

767 

768 :param unicode value: A regular expression to test 

769 :raises ValidationError: if ``value`` is not a valid regular expression 

770 """ 

771 try: 

772 re.compile(value) 

773 except re.error: 

774 raise ValidationError( 

775 _('"%(value)s" is not a valid regular expression'), 

776 params={'value': value} 

777 )