Coverage for cas_server/views.py: 99%
596 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-13 11:30 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-13 11:30 +0000
1# -*- coding: utf-8 -*-
2# This program is distributed in the hope that it will be useful, but WITHOUT
3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
5# more details.
6#
7# You should have received a copy of the GNU General Public License version 3
8# along with this program; if not, write to the Free Software Foundation, Inc., 51
9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
10#
11# (c) 2015-2016 Valentin Samir
12"""views for the app"""
13from .default_settings import settings, SessionStore
15from django.shortcuts import render, redirect
16from django.http import HttpResponse, HttpResponseRedirect
17from django.contrib import messages
18from django.utils.decorators import method_decorator
19from django.utils import timezone
20from django.views.decorators.csrf import csrf_exempt
21from django.middleware.csrf import CsrfViewMiddleware
22from django.views.generic import View
23try:
24 from django.utils.encoding import python_2_unicode_compatible
25 from django.utils.translation import ugettext as _
26except ImportError:
27 def python_2_unicode_compatible(func):
28 """
29 We use Django >= 3.0 with Python >= 3.4, we don't need Python 2 compatibility.
30 """
31 return func
32 from django.utils.translation import gettext as _
33from django.utils.safestring import mark_safe
34try:
35 from django.urls import reverse
36except ImportError:
37 from django.core.urlresolvers import reverse
39import re
40import logging
41import pprint
42import requests
43from lxml import etree
44from datetime import timedelta
46import cas_server.utils as utils
47import cas_server.forms as forms
48import cas_server.models as models
50from .utils import json_response
51from .models import Ticket, ServiceTicket, ProxyTicket, ProxyGrantingTicket
52from .models import ServicePattern, FederatedIendityProvider, FederatedUser
53from .federate import CASFederateValidateUser
55logger = logging.getLogger(__name__)
58class LogoutMixin(object):
59 """destroy CAS session utils"""
61 def logout(self, all_session=False):
62 """
63 effectively destroy a CAS session
65 :param boolean all_session: If ``True`` destroy all the user sessions, otherwise
66 destroy the current user session.
67 :return: The number of destroyed sessions
68 :rtype: int
69 """
70 # initialize the counter of the number of destroyed sesisons
71 session_nb = 0
72 # save the current user username before flushing the session
73 username = self.request.session.get("username")
74 if username:
75 if all_session:
76 logger.info("Logging out user %s from all sessions." % username)
77 else:
78 logger.info("Logging out user %s." % username)
79 users = []
80 # try to get the user from the current session
81 try:
82 users.append(
83 models.User.objects.get(
84 username=username,
85 session_key=self.request.session.session_key
86 )
87 )
88 except models.User.DoesNotExist:
89 # if user not found in database, flush the session anyway
90 self.request.session.flush()
92 # If all_session is set, search all of the user sessions
93 if all_session:
94 users.extend(
95 models.User.objects.filter(
96 username=username
97 ).exclude(
98 session_key=self.request.session.session_key
99 )
100 )
102 # Iterate over all user sessions that have to be logged out
103 for user in users:
104 # get the user session
105 session = SessionStore(session_key=user.session_key)
106 # flush the session
107 session.flush()
108 # send SLO requests
109 user.logout(self.request)
110 # delete the user
111 user.delete()
112 # increment the destroyed session counter
113 session_nb += 1
114 if username:
115 logger.info("User %s logged out" % username)
116 return session_nb
119class CsrfExemptView(View):
120 """base class for csrf exempt class views"""
122 @method_decorator(csrf_exempt) # csrf is disabled for allowing SLO requests reception
123 def dispatch(self, request, *args, **kwargs):
124 """
125 dispatch different http request to the methods of the same name
127 :param django.http.HttpRequest request: The current request object
128 """
129 return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)
132class LogoutView(View, LogoutMixin):
133 """destroy CAS session (logout) view"""
135 #: current :class:`django.http.HttpRequest` object
136 request = None
137 #: service GET parameter
138 service = None
139 #: url GET paramet
140 url = None
141 #: ``True`` if the HTTP_X_AJAX http header is sent and ``settings.CAS_ENABLE_AJAX_AUTH``
142 #: is ``True``, ``False`` otherwise.
143 ajax = None
145 def init_get(self, request):
146 """
147 Initialize the :class:`LogoutView` attributes on GET request
149 :param django.http.HttpRequest request: The current request object
150 """
151 self.request = request
152 self.service = request.GET.get('service')
153 self.url = request.GET.get('url')
154 self.ajax = settings.CAS_ENABLE_AJAX_AUTH and 'HTTP_X_AJAX' in request.META
156 @staticmethod
157 def delete_cookies(response):
158 if settings.CAS_REMOVE_DJANGO_SESSION_COOKIE_ON_LOGOUT: 158 ↛ 159line 158 didn't jump to line 159, because the condition on line 158 was never true
159 response.delete_cookie(settings.SESSION_COOKIE_NAME)
160 if settings.CAS_REMOVE_DJANGO_CSRF_COOKIE_ON_LOGOUT: 160 ↛ 161line 160 didn't jump to line 161, because the condition on line 160 was never true
161 response.delete_cookie(settings.CSRF_COOKIE_NAME)
162 if settings.CAS_REMOVE_DJANGO_LANGUAGE_COOKIE_ON_LOGOUT: 162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true
163 response.delete_cookie(settings.LANGUAGE_COOKIE_NAME)
164 return response
166 def get(self, request, *args, **kwargs):
167 """
168 method called on GET request on this view
170 :param django.http.HttpRequest request: The current request object
171 """
172 logger.info("logout requested")
173 # initialize the class attributes
174 self.init_get(request)
175 # if CAS federation mode is enable, bakup the provider before flushing the sessions
176 if settings.CAS_FEDERATE:
177 try:
178 user = FederatedUser.get_from_federated_username(
179 self.request.session.get("username")
180 )
181 auth = CASFederateValidateUser(user.provider, service_url="")
182 except FederatedUser.DoesNotExist:
183 auth = None
184 session_nb = self.logout(self.request.GET.get("all"))
185 # if CAS federation mode is enable, redirect to user CAS logout page, appending the
186 # current querystring
187 if settings.CAS_FEDERATE:
188 if auth is not None:
189 params = utils.copy_params(request.GET, ignore={"forget_provider"})
190 url = auth.get_logout_url()
191 response = HttpResponseRedirect(utils.update_url(url, params))
192 if request.GET.get("forget_provider"):
193 response.delete_cookie("remember_provider")
194 return self.delete_cookies(response)
195 # if service is set, redirect to service after logout
196 if self.service:
197 list(messages.get_messages(request)) # clean messages before leaving the django app
198 return self.delete_cookies(HttpResponseRedirect(self.service))
199 # if service is not set but url is set, redirect to url after logout
200 elif self.url:
201 list(messages.get_messages(request)) # clean messages before leaving the django app
202 return self.delete_cookies(HttpResponseRedirect(self.url))
203 else:
204 # build logout message depending of the number of sessions the user logs out
205 if session_nb == 1:
206 logout_msg = mark_safe(_(
207 "<h3>Logout successful</h3>"
208 "You have successfully logged out from the Central Authentication Service. "
209 "For security reasons, close your web browser."
210 ))
211 elif session_nb > 1:
212 logout_msg = mark_safe(_(
213 "<h3>Logout successful</h3>"
214 "You have successfully logged out from %d sessions of the Central "
215 "Authentication Service. "
216 "For security reasons, close your web browser."
217 ) % session_nb)
218 else:
219 logout_msg = mark_safe(_(
220 "<h3>Logout successful</h3>"
221 "You were already logged out from the Central Authentication Service. "
222 "For security reasons, close your web browser."
223 ))
225 # depending of settings, redirect to the login page with a logout message or display
226 # the logout page. The default is to display tge logout page.
227 if settings.CAS_REDIRECT_TO_LOGIN_AFTER_LOGOUT:
228 messages.add_message(request, messages.SUCCESS, logout_msg)
229 if self.ajax:
230 url = reverse("cas_server:login")
231 data = {
232 'status': 'success',
233 'detail': 'logout',
234 'url': url,
235 'session_nb': session_nb
236 }
237 return self.delete_cookies(json_response(request, data))
238 else:
239 return self.delete_cookies(redirect("cas_server:login"))
240 else:
241 if self.ajax:
242 data = {'status': 'success', 'detail': 'logout', 'session_nb': session_nb}
243 return self.delete_cookies(json_response(request, data))
244 else:
245 return self.delete_cookies(render(
246 request,
247 settings.CAS_LOGOUT_TEMPLATE,
248 utils.context({'logout_msg': logout_msg})
249 ))
252class FederateAuth(CsrfExemptView):
253 """
254 view to authenticated user against a backend CAS then CAS_FEDERATE is True
256 csrf is disabled for allowing SLO requests reception.
257 """
259 #: current URL used as service URL by the CAS client
260 service_url = None
262 def get_cas_client(self, request, provider, renew=False):
263 """
264 return a CAS client object matching provider
266 :param django.http.HttpRequest request: The current request object
267 :param cas_server.models.FederatedIendityProvider provider: the user identity provider
268 :return: The user CAS client object
269 :rtype: :class:`federate.CASFederateValidateUser
270 <cas_server.federate.CASFederateValidateUser>`
271 """
272 # compute the current url, ignoring ticket dans provider GET parameters
273 service_url = utils.get_current_url(request, {"ticket", "provider"})
274 self.service_url = service_url
275 return CASFederateValidateUser(provider, service_url, renew=renew)
277 def post(self, request, provider=None, *args, **kwargs):
278 """
279 method called on POST request
281 :param django.http.HttpRequest request: The current request object
282 :param unicode provider: Optional parameter. The user provider suffix.
283 """
284 # if settings.CAS_FEDERATE is not True redirect to the login page
285 if not settings.CAS_FEDERATE:
286 logger.warning("CAS_FEDERATE is False, set it to True to use federation")
287 return redirect("cas_server:login")
288 # POST with a provider suffix, this is probably an SLO request. csrf is disabled for
289 # allowing SLO requests reception
290 try:
291 provider = FederatedIendityProvider.objects.get(suffix=provider)
292 auth = self.get_cas_client(request, provider)
293 try:
294 auth.clean_sessions(request.POST['logoutRequest'])
295 except (KeyError, AttributeError):
296 pass
297 return HttpResponse("ok")
298 # else, a User is trying to log in using an identity provider
299 except FederatedIendityProvider.DoesNotExist:
300 # Manually checking for csrf to protect the code below
301 reason = CsrfViewMiddleware(lambda request: HttpResponse()) \
302 .process_view(request, None, (), {})
303 if reason is not None: # pragma: no cover (csrf checks are disabled during tests)
304 return reason # Failed the test, stop here.
305 form = forms.FederateSelect(request.POST)
306 if form.is_valid():
307 params = utils.copy_params(
308 request.POST,
309 ignore={"provider", "csrfmiddlewaretoken", "ticket", "lt"}
310 )
311 if params.get("renew") == "False":
312 del params["renew"]
313 url = utils.reverse_params(
314 "cas_server:federateAuth",
315 kwargs=dict(provider=form.cleaned_data["provider"].suffix),
316 params=params
317 )
318 return HttpResponseRedirect(url)
319 else:
320 return redirect("cas_server:login")
322 def get(self, request, provider=None):
323 """
324 method called on GET request
326 :param django.http.HttpRequestself. request: The current request object
327 :param unicode provider: Optional parameter. The user provider suffix.
328 """
329 # if settings.CAS_FEDERATE is not True redirect to the login page
330 if not settings.CAS_FEDERATE:
331 logger.warning("CAS_FEDERATE is False, set it to True to use federation")
332 return redirect("cas_server:login")
333 renew = bool(request.GET.get('renew') and request.GET['renew'] != "False")
334 # Is the user is already authenticated, no need to request authentication to the user
335 # identity provider.
336 if self.request.session.get("authenticated") and not renew:
337 logger.warning("User already authenticated, dropping federated authentication request")
338 return redirect("cas_server:login")
339 try:
340 # get the identity provider from its suffix
341 provider = FederatedIendityProvider.objects.get(suffix=provider)
342 # get a CAS client for the user identity provider
343 auth = self.get_cas_client(request, provider, renew)
344 # if no ticket submited, redirect to the identity provider CAS login page
345 if 'ticket' not in request.GET:
346 logger.info("Trying to authenticate %s again" % auth.provider.server_url)
347 return HttpResponseRedirect(auth.get_login_url())
348 else:
349 ticket = request.GET['ticket']
350 try:
351 # if the ticket validation succeed
352 if auth.verify_ticket(ticket):
353 logger.info(
354 "Got a valid ticket for %s from %s" % (
355 auth.username,
356 auth.provider.server_url
357 )
358 )
359 params = utils.copy_params(request.GET, ignore={"ticket", "remember"})
360 request.session["federate_username"] = auth.federated_username
361 request.session["federate_ticket"] = ticket
362 auth.register_slo(
363 auth.federated_username,
364 request.session.session_key,
365 ticket
366 )
367 # redirect to the the login page for the user to become authenticated
368 # thanks to the `federate_username` and `federate_ticket` session parameters
369 url = utils.reverse_params("cas_server:login", params)
370 response = HttpResponseRedirect(url)
371 # If the user has checked "remember my identity provider" store it in a
372 # cookie
373 if request.GET.get("remember"):
374 max_age = settings.CAS_FEDERATE_REMEMBER_TIMEOUT
375 utils.set_cookie(
376 response,
377 "remember_provider",
378 provider.suffix,
379 max_age
380 )
381 return response
382 # else redirect to the identity provider CAS login page
383 else:
384 logger.info(
385 (
386 "Got an invalid ticket %s from %s for service %s. "
387 "Retrying authentication"
388 ) % (
389 ticket,
390 auth.provider.server_url,
391 self.service_url
392 )
393 )
394 return HttpResponseRedirect(auth.get_login_url())
395 # both xml.etree.ElementTree and lxml.etree exceptions inherit from SyntaxError
396 except SyntaxError as error:
397 messages.add_message(
398 request,
399 messages.ERROR,
400 _(
401 u"Invalid response from your identity provider CAS upon "
402 u"ticket %(ticket)s validation: %(error)r"
403 ) % {'ticket': ticket, 'error': error}
404 )
405 response = redirect("cas_server:login")
406 response.delete_cookie("remember_provider")
407 return response
408 except FederatedIendityProvider.DoesNotExist:
409 logger.warning("Identity provider suffix %s not found" % provider)
410 # if the identity provider is not found, redirect to the login page
411 return redirect("cas_server:login")
414class LoginView(View, LogoutMixin):
415 """credential requestor / acceptor"""
417 # pylint: disable=too-many-instance-attributes
418 # Nine is reasonable in this case.
420 #: The current :class:`models.User<cas_server.models.User>` object
421 user = None
422 #: The form to display to the user
423 form = None
425 #: current :class:`django.http.HttpRequest` object
426 request = None
427 #: service GET/POST parameter
428 service = None
429 #: ``True`` if renew GET/POST parameter is present and not "False"
430 renew = None
431 #: the warn GET/POST parameter
432 warn = None
433 #: the gateway GET/POST parameter
434 gateway = None
435 #: the method GET/POST parameter
436 method = None
438 #: ``True`` if the HTTP_X_AJAX http header is sent and ``settings.CAS_ENABLE_AJAX_AUTH``
439 #: is ``True``, ``False`` otherwise.
440 ajax = None
442 #: ``True`` if the user has just authenticated
443 renewed = False
444 #: ``True`` if renew GET/POST parameter is present and not "False"
445 warned = False
447 #: The :class:`FederateAuth` transmited username (only used if ``settings.CAS_FEDERATE``
448 #: is ``True``)
449 username = None
450 #: The :class:`FederateAuth` transmited ticket (only used if ``settings.CAS_FEDERATE`` is
451 #: ``True``)
452 ticket = None
454 INVALID_LOGIN_TICKET = 1
455 USER_LOGIN_OK = 2
456 USER_LOGIN_FAILURE = 3
457 USER_ALREADY_LOGGED = 4
458 USER_AUTHENTICATED = 5
459 USER_NOT_AUTHENTICATED = 6
461 def init_post(self, request):
462 """
463 Initialize POST received parameters
465 :param django.http.HttpRequest request: The current request object
466 """
467 self.request = request
468 self.service = request.POST.get('service')
469 self.renew = bool(request.POST.get('renew') and request.POST['renew'] != "False")
470 self.gateway = request.POST.get('gateway')
471 self.method = request.POST.get('method')
472 self.ajax = settings.CAS_ENABLE_AJAX_AUTH and 'HTTP_X_AJAX' in request.META
473 if request.POST.get('warned') and request.POST['warned'] != "False":
474 self.warned = True
475 self.warn = request.POST.get('warn')
476 if settings.CAS_FEDERATE:
477 self.username = request.POST.get('username')
478 # in federated mode, the valdated indentity provider CAS ticket is used as password
479 self.ticket = request.POST.get('password')
481 def gen_lt(self):
482 """Generate a new LoginTicket and add it to the list of valid LT for the user"""
483 self.request.session['lt'] = self.request.session.get('lt', []) + [utils.gen_lt()]
484 if len(self.request.session['lt']) > 100:
485 self.request.session['lt'] = self.request.session['lt'][-100:]
487 def check_lt(self):
488 """
489 Check is the POSTed LoginTicket is valid, if yes invalide it
491 :return: ``True`` if the LoginTicket is valid, ``False`` otherwise
492 :rtype: bool
493 """
494 # save LT for later check
495 lt_valid = self.request.session.get('lt', [])
496 lt_send = self.request.POST.get('lt')
497 # generate a new LT (by posting the LT has been consumed)
498 self.gen_lt()
499 # check if send LT is valid
500 if lt_send not in lt_valid:
501 return False
502 else:
503 self.request.session['lt'].remove(lt_send)
504 # we need to redo the affectation for django to detect that the list has changed
505 # and for its new value to be store in the session
506 self.request.session['lt'] = self.request.session['lt']
507 return True
509 def post(self, request, *args, **kwargs):
510 """
511 method called on POST request on this view
513 :param django.http.HttpRequest request: The current request object
514 """
515 # initialize class parameters
516 self.init_post(request)
517 # process the POST request
518 ret = self.process_post()
519 if ret == self.INVALID_LOGIN_TICKET:
520 messages.add_message(
521 self.request,
522 messages.ERROR,
523 _(u"Invalid login ticket, please try to log in again")
524 )
525 elif ret == self.USER_LOGIN_OK:
526 # On successful login, update the :class:`models.User<cas_server.models.User>` ``date``
527 # attribute by saving it. (``auto_now=True``)
528 self.user = models.User.objects.get_or_create(
529 username=self.request.session['username'],
530 session_key=self.request.session.session_key
531 )[0]
532 self.user.last_login = timezone.now()
533 self.user.save()
534 elif ret == self.USER_LOGIN_FAILURE: # bad user login
535 if settings.CAS_FEDERATE:
536 self.ticket = None
537 self.username = None
538 self.init_form()
539 # preserve valid LoginTickets from session flush
540 lt = self.request.session.get('lt', [])
541 # On login failure, flush the session
542 self.logout()
543 # restore valid LoginTickets
544 self.request.session['lt'] = lt
545 elif ret == self.USER_ALREADY_LOGGED:
546 pass
547 else: # pragma: no cover (should no happen)
548 raise EnvironmentError("invalid output for LoginView.process_post")
549 # call the GET/POST common part
550 response = self.common()
551 if self.warn:
552 utils.set_cookie(
553 response,
554 "warn",
555 "on",
556 10 * 365 * 24 * 3600
557 )
558 else:
559 response.delete_cookie("warn")
560 return response
562 def process_post(self):
563 """
564 Analyse the POST request:
566 * check that the LoginTicket is valid
567 * check that the user sumited credentials are valid
569 :return:
570 * :attr:`INVALID_LOGIN_TICKET` if the POSTed LoginTicket is not valid
571 * :attr:`USER_ALREADY_LOGGED` if the user is already logged and do no request
572 reauthentication.
573 * :attr:`USER_LOGIN_FAILURE` if the user is not logged or request for
574 reauthentication and his credentials are not valid
575 * :attr:`USER_LOGIN_OK` if the user is not logged or request for
576 reauthentication and his credentials are valid
577 :rtype: int
578 """
579 if not self.check_lt():
580 self.init_form(self.request.POST)
581 logger.warning("Received an invalid login ticket")
582 return self.INVALID_LOGIN_TICKET
583 elif not self.request.session.get("authenticated") or self.renew:
584 # authentication request receive, initialize the form to use
585 self.init_form(self.request.POST)
586 if self.form.is_valid():
587 self.request.session.set_expiry(0)
588 self.request.session["username"] = self.form.cleaned_data['username']
589 self.request.session["warn"] = True if self.form.cleaned_data.get("warn") else False
590 self.request.session["authenticated"] = True
591 self.renewed = True
592 self.warned = True
593 logger.info("User %s successfully authenticated" % self.request.session["username"])
594 return self.USER_LOGIN_OK
595 else:
596 logger.warning("A login attempt failed")
597 return self.USER_LOGIN_FAILURE
598 else:
599 logger.warning("Received a login attempt for an already-active user")
600 return self.USER_ALREADY_LOGGED
602 def init_get(self, request):
603 """
604 Initialize GET received parameters
606 :param django.http.HttpRequest request: The current request object
607 """
608 self.request = request
609 self.service = request.GET.get('service')
610 self.renew = bool(request.GET.get('renew') and request.GET['renew'] != "False")
611 self.gateway = request.GET.get('gateway')
612 self.method = request.GET.get('method')
613 self.ajax = settings.CAS_ENABLE_AJAX_AUTH and 'HTTP_X_AJAX' in request.META
614 self.warn = request.GET.get('warn')
615 if settings.CAS_FEDERATE:
616 # here username and ticket are fetch from the session after a redirection from
617 # FederateAuth.get
618 self.username = request.session.get("federate_username")
619 self.ticket = request.session.get("federate_ticket")
620 if self.username:
621 del request.session["federate_username"]
622 if self.ticket:
623 del request.session["federate_ticket"]
625 def get(self, request, *args, **kwargs):
626 """
627 method called on GET request on this view
629 :param django.http.HttpRequest request: The current request object
630 """
631 # initialize class parameters
632 self.init_get(request)
633 # process the GET request
634 self.process_get()
635 # call the GET/POST common part
636 return self.common()
638 def process_get(self):
639 """
640 Analyse the GET request
642 :return:
643 * :attr:`USER_NOT_AUTHENTICATED` if the user is not authenticated or is requesting
644 for authentication renewal
645 * :attr:`USER_AUTHENTICATED` if the user is authenticated and is not requesting
646 for authentication renewal
647 :rtype: int
648 """
649 # generate a new LT
650 self.gen_lt()
651 if not self.request.session.get("authenticated") or self.renew:
652 # authentication will be needed, initialize the form to use
653 self.init_form()
654 return self.USER_NOT_AUTHENTICATED
655 return self.USER_AUTHENTICATED
657 def init_form(self, values=None):
658 """
659 Initialization of the good form depending of POST and GET parameters
661 :param django.http.QueryDict values: A POST or GET QueryDict
662 """
663 if values:
664 values = values.copy()
665 values['lt'] = self.request.session['lt'][-1]
666 form_initial = {
667 'service': self.service,
668 'method': self.method,
669 'warn': (
670 self.warn or self.request.session.get("warn") or self.request.COOKIES.get('warn')
671 ),
672 'lt': self.request.session['lt'][-1],
673 'renew': self.renew
674 }
675 if settings.CAS_FEDERATE:
676 if self.username and self.ticket:
677 form_initial['username'] = self.username
678 form_initial['password'] = self.ticket
679 form_initial['ticket'] = self.ticket
680 self.form = forms.FederateUserCredential(
681 values,
682 initial=form_initial
683 )
684 else:
685 self.form = forms.FederateSelect(values, initial=form_initial)
686 else:
687 self.form = forms.UserCredential(
688 values,
689 initial=form_initial
690 )
692 def service_login(self):
693 """
694 Perform login against a service
696 :return:
697 * The rendering of the ``settings.CAS_WARN_TEMPLATE`` if the user asked to be
698 warned before ticket emission and has not yep been warned.
699 * The redirection to the service URL with a ticket GET parameter
700 * The redirection to the service URL without a ticket if ticket generation failed
701 and the :attr:`gateway` attribute is set
702 * The rendering of the ``settings.CAS_LOGGED_TEMPLATE`` template with some error
703 messages if the ticket generation failed (e.g: user not allowed).
704 :rtype: django.http.HttpResponse
705 """
706 try:
707 # is the service allowed
708 service_pattern = ServicePattern.validate(self.service)
709 # is the current user allowed on this service
710 service_pattern.check_user(self.user)
711 # if the user has asked to be warned before any login to a service
712 if self.request.session.get("warn", True) and not self.warned:
713 messages.add_message(
714 self.request,
715 messages.WARNING,
716 _(u"Authentication has been required by service %(name)s (%(url)s)") %
717 {'name': service_pattern.name, 'url': self.service}
718 )
719 if self.ajax:
720 data = {"status": "error", "detail": "confirmation needed"}
721 return json_response(self.request, data)
722 else:
723 warn_form = forms.WarnForm(initial={
724 'service': self.service,
725 'renew': self.renew,
726 'gateway': self.gateway,
727 'method': self.method,
728 'warned': True,
729 'lt': self.request.session['lt'][-1]
730 })
731 return render(
732 self.request,
733 settings.CAS_WARN_TEMPLATE,
734 utils.context({'form': warn_form})
735 )
736 else:
737 # redirect, using method ?
738 list(messages.get_messages(self.request)) # clean messages before leaving django
739 redirect_url = self.user.get_service_url(
740 self.service,
741 service_pattern,
742 renew=self.renewed
743 )
744 if not self.ajax:
745 return HttpResponseRedirect(redirect_url)
746 else:
747 data = {"status": "success", "detail": "auth", "url": redirect_url}
748 return json_response(self.request, data)
749 except ServicePattern.DoesNotExist:
750 error = 1
751 messages.add_message(
752 self.request,
753 messages.ERROR,
754 _(u'Service %(url)s not allowed.') % {'url': self.service}
755 )
756 except models.BadUsername:
757 error = 2
758 messages.add_message(
759 self.request,
760 messages.ERROR,
761 _(u"Username not allowed")
762 )
763 except models.BadFilter:
764 error = 3
765 messages.add_message(
766 self.request,
767 messages.ERROR,
768 _(u"User characteristics not allowed")
769 )
770 except models.UserFieldNotDefined:
771 error = 4
772 messages.add_message(
773 self.request,
774 messages.ERROR,
775 _(u"The attribute %(field)s is needed to use"
776 u" that service") % {'field': service_pattern.user_field}
777 )
779 # if gateway is set and auth failed redirect to the service without authentication
780 if self.gateway and not self.ajax:
781 list(messages.get_messages(self.request)) # clean messages before leaving django
782 return HttpResponseRedirect(self.service)
784 if not self.ajax:
785 return render(
786 self.request,
787 settings.CAS_LOGGED_TEMPLATE,
788 utils.context({'session': self.request.session})
789 )
790 else:
791 data = {"status": "error", "detail": "auth", "code": error}
792 return json_response(self.request, data)
794 def authenticated(self):
795 """
796 Processing authenticated users
798 :return:
799 * The returned value of :meth:`service_login` if :attr:`service` is defined
800 * The rendering of ``settings.CAS_LOGGED_TEMPLATE`` otherwise
801 :rtype: django.http.HttpResponse
802 """
803 # Try to get the current :class:`models.User<cas_server.models.User>` object for the current
804 # session
805 try:
806 self.user = models.User.objects.get(
807 username=self.request.session.get("username"),
808 session_key=self.request.session.session_key
809 )
810 # if not found, flush the session and redirect to the login page
811 except models.User.DoesNotExist:
812 logger.warning(
813 "User %s seems authenticated but is not found in the database." % (
814 self.request.session.get("username"),
815 )
816 )
817 self.logout()
818 if self.ajax:
819 data = {
820 "status": "error",
821 "detail": "login required",
822 "url": utils.reverse_params("cas_server:login", params=self.request.GET)
823 }
824 return json_response(self.request, data)
825 else:
826 return utils.redirect_params("cas_server:login", params=self.request.GET)
828 # if login against a service
829 if self.service:
830 return self.service_login()
831 # else display the logged template
832 else:
833 if self.ajax:
834 data = {"status": "success", "detail": "logged"}
835 return json_response(self.request, data)
836 else:
837 return render(
838 self.request,
839 settings.CAS_LOGGED_TEMPLATE,
840 utils.context({'session': self.request.session})
841 )
843 def not_authenticated(self):
844 """
845 Processing non authenticated users
847 :return:
848 * The rendering of ``settings.CAS_LOGIN_TEMPLATE`` with various messages
849 depending of GET/POST parameters
850 * The redirection to :class:`FederateAuth` if ``settings.CAS_FEDERATE`` is ``True``
851 and the "remember my identity provider" cookie is found
852 :rtype: django.http.HttpResponse
853 """
854 if self.service:
855 try:
856 service_pattern = ServicePattern.validate(self.service)
857 if self.gateway and not self.ajax:
858 # clean messages before leaving django
859 list(messages.get_messages(self.request))
860 return HttpResponseRedirect(self.service)
862 if settings.CAS_SHOW_SERVICE_MESSAGES:
863 if self.request.session.get("authenticated") and self.renew:
864 messages.add_message(
865 self.request,
866 messages.WARNING,
867 _(u"Authentication renewal required by service %(name)s (%(url)s).") %
868 {'name': service_pattern.name, 'url': self.service}
869 )
870 else:
871 messages.add_message(
872 self.request,
873 messages.WARNING,
874 _(u"Authentication required by service %(name)s (%(url)s).") %
875 {'name': service_pattern.name, 'url': self.service}
876 )
877 except ServicePattern.DoesNotExist:
878 if settings.CAS_SHOW_SERVICE_MESSAGES:
879 messages.add_message(
880 self.request,
881 messages.ERROR,
882 _(u'Service %s not allowed') % self.service
883 )
884 if self.ajax:
885 data = {
886 "status": "error",
887 "detail": "login required",
888 "url": utils.reverse_params("cas_server:login", params=self.request.GET)
889 }
890 return json_response(self.request, data)
891 else:
892 if settings.CAS_FEDERATE:
893 if self.username and self.ticket:
894 return render(
895 self.request,
896 settings.CAS_LOGIN_TEMPLATE,
897 utils.context({
898 'form': self.form,
899 'auto_submit': True,
900 'post_url': reverse("cas_server:login")
901 })
902 )
903 else:
904 if (
905 self.request.COOKIES.get('remember_provider') and
906 FederatedIendityProvider.objects.filter(
907 suffix=self.request.COOKIES['remember_provider']
908 )
909 ):
910 params = utils.copy_params(self.request.GET)
911 url = utils.reverse_params(
912 "cas_server:federateAuth",
913 params=params,
914 kwargs=dict(provider=self.request.COOKIES['remember_provider'])
915 )
916 return HttpResponseRedirect(url)
917 else:
918 # if user is authenticated and auth renewal is requested, redirect directly
919 # to the user identity provider
920 if self.renew and self.request.session.get("authenticated"):
921 try:
922 user = FederatedUser.get_from_federated_username(
923 self.request.session.get("username")
924 )
925 params = utils.copy_params(self.request.GET)
926 url = utils.reverse_params(
927 "cas_server:federateAuth",
928 params=params,
929 kwargs=dict(provider=user.provider.suffix)
930 )
931 return HttpResponseRedirect(url)
932 # Should normally not happen: if the user is logged, it exists in the
933 # database.
934 except FederatedUser.DoesNotExist: # pragma: no cover
935 pass
936 return render(
937 self.request,
938 settings.CAS_LOGIN_TEMPLATE,
939 utils.context({
940 'form': self.form,
941 'post_url': reverse("cas_server:federateAuth")
942 })
943 )
944 else:
945 return render(
946 self.request,
947 settings.CAS_LOGIN_TEMPLATE,
948 utils.context({'form': self.form})
949 )
951 def common(self):
952 """
953 Common part execute uppon GET and POST request
955 :return:
956 * The returned value of :meth:`authenticated` if the user is authenticated and
957 not requesting for authentication or if the authentication has just been renewed
958 * The returned value of :meth:`not_authenticated` otherwise
959 :rtype: django.http.HttpResponse
960 """
961 # if authenticated and successfully renewed authentication if needed
962 if self.request.session.get("authenticated") and (not self.renew or self.renewed):
963 return self.authenticated()
964 else:
965 return self.not_authenticated()
968class Auth(CsrfExemptView):
969 """
970 A simple view to validate username/password/service tuple
972 csrf is disable as it is intended to be used by programs. Security is assured by a shared
973 secret between the programs dans django-cas-server.
974 """
976 @staticmethod
977 def post(request):
978 """
979 method called on POST request on this view
981 :param django.http.HttpRequest request: The current request object
982 :return: ``HttpResponse(u"yes\\n")`` if the POSTed tuple (username, password, service)
983 if valid (i.e. (username, password) is valid dans username is allowed on service).
984 ``HttpResponse(u"no\\n…")`` otherwise, with possibly an error message on the second
985 line.
986 :rtype: django.http.HttpResponse
987 """
988 username = request.POST.get('username')
989 password = request.POST.get('password')
990 service = request.POST.get('service')
991 secret = request.POST.get('secret')
993 if not settings.CAS_AUTH_SHARED_SECRET:
994 return HttpResponse(
995 "no\nplease set CAS_AUTH_SHARED_SECRET",
996 content_type="text/plain; charset=utf-8"
997 )
998 if secret != settings.CAS_AUTH_SHARED_SECRET:
999 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1000 if not username or not password or not service:
1001 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1002 form = forms.UserCredential(
1003 request.POST,
1004 initial={
1005 'service': service,
1006 'method': 'POST',
1007 'warn': False
1008 }
1009 )
1010 if form.is_valid():
1011 try:
1012 user = models.User.objects.get_or_create(
1013 username=form.cleaned_data['username'],
1014 session_key=request.session.session_key
1015 )[0]
1016 user.save()
1017 # is the service allowed
1018 service_pattern = ServicePattern.validate(service)
1019 # is the current user allowed on this service
1020 service_pattern.check_user(user)
1021 if not request.session.get("authenticated"):
1022 user.delete()
1023 return HttpResponse(u"yes\n", content_type="text/plain; charset=utf-8")
1024 except (ServicePattern.DoesNotExist, models.ServicePatternException):
1025 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1026 else:
1027 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1030class Validate(View):
1031 """service ticket validation"""
1032 @staticmethod
1033 def get(request):
1034 """
1035 method called on GET request on this view
1037 :param django.http.HttpRequest request: The current request object
1038 :return:
1039 * ``HttpResponse("yes\\nusername")`` if submited (service, ticket) is valid
1040 * else ``HttpResponse("no\\n")``
1041 :rtype: django.http.HttpResponse
1042 """
1043 # store wanted GET parameters
1044 service = request.GET.get('service')
1045 ticket = request.GET.get('ticket')
1046 renew = True if request.GET.get('renew') else False
1047 # service and ticket parameters are mandatory
1048 if service and ticket:
1049 try:
1050 # search for the ticket, associated at service that is not yet validated but is
1051 # still valid
1052 ticket = ServiceTicket.get(ticket, renew, service)
1053 logger.info(
1054 "Validate: Service ticket %s validated, user %s authenticated on service %s" % (
1055 ticket.value,
1056 ticket.user.username,
1057 ticket.service
1058 )
1059 )
1060 return HttpResponse(
1061 u"yes\n%s\n" % ticket.username(),
1062 content_type="text/plain; charset=utf-8"
1063 )
1064 except ServiceTicket.DoesNotExist:
1065 logger.warning(
1066 (
1067 "Validate: Service ticket %s not found or "
1068 "already validated, auth to %s failed"
1069 ) % (
1070 ticket,
1071 service
1072 )
1073 )
1074 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1075 else:
1076 logger.warning("Validate: service or ticket missing")
1077 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1080@python_2_unicode_compatible
1081class ValidationBaseError(Exception):
1082 """Base class for both saml and cas validation error"""
1084 #: The error code
1085 code = None
1086 #: The error message
1087 msg = None
1089 def __init__(self, code, msg=""):
1090 self.code = code
1091 self.msg = msg
1092 super(ValidationBaseError, self).__init__(code)
1094 def __str__(self):
1095 return u"%s" % self.msg
1097 def render(self, request):
1098 """
1099 render the error template for the exception
1101 :param django.http.HttpRequest request: The current request object:
1102 :return: the rendered ``cas_server/serviceValidateError.xml`` template
1103 :rtype: django.http.HttpResponse
1104 """
1105 return render(
1106 request,
1107 self.template,
1108 self.context(), content_type="text/xml; charset=utf-8"
1109 )
1112class ValidateError(ValidationBaseError):
1113 """handle service validation error"""
1115 #: template to be render for the error
1116 template = "cas_server/serviceValidateError.xml"
1118 def context(self):
1119 """
1120 content to use to render :attr:`template`
1122 :return: A dictionary to contextualize :attr:`template`
1123 :rtype: dict
1124 """
1125 return {'code': self.code, 'msg': self.msg}
1128class ValidateService(View):
1129 """service ticket validation [CAS 2.0] and [CAS 3.0]"""
1130 #: Current :class:`django.http.HttpRequest` object
1131 request = None
1132 #: The service GET parameter
1133 service = None
1134 #: the ticket GET parameter
1135 ticket = None
1136 #: the pgtUrl GET parameter
1137 pgt_url = None
1138 #: the renew GET parameter
1139 renew = None
1140 #: specify if ProxyTicket are allowed by the view. Hence we user the same view for
1141 #: ``/serviceValidate`` and ``/proxyValidate`` juste changing the parameter.
1142 allow_proxy_ticket = False
1144 def get(self, request):
1145 """
1146 method called on GET request on this view
1148 :param django.http.HttpRequest request: The current request object:
1149 :return: The rendering of ``cas_server/serviceValidate.xml`` if no errors is raised,
1150 the rendering or ``cas_server/serviceValidateError.xml`` otherwise.
1151 :rtype: django.http.HttpResponse
1152 """
1153 # define the class parameters
1154 self.request = request
1155 self.service = request.GET.get('service')
1156 self.ticket = request.GET.get('ticket')
1157 self.pgt_url = request.GET.get('pgtUrl')
1158 self.renew = True if request.GET.get('renew') else False
1160 # service and ticket parameter are mandatory
1161 if not self.service or not self.ticket:
1162 logger.warning("ValidateService: missing ticket or service")
1163 return ValidateError(
1164 u'INVALID_REQUEST',
1165 u"you must specify a service and a ticket"
1166 ).render(request)
1167 else:
1168 try:
1169 # search the ticket in the database
1170 self.ticket, proxies = self.process_ticket()
1171 # prepare template rendering context
1172 params = {
1173 'username': self.ticket.username(),
1174 'attributes': self.ticket.attributs_flat(),
1175 'proxies': proxies,
1176 'auth_date': self.ticket.user.last_login.replace(microsecond=0).isoformat(),
1177 'is_new_login': 'true' if self.ticket.renew else 'false'
1178 }
1179 # if pgtUrl is set, require https or localhost
1180 if self.pgt_url and (
1181 self.pgt_url.startswith("https://") or
1182 re.match(r"^http://(127\.0\.0\.1|localhost)(:[0-9]+)?(/.*)?$", self.pgt_url)
1183 ):
1184 return self.process_pgturl(params)
1185 else:
1186 logger.info(
1187 "ValidateService: ticket %s validated for user %s on service %s." % (
1188 self.ticket.value,
1189 self.ticket.user.username,
1190 self.ticket.service
1191 )
1192 )
1193 logger.debug(
1194 "ValidateService: User attributs are:\n%s" % (
1195 pprint.pformat(self.ticket.attributs),
1196 )
1197 )
1198 return render(
1199 request,
1200 "cas_server/serviceValidate.xml",
1201 params,
1202 content_type="text/xml; charset=utf-8"
1203 )
1204 except ValidateError as error:
1205 logger.warning(
1206 "ValidateService: validation error: %s %s" % (error.code, error.msg)
1207 )
1208 return error.render(request)
1210 def process_ticket(self):
1211 """
1212 fetch the ticket against the database and check its validity
1214 :raises ValidateError: if the ticket is not found or not valid, potentially for that
1215 service
1216 :returns: A couple (ticket, proxies list)
1217 :rtype: :obj:`tuple`
1218 """
1219 try:
1220 proxies = []
1221 if self.allow_proxy_ticket:
1222 ticket = models.Ticket.get(self.ticket, self.renew)
1223 else:
1224 ticket = models.ServiceTicket.get(self.ticket, self.renew)
1225 try:
1226 for prox in ticket.proxies.all():
1227 proxies.append(prox.url)
1228 except AttributeError:
1229 pass
1230 if ticket.service != self.service:
1231 raise ValidateError(u'INVALID_SERVICE', self.service)
1232 return ticket, proxies
1233 except Ticket.DoesNotExist:
1234 raise ValidateError(u'INVALID_TICKET', self.ticket)
1235 except (ServiceTicket.DoesNotExist, ProxyTicket.DoesNotExist):
1236 raise ValidateError(u'INVALID_TICKET', 'ticket not found')
1238 def process_pgturl(self, params):
1239 """
1240 Handle PGT request
1242 :param dict params: A template context dict
1243 :raises ValidateError: if pgtUrl is invalid or if TLS validation of the pgtUrl fails
1244 :return: The rendering of ``cas_server/serviceValidate.xml``, using ``params``
1245 :rtype: django.http.HttpResponse
1246 """
1247 try:
1248 pattern = ServicePattern.validate(self.pgt_url)
1249 if pattern.proxy_callback:
1250 proxyid = utils.gen_pgtiou()
1251 pticket = ProxyGrantingTicket.objects.create(
1252 user=self.ticket.user,
1253 service=self.pgt_url,
1254 service_pattern=pattern,
1255 single_log_out=pattern.single_log_out
1256 )
1257 url = utils.update_url(self.pgt_url, {'pgtIou': proxyid, 'pgtId': pticket.value})
1258 try:
1259 ret = requests.get(url, verify=settings.CAS_PROXY_CA_CERTIFICATE_PATH)
1260 if ret.status_code == 200:
1261 params['proxyGrantingTicket'] = proxyid
1262 else:
1263 pticket.delete()
1264 logger.info(
1265 (
1266 "ValidateService: ticket %s validated for user %s on service %s. "
1267 "Proxy Granting Ticket transmited to %s."
1268 ) % (
1269 self.ticket.value,
1270 self.ticket.user.username,
1271 self.ticket.service,
1272 self.pgt_url
1273 )
1274 )
1275 logger.debug(
1276 "ValidateService: User attributs are:\n%s" % (
1277 pprint.pformat(self.ticket.attributs),
1278 )
1279 )
1280 return render(
1281 self.request,
1282 "cas_server/serviceValidate.xml",
1283 params,
1284 content_type="text/xml; charset=utf-8"
1285 )
1286 except requests.exceptions.RequestException as error:
1287 error = utils.unpack_nested_exception(error)
1288 raise ValidateError(
1289 u'INVALID_PROXY_CALLBACK',
1290 u"%s: %s" % (type(error), str(error))
1291 )
1292 else:
1293 raise ValidateError(
1294 u'INVALID_PROXY_CALLBACK',
1295 u"callback url not allowed by configuration"
1296 )
1297 except ServicePattern.DoesNotExist:
1298 raise ValidateError(
1299 u'INVALID_PROXY_CALLBACK',
1300 u'callback url not allowed by configuration'
1301 )
1304class Proxy(View):
1305 """proxy ticket service"""
1307 #: Current :class:`django.http.HttpRequest` object
1308 request = None
1309 #: A ProxyGrantingTicket from the pgt GET parameter
1310 pgt = None
1311 #: the targetService GET parameter
1312 target_service = None
1314 def get(self, request):
1315 """
1316 method called on GET request on this view
1318 :param django.http.HttpRequest request: The current request object:
1319 :return: The returned value of :meth:`process_proxy` if no error is raised,
1320 else the rendering of ``cas_server/serviceValidateError.xml``.
1321 :rtype: django.http.HttpResponse
1322 """
1323 self.request = request
1324 self.pgt = request.GET.get('pgt')
1325 self.target_service = request.GET.get('targetService')
1326 try:
1327 # pgt and targetService parameters are mandatory
1328 if self.pgt and self.target_service:
1329 return self.process_proxy()
1330 else:
1331 raise ValidateError(
1332 u'INVALID_REQUEST',
1333 u"you must specify and pgt and targetService"
1334 )
1335 except ValidateError as error:
1336 logger.warning("Proxy: validation error: %s %s" % (error.code, error.msg))
1337 return error.render(request)
1339 def process_proxy(self):
1340 """
1341 handle PT request
1343 :raises ValidateError: if the PGT is not found, or the target service not allowed or
1344 the user not allowed on the tardet service.
1345 :return: The rendering of ``cas_server/proxy.xml``
1346 :rtype: django.http.HttpResponse
1347 """
1348 try:
1349 # is the target service allowed
1350 pattern = ServicePattern.validate(self.target_service)
1351 # to get a proxy ticket require that the service allow it
1352 if not pattern.proxy:
1353 raise ValidateError(
1354 u'UNAUTHORIZED_SERVICE',
1355 u'the service %s does not allow proxy tickets' % self.target_service
1356 )
1357 # is the proxy granting ticket valid
1358 ticket = ProxyGrantingTicket.get(self.pgt)
1359 # is the pgt user allowed on the target service
1360 pattern.check_user(ticket.user)
1361 pticket = ticket.user.get_ticket(
1362 ProxyTicket,
1363 self.target_service,
1364 pattern,
1365 renew=False
1366 )
1367 models.Proxy.objects.create(proxy_ticket=pticket, url=ticket.service)
1368 logger.info(
1369 "Proxy ticket created for user %s on service %s." % (
1370 ticket.user.username,
1371 self.target_service
1372 )
1373 )
1374 return render(
1375 self.request,
1376 "cas_server/proxy.xml",
1377 {'ticket': pticket.value},
1378 content_type="text/xml; charset=utf-8"
1379 )
1380 except (Ticket.DoesNotExist, ProxyGrantingTicket.DoesNotExist):
1381 raise ValidateError(u'INVALID_TICKET', u'PGT %s not found' % self.pgt)
1382 except ServicePattern.DoesNotExist:
1383 raise ValidateError(u'UNAUTHORIZED_SERVICE', self.target_service)
1384 except (models.BadUsername, models.BadFilter, models.UserFieldNotDefined):
1385 raise ValidateError(
1386 u'UNAUTHORIZED_USER',
1387 u'User %s not allowed on %s' % (ticket.user.username, self.target_service)
1388 )
1391class SamlValidateError(ValidationBaseError):
1392 """handle saml validation error"""
1394 #: template to be render for the error
1395 template = "cas_server/samlValidateError.xml"
1397 def context(self):
1398 """
1399 :return: A dictionary to contextualize :attr:`template`
1400 :rtype: dict
1401 """
1402 return {
1403 'code': self.code,
1404 'msg': self.msg,
1405 'IssueInstant': timezone.now().isoformat(),
1406 'ResponseID': utils.gen_saml_id()
1407 }
1410class SamlValidate(CsrfExemptView):
1411 """SAML ticket validation"""
1412 request = None
1413 target = None
1414 ticket = None
1415 root = None
1417 def post(self, request, *args, **kwargs):
1418 """
1419 method called on POST request on this view
1421 :param django.http.HttpRequest request: The current request object
1422 :return: the rendering of ``cas_server/samlValidate.xml`` if no error is raised,
1423 else the rendering of ``cas_server/samlValidateError.xml``.
1424 :rtype: django.http.HttpResponse
1425 """
1426 self.request = request
1427 self.target = request.GET.get('TARGET')
1428 self.root = etree.fromstring(request.body)
1429 try:
1430 self.ticket = self.process_ticket()
1431 expire_instant = (self.ticket.creation +
1432 timedelta(seconds=self.ticket.VALIDITY)).isoformat()
1433 params = {
1434 'IssueInstant': timezone.now().isoformat(),
1435 'expireInstant': expire_instant,
1436 'Recipient': self.target,
1437 'ResponseID': utils.gen_saml_id(),
1438 'username': self.ticket.username(),
1439 'attributes': self.ticket.attributs_flat(),
1440 'auth_date': self.ticket.user.last_login.replace(microsecond=0).isoformat(),
1441 'is_new_login': 'true' if self.ticket.renew else 'false'
1443 }
1444 logger.info(
1445 "SamlValidate: ticket %s validated for user %s on service %s." % (
1446 self.ticket.value,
1447 self.ticket.user.username,
1448 self.ticket.service
1449 )
1450 )
1451 logger.debug(
1452 "SamlValidate: User attributes are:\n%s" % pprint.pformat(self.ticket.attributs)
1453 )
1455 return render(
1456 request,
1457 "cas_server/samlValidate.xml",
1458 params,
1459 content_type="text/xml; charset=utf-8"
1460 )
1461 except SamlValidateError as error:
1462 logger.warning("SamlValidate: validation error: %s %s" % (error.code, error.msg))
1463 return error.render(request)
1465 def process_ticket(self):
1466 """
1467 validate ticket from SAML XML body
1469 :raises: SamlValidateError: if the ticket is not found or not valid, or if we fail
1470 to parse the posted XML.
1471 :return: a ticket object
1472 :rtype: :class:`models.Ticket<cas_server.models.Ticket>`
1473 """
1474 try:
1475 auth_req = self.root.getchildren()[1].getchildren()[0]
1476 ticket = auth_req.getchildren()[0].text
1477 ticket = models.Ticket.get(ticket)
1478 if ticket.service != self.target:
1479 raise SamlValidateError(
1480 u'AuthnFailed',
1481 u'TARGET %s does not match ticket service' % self.target
1482 )
1483 return ticket
1484 except (IndexError, KeyError):
1485 raise SamlValidateError(u'VersionMismatch')
1486 except Ticket.DoesNotExist:
1487 raise SamlValidateError(
1488 u'AuthnFailed',
1489 u'ticket %s should begin with PT- or ST-' % ticket
1490 )
1491 except (ServiceTicket.DoesNotExist, ProxyTicket.DoesNotExist):
1492 raise SamlValidateError(u'AuthnFailed', u'ticket %s not found' % ticket)