# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
User and Role Models
====================
Stores users and their roles in the database.
"""
from hashlib import sha256
from datetime import datetime
from flask.ext.login import UserMixin
from pyfarm.core.enums import STRING_TYPES, PY3
from pyfarm.master.application import app, db, login_serializer
from pyfarm.master.config import config
from pyfarm.models.core.mixins import ReprMixin
from pyfarm.models.core.functions import split_and_extend
__all__ = ("User", "Role")
SHA256_ASCII_LENGTH = 64 # static length of a sha256 string
# roles the user is a member of
UserRole = db.Table(
config.get("table_user_role"),
db.Column(
"user_id", db.Integer,
db.ForeignKey("%s.id" % config.get("table_user")),
doc="The id of the associated user"),
db.Column(
"role_id", db.Integer,
db.ForeignKey("%s.id" % config.get("table_role")),
doc="The id of the associated role")
)
[docs]class User(db.Model, UserMixin, ReprMixin):
"""
Stores information about a user including the roles they belong to
"""
__tablename__ = config.get("table_user")
REPR_COLUMNS = ("id", "username")
id = db.Column(db.Integer, primary_key=True, nullable=False)
active = db.Column(
db.Boolean,
default=True,
doc="Enables or disables a particular user across the entire "
"system")
username = db.Column(
db.String(config.get("max_username_length")),
unique=True, nullable=False,
doc="The username used to login.")
password = db.Column(
db.String(SHA256_ASCII_LENGTH),
doc="The password used to login")
email = db.Column(
db.String(config.get("max_email_length")),
unique=True,
doc="Contact email for registration and possible "
"notifications")
expiration = db.Column(
db.DateTime,
doc="User expiration. If this value is set then the user "
"will no longer be able to access PyFarm past the "
"expiration.")
onetime_code = db.Column(
db.String(SHA256_ASCII_LENGTH),
doc="SHA256 one time use code which can be used for unique "
"urls such as for password resets.")
last_login = db.Column(
db.DateTime,
doc="The last date that this user was logged in.")
#
# Relationships
#
roles = db.relationship(
"Role",
secondary=UserRole,
backref=db.backref("users", lazy="dynamic"))
@classmethod
[docs] def create(cls, username, password, email=None, roles=None):
# create the list or roles to add
if roles is None:
roles = []
elif isinstance(roles, STRING_TYPES):
roles = [roles]
# create the user with the proper initial values
user = cls(
username=username,
password=cls.hash_password(password),
email=email)
user.roles.extend(map(Role.create, roles))
# commit and return
db.session.add(user)
db.session.commit()
return user
@classmethod
[docs] def get(cls, id_or_username):
"""Get a user model either by id or by the user's username"""
try:
id_or_username = int(id_or_username)
except ValueError:
pass
if isinstance(id_or_username, int):
return cls.query.filter_by(id=id_or_username).first()
elif isinstance(id_or_username, STRING_TYPES):
return cls.query.filter_by(username=id_or_username).first()
else:
raise TypeError("string or integer required for User.get()")
@classmethod
[docs] def hash_password(cls, value):
value = app.secret_key + value
if PY3:
value = value.encode("utf-8")
return sha256(value).hexdigest()
[docs] def get_auth_token(self):
return login_serializer.dumps([str(self.id), self.password])
[docs] def get_id(self):
return self.id
[docs] def check_password(self, password):
"""checks the password provided against the stored password"""
assert isinstance(password, STRING_TYPES)
return self.hash_password(password) == self.password
[docs] def is_active(self):
"""returns true if the user and the roles it belongs to are active"""
now = datetime.utcnow()
# user is not active
if not self.active:
return False
# user has expired
if self.expiration is not None and now > self.expiration:
return False
# TODO: there's probably some way to cache this information
return all(role.is_active() for role in self.roles)
[docs] def has_roles(self, allowed=None, required=None):
"""checks the provided arguments against the roles assigned"""
if not allowed and not required:
return True
allowed = split_and_extend(allowed)
required = split_and_extend(required)
if allowed:
# Ask the database if the user has any of the allowed roles. For
# smaller numbers of roles this is very slightly slower with
# SQLite in :memory: but is a good amount faster over the network
# or with large role sets.
return bool(
User.query.filter(
User.roles.any(
Role.name.in_(allowed))
).filter(User.id == self.id).count())
if required:
# Ask the database for all roles matching ``required``. In order
# for this to return True is the number of entries found must
# be equal to len(required).
count = Role.query.filter(
Role.name.in_(required)).filter(User.id == self.id).count()
return count == len(required)
[docs]class Role(db.Model):
"""
Stores role information that can be used to give a user access
to individual resources.
"""
__tablename__ = config.get("table_role")
id = db.Column(
db.Integer,
primary_key=True,
nullable=False)
active = db.Column(
db.Boolean,
default=True,
doc="Enables or disables a role. Disabling a role "
"will prevent any users of this role from accessing "
"PyFarm")
name = db.Column(
db.String(config.get("max_role_length")),
unique=True, nullable=False,
doc="The name of the role")
expiration = db.Column(
db.DateTime,
doc="Role expiration. If this value is set then the role, and "
"anyone assigned to it, will no longer be able to access "
"PyFarm past the expiration.")
description = db.Column(
db.Text,
doc="Human description of the role.")
@classmethod
[docs] def create(cls, name, description=None):
"""
Creates a role by the given name or returns an existing
role if it already exists.
"""
if isinstance(name, Role):
return name
role = Role.query.filter_by(name=name).first()
if role is None:
role = cls(name=name, description=description)
db.session.add(role)
db.session.commit()
return role
[docs] def is_active(self):
if self.expiration is None:
return self.active
return self.active and datetime.utcnow() < self.expiration