Coverage for cas_server/federate.py: 100%

53 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-18 09:41 +0000

1# -*- coding: utf-8 -*- 

2# This program is distributed in the hope that it will be useful, but WITHOUT 

3# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

4# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for 

5# more details. 

6# 

7# You should have received a copy of the GNU General Public License version 3 

8# along with this program; if not, write to the Free Software Foundation, Inc., 51 

9# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

10# 

11# (c) 2016 Valentin Samir 

12"""federated mode helper classes""" 

13from .default_settings import SessionStore 

14from django.db import IntegrityError 

15 

16from .cas import CASClient 

17from .models import FederatedUser, FederateSLO, User 

18 

19import logging 

20from six.moves import urllib 

21 

22#: logger facility 

23logger = logging.getLogger(__name__) 

24 

25 

26class CASFederateValidateUser(object): 

27 """ 

28 Class CAS client used to authenticate the user again a CAS provider 

29 

30 :param cas_server.models.FederatedIendityProvider provider: The provider to use for 

31 authenticate the user. 

32 :param unicode service_url: The service url to transmit to the ``provider``. 

33 """ 

34 #: the provider returned username 

35 username = None 

36 #: the provider returned attributes 

37 attributs = {} 

38 #: the CAS client instance 

39 client = None 

40 #: the provider returned username this the provider suffix appended 

41 federated_username = None 

42 #: the identity provider 

43 provider = None 

44 

45 def __init__(self, provider, service_url, renew=False): 

46 self.provider = provider 

47 self.client = CASClient( 

48 service_url=service_url, 

49 version=provider.cas_protocol_version, 

50 server_url=provider.server_url, 

51 renew=renew, 

52 ) 

53 

54 def get_login_url(self): 

55 """ 

56 :return: the CAS provider login url 

57 :rtype: unicode 

58 """ 

59 return self.client.get_login_url() 

60 

61 def get_logout_url(self, redirect_url=None): 

62 """ 

63 :param redirect_url: The url to redirect to after logout from the provider, if provided. 

64 :type redirect_url: :obj:`unicode` or :obj:`NoneType<types.NoneType>` 

65 :return: the CAS provider logout url 

66 :rtype: unicode 

67 """ 

68 return self.client.get_logout_url(redirect_url) 

69 

70 def verify_ticket(self, ticket): 

71 """ 

72 test ``ticket`` against the CAS provider, if valid, create a 

73 :class:`FederatedUser<cas_server.models.FederatedUser>` matching provider returned 

74 username and attributes. 

75 

76 :param unicode ticket: The ticket to validate against the provider CAS 

77 :return: ``True`` if the validation succeed, else ``False``. 

78 :rtype: bool 

79 """ 

80 try: 

81 username, attributs = self.client.verify_ticket(ticket)[:2] 

82 except urllib.error.URLError: 

83 return False 

84 if username is not None: 

85 if attributs is None: 

86 attributs = {} 

87 attributs["provider"] = self.provider.suffix 

88 self.username = username 

89 self.attributs = attributs 

90 user = FederatedUser.objects.update_or_create( 

91 username=username, 

92 provider=self.provider, 

93 defaults=dict(attributs=attributs, ticket=ticket) 

94 )[0] 

95 user.save() 

96 self.federated_username = user.federated_username 

97 return True 

98 else: 

99 return False 

100 

101 @staticmethod 

102 def register_slo(username, session_key, ticket): 

103 """ 

104 association a ``ticket`` with a (``username``, ``session_key``) for processing later SLO 

105 request by creating a :class:`cas_server.models.FederateSLO` object. 

106 

107 :param unicode username: A logged user username, with the ``@`` component. 

108 :param unicode session_key: A logged user session_key matching ``username``. 

109 :param unicode ticket: A ticket used to authentication ``username`` for the session 

110 ``session_key``. 

111 """ 

112 try: 

113 FederateSLO.objects.create( 

114 username=username, 

115 session_key=session_key, 

116 ticket=ticket 

117 ) 

118 except IntegrityError: # pragma: no cover (ignore if the FederateSLO already exists) 

119 pass 

120 

121 def clean_sessions(self, logout_request): 

122 """ 

123 process a SLO request: Search for ticket values in ``logout_request``. For each 

124 ticket value matching a :class:`cas_server.models.FederateSLO`, disconnect the 

125 corresponding user. 

126 

127 :param unicode logout_request: An XML document contening one or more Single Log Out 

128 requests. 

129 """ 

130 try: 

131 slos = self.client.get_saml_slos(logout_request) or [] 

132 except NameError: # pragma: no cover (should not happen) 

133 slos = [] 

134 for slo in slos: 

135 for federate_slo in FederateSLO.objects.filter(ticket=slo.text): 

136 logger.info( 

137 "Got an SLO requests for ticket %s, logging out user %s" % ( 

138 federate_slo.username, 

139 federate_slo.ticket 

140 ) 

141 ) 

142 session = SessionStore(session_key=federate_slo.session_key) 

143 session.flush() 

144 try: 

145 user = User.objects.get( 

146 username=federate_slo.username, 

147 session_key=federate_slo.session_key 

148 ) 

149 user.logout() 

150 user.delete() 

151 except User.DoesNotExist: # pragma: no cover (should not happen) 

152 pass 

153 federate_slo.delete()