Coverage for cas_server/federate.py: 100%
53 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-18 11:22 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-18 11:22 +0000
1# -*- coding: utf-8 -*-
2# This program is distributed in the hope that it will be useful, but WITHOUT
3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
5# more details.
6#
7# You should have received a copy of the GNU General Public License version 3
8# along with this program; if not, write to the Free Software Foundation, Inc., 51
9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
10#
11# (c) 2016 Valentin Samir
12"""federated mode helper classes"""
13from .default_settings import SessionStore
14from django.db import IntegrityError
16from .cas import CASClient
17from .models import FederatedUser, FederateSLO, User
19import logging
20from six.moves import urllib
22#: logger facility
23logger = logging.getLogger(__name__)
26class CASFederateValidateUser(object):
27 """
28 Class CAS client used to authenticate the user again a CAS provider
30 :param cas_server.models.FederatedIendityProvider provider: The provider to use for
31 authenticate the user.
32 :param unicode service_url: The service url to transmit to the ``provider``.
33 """
34 #: the provider returned username
35 username = None
36 #: the provider returned attributes
37 attributs = {}
38 #: the CAS client instance
39 client = None
40 #: the provider returned username this the provider suffix appended
41 federated_username = None
42 #: the identity provider
43 provider = None
45 def __init__(self, provider, service_url, renew=False):
46 self.provider = provider
47 self.client = CASClient(
48 service_url=service_url,
49 version=provider.cas_protocol_version,
50 server_url=provider.server_url,
51 renew=renew,
52 )
54 def get_login_url(self):
55 """
56 :return: the CAS provider login url
57 :rtype: unicode
58 """
59 return self.client.get_login_url()
61 def get_logout_url(self, redirect_url=None):
62 """
63 :param redirect_url: The url to redirect to after logout from the provider, if provided.
64 :type redirect_url: :obj:`unicode` or :obj:`NoneType<types.NoneType>`
65 :return: the CAS provider logout url
66 :rtype: unicode
67 """
68 return self.client.get_logout_url(redirect_url)
70 def verify_ticket(self, ticket):
71 """
72 test ``ticket`` against the CAS provider, if valid, create a
73 :class:`FederatedUser<cas_server.models.FederatedUser>` matching provider returned
74 username and attributes.
76 :param unicode ticket: The ticket to validate against the provider CAS
77 :return: ``True`` if the validation succeed, else ``False``.
78 :rtype: bool
79 """
80 try:
81 username, attributs = self.client.verify_ticket(ticket)[:2]
82 except urllib.error.URLError:
83 return False
84 if username is not None:
85 if attributs is None:
86 attributs = {}
87 attributs["provider"] = self.provider.suffix
88 self.username = username
89 self.attributs = attributs
90 user = FederatedUser.objects.update_or_create(
91 username=username,
92 provider=self.provider,
93 defaults=dict(attributs=attributs, ticket=ticket)
94 )[0]
95 user.save()
96 self.federated_username = user.federated_username
97 return True
98 else:
99 return False
101 @staticmethod
102 def register_slo(username, session_key, ticket):
103 """
104 association a ``ticket`` with a (``username``, ``session_key``) for processing later SLO
105 request by creating a :class:`cas_server.models.FederateSLO` object.
107 :param unicode username: A logged user username, with the ``@`` component.
108 :param unicode session_key: A logged user session_key matching ``username``.
109 :param unicode ticket: A ticket used to authentication ``username`` for the session
110 ``session_key``.
111 """
112 try:
113 FederateSLO.objects.create(
114 username=username,
115 session_key=session_key,
116 ticket=ticket
117 )
118 except IntegrityError: # pragma: no cover (ignore if the FederateSLO already exists)
119 pass
121 def clean_sessions(self, logout_request):
122 """
123 process a SLO request: Search for ticket values in ``logout_request``. For each
124 ticket value matching a :class:`cas_server.models.FederateSLO`, disconnect the
125 corresponding user.
127 :param unicode logout_request: An XML document contening one or more Single Log Out
128 requests.
129 """
130 try:
131 slos = self.client.get_saml_slos(logout_request) or []
132 except NameError: # pragma: no cover (should not happen)
133 slos = []
134 for slo in slos:
135 for federate_slo in FederateSLO.objects.filter(ticket=slo.text):
136 logger.info(
137 "Got an SLO requests for ticket %s, logging out user %s" % (
138 federate_slo.username,
139 federate_slo.ticket
140 )
141 )
142 session = SessionStore(session_key=federate_slo.session_key)
143 session.flush()
144 try:
145 user = User.objects.get(
146 username=federate_slo.username,
147 session_key=federate_slo.session_key
148 )
149 user.logout()
150 user.delete()
151 except User.DoesNotExist: # pragma: no cover (should not happen)
152 pass
153 federate_slo.delete()