Coverage for cas_server/views.py: 94%
635 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-18 09:39 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-18 09:39 +0000
1# -*- coding: utf-8 -*-
2# This program is distributed in the hope that it will be useful, but WITHOUT
3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
5# more details.
6#
7# You should have received a copy of the GNU General Public License version 3
8# along with this program; if not, write to the Free Software Foundation, Inc., 51
9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
10#
11# (c) 2015-2025 Valentin Samir
12"""views for the app"""
13from .default_settings import settings, SessionStore
15from django.shortcuts import render, redirect
16from django.http import HttpResponse, HttpResponseRedirect
17from django.contrib import messages
18from django.utils.decorators import method_decorator
19from django.utils import timezone
20from django.views.decorators.csrf import csrf_exempt
21from django.middleware.csrf import CsrfViewMiddleware
22from django.views.generic import View
23try:
24 from django.utils.encoding import python_2_unicode_compatible
25 from django.utils.translation import ugettext as _
26except ImportError:
27 def python_2_unicode_compatible(func):
28 """
29 We use Django >= 3.0 with Python >= 3.4, we don't need Python 2 compatibility.
30 """
31 return func
32 from django.utils.translation import gettext as _
33from django.utils.safestring import mark_safe
34try:
35 from django.urls import reverse
36except ImportError:
37 from django.core.urlresolvers import reverse
39import re
40import base64
41import logging
42import pprint
43import requests
44from lxml import etree
45from datetime import timedelta
47if settings.CAS_AUTH_GSSAPI_ENABLE: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true
48 try:
49 import gssapi
50 except ImportError:
51 raise RuntimeError("Please install gssapi before using the Kerberos auth")
53import cas_server.utils as utils
54import cas_server.models as models
56from .utils import json_response, import_attr
57from .models import Ticket, ServiceTicket, ProxyTicket, ProxyGrantingTicket
58from .models import ServicePattern, FederatedIendityProvider, FederatedUser
59from .federate import CASFederateValidateUser
61logger = logging.getLogger(__name__)
64class LogoutMixin(object):
65 """destroy CAS session utils"""
67 def logout(self, all_session=False):
68 """
69 effectively destroy a CAS session
71 :param boolean all_session: If ``True`` destroy all the user sessions, otherwise
72 destroy the current user session.
73 :return: The number of destroyed sessions
74 :rtype: int
75 """
76 # initialize the counter of the number of destroyed sesisons
77 session_nb = 0
78 # save the current user username before flushing the session
79 username = self.request.session.get("username")
80 if username:
81 if all_session:
82 logger.info("Logging out user %s from all sessions." % username)
83 else:
84 logger.info("Logging out user %s." % username)
85 users = []
86 # try to get the user from the current session
87 try:
88 users.append(
89 models.User.objects.get(
90 username=username,
91 session_key=self.request.session.session_key
92 )
93 )
94 except models.User.DoesNotExist:
95 # if user not found in database, flush the session anyway
96 self.request.session.flush()
98 # If all_session is set, search all of the user sessions
99 if all_session:
100 users.extend(
101 models.User.objects.filter(
102 username=username
103 ).exclude(
104 session_key=self.request.session.session_key
105 )
106 )
108 # Iterate over all user sessions that have to be logged out
109 for user in users:
110 # get the user session
111 session = SessionStore(session_key=user.session_key)
112 # flush the session
113 session.flush()
114 # send SLO requests
115 user.logout(self.request)
116 # delete the user
117 user.delete()
118 # increment the destroyed session counter
119 session_nb += 1
120 if username:
121 logger.info("User %s logged out" % username)
122 return session_nb
125class CsrfExemptView(View):
126 """base class for csrf exempt class views"""
128 @method_decorator(csrf_exempt) # csrf is disabled for allowing SLO requests reception
129 def dispatch(self, request, *args, **kwargs):
130 """
131 dispatch different http request to the methods of the same name
133 :param django.http.HttpRequest request: The current request object
134 """
135 return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)
138class LogoutView(View, LogoutMixin):
139 """destroy CAS session (logout) view"""
141 #: current :class:`django.http.HttpRequest` object
142 request = None
143 #: service GET parameter
144 service = None
145 #: url GET paramet
146 url = None
147 #: ``True`` if the HTTP_X_AJAX http header is sent and ``settings.CAS_ENABLE_AJAX_AUTH``
148 #: is ``True``, ``False`` otherwise.
149 ajax = None
151 def init_get(self, request):
152 """
153 Initialize the :class:`LogoutView` attributes on GET request
155 :param django.http.HttpRequest request: The current request object
156 """
157 self.request = request
158 self.service = request.GET.get('service')
159 self.url = request.GET.get('url')
160 self.ajax = settings.CAS_ENABLE_AJAX_AUTH and 'HTTP_X_AJAX' in request.META
162 @staticmethod
163 def delete_cookies(response):
164 if settings.CAS_REMOVE_DJANGO_SESSION_COOKIE_ON_LOGOUT: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 response.delete_cookie(settings.SESSION_COOKIE_NAME)
166 if settings.CAS_REMOVE_DJANGO_CSRF_COOKIE_ON_LOGOUT: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 response.delete_cookie(settings.CSRF_COOKIE_NAME)
168 if settings.CAS_REMOVE_DJANGO_LANGUAGE_COOKIE_ON_LOGOUT: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 response.delete_cookie(settings.LANGUAGE_COOKIE_NAME)
170 return response
172 def get(self, request, *args, **kwargs):
173 """
174 method called on GET request on this view
176 :param django.http.HttpRequest request: The current request object
177 """
178 logger.info("logout requested")
179 # initialize the class attributes
180 self.init_get(request)
181 # if CAS federation mode is enable, bakup the provider before flushing the sessions
182 if settings.CAS_FEDERATE:
183 try:
184 user = FederatedUser.get_from_federated_username(
185 self.request.session.get("username")
186 )
187 auth = CASFederateValidateUser(user.provider, service_url="")
188 except FederatedUser.DoesNotExist:
189 auth = None
190 session_nb = self.logout(self.request.GET.get("all"))
191 # if CAS federation mode is enable, redirect to user CAS logout page, appending the
192 # current querystring
193 if settings.CAS_FEDERATE:
194 if auth is not None:
195 params = utils.copy_params(request.GET, ignore={"forget_provider"})
196 url = auth.get_logout_url()
197 response = HttpResponseRedirect(utils.update_url(url, params))
198 if request.GET.get("forget_provider"):
199 response.delete_cookie("remember_provider")
200 return self.delete_cookies(response)
201 # if service is set, redirect to service after logout
202 if self.service:
203 list(messages.get_messages(request)) # clean messages before leaving the django app
204 return self.delete_cookies(HttpResponseRedirect(self.service))
205 # if service is not set but url is set, redirect to url after logout
206 elif self.url:
207 list(messages.get_messages(request)) # clean messages before leaving the django app
208 return self.delete_cookies(HttpResponseRedirect(self.url))
209 else:
210 # build logout message depending of the number of sessions the user logs out
211 if session_nb == 1:
212 logout_msg = mark_safe(_(
213 "<h3>Logout successful</h3>"
214 "You have successfully logged out from the Central Authentication Service. "
215 "For security reasons, close your web browser."
216 ))
217 elif session_nb > 1:
218 logout_msg = mark_safe(_(
219 "<h3>Logout successful</h3>"
220 "You have successfully logged out from %d sessions of the Central "
221 "Authentication Service. "
222 "For security reasons, close your web browser."
223 ) % session_nb)
224 else:
225 logout_msg = mark_safe(_(
226 "<h3>Logout successful</h3>"
227 "You were already logged out from the Central Authentication Service. "
228 "For security reasons, close your web browser."
229 ))
231 # depending of settings, redirect to the login page with a logout message or display
232 # the logout page. The default is to display tge logout page.
233 if settings.CAS_REDIRECT_TO_LOGIN_AFTER_LOGOUT:
234 messages.add_message(request, messages.SUCCESS, logout_msg)
235 if self.ajax:
236 url = reverse("cas_server:login")
237 data = {
238 'status': 'success',
239 'detail': 'logout',
240 'url': url,
241 'session_nb': session_nb
242 }
243 return self.delete_cookies(json_response(request, data))
244 else:
245 return self.delete_cookies(redirect("cas_server:login"))
246 else:
247 if self.ajax:
248 data = {'status': 'success', 'detail': 'logout', 'session_nb': session_nb}
249 return self.delete_cookies(json_response(request, data))
250 else:
251 return self.delete_cookies(render(
252 request,
253 settings.CAS_LOGOUT_TEMPLATE,
254 utils.context({'logout_msg': logout_msg})
255 ))
258class FederateAuth(CsrfExemptView):
259 """
260 view to authenticated user against a backend CAS then CAS_FEDERATE is True
262 csrf is disabled for allowing SLO requests reception.
263 """
265 #: current URL used as service URL by the CAS client
266 service_url = None
268 def get_cas_client(self, request, provider, renew=False):
269 """
270 return a CAS client object matching provider
272 :param django.http.HttpRequest request: The current request object
273 :param cas_server.models.FederatedIendityProvider provider: the user identity provider
274 :return: The user CAS client object
275 :rtype: :class:`federate.CASFederateValidateUser
276 <cas_server.federate.CASFederateValidateUser>`
277 """
278 # compute the current url, ignoring ticket dans provider GET parameters
279 service_url = utils.get_current_url(request, {"ticket", "provider"})
280 self.service_url = service_url
281 return CASFederateValidateUser(provider, service_url, renew=renew)
283 def post(self, request, provider=None, *args, **kwargs):
284 """
285 method called on POST request
287 :param django.http.HttpRequest request: The current request object
288 :param unicode provider: Optional parameter. The user provider suffix.
289 """
290 # if settings.CAS_FEDERATE is not True redirect to the login page
291 if not settings.CAS_FEDERATE:
292 logger.warning("CAS_FEDERATE is False, set it to True to use federation")
293 return redirect("cas_server:login")
294 # POST with a provider suffix, this is probably an SLO request. csrf is disabled for
295 # allowing SLO requests reception
296 try:
297 provider = FederatedIendityProvider.objects.get(suffix=provider)
298 auth = self.get_cas_client(request, provider)
299 try:
300 auth.clean_sessions(request.POST['logoutRequest'])
301 except (KeyError, AttributeError):
302 pass
303 return HttpResponse("ok")
304 # else, a User is trying to log in using an identity provider
305 except FederatedIendityProvider.DoesNotExist:
306 # Manually checking for csrf to protect the code below
307 reason = CsrfViewMiddleware(lambda request: HttpResponse()) \
308 .process_view(request, None, (), {})
309 if reason is not None: # pragma: no cover (csrf checks are disabled during tests)
310 return reason # Failed the test, stop here.
311 form = import_attr(settings.CAS_FEDERATE_SELECT_FORM)(request.POST)
312 if form.is_valid():
313 params = utils.copy_params(
314 request.POST,
315 ignore={"provider", "csrfmiddlewaretoken", "ticket", "lt"}
316 )
317 if params.get("renew") == "False":
318 del params["renew"]
319 url = utils.reverse_params(
320 "cas_server:federateAuth",
321 kwargs=dict(provider=form.cleaned_data["provider"].suffix),
322 params=params
323 )
324 return HttpResponseRedirect(url)
325 else:
326 return redirect("cas_server:login")
328 def get(self, request, provider=None):
329 """
330 method called on GET request
332 :param django.http.HttpRequestself. request: The current request object
333 :param unicode provider: Optional parameter. The user provider suffix.
334 """
335 # if settings.CAS_FEDERATE is not True redirect to the login page
336 if not settings.CAS_FEDERATE:
337 logger.warning("CAS_FEDERATE is False, set it to True to use federation")
338 return redirect("cas_server:login")
339 renew = bool(request.GET.get('renew') and request.GET['renew'] != "False")
340 # Is the user is already authenticated, no need to request authentication to the user
341 # identity provider.
342 if self.request.session.get("authenticated") and not renew:
343 logger.warning("User already authenticated, dropping federated authentication request")
344 return redirect("cas_server:login")
345 try:
346 # get the identity provider from its suffix
347 provider = FederatedIendityProvider.objects.get(suffix=provider)
348 # get a CAS client for the user identity provider
349 auth = self.get_cas_client(request, provider, renew)
350 # if no ticket submited, redirect to the identity provider CAS login page
351 if 'ticket' not in request.GET:
352 logger.info("Trying to authenticate %s again" % auth.provider.server_url)
353 return HttpResponseRedirect(auth.get_login_url())
354 else:
355 ticket = request.GET['ticket']
356 try:
357 # if the ticket validation succeed
358 if auth.verify_ticket(ticket):
359 logger.info(
360 "Got a valid ticket for %s from %s" % (
361 auth.username,
362 auth.provider.server_url
363 )
364 )
365 params = utils.copy_params(request.GET, ignore={"ticket", "remember"})
366 request.session["federate_username"] = auth.federated_username
367 request.session["federate_ticket"] = ticket
368 auth.register_slo(
369 auth.federated_username,
370 request.session.session_key,
371 ticket
372 )
373 # redirect to the the login page for the user to become authenticated
374 # thanks to the `federate_username` and `federate_ticket` session parameters
375 url = utils.reverse_params("cas_server:login", params)
376 response = HttpResponseRedirect(url)
377 # If the user has checked "remember my identity provider" store it in a
378 # cookie
379 if request.GET.get("remember"):
380 max_age = settings.CAS_FEDERATE_REMEMBER_TIMEOUT
381 utils.set_cookie(
382 response,
383 "remember_provider",
384 provider.suffix,
385 max_age
386 )
387 return response
388 # else redirect to the identity provider CAS login page
389 else:
390 logger.info(
391 (
392 "Got an invalid ticket %s from %s for service %s. "
393 "Retrying authentication"
394 ) % (
395 ticket,
396 auth.provider.server_url,
397 self.service_url
398 )
399 )
400 return HttpResponseRedirect(auth.get_login_url())
401 # both xml.etree.ElementTree and lxml.etree exceptions inherit from SyntaxError
402 except SyntaxError as error:
403 messages.add_message(
404 request,
405 messages.ERROR,
406 _(
407 u"Invalid response from your identity provider CAS upon "
408 u"ticket %(ticket)s validation: %(error)r"
409 ) % {'ticket': ticket, 'error': error}
410 )
411 response = redirect("cas_server:login")
412 response.delete_cookie("remember_provider")
413 return response
414 except FederatedIendityProvider.DoesNotExist:
415 logger.warning("Identity provider suffix %s not found" % provider)
416 # if the identity provider is not found, redirect to the login page
417 return redirect("cas_server:login")
420class LoginView(View, LogoutMixin):
421 """credential requestor / acceptor"""
423 # pylint: disable=too-many-instance-attributes
424 # Nine is reasonable in this case.
426 #: The current :class:`models.User<cas_server.models.User>` object
427 user = None
428 #: The form to display to the user
429 form = None
431 #: current :class:`django.http.HttpRequest` object
432 request = None
433 #: service GET/POST parameter
434 service = None
435 #: ``True`` if renew GET/POST parameter is present and not "False"
436 renew = None
437 #: the warn GET/POST parameter
438 warn = None
439 #: the gateway GET/POST parameter
440 gateway = None
441 #: the method GET/POST parameter
442 method = None
444 #: ``True`` if the HTTP_X_AJAX http header is sent and ``settings.CAS_ENABLE_AJAX_AUTH``
445 #: is ``True``, ``False`` otherwise.
446 ajax = None
448 #: ``True`` if the user has just authenticated
449 renewed = False
450 #: ``True`` if renew GET/POST parameter is present and not "False"
451 warned = False
453 #: The :class:`FederateAuth` transmited username (only used if ``settings.CAS_FEDERATE``
454 #: is ``True``)
455 username = None
456 #: The :class:`FederateAuth` transmited ticket (only used if ``settings.CAS_FEDERATE`` is
457 #: ``True``)
458 ticket = None
460 INVALID_LOGIN_TICKET = 1
461 USER_LOGIN_OK = 2
462 USER_LOGIN_FAILURE = 3
463 USER_ALREADY_LOGGED = 4
464 USER_AUTHENTICATED = 5
465 USER_NOT_AUTHENTICATED = 6
467 def init_post(self, request):
468 """
469 Initialize POST received parameters
471 :param django.http.HttpRequest request: The current request object
472 """
473 self.request = request
474 self.service = request.POST.get('service')
475 self.renew = bool(request.POST.get('renew') and request.POST['renew'] != "False")
476 self.gateway = request.POST.get('gateway')
477 self.method = request.POST.get('method')
478 self.ajax = settings.CAS_ENABLE_AJAX_AUTH and 'HTTP_X_AJAX' in request.META
479 if request.POST.get('warned') and request.POST['warned'] != "False":
480 self.warned = True
481 self.warn = request.POST.get('warn')
482 if settings.CAS_FEDERATE:
483 self.username = request.POST.get('username')
484 # in federated mode, the valdated indentity provider CAS ticket is used as password
485 self.ticket = request.POST.get('password')
487 def gen_lt(self):
488 """Generate a new LoginTicket and add it to the list of valid LT for the user"""
489 self.request.session['lt'] = self.request.session.get('lt', []) + [utils.gen_lt()]
490 if len(self.request.session['lt']) > 100:
491 self.request.session['lt'] = self.request.session['lt'][-100:]
493 def check_lt(self):
494 """
495 Check is the POSTed LoginTicket is valid, if yes invalide it
497 :return: ``True`` if the LoginTicket is valid, ``False`` otherwise
498 :rtype: bool
499 """
500 # save LT for later check
501 lt_valid = self.request.session.get('lt', [])
502 lt_send = self.request.POST.get('lt')
503 # generate a new LT (by posting the LT has been consumed)
504 self.gen_lt()
505 # check if send LT is valid
506 if lt_send not in lt_valid:
507 return False
508 else:
509 self.request.session['lt'].remove(lt_send)
510 # we need to redo the affectation for django to detect that the list has changed
511 # and for its new value to be store in the session
512 self.request.session['lt'] = self.request.session['lt']
513 return True
515 def post(self, request, *args, **kwargs):
516 """
517 method called on POST request on this view
519 :param django.http.HttpRequest request: The current request object
520 """
521 # initialize class parameters
522 self.init_post(request)
523 # process the POST request
524 ret = self.process_post()
525 if ret == self.INVALID_LOGIN_TICKET:
526 messages.add_message(
527 self.request,
528 messages.ERROR,
529 _(u"Invalid login ticket, please try to log in again")
530 )
531 elif ret == self.USER_LOGIN_OK:
532 pass
533 elif ret == self.USER_LOGIN_FAILURE: # bad user login
534 if settings.CAS_FEDERATE:
535 self.ticket = None
536 self.username = None
537 self.init_form()
538 # preserve valid LoginTickets from session flush
539 lt = self.request.session.get('lt', [])
540 # On login failure, flush the session
541 self.logout()
542 # restore valid LoginTickets
543 self.request.session['lt'] = lt
544 elif ret == self.USER_ALREADY_LOGGED:
545 pass
546 else: # pragma: no cover (should no happen)
547 raise EnvironmentError("invalid output for LoginView.process_post")
548 # call the GET/POST common part
549 response = self.common()
550 if self.warn:
551 utils.set_cookie(
552 response,
553 "warn",
554 "on",
555 10 * 365 * 24 * 3600
556 )
557 else:
558 response.delete_cookie("warn")
559 return response
561 def user_login(self, username, warn, warned=True):
562 self.request.session.set_expiry(0)
563 self.request.session["username"] = username
564 self.request.session["warn"] = warn
565 self.request.session["authenticated"] = True
566 self.renewed = True
567 self.warned = warned
569 # On successful login, update the :class:`models.User<cas_server.models.User>` ``date``
570 # attribute by saving it. (``auto_now=True``)
571 self.user = models.User.objects.get_or_create(
572 username=self.request.session['username'],
573 session_key=self.request.session.session_key
574 )[0]
575 self.user.last_login = timezone.now()
576 self.user.save()
578 logger.info("User %s successfully authenticated" % username)
580 def process_post(self):
581 """
582 Analyse the POST request:
584 * check that the LoginTicket is valid
585 * check that the user sumited credentials are valid
587 :return:
588 * :attr:`INVALID_LOGIN_TICKET` if the POSTed LoginTicket is not valid
589 * :attr:`USER_ALREADY_LOGGED` if the user is already logged and do no request
590 reauthentication.
591 * :attr:`USER_LOGIN_FAILURE` if the user is not logged or request for
592 reauthentication and his credentials are not valid
593 * :attr:`USER_LOGIN_OK` if the user is not logged or request for
594 reauthentication and his credentials are valid
595 :rtype: int
596 """
597 if not self.check_lt():
598 self.init_form(self.request.POST)
599 logger.warning("Received an invalid login ticket")
600 return self.INVALID_LOGIN_TICKET
601 elif not self.request.session.get("authenticated") or self.renew:
602 # authentication request receive, initialize the form to use
603 self.init_form(self.request.POST)
604 if self.form.is_valid():
605 self.user_login(
606 self.form.cleaned_data['username'],
607 True if self.form.cleaned_data.get("warn") else False,
608 warned=True
609 )
610 return self.USER_LOGIN_OK
611 else:
612 logger.warning("A login attempt failed")
613 return self.USER_LOGIN_FAILURE
614 else:
615 logger.warning("Received a login attempt for an already-active user")
616 return self.USER_ALREADY_LOGGED
618 def init_get(self, request):
619 """
620 Initialize GET received parameters
622 :param django.http.HttpRequest request: The current request object
623 """
624 self.request = request
625 self.service = request.GET.get('service')
626 self.renew = bool(request.GET.get('renew') and request.GET['renew'] != "False")
627 self.gateway = request.GET.get('gateway')
628 self.method = request.GET.get('method')
629 self.ajax = settings.CAS_ENABLE_AJAX_AUTH and 'HTTP_X_AJAX' in request.META
630 self.warn = request.GET.get('warn')
631 if settings.CAS_FEDERATE:
632 # here username and ticket are fetch from the session after a redirection from
633 # FederateAuth.get
634 self.username = request.session.get("federate_username")
635 self.ticket = request.session.get("federate_ticket")
636 if self.username:
637 del request.session["federate_username"]
638 if self.ticket:
639 del request.session["federate_ticket"]
641 def gssapi_auth(self):
642 self.request.session['GSSAPI'] = False
643 # read http header Authorization
644 auth_header = self.request.META.get('HTTP_AUTHORIZATION', None)
645 # if header is missing or do not match SPNEGO, start GSSAPI auth
646 if auth_header is None or not auth_header.startswith('Negotiate '):
647 # call the GET/POST common part
648 response = self.common()
649 response.status_code = 401
650 # demande la négo GSSAPI
651 response['WWW-Authenticate'] = 'Negotiate '
652 return response
653 else:
654 try:
655 try:
656 server_creds = gssapi.Credentials(
657 usage='accept',
658 name=gssapi.Name(settings.CAS_AUTH_GSSAPI_SERVICENAME)
659 )
660 except gssapi.exceptions.GSSError as error:
661 logger.error("Fail to create GSSAPI credentials objects: %s" % (error,))
662 server_creds = None
663 # decode and check auth header
664 tok = base64.b64decode(auth_header[len('Negotiate '):])
665 ctx = gssapi.SecurityContext(creds=server_creds, usage='accept')
666 server_tok = ctx.step(tok)
667 b64tok = base64.b64encode(server_tok)
668 # If GSSAPI auth is successful
669 if ctx.complete:
670 # retreive the user username (remove the realm part)
671 username = str(ctx.initiator_name).split('@', 1)[0]
672 self.user_login(
673 username,
674 warn=False,
675 warned=False
676 )
677 self.request.session['GSSAPI'] = True
678 # call the GET/POST common part
679 return self.common()
680 # sinon, ou la négo continue, ou l'utilisateur peut utiliser le form login/pass
681 else:
682 # call the GET/POST common part
683 response = self.common()
684 response.status_code = 401
685 response['WWW-Authenticate'] = 'Negotiate ' + b64tok.decode('ascii')
686 return response
687 except gssapi.exceptions.GSSError as error:
688 logger.error("Error during GSSAPI handshake: %s" % (error,))
689 # call the GET/POST common part
690 return self.common()
692 def get(self, request, *args, **kwargs):
693 """
694 method called on GET request on this view
696 :param django.http.HttpRequest request: The current request object
697 """
698 # initialize class parameters
699 self.init_get(request)
700 # process the GET request
701 self.process_get()
702 if ( 702 ↛ 707line 702 didn't jump to line 707 because the condition on line 702 was never true
703 not settings.CAS_FEDERATE and
704 settings.CAS_AUTH_GSSAPI_ENABLE and
705 not request.session.get("authenticated")
706 ):
707 return self.gssapi_auth()
708 else:
709 # call the GET/POST common part
710 return self.common()
712 def process_get(self):
713 """
714 Analyse the GET request
716 :return:
717 * :attr:`USER_NOT_AUTHENTICATED` if the user is not authenticated or is requesting
718 for authentication renewal
719 * :attr:`USER_AUTHENTICATED` if the user is authenticated and is not requesting
720 for authentication renewal
721 :rtype: int
722 """
723 # generate a new LT
724 self.gen_lt()
725 if not self.request.session.get("authenticated") or self.renew:
726 # authentication will be needed, initialize the form to use
727 self.init_form()
728 return self.USER_NOT_AUTHENTICATED
729 return self.USER_AUTHENTICATED
731 def init_form(self, values=None):
732 """
733 Initialization of the good form depending of POST and GET parameters
735 :param django.http.QueryDict values: A POST or GET QueryDict
736 """
737 if values:
738 values = values.copy()
739 values['lt'] = self.request.session['lt'][-1]
740 form_initial = {
741 'service': self.service,
742 'method': self.method,
743 'warn': (
744 self.warn or self.request.session.get("warn") or self.request.COOKIES.get('warn')
745 ),
746 'lt': self.request.session['lt'][-1],
747 'renew': self.renew
748 }
749 if settings.CAS_FEDERATE:
750 if self.username and self.ticket:
751 form_initial['username'] = self.username
752 form_initial['password'] = self.ticket
753 form_initial['ticket'] = self.ticket
754 self.form = import_attr(settings.CAS_FEDERATE_USER_CREDENTIAL_FORM)(
755 values,
756 initial=form_initial
757 )
758 else:
759 self.form = import_attr(
760 settings.CAS_FEDERATE_SELECT_FORM
761 )(values, initial=form_initial)
762 else:
763 self.form = import_attr(settings.CAS_USER_CREDENTIAL_FORM)(
764 values,
765 initial=form_initial
766 )
768 def service_login(self):
769 """
770 Perform login against a service
772 :return:
773 * The rendering of the ``settings.CAS_WARN_TEMPLATE`` if the user asked to be
774 warned before ticket emission and has not yep been warned.
775 * The redirection to the service URL with a ticket GET parameter
776 * The redirection to the service URL without a ticket if ticket generation failed
777 and the :attr:`gateway` attribute is set
778 * The rendering of the ``settings.CAS_LOGGED_TEMPLATE`` template with some error
779 messages if the ticket generation failed (e.g: user not allowed).
780 :rtype: django.http.HttpResponse
781 """
782 try:
783 # is the service allowed
784 service_pattern = ServicePattern.validate(self.service)
785 # is the current user allowed on this service
786 service_pattern.check_user(self.user)
787 # if the user has asked to be warned before any login to a service
788 if self.request.session.get("warn", True) and not self.warned:
789 messages.add_message(
790 self.request,
791 messages.WARNING,
792 _(u"Authentication has been required by service %(name)s (%(url)s)") %
793 {'name': service_pattern.name, 'url': self.service}
794 )
795 if self.ajax:
796 data = {"status": "error", "detail": "confirmation needed"}
797 return json_response(self.request, data)
798 else:
799 warn_form = import_attr(settings.CAS_WARN_FORM)(initial={
800 'service': self.service,
801 'renew': self.renew,
802 'gateway': self.gateway,
803 'method': self.method,
804 'warned': True,
805 'lt': self.request.session['lt'][-1]
806 })
807 return render(
808 self.request,
809 settings.CAS_WARN_TEMPLATE,
810 utils.context({'form': warn_form})
811 )
812 else:
813 # redirect, using method ?
814 list(messages.get_messages(self.request)) # clean messages before leaving django
815 redirect_url = self.user.get_service_url(
816 self.service,
817 service_pattern,
818 renew=self.renewed
819 )
820 if not self.ajax:
821 return HttpResponseRedirect(redirect_url)
822 else:
823 data = {"status": "success", "detail": "auth", "url": redirect_url}
824 return json_response(self.request, data)
825 except ServicePattern.DoesNotExist:
826 error = 1
827 messages.add_message(
828 self.request,
829 messages.ERROR,
830 _(u'Service %(url)s not allowed.') % {'url': self.service}
831 )
832 except models.BadUsername:
833 error = 2
834 messages.add_message(
835 self.request,
836 messages.ERROR,
837 _(u"Username not allowed")
838 )
839 except models.BadFilter:
840 error = 3
841 messages.add_message(
842 self.request,
843 messages.ERROR,
844 _(u"User characteristics not allowed")
845 )
846 except models.UserFieldNotDefined:
847 error = 4
848 messages.add_message(
849 self.request,
850 messages.ERROR,
851 _(u"The attribute %(field)s is needed to use"
852 u" that service") % {'field': service_pattern.user_field}
853 )
855 # if gateway is set and auth failed redirect to the service without authentication
856 if self.gateway and not self.ajax:
857 list(messages.get_messages(self.request)) # clean messages before leaving django
858 return HttpResponseRedirect(self.service)
860 if not self.ajax:
861 return render(
862 self.request,
863 settings.CAS_LOGGED_TEMPLATE,
864 utils.context({'session': self.request.session})
865 )
866 else:
867 data = {"status": "error", "detail": "auth", "code": error}
868 return json_response(self.request, data)
870 def authenticated(self):
871 """
872 Processing authenticated users
874 :return:
875 * The returned value of :meth:`service_login` if :attr:`service` is defined
876 * The rendering of ``settings.CAS_LOGGED_TEMPLATE`` otherwise
877 :rtype: django.http.HttpResponse
878 """
879 # Try to get the current :class:`models.User<cas_server.models.User>` object for the current
880 # session
881 try:
882 self.user = models.User.objects.get(
883 username=self.request.session.get("username"),
884 session_key=self.request.session.session_key
885 )
886 # if not found, flush the session and redirect to the login page
887 except models.User.DoesNotExist:
888 logger.warning(
889 "User %s seems authenticated but is not found in the database." % (
890 self.request.session.get("username"),
891 )
892 )
893 self.logout()
894 if self.ajax:
895 data = {
896 "status": "error",
897 "detail": "login required",
898 "url": utils.reverse_params("cas_server:login", params=self.request.GET)
899 }
900 return json_response(self.request, data)
901 else:
902 return utils.redirect_params("cas_server:login", params=self.request.GET)
904 # if login against a service
905 if self.service:
906 return self.service_login()
907 # else display the logged template
908 else:
909 if self.ajax:
910 data = {"status": "success", "detail": "logged"}
911 return json_response(self.request, data)
912 else:
913 return render(
914 self.request,
915 settings.CAS_LOGGED_TEMPLATE,
916 utils.context({'session': self.request.session})
917 )
919 def not_authenticated(self):
920 """
921 Processing non authenticated users
923 :return:
924 * The rendering of ``settings.CAS_LOGIN_TEMPLATE`` with various messages
925 depending of GET/POST parameters
926 * The redirection to :class:`FederateAuth` if ``settings.CAS_FEDERATE`` is ``True``
927 and the "remember my identity provider" cookie is found
928 :rtype: django.http.HttpResponse
929 """
930 if self.service:
931 try:
932 service_pattern = ServicePattern.validate(self.service)
933 if self.gateway and not self.ajax:
934 # clean messages before leaving django
935 list(messages.get_messages(self.request))
936 return HttpResponseRedirect(self.service)
938 if settings.CAS_SHOW_SERVICE_MESSAGES:
939 if self.request.session.get("authenticated") and self.renew:
940 messages.add_message(
941 self.request,
942 messages.WARNING,
943 _(u"Authentication renewal required by service %(name)s (%(url)s).") %
944 {'name': service_pattern.name, 'url': self.service}
945 )
946 else:
947 messages.add_message(
948 self.request,
949 messages.WARNING,
950 _(u"Authentication required by service %(name)s (%(url)s).") %
951 {'name': service_pattern.name, 'url': self.service}
952 )
953 except ServicePattern.DoesNotExist:
954 if settings.CAS_SHOW_SERVICE_MESSAGES:
955 messages.add_message(
956 self.request,
957 messages.ERROR,
958 _(u'Service %s not allowed') % self.service
959 )
960 if self.ajax:
961 data = {
962 "status": "error",
963 "detail": "login required",
964 "url": utils.reverse_params("cas_server:login", params=self.request.GET)
965 }
966 return json_response(self.request, data)
967 else:
968 if settings.CAS_FEDERATE:
969 if self.username and self.ticket:
970 return render(
971 self.request,
972 settings.CAS_LOGIN_TEMPLATE,
973 utils.context({
974 'form': self.form,
975 'auto_submit': True,
976 'post_url': reverse("cas_server:login")
977 })
978 )
979 else:
980 if (
981 self.request.COOKIES.get('remember_provider') and
982 FederatedIendityProvider.objects.filter(
983 suffix=self.request.COOKIES['remember_provider']
984 )
985 ):
986 params = utils.copy_params(self.request.GET)
987 url = utils.reverse_params(
988 "cas_server:federateAuth",
989 params=params,
990 kwargs=dict(provider=self.request.COOKIES['remember_provider'])
991 )
992 return HttpResponseRedirect(url)
993 else:
994 # if user is authenticated and auth renewal is requested, redirect directly
995 # to the user identity provider
996 if self.renew and self.request.session.get("authenticated"):
997 try:
998 user = FederatedUser.get_from_federated_username(
999 self.request.session.get("username")
1000 )
1001 params = utils.copy_params(self.request.GET)
1002 url = utils.reverse_params(
1003 "cas_server:federateAuth",
1004 params=params,
1005 kwargs=dict(provider=user.provider.suffix)
1006 )
1007 return HttpResponseRedirect(url)
1008 # Should normally not happen: if the user is logged, it exists in the
1009 # database.
1010 except FederatedUser.DoesNotExist: # pragma: no cover
1011 pass
1012 return render(
1013 self.request,
1014 settings.CAS_LOGIN_TEMPLATE,
1015 utils.context({
1016 'form': self.form,
1017 'post_url': reverse("cas_server:federateAuth")
1018 })
1019 )
1020 else:
1021 return render(
1022 self.request,
1023 settings.CAS_LOGIN_TEMPLATE,
1024 utils.context({'form': self.form})
1025 )
1027 def common(self):
1028 """
1029 Common part execute uppon GET and POST request
1031 :return:
1032 * The returned value of :meth:`authenticated` if the user is authenticated and
1033 not requesting for authentication or if the authentication has just been renewed
1034 * The returned value of :meth:`not_authenticated` otherwise
1035 :rtype: django.http.HttpResponse
1036 """
1037 # if authenticated and successfully renewed authentication if needed
1038 if self.request.session.get("authenticated") and (not self.renew or self.renewed):
1039 return self.authenticated()
1040 else:
1041 return self.not_authenticated()
1044class Auth(CsrfExemptView):
1045 """
1046 A simple view to validate username/password/service tuple
1048 csrf is disable as it is intended to be used by programs. Security is assured by a shared
1049 secret between the programs dans django-cas-server.
1050 """
1052 @staticmethod
1053 def post(request):
1054 """
1055 method called on POST request on this view
1057 :param django.http.HttpRequest request: The current request object
1058 :return: ``HttpResponse(u"yes\\n")`` if the POSTed tuple (username, password, service)
1059 if valid (i.e. (username, password) is valid dans username is allowed on service).
1060 ``HttpResponse(u"no\\n…")`` otherwise, with possibly an error message on the second
1061 line.
1062 :rtype: django.http.HttpResponse
1063 """
1064 username = request.POST.get('username')
1065 password = request.POST.get('password')
1066 service = request.POST.get('service')
1067 secret = request.POST.get('secret')
1069 if not settings.CAS_AUTH_SHARED_SECRET:
1070 return HttpResponse(
1071 "no\nplease set CAS_AUTH_SHARED_SECRET",
1072 content_type="text/plain; charset=utf-8"
1073 )
1074 if secret != settings.CAS_AUTH_SHARED_SECRET:
1075 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1076 if not username or not password or not service:
1077 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1078 form = import_attr(settings.CAS_USER_CREDENTIAL_FORM)(
1079 request.POST,
1080 initial={
1081 'service': service,
1082 'method': 'POST',
1083 'warn': False
1084 }
1085 )
1086 if form.is_valid():
1087 try:
1088 user = models.User.objects.get_or_create(
1089 username=form.cleaned_data['username'],
1090 session_key=request.session.session_key
1091 )[0]
1092 user.save()
1093 # is the service allowed
1094 service_pattern = ServicePattern.validate(service)
1095 # is the current user allowed on this service
1096 service_pattern.check_user(user)
1097 if not request.session.get("authenticated"):
1098 user.delete()
1099 return HttpResponse(u"yes\n", content_type="text/plain; charset=utf-8")
1100 except (ServicePattern.DoesNotExist, models.ServicePatternException):
1101 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1102 else:
1103 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1106class Validate(View):
1107 """service ticket validation"""
1108 @staticmethod
1109 def get(request):
1110 """
1111 method called on GET request on this view
1113 :param django.http.HttpRequest request: The current request object
1114 :return:
1115 * ``HttpResponse("yes\\nusername")`` if submited (service, ticket) is valid
1116 * else ``HttpResponse("no\\n")``
1117 :rtype: django.http.HttpResponse
1118 """
1119 # store wanted GET parameters
1120 service = request.GET.get('service')
1121 ticket = request.GET.get('ticket')
1122 renew = True if request.GET.get('renew') else False
1123 # service and ticket parameters are mandatory
1124 if service and ticket:
1125 try:
1126 # search for the ticket, associated at service that is not yet validated but is
1127 # still valid
1128 ticket = ServiceTicket.get(ticket, renew, service)
1129 logger.info(
1130 "Validate: Service ticket %s validated, user %s authenticated on service %s" % (
1131 ticket.value,
1132 ticket.user.username,
1133 ticket.service
1134 )
1135 )
1136 return HttpResponse(
1137 u"yes\n%s\n" % ticket.username(),
1138 content_type="text/plain; charset=utf-8"
1139 )
1140 except ServiceTicket.DoesNotExist:
1141 logger.warning(
1142 (
1143 "Validate: Service ticket %s not found or "
1144 "already validated, auth to %s failed"
1145 ) % (
1146 ticket,
1147 service
1148 )
1149 )
1150 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1151 else:
1152 logger.warning("Validate: service or ticket missing")
1153 return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
1156@python_2_unicode_compatible
1157class ValidationBaseError(Exception):
1158 """Base class for both saml and cas validation error"""
1160 #: The error code
1161 code = None
1162 #: The error message
1163 msg = None
1165 def __init__(self, code, msg=""):
1166 self.code = code
1167 self.msg = msg
1168 super(ValidationBaseError, self).__init__(code)
1170 def __str__(self):
1171 return u"%s" % self.msg
1173 def render(self, request):
1174 """
1175 render the error template for the exception
1177 :param django.http.HttpRequest request: The current request object:
1178 :return: the rendered ``cas_server/serviceValidateError.xml`` template
1179 :rtype: django.http.HttpResponse
1180 """
1181 return render(
1182 request,
1183 self.template,
1184 self.context(), content_type="text/xml; charset=utf-8"
1185 )
1188class ValidateError(ValidationBaseError):
1189 """handle service validation error"""
1191 #: template to be render for the error
1192 template = "cas_server/serviceValidateError.xml"
1194 def context(self):
1195 """
1196 content to use to render :attr:`template`
1198 :return: A dictionary to contextualize :attr:`template`
1199 :rtype: dict
1200 """
1201 return {'code': self.code, 'msg': self.msg}
1204class ValidateService(View):
1205 """service ticket validation [CAS 2.0] and [CAS 3.0]"""
1206 #: Current :class:`django.http.HttpRequest` object
1207 request = None
1208 #: The service GET parameter
1209 service = None
1210 #: the ticket GET parameter
1211 ticket = None
1212 #: the pgtUrl GET parameter
1213 pgt_url = None
1214 #: the renew GET parameter
1215 renew = None
1216 #: specify if ProxyTicket are allowed by the view. Hence we user the same view for
1217 #: ``/serviceValidate`` and ``/proxyValidate`` juste changing the parameter.
1218 allow_proxy_ticket = False
1220 def get(self, request):
1221 """
1222 method called on GET request on this view
1224 :param django.http.HttpRequest request: The current request object:
1225 :return: The rendering of ``cas_server/serviceValidate.xml`` if no errors is raised,
1226 the rendering or ``cas_server/serviceValidateError.xml`` otherwise.
1227 :rtype: django.http.HttpResponse
1228 """
1229 # define the class parameters
1230 self.request = request
1231 self.service = request.GET.get('service')
1232 self.ticket = request.GET.get('ticket')
1233 self.pgt_url = request.GET.get('pgtUrl')
1234 self.renew = True if request.GET.get('renew') else False
1236 # service and ticket parameter are mandatory
1237 if not self.service or not self.ticket:
1238 logger.warning("ValidateService: missing ticket or service")
1239 return ValidateError(
1240 u'INVALID_REQUEST',
1241 u"you must specify a service and a ticket"
1242 ).render(request)
1243 else:
1244 try:
1245 # search the ticket in the database
1246 self.ticket, proxies = self.process_ticket()
1247 # prepare template rendering context
1248 params = {
1249 'username': self.ticket.username(),
1250 'attributes': self.ticket.attributs_flat(),
1251 'proxies': proxies,
1252 'auth_date': self.ticket.user.last_login.replace(microsecond=0).isoformat(),
1253 'is_new_login': 'true' if self.ticket.renew else 'false'
1254 }
1255 # if pgtUrl is set, require https or localhost
1256 if self.pgt_url and (
1257 self.pgt_url.startswith("https://") or
1258 re.match(r"^http://(127\.0\.0\.1|localhost)(:[0-9]+)?(/.*)?$", self.pgt_url)
1259 ):
1260 return self.process_pgturl(params)
1261 else:
1262 logger.info(
1263 "ValidateService: ticket %s validated for user %s on service %s." % (
1264 self.ticket.value,
1265 self.ticket.user.username,
1266 self.ticket.service
1267 )
1268 )
1269 logger.debug(
1270 "ValidateService: User attributs are:\n%s" % (
1271 pprint.pformat(self.ticket.attributs),
1272 )
1273 )
1274 return render(
1275 request,
1276 "cas_server/serviceValidate.xml",
1277 params,
1278 content_type="text/xml; charset=utf-8"
1279 )
1280 except ValidateError as error:
1281 logger.warning(
1282 "ValidateService: validation error: %s %s" % (error.code, error.msg)
1283 )
1284 return error.render(request)
1286 def process_ticket(self):
1287 """
1288 fetch the ticket against the database and check its validity
1290 :raises ValidateError: if the ticket is not found or not valid, potentially for that
1291 service
1292 :returns: A couple (ticket, proxies list)
1293 :rtype: :obj:`tuple`
1294 """
1295 try:
1296 proxies = []
1297 if self.allow_proxy_ticket:
1298 ticket = models.Ticket.get(self.ticket, self.renew)
1299 else:
1300 ticket = models.ServiceTicket.get(self.ticket, self.renew)
1301 try:
1302 for prox in ticket.proxies.all():
1303 proxies.append(prox.url)
1304 except AttributeError:
1305 pass
1306 if ticket.service != self.service:
1307 raise ValidateError(u'INVALID_SERVICE', self.service)
1308 return ticket, proxies
1309 except Ticket.DoesNotExist:
1310 raise ValidateError(u'INVALID_TICKET', self.ticket)
1311 except (ServiceTicket.DoesNotExist, ProxyTicket.DoesNotExist):
1312 raise ValidateError(u'INVALID_TICKET', 'ticket not found')
1314 def process_pgturl(self, params):
1315 """
1316 Handle PGT request
1318 :param dict params: A template context dict
1319 :raises ValidateError: if pgtUrl is invalid or if TLS validation of the pgtUrl fails
1320 :return: The rendering of ``cas_server/serviceValidate.xml``, using ``params``
1321 :rtype: django.http.HttpResponse
1322 """
1323 try:
1324 pattern = ServicePattern.validate(self.pgt_url)
1325 if pattern.proxy_callback:
1326 proxyid = utils.gen_pgtiou()
1327 pticket = ProxyGrantingTicket.objects.create(
1328 user=self.ticket.user,
1329 service=self.pgt_url,
1330 service_pattern=pattern,
1331 single_log_out=pattern.single_log_out
1332 )
1333 url = utils.update_url(self.pgt_url, {'pgtIou': proxyid, 'pgtId': pticket.value})
1334 try:
1335 ret = requests.get(url, verify=settings.CAS_PROXY_CA_CERTIFICATE_PATH)
1336 if ret.status_code == 200:
1337 params['proxyGrantingTicket'] = proxyid
1338 else:
1339 pticket.delete()
1340 logger.info(
1341 (
1342 "ValidateService: ticket %s validated for user %s on service %s. "
1343 "Proxy Granting Ticket transmited to %s."
1344 ) % (
1345 self.ticket.value,
1346 self.ticket.user.username,
1347 self.ticket.service,
1348 self.pgt_url
1349 )
1350 )
1351 logger.debug(
1352 "ValidateService: User attributs are:\n%s" % (
1353 pprint.pformat(self.ticket.attributs),
1354 )
1355 )
1356 return render(
1357 self.request,
1358 "cas_server/serviceValidate.xml",
1359 params,
1360 content_type="text/xml; charset=utf-8"
1361 )
1362 except requests.exceptions.RequestException as error:
1363 error = utils.unpack_nested_exception(error)
1364 raise ValidateError(
1365 u'INVALID_PROXY_CALLBACK',
1366 u"%s: %s" % (type(error), str(error))
1367 )
1368 else:
1369 raise ValidateError(
1370 u'INVALID_PROXY_CALLBACK',
1371 u"callback url not allowed by configuration"
1372 )
1373 except ServicePattern.DoesNotExist:
1374 raise ValidateError(
1375 u'INVALID_PROXY_CALLBACK',
1376 u'callback url not allowed by configuration'
1377 )
1380class Proxy(View):
1381 """proxy ticket service"""
1383 #: Current :class:`django.http.HttpRequest` object
1384 request = None
1385 #: A ProxyGrantingTicket from the pgt GET parameter
1386 pgt = None
1387 #: the targetService GET parameter
1388 target_service = None
1390 def get(self, request):
1391 """
1392 method called on GET request on this view
1394 :param django.http.HttpRequest request: The current request object:
1395 :return: The returned value of :meth:`process_proxy` if no error is raised,
1396 else the rendering of ``cas_server/serviceValidateError.xml``.
1397 :rtype: django.http.HttpResponse
1398 """
1399 self.request = request
1400 self.pgt = request.GET.get('pgt')
1401 self.target_service = request.GET.get('targetService')
1402 try:
1403 # pgt and targetService parameters are mandatory
1404 if self.pgt and self.target_service:
1405 return self.process_proxy()
1406 else:
1407 raise ValidateError(
1408 u'INVALID_REQUEST',
1409 u"you must specify and pgt and targetService"
1410 )
1411 except ValidateError as error:
1412 logger.warning("Proxy: validation error: %s %s" % (error.code, error.msg))
1413 return error.render(request)
1415 def process_proxy(self):
1416 """
1417 handle PT request
1419 :raises ValidateError: if the PGT is not found, or the target service not allowed or
1420 the user not allowed on the tardet service.
1421 :return: The rendering of ``cas_server/proxy.xml``
1422 :rtype: django.http.HttpResponse
1423 """
1424 try:
1425 # is the target service allowed
1426 pattern = ServicePattern.validate(self.target_service)
1427 # to get a proxy ticket require that the service allow it
1428 if not pattern.proxy:
1429 raise ValidateError(
1430 u'UNAUTHORIZED_SERVICE',
1431 u'the service %s does not allow proxy tickets' % self.target_service
1432 )
1433 # is the proxy granting ticket valid
1434 ticket = ProxyGrantingTicket.get(self.pgt)
1435 # is the pgt user allowed on the target service
1436 pattern.check_user(ticket.user)
1437 pticket = ticket.user.get_ticket(
1438 ProxyTicket,
1439 self.target_service,
1440 pattern,
1441 renew=False
1442 )
1443 models.Proxy.objects.create(proxy_ticket=pticket, url=ticket.service)
1444 logger.info(
1445 "Proxy ticket created for user %s on service %s." % (
1446 ticket.user.username,
1447 self.target_service
1448 )
1449 )
1450 return render(
1451 self.request,
1452 "cas_server/proxy.xml",
1453 {'ticket': pticket.value},
1454 content_type="text/xml; charset=utf-8"
1455 )
1456 except (Ticket.DoesNotExist, ProxyGrantingTicket.DoesNotExist):
1457 raise ValidateError(u'INVALID_TICKET', u'PGT %s not found' % self.pgt)
1458 except ServicePattern.DoesNotExist:
1459 raise ValidateError(u'UNAUTHORIZED_SERVICE', self.target_service)
1460 except (models.BadUsername, models.BadFilter, models.UserFieldNotDefined):
1461 raise ValidateError(
1462 u'UNAUTHORIZED_USER',
1463 u'User %s not allowed on %s' % (ticket.user.username, self.target_service)
1464 )
1467class SamlValidateError(ValidationBaseError):
1468 """handle saml validation error"""
1470 #: template to be render for the error
1471 template = "cas_server/samlValidateError.xml"
1473 def context(self):
1474 """
1475 :return: A dictionary to contextualize :attr:`template`
1476 :rtype: dict
1477 """
1478 return {
1479 'code': self.code,
1480 'msg': self.msg,
1481 'IssueInstant': timezone.now().isoformat(),
1482 'ResponseID': utils.gen_saml_id()
1483 }
1486class SamlValidate(CsrfExemptView):
1487 """SAML ticket validation"""
1488 request = None
1489 target = None
1490 ticket = None
1491 root = None
1493 def post(self, request, *args, **kwargs):
1494 """
1495 method called on POST request on this view
1497 :param django.http.HttpRequest request: The current request object
1498 :return: the rendering of ``cas_server/samlValidate.xml`` if no error is raised,
1499 else the rendering of ``cas_server/samlValidateError.xml``.
1500 :rtype: django.http.HttpResponse
1501 """
1502 self.request = request
1503 self.target = request.GET.get('TARGET')
1504 self.root = etree.fromstring(request.body)
1505 try:
1506 self.ticket = self.process_ticket()
1507 expire_instant = (self.ticket.creation +
1508 timedelta(seconds=self.ticket.VALIDITY)).isoformat()
1509 params = {
1510 'IssueInstant': timezone.now().isoformat(),
1511 'expireInstant': expire_instant,
1512 'Recipient': self.target,
1513 'ResponseID': utils.gen_saml_id(),
1514 'username': self.ticket.username(),
1515 'attributes': self.ticket.attributs_flat(),
1516 'auth_date': self.ticket.user.last_login.replace(microsecond=0).isoformat(),
1517 'is_new_login': 'true' if self.ticket.renew else 'false'
1519 }
1520 logger.info(
1521 "SamlValidate: ticket %s validated for user %s on service %s." % (
1522 self.ticket.value,
1523 self.ticket.user.username,
1524 self.ticket.service
1525 )
1526 )
1527 logger.debug(
1528 "SamlValidate: User attributes are:\n%s" % pprint.pformat(self.ticket.attributs)
1529 )
1531 return render(
1532 request,
1533 "cas_server/samlValidate.xml",
1534 params,
1535 content_type="text/xml; charset=utf-8"
1536 )
1537 except SamlValidateError as error:
1538 logger.warning("SamlValidate: validation error: %s %s" % (error.code, error.msg))
1539 return error.render(request)
1541 def process_ticket(self):
1542 """
1543 validate ticket from SAML XML body
1545 :raises: SamlValidateError: if the ticket is not found or not valid, or if we fail
1546 to parse the posted XML.
1547 :return: a ticket object
1548 :rtype: :class:`models.Ticket<cas_server.models.Ticket>`
1549 """
1550 try:
1551 auth_req = self.root.getchildren()[1].getchildren()[0]
1552 ticket = auth_req.getchildren()[0].text
1553 ticket = models.Ticket.get(ticket)
1554 if ticket.service != self.target:
1555 raise SamlValidateError(
1556 u'AuthnFailed',
1557 u'TARGET %s does not match ticket service' % self.target
1558 )
1559 return ticket
1560 except (IndexError, KeyError):
1561 raise SamlValidateError(u'VersionMismatch')
1562 except Ticket.DoesNotExist:
1563 raise SamlValidateError(
1564 u'AuthnFailed',
1565 u'ticket %s should begin with PT- or ST-' % ticket
1566 )
1567 except (ServiceTicket.DoesNotExist, ProxyTicket.DoesNotExist):
1568 raise SamlValidateError(u'AuthnFailed', u'ticket %s not found' % ticket)