Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# -*- coding: utf-8 -*- # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for # more details. # # You should have received a copy of the GNU General Public License version 3 # along with this program; if not, write to the Free Software Foundation, Inc., 51 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # (c) 2015-2016 Valentin Samir
#: logger facility
""" Bases: :class:`django.db.models.Model`
A base class for models storing attributes as a json """
#: The attributes json encoded
def attributs(self): """The attributes"""
def attributs(self, value): """attributs property setter"""
""" Bases: :class:`django.db.models.Model`
An identity provider for the federated mode """ #: Suffix append to backend CAS returned username: ``returned_username`` @ ``suffix``. #: it must be unique. max_length=30, unique=True, verbose_name=_(u"suffix"), help_text=_( u"Suffix append to backend CAS returned " u"username: ``returned_username`` @ ``suffix``." ) ) #: URL to the root of the CAS server application. If login page is #: https://cas.example.net/cas/login then :attr:`server_url` should be #: https://cas.example.net/cas/ #: Version of the CAS protocol to use when sending requests the the backend CAS. max_length=30, choices=[ ("1", "CAS 1.0"), ("2", "CAS 2.0"), ("3", "CAS 3.0"), ("CAS_2_SAML_1_0", "SAML 1.1") ], verbose_name=_(u"CAS protocol version"), help_text=_( u"Version of the CAS protocol to use when sending requests the the backend CAS." ), default="3" ) #: Name for this identity provider displayed on the login page. max_length=255, verbose_name=_(u"verbose name"), help_text=_(u"Name for this identity provider displayed on the login page.") ) #: Position of the identity provider on the login page. Identity provider are sorted using the #: (:attr:`pos`, :attr:`verbose_name`, :attr:`suffix`) attributes. default=100, verbose_name=_(u"position"), help_text=_( ( u"Position of the identity provider on the login page. " u"Identity provider are sorted using the " u"(position, verbose name, suffix) attributes." ) ) ) #: Display the provider on the login page. Beware that this do not disable the identity #: provider, it just hide it on the login page. User will always be able to log in using this #: provider by fetching ``/federate/suffix``. default=True, verbose_name=_(u"display"), help_text=_("Display the provider on the login page.") )
def __str__(self): return self.verbose_name
def build_username_from_suffix(username, suffix): """ Transform backend username into federated username using ``suffix``
:param unicode username: A CAS backend returned username :param unicode suffix: A suffix identifying the CAS backend :return: The federated username: ``username`` @ ``suffix``. :rtype: unicode """
""" Transform backend username into federated username
:param unicode username: A CAS backend returned username :return: The federated username: ``username`` @ :attr:`suffix`. :rtype: unicode """
""" Bases: :class:`JsonAttributes`
A federated user as returner by a CAS provider (username and attributes) """ #: The user username returned by the CAS backend on successful ticket validation #: A foreign key to :class:`FederatedIendityProvider` #: The last ticket used to authenticate :attr:`username` against :attr:`provider` #: Last update timespampt. Usually, the last time :attr:`ticket` has been set.
def __str__(self): return self.federated_username
def federated_username(self): """The federated username with a suffix for the current :class:`FederatedUser`."""
def get_from_federated_username(cls, username): """ :return: A :class:`FederatedUser` object from a federated ``username`` :rtype: :class:`FederatedUser` """ else:
def clean_old_entries(cls): """remove old unused :class:`FederatedUser`""" last_update__lt=(timezone.now() - timedelta(seconds=settings.CAS_TICKET_TIMEOUT)) )
""" Bases: :class:`django.db.models.Model`
An association between a CAS provider ticket and a (username, session) for processing SLO """ #: the federated username with the ``@``component #: the session key for the session :attr:`username` has been authenticated using :attr:`ticket` #: The ticket used to authenticate :attr:`username`
def clean_deleted_sessions(cls): """remove old :class:`FederateSLO` object for which the session do not exists anymore"""
""" Bases: :class:`JsonAttributes`
Local cache of the user attributes, used then needed """ #: The username of the user for which we cache attributes
def __str__(self): return self.username
def clean_old_entries(cls): """Remove :class:`UserAttributes` for which no more :class:`User` exists."""
""" Bases: :class:`django.db.models.Model`
A user logged into the CAS """ #: The session key of the current authenticated user #: The username of the current authenticated user #: Last time the authenticated user has do something (auth, fetch ticket, etc…) #: last time the user logged
""" Remove the current :class:`User`. If ``settings.CAS_FEDERATE`` is ``True``, also delete the corresponding :class:`FederateSLO` object. """ username=self.username, session_key=self.session_key ).delete()
def clean_old_entries(cls): """ Remove :class:`User` objects inactive since more that :django:setting:`SESSION_COOKIE_AGE` and send corresponding SingleLogOut requests. """ last_login__lt=(timezone.now() - timedelta(seconds=settings.CAS_TGT_VALIDITY)) )
def clean_deleted_sessions(cls): """Remove :class:`User` objects where the corresponding session do not exists anymore."""
def attributs(self): """ Property. A fresh :class:`dict` for the user attributes, using ``settings.CAS_AUTH_CLASS`` if possible, and if not, try to fallback to cached attributes (actually only used for ldap auth class with bind password check mthode). """ else:
def __str__(self): return u"%s - %s" % (self.username, self.session_key)
""" Send SLO requests to all services the user is logged in.
:param request: The current django HttpRequest to display possible failure to the user. :type request: :class:`django.http.HttpRequest` or :obj:`NoneType<types.NoneType>` """ [ticket_class.objects.filter(user=self) for ticket_class in ticket_classes] ): "Error during SLO for user %s: %s" % ( self.username, error ) ) request, messages.WARNING, _(u'Error during service logout %s') % error )
""" Generate a ticket using ``ticket_class`` for the service ``service`` matching ``service_pattern`` and asking or not for authentication renewal with ``renew``
:param type ticket_class: :class:`ServiceTicket` or :class:`ProxyTicket` or :class:`ProxyGrantingTicket`. :param unicode service: The service url for which we want a ticket. :param ServicePattern service_pattern: The service pattern matching ``service``. Beware that ``service`` must match :attr:`ServicePattern.pattern` and the current :class:`User` must pass :meth:`ServicePattern.check_user`. These checks are not done here and you must perform them before calling this method. :param bool renew: Should be ``True`` if authentication has been renewed. Must be ``False`` otherwise. :return: A :class:`Ticket` object. :rtype: :class:`ServiceTicket` or :class:`ProxyTicket` or :class:`ProxyGrantingTicket`. """ (a.name, a.replace if a.replace else a.name) for a in service_pattern.attributs.all() ) (a.attribut, (a.pattern, a.replace)) for a in service_pattern.replacements.all() ) replacements[key][0], replacements[key][1], subval ) else: user=self, attributs=service_attributs, service=service, renew=renew, service_pattern=service_pattern, single_log_out=service_pattern.single_log_out )
""" Return the url to which the user must be redirected to after a Service Ticket has been generated
:param unicode service: The service url for which we want a ticket. :param ServicePattern service_pattern: The service pattern matching ``service``. Beware that ``service`` must match :attr:`ServicePattern.pattern` and the current :class:`User` must pass :meth:`ServicePattern.check_user`. These checks are not done here and you must perform them before calling this method. :param bool renew: Should be ``True`` if authentication has been renewed. Must be ``False`` otherwise. :return unicode: The service url with the ticket GET param added. :rtype: unicode """
""" Bases: :class:`exceptions.Exception`
Base exception of exceptions raised in the ServicePattern model"""
""" Bases: :class:`ServicePatternException`
Exception raised then an non allowed username try to get a ticket for a service """
""" Bases: :class:`ServicePatternException`
Exception raised then a user try to get a ticket for a service and do not reach a condition """
""" Bases: :class:`ServicePatternException`
Exception raised then a user try to get a ticket for a service using as username an attribut not present on this user """
""" Bases: :class:`django.db.models.Model`
Allowed services pattern against services are tested to """
#: service patterns are sorted using the :attr:`pos` attribute default=100, verbose_name=_(u"position"), help_text=_(u"service patterns are sorted using the position attribute") ) #: A name for the service (this can bedisplayed to the user on the login page) max_length=255, unique=True, blank=True, null=True, verbose_name=_(u"name"), help_text=_(u"A name for the service") ) #: A regular expression matching services. "Will usually looks like #: '^https://some\\.server\\.com/path/.*$'. As it is a regular expression, special character #: must be escaped with a '\\'. max_length=255, unique=True, verbose_name=_(u"pattern"), help_text=_( "A regular expression matching services. " "Will usually looks like '^https://some\\.server\\.com/path/.*$'." "As it is a regular expression, special character must be escaped with a '\\'." ), validators=[utils.regexpr_validator] ) #: Name of the attribute to transmit as username, if empty the user login is used max_length=255, default="", blank=True, verbose_name=_(u"user field"), help_text=_("Name of the attribute to transmit as username, empty = login") ) #: A boolean allowing to limit username allowed to connect to :attr:`usernames`. default=False, verbose_name=_(u"restrict username"), help_text=_("Limit username allowed to connect to the list provided bellow") ) #: A boolean allowing to deliver :class:`ProxyTicket` to the service. default=False, verbose_name=_(u"proxy"), help_text=_("Proxy tickets can be delivered to the service") ) #: A boolean allowing the service to be used as a proxy callback (via the pgtUrl GET param) #: to deliver :class:`ProxyGrantingTicket`. default=False, verbose_name=_(u"proxy callback"), help_text=_("can be used as a proxy callback to deliver PGT") ) #: Enable SingleLogOut for the service. Old validaed tickets for the service will be kept #: until ``settings.CAS_TICKET_TIMEOUT`` after what a SLO request is send to the service and #: the ticket is purged from database. A SLO can be send earlier if the user log-out. default=False, verbose_name=_(u"single log out"), help_text=_("Enable SLO for the service") ) #: An URL where the SLO request will be POST. If empty the service url will be used. #: This is usefull for non HTTP proxied services like smtp or imap. max_length=255, default="", blank=True, verbose_name=_(u"single log out callback"), help_text=_(u"URL where the SLO request will be POST. empty = service url\n" u"This is usefull for non HTTP proxied services.") )
def __str__(self): return u"%s: %s" % (self.pos, self.pattern)
""" Check if ``user`` if allowed to use theses services. If ``user`` is not allowed, raises one of :class:`BadFilter`, :class:`UserFieldNotDefined`, :class:`BadUsername`
:param User user: a :class:`User` object :raises BadUsername: if :attr:`restrict_users` if ``True`` and :attr:`User.username` is not within :attr:`usernames`. :raises BadFilter: if a :class:`FilterAttributValue` condition of :attr:`filters` connot be verified. :raises UserFieldNotDefined: if :attr:`user_field` is defined and its value is not within :attr:`User.attributs`. :return: ``True`` :rtype: bool """ else: else: "User constraint failed for %s, service %s: %s do not match %s %s." % ( (user.username, self.name) + bad_filter ) ) "Cannot use %s a loggin for user %s on service %s because it is absent" % ( self.user_field, user.username, self.name ) )
def validate(cls, service): """ Get a :class:`ServicePattern` intance from a service url.
:param unicode service: A service url :return: A :class:`ServicePattern` instance matching ``service``. :rtype: :class:`ServicePattern` :raises ServicePattern.DoesNotExist: if no :class:`ServicePattern` is matching ``service``. """
""" Bases: :class:`django.db.models.Model`
A list of allowed usernames on a :class:`ServicePattern` """ #: username allowed to connect to the service max_length=255, verbose_name=_(u"username"), help_text=_(u"username allowed to connect to the service") ) #: ForeignKey to a :class:`ServicePattern`. :class:`Username` instances for a #: :class:`ServicePattern` are accessible thought its :attr:`ServicePattern.usernames` #: attribute. ServicePattern, related_name="usernames", on_delete=models.CASCADE )
def __str__(self): return self.value
""" Bases: :class:`django.db.models.Model`
A replacement of an attribute name for a :class:`ServicePattern`. It also tell to transmit an attribute of :attr:`User.attributs` to the service. An empty :attr:`replace` mean to use the original attribute name. """ #: Name the attribute: a key of :attr:`User.attributs` max_length=255, verbose_name=_(u"name"), help_text=_(u"name of an attribute to send to the service, use * for all attributes") ) #: The name of the attribute to transmit to the service. If empty, the value of :attr:`name` #: is used. max_length=255, blank=True, verbose_name=_(u"replace"), help_text=_(u"name under which the attribute will be show " u"to the service. empty = default name of the attribut") ) #: ForeignKey to a :class:`ServicePattern`. :class:`ReplaceAttributName` instances for a #: :class:`ServicePattern` are accessible thought its :attr:`ServicePattern.attributs` #: attribute. ServicePattern, related_name="attributs", on_delete=models.CASCADE )
def __str__(self): if not self.replace: return self.name else: return u"%s → %s" % (self.name, self.replace)
""" Bases: :class:`django.db.models.Model`
A filter on :attr:`User.attributs` for a :class:`ServicePattern`. If a :class:`User` do not have an attribute :attr:`attribut` or its value do not match :attr:`pattern`, then :meth:`ServicePattern.check_user` will raises :class:`BadFilter` if called with that user. """ #: The name of a user attribute max_length=255, verbose_name=_(u"attribute"), help_text=_(u"Name of the attribute which must verify pattern") ) #: A regular expression the attribute :attr:`attribut` value must verify. If :attr:`attribut` #: if a list, only one of the list values needs to match. max_length=255, verbose_name=_(u"pattern"), help_text=_(u"a regular expression"), validators=[utils.regexpr_validator] ) #: ForeignKey to a :class:`ServicePattern`. :class:`FilterAttributValue` instances for a #: :class:`ServicePattern` are accessible thought its :attr:`ServicePattern.filters` #: attribute. ServicePattern, related_name="filters", on_delete=models.CASCADE )
def __str__(self): return u"%s %s" % (self.attribut, self.pattern)
""" Bases: :class:`django.db.models.Model`
A replacement (using a regular expression) of an attribute value for a :class:`ServicePattern`. """ #: Name the attribute: a key of :attr:`User.attributs` max_length=255, verbose_name=_(u"attribute"), help_text=_(u"Name of the attribute for which the value must be replace") ) #: A regular expression matching the part of the attribute value that need to be changed max_length=255, verbose_name=_(u"pattern"), help_text=_(u"An regular expression maching whats need to be replaced"), validators=[utils.regexpr_validator] ) #: The replacement to what is mached by :attr:`pattern`. groups are capture by \\1, \\2 … max_length=255, blank=True, verbose_name=_(u"replace"), help_text=_(u"replace expression, groups are capture by \\1, \\2 …") ) #: ForeignKey to a :class:`ServicePattern`. :class:`ReplaceAttributValue` instances for a #: :class:`ServicePattern` are accessible thought its :attr:`ServicePattern.replacements` #: attribute. ServicePattern, related_name="replacements", on_delete=models.CASCADE )
def __str__(self): return u"%s %s %s" % (self.attribut, self.pattern, self.replace)
""" Bases: :class:`JsonAttributes`
Generic class for a Ticket """ #: ForeignKey to a :class:`User`. #: A boolean. ``True`` if the ticket has been validated #: The service url for the ticket #: ForeignKey to a :class:`ServicePattern`. The :class:`ServicePattern` corresponding to #: :attr:`service`. Use :meth:`ServicePattern.validate` to find it. ServicePattern, related_name="%(class)s", on_delete=models.CASCADE ) #: Date of the ticket creation #: A boolean. ``True`` if the user has just renew his authentication #: A boolean. Set to :attr:`service_pattern` attribute #: :attr:`ServicePattern.single_log_out` value.
#: Max duration between ticket creation and its validation. Any validation attempt for the #: ticket after :attr:`creation` + VALIDITY will fail as if the ticket do not exists. #: Time we keep ticket with :attr:`single_log_out` set to ``True`` before sending SingleLogOut #: requests.
"""raised in :meth:`Ticket.get` then ticket prefix and ticket classes mismatch"""
def __str__(self): return u"Ticket-%s" % self.pk
def send_slos(queryset_list): """ Send SLO requests to each ticket of each queryset of ``queryset_list``
:param list queryset_list: A list a :class:`Ticket` queryset :return: A list of possibly encoutered :class:`Exception` :rtype: list """ # sending SLO to timed-out validated tickets executor=ThreadPoolExecutor(max_workers=settings.CAS_SLO_MAX_PARALLEL_REQUESTS) )
def clean_old_entries(cls): """Remove old ticket and send SLO to timed-out services""" # removing old validated ticket and non validated expired tickets ( Q(single_log_out=False) & Q(validate=True) ) | ( Q(validate=False) & Q(creation__lt=(timezone.now() - timedelta(seconds=cls.VALIDITY))) ) ).delete() creation__lt=(timezone.now() - timedelta(seconds=cls.TIMEOUT)) )
"""Send a SLO request to the ticket service""" # On logout invalidate the Ticket "Sending SLO requests to service %s for user %s" % ( self.service, self.user.username ) ) else: session.post( url.encode('utf-8'), data={'logoutRequest': xml.encode('utf-8')}, timeout=settings.CAS_SLO_TIMEOUT ) )
""" Return the ticket class of ``ticket``
:param unicode ticket: A ticket :param list classes: Optinal arguement. A list of possible :class:`Ticket` subclasses :return: The class corresponding to ``ticket`` (:class:`ServiceTicket` or :class:`ProxyTicket` or :class:`ProxyGrantingTicket`) if found among ``classes, ``None`` otherwise. :rtype: :obj:`type` or :obj:`NoneType<types.NoneType>` """ if classes is None: # pragma: no cover (not used) classes = [ServiceTicket, ProxyTicket, ProxyGrantingTicket]
""" The username to send on ticket validation
:return: The value of the corresponding user attribute if :attr:`service_pattern`.user_field is set, the user username otherwise. """ self.service_pattern.user_field ): # the list is not empty because we wont generate a ticket with a user_field # that evaluate to False else:
""" generate attributes list for template rendering
:return: An list of (attribute name, attribute value) of all user attributes flatened (no nested list) :rtype: :obj:`list` of :obj:`tuple` of :obj:`unicode` """ else:
""" Search the database for a valid ticket with provided arguments
:param unicode ticket: A ticket value :param bool renew: Is authentication renewal needed :param unicode service: Optional argument. The ticket service :raises Ticket.DoesNotExist: if no class is found for the ticket prefix :raises cls.DoesNotExist: if ``ticket`` value is not found in th database :return: a :class:`Ticket` instance :rtype: Ticket """ # If the method class is the ticket abstract class, search for the submited ticket # class using its prefix. Assuming ticket is a ProxyTicket or a ServiceTicket # else use the method class else: # If ticket prefix is wrong, raise DoesNotExist # search for the ticket that is not yet validated and is still valid value=ticket, validate=False, creation__gt=(timezone.now() - timedelta(seconds=ticket_class.VALIDITY)) ) # if service is specified, add it the the queryset # only require renew if renew is True, otherwise it do not matter if renew is True # or False. # fetch the ticket ``MultipleObjectsReturned`` is never raised as the ticket value # is unique across the database # For ServiceTicket and Proxyticket, mark it as validated before returning # If no class found for the ticket, raise DoesNotExist else:
""" Bases: :class:`Ticket`
A Service Ticket """ #: The ticket prefix used to differentiate it from other tickets types #: The ticket value
def __str__(self): return u"ServiceTicket-%s" % self.pk
""" Bases: :class:`Ticket`
A Proxy Ticket """ #: The ticket prefix used to differentiate it from other tickets types #: The ticket value
def __str__(self): return u"ProxyTicket-%s" % self.pk
""" Bases: :class:`Ticket`
A Proxy Granting Ticket """ #: The ticket prefix used to differentiate it from other tickets types #: ProxyGranting ticket are never validated. However, they can be used during :attr:`VALIDITY` #: to get :class:`ProxyTicket` for :attr:`user` #: The ticket value
def __str__(self): return u"ProxyGrantingTicket-%s" % self.pk
""" Bases: :class:`django.db.models.Model`
A list of proxies on :class:`ProxyTicket` """ #: Service url of the PGT used for getting the associated :class:`ProxyTicket` #: ForeignKey to a :class:`ProxyTicket`. :class:`Proxy` instances for a #: :class:`ProxyTicket` are accessible thought its :attr:`ProxyTicket.proxies` #: attribute.
def __str__(self): return self.url
""" Bases: :class:`django.db.models.Model`
The last new version available version sent """
def send_mails(cls): """ For each new django-cas-server version, if the current instance is not up to date send one mail to ``settings.ADMINS``. """ ( '%sA new version of django-cas-server is available' ) % settings.EMAIL_SUBJECT_PREFIX, u''' A new version of the django-cas-server is available.
Your version: %s New version: %s
Upgrade using: * pip install -U django-cas-server * fetching the last release on https://github.com/nitmir/django-cas-server/ or on https://pypi.org/project/django-cas-server/
After upgrade, do not forget to run: * ./manage.py migrate * ./manage.py collectstatic and to reload your wsgi server (apache2, uwsgi, gunicord, etc…)
--\u0020 django-cas-server '''.strip() % (VERSION, LAST_VERSION), settings.SERVER_EMAIL, ["%s <%s>" % admin for admin in settings.ADMINS], fail_silently=False, ) except smtplib.SMTPException as error: # pragma: no cover (should not happen) logger.error("Unable to send new version mail: %s" % error) |