Coverage for cas_server/utils.py: 94%
309 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-18 11:22 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-18 11:22 +0000
1# -*- coding: utf-8 -*-
2# This program is distributed in the hope that it will be useful, but WITHOUT
3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
5# more details.
6#
7# You should have received a copy of the GNU General Public License version 3
8# along with this program; if not, write to the Free Software Foundation, Inc., 51
9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
10#
11# (c) 2015-2016 Valentin Samir
12"""Some util function for the app"""
13from .default_settings import settings
15from django.http import HttpResponseRedirect, HttpResponse
16from django.contrib import messages
17from django.contrib.messages import constants as DEFAULT_MESSAGE_LEVELS
18from django.core.serializers.json import DjangoJSONEncoder
19from django.utils import timezone
20from django.core.exceptions import ValidationError
21try:
22 from django.urls import reverse
23 from django.utils.translation import gettext_lazy as _
24except ImportError:
25 from django.core.urlresolvers import reverse
26 from django.utils.translation import ugettext_lazy as _
28import re
29import random
30import string
31import json
32import hashlib
33import base64
34import six
35import requests
36import time
37import logging
38import binascii
39# The crypt module is deprecated and will be removed in version 3.13
40try:
41 import crypt
42except ImportError:
43 crypt = None
45from importlib import import_module
46from datetime import datetime, timedelta
47from six.moves.urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
49from . import VERSION
51#: logger facility
52logger = logging.getLogger(__name__)
55def json_encode(obj):
56 """Encode a python object to json"""
57 try:
58 return json_encode.encoder.encode(obj)
59 except AttributeError:
60 json_encode.encoder = DjangoJSONEncoder(default=six.text_type)
61 return json_encode(obj)
64def context(params):
65 """
66 Function that add somes variable to the context before template rendering
68 :param dict params: The context dictionary used to render templates.
69 :return: The ``params`` dictionary with the key ``settings`` set to
70 :obj:`django.conf.settings`.
71 :rtype: dict
72 """
73 params["settings"] = settings
74 params["message_levels"] = DEFAULT_MESSAGE_LEVELS
76 if settings.CAS_NEW_VERSION_HTML_WARNING:
77 LAST_VERSION = last_version()
78 params["VERSION"] = VERSION
79 params["LAST_VERSION"] = LAST_VERSION
80 if LAST_VERSION is not None:
81 params["upgrade_available"] = decode_version(VERSION) < decode_version(LAST_VERSION)
82 else:
83 params["upgrade_available"] = False
85 if settings.CAS_INFO_MESSAGES_ORDER:
86 params["CAS_INFO_RENDER"] = []
87 for msg_name in settings.CAS_INFO_MESSAGES_ORDER:
88 if msg_name in settings.CAS_INFO_MESSAGES:
89 if not isinstance(settings.CAS_INFO_MESSAGES[msg_name], dict):
90 continue
91 msg = settings.CAS_INFO_MESSAGES[msg_name].copy()
92 if "message" in msg:
93 msg["name"] = msg_name
94 # use info as default infox type
95 msg["type"] = msg.get("type", "info")
96 # make box discardable by default
97 msg["discardable"] = msg.get("discardable", True)
98 msg_hash = (
99 six.text_type(msg["message"]).encode("utf-8") +
100 msg["type"].encode("utf-8")
101 )
102 # hash depend of the rendering language
103 msg["hash"] = hashlib.md5(msg_hash).hexdigest()
104 params["CAS_INFO_RENDER"].append(msg)
105 return params
108def json_response(request, data):
109 """
110 Wrapper dumping `data` to a json and sending it to the user with an HttpResponse
112 :param django.http.HttpRequest request: The request object used to generate this response.
113 :param dict data: The python dictionnary to return as a json
114 :return: The content of ``data`` serialized in json
115 :rtype: django.http.HttpResponse
116 """
117 data["messages"] = []
118 for msg in messages.get_messages(request):
119 data["messages"].append({'message': msg.message, 'level': msg.level_tag})
120 return HttpResponse(json.dumps(data), content_type="application/json")
123def import_attr(path):
124 """
125 transform a python dotted path to the attr
127 :param path: A dotted path to a python object or a python object
128 :type path: :obj:`unicode` or :obj:`str` or anything
129 :return: The python object pointed by the dotted path or the python object unchanged
130 """
131 # if we got a str, decode it to unicode (normally it should only contain ascii)
132 if isinstance(path, six.binary_type): 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 path = path.decode("utf-8")
134 # if path is not an unicode, return it unchanged (may be it is already the attribute to import)
135 if not isinstance(path, six.text_type):
136 return path
137 if u"." not in path:
138 ValueError("%r should be of the form `module.attr` and we just got `attr`" % path)
139 module, attr = path.rsplit(u'.', 1)
140 try:
141 return getattr(import_module(module), attr)
142 except ImportError:
143 raise ImportError("Module %r not found" % module)
144 except AttributeError:
145 raise AttributeError("Module %r has not attribut %r" % (module, attr))
148def redirect_params(url_name, params=None):
149 """
150 Redirect to ``url_name`` with ``params`` as querystring
152 :param unicode url_name: a URL pattern name
153 :param params: Some parameter to append to the reversed URL
154 :type params: :obj:`dict` or :obj:`NoneType<types.NoneType>`
155 :return: A redirection to the URL with name ``url_name`` with ``params`` as querystring.
156 :rtype: django.http.HttpResponseRedirect
157 """
158 url = reverse(url_name)
159 params = urlencode(params if params else {})
160 return HttpResponseRedirect(url + "?%s" % params)
163def reverse_params(url_name, params=None, **kwargs):
164 """
165 compute the reverse url of ``url_name`` and add to it parameters from ``params``
166 as querystring
168 :param unicode url_name: a URL pattern name
169 :param params: Some parameter to append to the reversed URL
170 :type params: :obj:`dict` or :obj:`NoneType<types.NoneType>`
171 :param **kwargs: additional parameters needed to compure the reverse URL
172 :return: The computed reverse URL of ``url_name`` with possible querystring from ``params``
173 :rtype: unicode
174 """
175 url = reverse(url_name, **kwargs)
176 params = urlencode(params if params else {})
177 if params:
178 return u"%s?%s" % (url, params)
179 else:
180 return url
183def copy_params(get_or_post_params, ignore=None):
184 """
185 copy a :class:`django.http.QueryDict` in a :obj:`dict` ignoring keys in the set ``ignore``
187 :param django.http.QueryDict get_or_post_params: A GET or POST
188 :class:`QueryDict<django.http.QueryDict>`
189 :param set ignore: An optinal set of keys to ignore during the copy
190 :return: A copy of get_or_post_params
191 :rtype: dict
192 """
193 if ignore is None:
194 ignore = set()
195 params = {}
196 for key in get_or_post_params:
197 if key not in ignore and get_or_post_params[key]:
198 params[key] = get_or_post_params[key]
199 return params
202def set_cookie(response, key, value, max_age):
203 """
204 Set the cookie ``key`` on ``response`` with value ``value`` valid for ``max_age`` secondes
206 :param django.http.HttpResponse response: a django response where to set the cookie
207 :param unicode key: the cookie key
208 :param unicode value: the cookie value
209 :param int max_age: the maximum validity age of the cookie
210 """
211 expires = datetime.strftime(
212 datetime.utcnow() + timedelta(seconds=max_age),
213 "%a, %d-%b-%Y %H:%M:%S GMT"
214 )
215 response.set_cookie(
216 key,
217 value,
218 max_age=max_age,
219 expires=expires,
220 domain=settings.SESSION_COOKIE_DOMAIN,
221 secure=settings.SESSION_COOKIE_SECURE or None
222 )
225def get_current_url(request, ignore_params=None):
226 """
227 Giving a django request, return the current http url, possibly ignoring some GET parameters
229 :param django.http.HttpRequest request: The current request object.
230 :param set ignore_params: An optional set of GET parameters to ignore
231 :return: The URL of the current page, possibly omitting some parameters from
232 ``ignore_params`` in the querystring.
233 :rtype: unicode
234 """
235 if ignore_params is None:
236 ignore_params = set()
237 protocol = u'https' if request.is_secure() else u"http"
238 service_url = u"%s://%s%s" % (protocol, request.get_host(), request.path)
239 if request.GET:
240 params = copy_params(request.GET, ignore_params)
241 if params:
242 service_url += u"?%s" % urlencode(params)
243 return service_url
246def update_url(url, params):
247 """
248 update parameters using ``params`` in the ``url`` query string
250 :param url: An URL possibily with a querystring
251 :type url: :obj:`unicode` or :obj:`str`
252 :param dict params: A dictionary of parameters for updating the url querystring
253 :return: The URL with an updated querystring
254 :rtype: unicode
255 """
256 def to_unicode(data):
257 if isinstance(data, bytes):
258 return data.decode('utf-8')
259 else:
260 return data
262 def to_bytes(data):
263 if not isinstance(data, bytes):
264 return data.encode('utf-8')
265 else:
266 return data
268 if six.PY3:
269 url = to_unicode(url)
270 params = {to_unicode(key): to_unicode(value) for (key, value) in params.items()}
271 else:
272 url = to_bytes(url)
273 params = {to_bytes(key): to_bytes(value) for (key, value) in params.items()}
275 url_parts = list(urlparse(url))
276 query = dict(parse_qsl(url_parts[4], keep_blank_values=True))
277 query.update(params)
278 # make the params order deterministic
279 query = list(query.items())
280 query.sort()
281 url_query = urlencode(query)
282 url_parts[4] = url_query
283 url = urlunparse(url_parts)
285 if isinstance(url, bytes): 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 url = url.decode('utf-8')
287 return url
290def unpack_nested_exception(error):
291 """
292 If exception are stacked, return the first one
294 :param error: A python exception with possible exception embeded within
295 :return: A python exception with no exception embeded within
296 """
297 i = 0
298 while True:
299 if error.args[i:]:
300 if isinstance(error.args[i], Exception):
301 error = error.args[i]
302 i = 0
303 else:
304 i += 1
305 else:
306 break
307 return error
310def _gen_ticket(prefix=None, lg=settings.CAS_TICKET_LEN):
311 """
312 Generate a ticket with prefix ``prefix`` and length ``lg``
314 :param unicode prefix: An optional prefix (probably ST, PT, PGT or PGTIOU)
315 :param int lg: The length of the generated ticket (with the prefix)
316 :return: A randomlly generated ticket of length ``lg``
317 :rtype: unicode
318 """
319 random_part = u''.join(
320 random.choice(
321 string.ascii_letters + string.digits
322 ) for _ in range(lg - len(prefix or "") - 1)
323 )
324 if prefix is not None:
325 return u'%s-%s' % (prefix, random_part)
326 else:
327 return random_part
330def gen_lt():
331 """
332 Generate a Login Ticket
334 :return: A ticket with prefix ``settings.CAS_LOGIN_TICKET_PREFIX`` and length
335 ``settings.CAS_LT_LEN``
336 :rtype: unicode
337 """
338 return _gen_ticket(settings.CAS_LOGIN_TICKET_PREFIX, settings.CAS_LT_LEN)
341def gen_st():
342 """
343 Generate a Service Ticket
345 :return: A ticket with prefix ``settings.CAS_SERVICE_TICKET_PREFIX`` and length
346 ``settings.CAS_ST_LEN``
347 :rtype: unicode
348 """
349 return _gen_ticket(settings.CAS_SERVICE_TICKET_PREFIX, settings.CAS_ST_LEN)
352def gen_pt():
353 """
354 Generate a Proxy Ticket
356 :return: A ticket with prefix ``settings.CAS_PROXY_TICKET_PREFIX`` and length
357 ``settings.CAS_PT_LEN``
358 :rtype: unicode
359 """
360 return _gen_ticket(settings.CAS_PROXY_TICKET_PREFIX, settings.CAS_PT_LEN)
363def gen_pgt():
364 """
365 Generate a Proxy Granting Ticket
367 :return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_PREFIX`` and length
368 ``settings.CAS_PGT_LEN``
369 :rtype: unicode
370 """
371 return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_PREFIX, settings.CAS_PGT_LEN)
374def gen_pgtiou():
375 """
376 Generate a Proxy Granting Ticket IOU
378 :return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX`` and length
379 ``settings.CAS_PGTIOU_LEN``
380 :rtype: unicode
381 """
382 return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX, settings.CAS_PGTIOU_LEN)
385def gen_saml_id():
386 """
387 Generate an saml id
389 :return: A random id of length ``settings.CAS_TICKET_LEN``
390 :rtype: unicode
391 """
392 return _gen_ticket()
395def get_tuple(nuplet, index, default=None):
396 """
397 :param tuple nuplet: A tuple
398 :param int index: An index
399 :param default: An optional default value
400 :return: ``nuplet[index]`` if defined, else ``default`` (possibly ``None``)
401 """
402 if nuplet is None:
403 return default
404 try:
405 return nuplet[index]
406 except IndexError:
407 return default
410def crypt_salt_is_valid(salt):
411 """
412 Validate a salt as crypt salt
414 :param str salt: a password salt
415 :return: ``True`` if ``salt`` is a valid crypt salt on this system, ``False`` otherwise
416 :rtype: bool
417 """
418 if crypt is None: 418 ↛ 419line 418 didn't jump to line 419 because the condition on line 418 was never true
419 return False
420 if len(salt) < 2:
421 return False
422 else:
423 if salt[0] == '$':
424 if salt[1] == '$':
425 return False
426 else:
427 if '$' not in salt[1:]:
428 return False
429 else:
430 try:
431 hashed = crypt.crypt("", salt)
432 except OSError:
433 return False
434 if not hashed or '$' not in hashed[1:]:
435 return False
436 else:
437 return True
438 else:
439 return True
442class LdapHashUserPassword(object):
443 """
444 Class to deal with hashed password as defined at
445 https://tools.ietf.org/id/draft-stroeder-hashed-userpassword-values-01.html
446 """
448 #: valide schemes that require a salt
449 schemes_salt = {b"{SMD5}", b"{SSHA}", b"{SSHA256}", b"{SSHA384}", b"{SSHA512}", b"{CRYPT}"}
450 #: valide sschemes that require no slat
451 schemes_nosalt = {b"{MD5}", b"{SHA}", b"{SHA256}", b"{SHA384}", b"{SHA512}"}
453 #: map beetween scheme and hash function
454 _schemes_to_hash = {
455 b"{SMD5}": hashlib.md5,
456 b"{MD5}": hashlib.md5,
457 b"{SSHA}": hashlib.sha1,
458 b"{SHA}": hashlib.sha1,
459 b"{SSHA256}": hashlib.sha256,
460 b"{SHA256}": hashlib.sha256,
461 b"{SSHA384}": hashlib.sha384,
462 b"{SHA384}": hashlib.sha384,
463 b"{SSHA512}": hashlib.sha512,
464 b"{SHA512}": hashlib.sha512
465 }
467 #: map between scheme and hash length
468 _schemes_to_len = {
469 b"{SMD5}": 16,
470 b"{SSHA}": 20,
471 b"{SSHA256}": 32,
472 b"{SSHA384}": 48,
473 b"{SSHA512}": 64,
474 }
476 class BadScheme(ValueError):
477 """
478 Error raised then the hash scheme is not in
479 :attr:`LdapHashUserPassword.schemes_salt` + :attr:`LdapHashUserPassword.schemes_nosalt`
480 """
481 pass
483 class BadHash(ValueError):
484 """Error raised then the hash is too short"""
485 pass
487 class BadSalt(ValueError):
488 """Error raised then, with the scheme ``{CRYPT}``, the salt is invalid"""
489 pass
491 @classmethod
492 def _raise_bad_scheme(cls, scheme, valid, msg):
493 """
494 Raise :attr:`BadScheme` error for ``scheme``, possible valid scheme are
495 in ``valid``, the error message is ``msg``
497 :param bytes scheme: A bad scheme
498 :param list valid: A list a valid scheme
499 :param str msg: The error template message
500 :raises LdapHashUserPassword.BadScheme: always
501 """
502 valid_schemes = [s.decode() for s in valid]
503 valid_schemes.sort()
504 raise cls.BadScheme(msg % (scheme, u", ".join(valid_schemes)))
506 @classmethod
507 def _test_scheme(cls, scheme):
508 """
509 Test if a scheme is valide or raise BadScheme
511 :param bytes scheme: A scheme
512 :raises BadScheme: if ``scheme`` is not a valid scheme
513 """
514 if scheme not in cls.schemes_salt and scheme not in cls.schemes_nosalt:
515 cls._raise_bad_scheme(
516 scheme,
517 cls.schemes_salt | cls.schemes_nosalt,
518 "The scheme %r is not valid. Valide schemes are %s."
519 )
521 @classmethod
522 def _test_scheme_salt(cls, scheme):
523 """
524 Test if the scheme need a salt or raise BadScheme
526 :param bytes scheme: A scheme
527 :raises BadScheme: if ``scheme` require no salt
528 """
529 if scheme not in cls.schemes_salt:
530 cls._raise_bad_scheme(
531 scheme,
532 cls.schemes_salt,
533 "The scheme %r is only valid without a salt. Valide schemes with salt are %s."
534 )
536 @classmethod
537 def _test_scheme_nosalt(cls, scheme):
538 """
539 Test if the scheme need no salt or raise BadScheme
541 :param bytes scheme: A scheme
542 :raises BadScheme: if ``scheme` require a salt
543 """
544 if scheme not in cls.schemes_nosalt:
545 cls._raise_bad_scheme(
546 scheme,
547 cls.schemes_nosalt,
548 "The scheme %r is only valid with a salt. Valide schemes without salt are %s."
549 )
551 @classmethod
552 def hash(cls, scheme, password, salt=None, charset="utf8"):
553 """
554 Hash ``password`` with ``scheme`` using ``salt``.
555 This three variable beeing encoded in ``charset``.
557 :param bytes scheme: A valid scheme
558 :param bytes password: A byte string to hash using ``scheme``
559 :param bytes salt: An optional salt to use if ``scheme`` requires any
560 :param str charset: The encoding of ``scheme``, ``password`` and ``salt``
561 :return: The hashed password encoded with ``charset``
562 :rtype: bytes
563 """
564 scheme = scheme.upper()
565 cls._test_scheme(scheme)
566 if salt is None or salt == b"":
567 salt = b""
568 cls._test_scheme_nosalt(scheme)
569 else:
570 cls._test_scheme_salt(scheme)
571 try:
572 return scheme + base64.b64encode(
573 cls._schemes_to_hash[scheme](password + salt).digest() + salt
574 )
575 except KeyError:
576 if crypt is None:
577 raise cls.BadScheme("Crypt is not available on the system")
578 if six.PY3:
579 password = password.decode(charset)
580 salt = salt.decode(charset)
581 if not crypt_salt_is_valid(salt):
582 raise cls.BadSalt("System crypt implementation do not support the salt %r" % salt)
583 hashed_password = crypt.crypt(password, salt)
584 if six.PY3:
585 hashed_password = hashed_password.encode(charset)
586 return scheme + hashed_password
588 @classmethod
589 def get_scheme(cls, hashed_passord):
590 """
591 Return the scheme of ``hashed_passord`` or raise :attr:`BadHash`
593 :param bytes hashed_passord: A hashed password
594 :return: The scheme used by the hashed password
595 :rtype: bytes
596 :raises BadHash: if no valid scheme is found within ``hashed_passord``
597 """
598 if not hashed_passord[0] == b'{'[0] or b'}' not in hashed_passord:
599 raise cls.BadHash("%r should start with the scheme enclosed with { }" % hashed_passord)
600 scheme = hashed_passord.split(b'}', 1)[0]
601 scheme = scheme.upper() + b"}"
602 return scheme
604 @classmethod
605 def get_salt(cls, hashed_passord):
606 """
607 Return the salt of ``hashed_passord`` possibly empty
609 :param bytes hashed_passord: A hashed password
610 :return: The salt used by the hashed password (empty if no salt is used)
611 :rtype: bytes
612 :raises BadHash: if no valid scheme is found within ``hashed_passord`` or if the
613 hashed password is too short for the scheme found.
614 """
615 scheme = cls.get_scheme(hashed_passord)
616 cls._test_scheme(scheme)
617 if scheme in cls.schemes_nosalt:
618 return b""
619 elif scheme == b'{CRYPT}':
620 if b'$' in hashed_passord: 620 ↛ 622line 620 didn't jump to line 622 because the condition on line 620 was always true
621 return b'$'.join(hashed_passord.split(b'$', 3)[:-1])[len(scheme):]
622 return hashed_passord.split(b'}', 1)[-1]
623 else:
624 try:
625 hashed_passord = base64.b64decode(hashed_passord[len(scheme):])
626 except (TypeError, binascii.Error) as error:
627 raise cls.BadHash("Bad base64: %s" % error)
628 if len(hashed_passord) < cls._schemes_to_len[scheme]:
629 raise cls.BadHash("Hash too short for the scheme %s" % scheme)
630 return hashed_passord[cls._schemes_to_len[scheme]:]
633def check_password(method, password, hashed_password, charset):
634 """
635 Check that ``password`` match `hashed_password` using ``method``,
636 assuming the encoding is ``charset``.
638 :param str method: on of ``"crypt"``, ``"ldap"``, ``"hex_md5"``, ``"hex_sha1"``,
639 ``"hex_sha224"``, ``"hex_sha256"``, ``"hex_sha384"``, ``"hex_sha512"``, ``"plain"``
640 :param password: The user inputed password
641 :type password: :obj:`str` or :obj:`unicode`
642 :param hashed_password: The hashed password as stored in the database
643 :type hashed_password: :obj:`str` or :obj:`unicode`
644 :param str charset: The used char encoding (also used internally, so it must be valid for
645 the charset used by ``password`` when it was initially )
646 :return: True if ``password`` match ``hashed_password`` using ``method``,
647 ``False`` otherwise
648 :rtype: bool
649 """
650 if not isinstance(password, six.binary_type):
651 password = password.encode(charset)
652 if not isinstance(hashed_password, six.binary_type):
653 hashed_password = hashed_password.encode(charset)
654 if method == "plain":
655 return password == hashed_password
656 elif method == "crypt":
657 if crypt is None: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true
658 raise ValueError("Crypt is not available on the system")
659 if hashed_password.startswith(b'$'):
660 salt = b'$'.join(hashed_password.split(b'$', 3)[:-1])
661 elif hashed_password.startswith(b'_'): # pragma: no cover old BSD format not supported
662 salt = hashed_password[:9]
663 else:
664 salt = hashed_password[:2]
665 if six.PY3:
666 password = password.decode(charset)
667 salt = salt.decode(charset)
668 hashed_password = hashed_password.decode(charset)
669 if not crypt_salt_is_valid(salt):
670 raise ValueError("System crypt implementation do not support the salt %r" % salt)
671 crypted_password = crypt.crypt(password, salt)
672 return crypted_password == hashed_password
673 elif method == "ldap":
674 scheme = LdapHashUserPassword.get_scheme(hashed_password)
675 salt = LdapHashUserPassword.get_salt(hashed_password)
676 return LdapHashUserPassword.hash(scheme, password, salt, charset=charset) == hashed_password
677 elif (
678 method.startswith("hex_") and
679 method[4:] in {"md5", "sha1", "sha224", "sha256", "sha384", "sha512"}
680 ):
681 return getattr(
682 hashlib,
683 method[4:]
684 )(password).hexdigest().encode("ascii") == hashed_password.lower()
685 else:
686 raise ValueError("Unknown password method check %r" % method)
689def decode_version(version):
690 """
691 decode a version string following version semantic http://semver.org/ input a tuple of int.
692 It will work as long as we do not use pre release versions.
694 :param unicode version: A dotted version
695 :return: A tuple a int
696 :rtype: tuple
697 """
698 return tuple(int(sub_version) for sub_version in version.split('.'))
701def last_version():
702 """
703 Fetch the last version from pypi and return it. On successful fetch from pypi, the response
704 is cached 24h, on error, it is cached 10 min.
706 :return: the last django-cas-server version
707 :rtype: unicode
708 """
709 try:
710 last_update, version, success = last_version._cache
711 except AttributeError:
712 last_update = 0
713 version = None
714 success = False
715 cache_delta = 24 * 3600 if success else 600
716 if (time.time() - last_update) < cache_delta:
717 return version
718 else:
719 try:
720 req = requests.get(settings.CAS_NEW_VERSION_JSON_URL)
721 data = json.loads(req.text)
722 version = data["info"]["version"]
723 last_version._cache = (time.time(), version, True)
724 return version
725 except (
726 KeyError,
727 ValueError,
728 requests.exceptions.RequestException
729 ) as error: # pragma: no cover (should not happen unless pypi is not available)
730 logger.error(
731 "Unable to fetch %s: %s" % (settings.CAS_NEW_VERSION_JSON_URL, error)
732 )
733 last_version._cache = (time.time(), version, False)
736def dictfetchall(cursor):
737 "Return all rows from a django cursor as a dict"
738 columns = [col[0] for col in cursor.description]
739 return [
740 dict(zip(columns, row))
741 for row in cursor.fetchall()
742 ]
745def logout_request(ticket):
746 """
747 Forge a SLO logout request
749 :param unicode ticket: A ticket value
750 :return: A SLO XML body request
751 :rtype: unicode
752 """
753 return u"""<samlp:LogoutRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
754 ID="%(id)s" Version="2.0" IssueInstant="%(datetime)s">
755<saml:NameID xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"></saml:NameID>
756<samlp:SessionIndex>%(ticket)s</samlp:SessionIndex>
757</samlp:LogoutRequest>""" % {
758 'id': gen_saml_id(),
759 'datetime': timezone.now().isoformat(),
760 'ticket': ticket
761 }
764def regexpr_validator(value):
765 """
766 Test that ``value`` is a valid regular expression
768 :param unicode value: A regular expression to test
769 :raises ValidationError: if ``value`` is not a valid regular expression
770 """
771 try:
772 re.compile(value)
773 except re.error:
774 raise ValidationError(
775 _('"%(value)s" is not a valid regular expression'),
776 params={'value': value}
777 )