Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

# -*- coding: utf-8 -*- 

# This program is distributed in the hope that it will be useful, but WITHOUT 

# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for 

# more details. 

# 

# You should have received a copy of the GNU General Public License version 3 

# along with this program; if not, write to the Free Software Foundation, Inc., 51 

# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

# 

# (c) 2016 Valentin Samir 

"""federated mode helper classes""" 

from .default_settings import SessionStore 

from django.db import IntegrityError 

 

from .cas import CASClient 

from .models import FederatedUser, FederateSLO, User 

 

import logging 

from six.moves import urllib 

 

#: logger facility 

logger = logging.getLogger(__name__) 

 

 

class CASFederateValidateUser(object): 

""" 

Class CAS client used to authenticate the user again a CAS provider 

 

:param cas_server.models.FederatedIendityProvider provider: The provider to use for 

authenticate the user. 

:param unicode service_url: The service url to transmit to the ``provider``. 

""" 

#: the provider returned username 

username = None 

#: the provider returned attributes 

attributs = {} 

#: the CAS client instance 

client = None 

#: the provider returned username this the provider suffix appended 

federated_username = None 

#: the identity provider 

provider = None 

 

def __init__(self, provider, service_url, renew=False): 

self.provider = provider 

self.client = CASClient( 

service_url=service_url, 

version=provider.cas_protocol_version, 

server_url=provider.server_url, 

renew=renew, 

) 

 

def get_login_url(self): 

""" 

:return: the CAS provider login url 

:rtype: unicode 

""" 

return self.client.get_login_url() 

 

def get_logout_url(self, redirect_url=None): 

""" 

:param redirect_url: The url to redirect to after logout from the provider, if provided. 

:type redirect_url: :obj:`unicode` or :obj:`NoneType<types.NoneType>` 

:return: the CAS provider logout url 

:rtype: unicode 

""" 

return self.client.get_logout_url(redirect_url) 

 

def verify_ticket(self, ticket): 

""" 

test ``ticket`` against the CAS provider, if valid, create a 

:class:`FederatedUser<cas_server.models.FederatedUser>` matching provider returned 

username and attributes. 

 

:param unicode ticket: The ticket to validate against the provider CAS 

:return: ``True`` if the validation succeed, else ``False``. 

:rtype: bool 

""" 

try: 

username, attributs = self.client.verify_ticket(ticket)[:2] 

except urllib.error.URLError: 

return False 

if username is not None: 

if attributs is None: 

attributs = {} 

attributs["provider"] = self.provider.suffix 

self.username = username 

self.attributs = attributs 

user = FederatedUser.objects.update_or_create( 

username=username, 

provider=self.provider, 

defaults=dict(attributs=attributs, ticket=ticket) 

)[0] 

user.save() 

self.federated_username = user.federated_username 

return True 

else: 

return False 

 

@staticmethod 

def register_slo(username, session_key, ticket): 

""" 

association a ``ticket`` with a (``username``, ``session_key``) for processing later SLO 

request by creating a :class:`cas_server.models.FederateSLO` object. 

 

:param unicode username: A logged user username, with the ``@`` component. 

:param unicode session_key: A logged user session_key matching ``username``. 

:param unicode ticket: A ticket used to authentication ``username`` for the session 

``session_key``. 

""" 

try: 

FederateSLO.objects.create( 

username=username, 

session_key=session_key, 

ticket=ticket 

) 

except IntegrityError: # pragma: no cover (ignore if the FederateSLO already exists) 

pass 

 

def clean_sessions(self, logout_request): 

""" 

process a SLO request: Search for ticket values in ``logout_request``. For each 

ticket value matching a :class:`cas_server.models.FederateSLO`, disconnect the 

corresponding user. 

 

:param unicode logout_request: An XML document contening one or more Single Log Out 

requests. 

""" 

try: 

slos = self.client.get_saml_slos(logout_request) or [] 

except NameError: # pragma: no cover (should not happen) 

slos = [] 

for slo in slos: 

for federate_slo in FederateSLO.objects.filter(ticket=slo.text): 

logger.info( 

"Got an SLO requests for ticket %s, logging out user %s" % ( 

federate_slo.username, 

federate_slo.ticket 

) 

) 

session = SessionStore(session_key=federate_slo.session_key) 

session.flush() 

try: 

user = User.objects.get( 

username=federate_slo.username, 

session_key=federate_slo.session_key 

) 

user.logout() 

user.delete() 

except User.DoesNotExist: # pragma: no cover (should not happen) 

pass 

federate_slo.delete()