Coverage for cas_server/utils.py: 82%
314 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-18 09:39 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-18 09:39 +0000
1# -*- coding: utf-8 -*-
2# This program is distributed in the hope that it will be useful, but WITHOUT
3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
5# more details.
6#
7# You should have received a copy of the GNU General Public License version 3
8# along with this program; if not, write to the Free Software Foundation, Inc., 51
9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
10#
11# (c) 2015-2025 Valentin Samir
12"""Some util function for the app"""
13from .default_settings import settings
15from django.http import HttpResponseRedirect, HttpResponse
16from django.contrib import messages
17from django.contrib.messages import constants as DEFAULT_MESSAGE_LEVELS
18from django.core.serializers.json import DjangoJSONEncoder
19from django.utils import timezone
20from django.core.exceptions import ValidationError
21try:
22 from django.urls import reverse
23 from django.utils.translation import gettext_lazy as _
24except ImportError:
25 from django.core.urlresolvers import reverse
26 from django.utils.translation import ugettext_lazy as _
28import re
29import random
30import string
31import json
32import hashlib
33import base64
34import requests
35import time
36import logging
37import binascii
38# The crypt module is deprecated and will be removed in version 3.13
39try:
40 import crypt
41except ImportError:
42 crypt = None
44from importlib import import_module
45from datetime import datetime, timedelta, timezone as tz
46from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
48from . import VERSION
50#: logger facility
51logger = logging.getLogger(__name__)
54def json_encode(obj):
55 """Encode a python object to json"""
56 try:
57 return json_encode.encoder.encode(obj)
58 except AttributeError:
59 json_encode.encoder = DjangoJSONEncoder(default=str)
60 return json_encode(obj)
63def context(params):
64 """
65 Function that add somes variable to the context before template rendering
67 :param dict params: The context dictionary used to render templates.
68 :return: The ``params`` dictionary with the key ``settings`` set to
69 :obj:`django.conf.settings`.
70 :rtype: dict
71 """
72 params["settings"] = settings
73 params["message_levels"] = DEFAULT_MESSAGE_LEVELS
75 if settings.CAS_NEW_VERSION_HTML_WARNING:
76 LAST_VERSION = last_version()
77 params["VERSION"] = VERSION
78 params["LAST_VERSION"] = LAST_VERSION
79 if LAST_VERSION is not None:
80 params["upgrade_available"] = decode_version(VERSION) < decode_version(LAST_VERSION)
81 else:
82 params["upgrade_available"] = False
84 if settings.CAS_INFO_MESSAGES_ORDER:
85 params["CAS_INFO_RENDER"] = []
86 for msg_name in settings.CAS_INFO_MESSAGES_ORDER:
87 if msg_name in settings.CAS_INFO_MESSAGES:
88 if not isinstance(settings.CAS_INFO_MESSAGES[msg_name], dict):
89 continue
90 msg = settings.CAS_INFO_MESSAGES[msg_name].copy()
91 if "message" in msg:
92 msg["name"] = msg_name
93 # use info as default infox type
94 msg["type"] = msg.get("type", "info")
95 # make box discardable by default
96 msg["discardable"] = msg.get("discardable", True)
97 msg_hash = (
98 str(msg["message"]).encode("utf-8") +
99 msg["type"].encode("utf-8")
100 )
101 # hash depend of the rendering language
102 msg["hash"] = hashlib.md5(msg_hash).hexdigest()
103 params["CAS_INFO_RENDER"].append(msg)
104 return params
107def json_response(request, data):
108 """
109 Wrapper dumping `data` to a json and sending it to the user with an HttpResponse
111 :param django.http.HttpRequest request: The request object used to generate this response.
112 :param dict data: The python dictionnary to return as a json
113 :return: The content of ``data`` serialized in json
114 :rtype: django.http.HttpResponse
115 """
116 data["messages"] = []
117 for msg in messages.get_messages(request):
118 data["messages"].append({'message': msg.message, 'level': msg.level_tag})
119 return HttpResponse(json.dumps(data), content_type="application/json")
122def import_attr(path):
123 """
124 transform a python dotted path to the attr
126 :param path: A dotted path to a python object or a python object
127 :type path: :obj:`unicode` or :obj:`str` or anything
128 :return: The python object pointed by the dotted path or the python object unchanged
129 """
130 # if we got a str, decode it to unicode (normally it should only contain ascii)
131 if isinstance(path, bytes): 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 path = path.decode("utf-8")
133 # if path is not an unicode, return it unchanged (may be it is already the attribute to import)
134 if not isinstance(path, str):
135 return path
136 if u"." not in path:
137 ValueError("%r should be of the form `module.attr` and we just got `attr`" % path)
138 module, attr = path.rsplit(u'.', 1)
139 try:
140 return getattr(import_module(module), attr)
141 except ImportError:
142 raise ImportError("Module %r not found" % module)
143 except AttributeError:
144 raise AttributeError("Module %r has not attribut %r" % (module, attr))
147def redirect_params(url_name, params=None):
148 """
149 Redirect to ``url_name`` with ``params`` as querystring
151 :param unicode url_name: a URL pattern name
152 :param params: Some parameter to append to the reversed URL
153 :type params: :obj:`dict` or :obj:`NoneType<types.NoneType>`
154 :return: A redirection to the URL with name ``url_name`` with ``params`` as querystring.
155 :rtype: django.http.HttpResponseRedirect
156 """
157 url = reverse(url_name)
158 params = urlencode(params if params else {})
159 return HttpResponseRedirect(url + "?%s" % params)
162def reverse_params(url_name, params=None, **kwargs):
163 """
164 compute the reverse url of ``url_name`` and add to it parameters from ``params``
165 as querystring
167 :param unicode url_name: a URL pattern name
168 :param params: Some parameter to append to the reversed URL
169 :type params: :obj:`dict` or :obj:`NoneType<types.NoneType>`
170 :param **kwargs: additional parameters needed to compure the reverse URL
171 :return: The computed reverse URL of ``url_name`` with possible querystring from ``params``
172 :rtype: unicode
173 """
174 url = reverse(url_name, **kwargs)
175 params = urlencode(params if params else {})
176 if params:
177 return u"%s?%s" % (url, params)
178 else:
179 return url
182def copy_params(get_or_post_params, ignore=None):
183 """
184 copy a :class:`django.http.QueryDict` in a :obj:`dict` ignoring keys in the set ``ignore``
186 :param django.http.QueryDict get_or_post_params: A GET or POST
187 :class:`QueryDict<django.http.QueryDict>`
188 :param set ignore: An optinal set of keys to ignore during the copy
189 :return: A copy of get_or_post_params
190 :rtype: dict
191 """
192 if ignore is None:
193 ignore = set()
194 params = {}
195 for key in get_or_post_params:
196 if key not in ignore and get_or_post_params[key]:
197 params[key] = get_or_post_params[key]
198 return params
201def set_cookie(response, key, value, max_age):
202 """
203 Set the cookie ``key`` on ``response`` with value ``value`` valid for ``max_age`` secondes
205 :param django.http.HttpResponse response: a django response where to set the cookie
206 :param unicode key: the cookie key
207 :param unicode value: the cookie value
208 :param int max_age: the maximum validity age of the cookie
209 """
210 expires = datetime.strftime(
211 datetime.now(tz.utc) + timedelta(seconds=max_age),
212 "%a, %d-%b-%Y %H:%M:%S GMT"
213 )
214 response.set_cookie(
215 key,
216 value,
217 max_age=max_age,
218 expires=expires,
219 domain=settings.SESSION_COOKIE_DOMAIN,
220 secure=settings.SESSION_COOKIE_SECURE or None
221 )
224def get_current_url(request, ignore_params=None):
225 """
226 Giving a django request, return the current http url, possibly ignoring some GET parameters
228 :param django.http.HttpRequest request: The current request object.
229 :param set ignore_params: An optional set of GET parameters to ignore
230 :return: The URL of the current page, possibly omitting some parameters from
231 ``ignore_params`` in the querystring.
232 :rtype: unicode
233 """
234 if ignore_params is None:
235 ignore_params = set()
236 protocol = u'https' if request.is_secure() else u"http"
237 service_url = u"%s://%s%s" % (protocol, request.get_host(), request.path)
238 if request.GET:
239 params = copy_params(request.GET, ignore_params)
240 if params:
241 service_url += u"?%s" % urlencode(params)
242 return service_url
245def update_url(url, params):
246 """
247 update parameters using ``params`` in the ``url`` query string
249 :param url: An URL possibily with a querystring
250 :type url: :obj:`unicode` or :obj:`str`
251 :param dict params: A dictionary of parameters for updating the url querystring
252 :return: The URL with an updated querystring
253 :rtype: unicode
254 """
255 def to_unicode(data):
256 if isinstance(data, bytes):
257 return data.decode('utf-8')
258 else:
259 return data
261 def to_bytes(data):
262 if not isinstance(data, bytes):
263 return data.encode('utf-8')
264 else:
265 return data
267 url = to_unicode(url)
268 params = {to_unicode(key): to_unicode(value) for (key, value) in params.items()}
270 url_parts = list(urlparse(url))
271 query = dict(parse_qsl(url_parts[4], keep_blank_values=True))
272 query.update(params)
273 # make the params order deterministic
274 query = list(query.items())
275 query.sort()
276 url_query = urlencode(query)
277 url_parts[4] = url_query
278 url = urlunparse(url_parts)
280 if isinstance(url, bytes): 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 url = url.decode('utf-8')
282 return url
285def unpack_nested_exception(error):
286 """
287 If exception are stacked, return the first one
289 :param error: A python exception with possible exception embeded within
290 :return: A python exception with no exception embeded within
291 """
292 i = 0
293 while True:
294 if error.args[i:]:
295 if isinstance(error.args[i], Exception):
296 error = error.args[i]
297 i = 0
298 else:
299 i += 1
300 else:
301 break
302 return error
305def _gen_ticket(prefix=None, lg=settings.CAS_TICKET_LEN):
306 """
307 Generate a ticket with prefix ``prefix`` and length ``lg``
309 :param unicode prefix: An optional prefix (probably ST, PT, PGT or PGTIOU)
310 :param int lg: The length of the generated ticket (with the prefix)
311 :return: A randomlly generated ticket of length ``lg``
312 :rtype: unicode
313 """
314 random_part = u''.join(
315 random.choice(
316 string.ascii_letters + string.digits
317 ) for _ in range(lg - len(prefix or "") - 1)
318 )
319 if prefix is not None:
320 return u'%s-%s' % (prefix, random_part)
321 else:
322 return random_part
325def gen_lt():
326 """
327 Generate a Login Ticket
329 :return: A ticket with prefix ``settings.CAS_LOGIN_TICKET_PREFIX`` and length
330 ``settings.CAS_LT_LEN``
331 :rtype: unicode
332 """
333 return _gen_ticket(settings.CAS_LOGIN_TICKET_PREFIX, settings.CAS_LT_LEN)
336def gen_st():
337 """
338 Generate a Service Ticket
340 :return: A ticket with prefix ``settings.CAS_SERVICE_TICKET_PREFIX`` and length
341 ``settings.CAS_ST_LEN``
342 :rtype: unicode
343 """
344 return _gen_ticket(settings.CAS_SERVICE_TICKET_PREFIX, settings.CAS_ST_LEN)
347def gen_pt():
348 """
349 Generate a Proxy Ticket
351 :return: A ticket with prefix ``settings.CAS_PROXY_TICKET_PREFIX`` and length
352 ``settings.CAS_PT_LEN``
353 :rtype: unicode
354 """
355 return _gen_ticket(settings.CAS_PROXY_TICKET_PREFIX, settings.CAS_PT_LEN)
358def gen_pgt():
359 """
360 Generate a Proxy Granting Ticket
362 :return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_PREFIX`` and length
363 ``settings.CAS_PGT_LEN``
364 :rtype: unicode
365 """
366 return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_PREFIX, settings.CAS_PGT_LEN)
369def gen_pgtiou():
370 """
371 Generate a Proxy Granting Ticket IOU
373 :return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX`` and length
374 ``settings.CAS_PGTIOU_LEN``
375 :rtype: unicode
376 """
377 return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX, settings.CAS_PGTIOU_LEN)
380def gen_saml_id():
381 """
382 Generate an saml id
384 :return: A random id of length ``settings.CAS_TICKET_LEN``
385 :rtype: unicode
386 """
387 return _gen_ticket()
390def get_tuple(nuplet, index, default=None):
391 """
392 :param tuple nuplet: A tuple
393 :param int index: An index
394 :param default: An optional default value
395 :return: ``nuplet[index]`` if defined, else ``default`` (possibly ``None``)
396 """
397 if nuplet is None:
398 return default
399 try:
400 return nuplet[index]
401 except IndexError:
402 return default
405def crypt_salt_is_valid(salt):
406 """
407 Validate a salt as crypt salt
409 :param str salt: a password salt
410 :return: ``True`` if ``salt`` is a valid crypt salt on this system, ``False`` otherwise
411 :rtype: bool
412 """
413 if crypt is None: 413 ↛ 415line 413 didn't jump to line 415 because the condition on line 413 was always true
414 return False
415 if len(salt) < 2:
416 return False
417 else:
418 if salt[0] == '$':
419 if salt[1] == '$':
420 return False
421 else:
422 if '$' not in salt[1:]:
423 return False
424 else:
425 try:
426 hashed = crypt.crypt("", salt)
427 except OSError:
428 return False
429 if not hashed or '$' not in hashed[1:]:
430 return False
431 else:
432 return True
433 else:
434 return True
437class LdapHashUserPassword(object):
438 """
439 Class to deal with hashed password as defined at
440 https://datatracker.ietf.org/doc/html/draft-stroeder-hashed-userpassword-values
441 """
443 #: valide schemes that require a salt
444 schemes_salt = {b"{SMD5}", b"{SSHA}", b"{SSHA256}", b"{SSHA384}", b"{SSHA512}", b"{CRYPT}"}
445 #: valide sschemes that require no slat
446 schemes_nosalt = {b"{MD5}", b"{SHA}", b"{SHA256}", b"{SHA384}", b"{SHA512}"}
448 #: map beetween scheme and hash function
449 _schemes_to_hash = {
450 b"{SMD5}": hashlib.md5,
451 b"{MD5}": hashlib.md5,
452 b"{SSHA}": hashlib.sha1,
453 b"{SHA}": hashlib.sha1,
454 b"{SSHA256}": hashlib.sha256,
455 b"{SHA256}": hashlib.sha256,
456 b"{SSHA384}": hashlib.sha384,
457 b"{SHA384}": hashlib.sha384,
458 b"{SSHA512}": hashlib.sha512,
459 b"{SHA512}": hashlib.sha512
460 }
462 #: map between scheme and hash length
463 _schemes_to_len = {
464 b"{SMD5}": 16,
465 b"{SSHA}": 20,
466 b"{SSHA256}": 32,
467 b"{SSHA384}": 48,
468 b"{SSHA512}": 64,
469 }
471 class BadScheme(ValueError):
472 """
473 Error raised then the hash scheme is not in
474 :attr:`LdapHashUserPassword.schemes_salt` + :attr:`LdapHashUserPassword.schemes_nosalt`
475 """
476 pass
478 class BadHash(ValueError):
479 """Error raised then the hash is too short"""
480 pass
482 class BadSalt(ValueError):
483 """Error raised then, with the scheme ``{CRYPT}``, the salt is invalid"""
484 pass
486 @classmethod
487 def _raise_bad_scheme(cls, scheme, valid, msg):
488 """
489 Raise :attr:`BadScheme` error for ``scheme``, possible valid scheme are
490 in ``valid``, the error message is ``msg``
492 :param bytes scheme: A bad scheme
493 :param list valid: A list a valid scheme
494 :param str msg: The error template message
495 :raises LdapHashUserPassword.BadScheme: always
496 """
497 valid_schemes = [s.decode() for s in valid]
498 valid_schemes.sort()
499 raise cls.BadScheme(msg % (scheme, u", ".join(valid_schemes)))
501 @classmethod
502 def _test_scheme(cls, scheme):
503 """
504 Test if a scheme is valide or raise BadScheme
506 :param bytes scheme: A scheme
507 :raises BadScheme: if ``scheme`` is not a valid scheme
508 """
509 if scheme not in cls.schemes_salt and scheme not in cls.schemes_nosalt:
510 cls._raise_bad_scheme(
511 scheme,
512 cls.schemes_salt | cls.schemes_nosalt,
513 "The scheme %r is not valid. Valide schemes are %s."
514 )
516 @classmethod
517 def _test_scheme_salt(cls, scheme):
518 """
519 Test if the scheme need a salt or raise BadScheme
521 :param bytes scheme: A scheme
522 :raises BadScheme: if ``scheme` require no salt
523 """
524 if scheme not in cls.schemes_salt:
525 cls._raise_bad_scheme(
526 scheme,
527 cls.schemes_salt,
528 "The scheme %r is only valid without a salt. Valide schemes with salt are %s."
529 )
531 @classmethod
532 def _test_scheme_nosalt(cls, scheme):
533 """
534 Test if the scheme need no salt or raise BadScheme
536 :param bytes scheme: A scheme
537 :raises BadScheme: if ``scheme` require a salt
538 """
539 if scheme not in cls.schemes_nosalt:
540 cls._raise_bad_scheme(
541 scheme,
542 cls.schemes_nosalt,
543 "The scheme %r is only valid with a salt. Valide schemes without salt are %s."
544 )
546 @classmethod
547 def hash(cls, scheme, password, salt=None, charset="utf8"):
548 """
549 Hash ``password`` with ``scheme`` using ``salt``.
550 This three variable beeing encoded in ``charset``.
552 :param bytes scheme: A valid scheme
553 :param bytes password: A byte string to hash using ``scheme``
554 :param bytes salt: An optional salt to use if ``scheme`` requires any
555 :param str charset: The encoding of ``scheme``, ``password`` and ``salt``
556 :return: The hashed password encoded with ``charset``
557 :rtype: bytes
558 """
559 scheme = scheme.upper()
560 cls._test_scheme(scheme)
561 if salt is None or salt == b"":
562 salt = b""
563 cls._test_scheme_nosalt(scheme)
564 else:
565 cls._test_scheme_salt(scheme)
566 try:
567 return scheme + base64.b64encode(
568 cls._schemes_to_hash[scheme](password + salt).digest() + salt
569 )
570 except KeyError:
571 if crypt is None:
572 raise cls.BadScheme("Crypt is not available on the system")
573 password = password.decode(charset)
574 salt = salt.decode(charset)
575 if not crypt_salt_is_valid(salt):
576 raise cls.BadSalt("System crypt implementation do not support the salt %r" % salt)
577 hashed_password = crypt.crypt(password, salt)
578 hashed_password = hashed_password.encode(charset)
579 return scheme + hashed_password
581 @classmethod
582 def get_scheme(cls, hashed_passord):
583 """
584 Return the scheme of ``hashed_passord`` or raise :attr:`BadHash`
586 :param bytes hashed_passord: A hashed password
587 :return: The scheme used by the hashed password
588 :rtype: bytes
589 :raises BadHash: if no valid scheme is found within ``hashed_passord``
590 """
591 if not hashed_passord[0] == b'{'[0] or b'}' not in hashed_passord:
592 raise cls.BadHash("%r should start with the scheme enclosed with { }" % hashed_passord)
593 scheme = hashed_passord.split(b'}', 1)[0]
594 scheme = scheme.upper() + b"}"
595 return scheme
597 @classmethod
598 def get_salt(cls, hashed_passord):
599 """
600 Return the salt of ``hashed_passord`` possibly empty
602 :param bytes hashed_passord: A hashed password
603 :return: The salt used by the hashed password (empty if no salt is used)
604 :rtype: bytes
605 :raises BadHash: if no valid scheme is found within ``hashed_passord`` or if the
606 hashed password is too short for the scheme found.
607 """
608 scheme = cls.get_scheme(hashed_passord)
609 cls._test_scheme(scheme)
610 if scheme in cls.schemes_nosalt:
611 return b""
612 elif scheme == b'{CRYPT}': 612 ↛ 613line 612 didn't jump to line 613 because the condition on line 612 was never true
613 if b'$' in hashed_passord:
614 return b'$'.join(hashed_passord.split(b'$', 3)[:-1])[len(scheme):]
615 return hashed_passord.split(b'}', 1)[-1]
616 else:
617 try:
618 hashed_passord = base64.b64decode(hashed_passord[len(scheme):])
619 except (TypeError, binascii.Error) as error:
620 raise cls.BadHash("Bad base64: %s" % error)
621 if len(hashed_passord) < cls._schemes_to_len[scheme]:
622 raise cls.BadHash("Hash too short for the scheme %s" % scheme)
623 return hashed_passord[cls._schemes_to_len[scheme]:]
626def check_password(method, password, hashed_password, charset):
627 """
628 Check that ``password`` match `hashed_password` using ``method``,
629 assuming the encoding is ``charset``.
631 :param str method: on of ``"crypt"``, ``"ldap"``, ``"hex_md5"``, ``"hex_sha1"``,
632 ``"hex_sha224"``, ``"hex_sha256"``, ``"hex_sha384"``, ``"hex_sha512"``, ``"plain"``
633 :param password: The user inputed password
634 :type password: :obj:`str` or :obj:`unicode`
635 :param hashed_password: The hashed password as stored in the database
636 :type hashed_password: :obj:`str` or :obj:`unicode`
637 :param str charset: The used char encoding (also used internally, so it must be valid for
638 the charset used by ``password`` when it was initially )
639 :return: True if ``password`` match ``hashed_password`` using ``method``,
640 ``False`` otherwise
641 :rtype: bool
642 """
643 if not isinstance(password, bytes):
644 password = password.encode(charset)
645 if not isinstance(hashed_password, bytes):
646 hashed_password = hashed_password.encode(charset)
647 if method == "plain":
648 return password == hashed_password
649 elif method == "crypt": 649 ↛ 650line 649 didn't jump to line 650 because the condition on line 649 was never true
650 if crypt is None:
651 raise ValueError("Crypt is not available on the system")
652 if hashed_password.startswith(b'$'):
653 salt = b'$'.join(hashed_password.split(b'$', 3)[:-1])
654 elif hashed_password.startswith(b'_'): # pragma: no cover old BSD format not supported
655 salt = hashed_password[:9]
656 else:
657 salt = hashed_password[:2]
658 password = password.decode(charset)
659 salt = salt.decode(charset)
660 hashed_password = hashed_password.decode(charset)
661 if not crypt_salt_is_valid(salt):
662 raise ValueError("System crypt implementation do not support the salt %r" % salt)
663 crypted_password = crypt.crypt(password, salt)
664 return crypted_password == hashed_password
665 elif method == "ldap":
666 scheme = LdapHashUserPassword.get_scheme(hashed_password)
667 salt = LdapHashUserPassword.get_salt(hashed_password)
668 return LdapHashUserPassword.hash(scheme, password, salt, charset=charset) == hashed_password
669 elif (
670 method.startswith("hex_") and
671 method[4:] in {"md5", "sha1", "sha224", "sha256", "sha384", "sha512"}
672 ):
673 return getattr(
674 hashlib,
675 method[4:]
676 )(password).hexdigest().encode("ascii") == hashed_password.lower()
677 else:
678 raise ValueError("Unknown password method check %r" % method)
681def decode_version(version):
682 """
683 decode a version string following version semantic http://semver.org/ input a tuple of int.
684 It will work as long as we do not use pre release versions.
686 :param unicode version: A dotted version
687 :return: A tuple a int
688 :rtype: tuple
689 """
690 return tuple(int(sub_version) for sub_version in version.split('.'))
693def last_version():
694 """
695 Fetch the last version from pypi and return it. On successful fetch from pypi, the response
696 is cached 24h, on error, it is cached 10 min.
698 :return: the last django-cas-server version
699 :rtype: unicode
700 """
701 try:
702 last_update, version, success = last_version._cache
703 except AttributeError:
704 last_update = 0
705 version = None
706 success = False
707 cache_delta = 24 * 3600 if success else 600
708 if (time.time() - last_update) < cache_delta:
709 return version
710 else:
711 try:
712 req = requests.get(settings.CAS_NEW_VERSION_JSON_URL)
713 data = json.loads(req.text)
714 version = data["info"]["version"]
715 last_version._cache = (time.time(), version, True)
716 return version
717 except (
718 KeyError,
719 ValueError,
720 requests.exceptions.RequestException
721 ) as error: # pragma: no cover (should not happen unless pypi is not available)
722 logger.error(
723 "Unable to fetch %s: %s" % (settings.CAS_NEW_VERSION_JSON_URL, error)
724 )
725 last_version._cache = (time.time(), version, False)
728def dictfetchall(cursor):
729 "Return all rows from a django cursor as a dict"
730 columns = [col[0] for col in cursor.description]
731 return [
732 dict(zip(columns, row))
733 for row in cursor.fetchall()
734 ]
737def logout_request(ticket):
738 """
739 Forge a SLO logout request
741 :param unicode ticket: A ticket value
742 :return: A SLO XML body request
743 :rtype: unicode
744 """
745 return u"""<samlp:LogoutRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
746 ID="%(id)s" Version="2.0" IssueInstant="%(datetime)s">
747<saml:NameID xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"></saml:NameID>
748<samlp:SessionIndex>%(ticket)s</samlp:SessionIndex>
749</samlp:LogoutRequest>""" % {
750 'id': gen_saml_id(),
751 'datetime': timezone.now().isoformat(),
752 'ticket': ticket
753 }
756def regexpr_validator(value):
757 """
758 Test that ``value`` is a valid regular expression
760 :param unicode value: A regular expression to test
761 :raises ValidationError: if ``value`` is not a valid regular expression
762 """
763 try:
764 re.compile(value)
765 except re.error:
766 raise ValidationError(
767 _('"%(value)s" is not a valid regular expression'),
768 params={'value': value}
769 )