| @@ -11,17 +11,20 @@ def get_user(username): | |||
| :param username: The username | |||
| :type username: str | |||
| :return: user | |||
| :return: A dictionary of user attributes | |||
| """ | |||
| db.connect() | |||
| cursor = db.cursor() | |||
| query = ("SELECT userid, username, email, password, temporary_password, login_attempts, last_login_attempt from users where username = %s") | |||
| user = None | |||
| fields = [ | |||
| 'userid', 'username', 'email', 'password', 'temporary_password', | |||
| 'login_attempts', 'last_login_attempt', 'authenticator_secret', | |||
| ] | |||
| query = ("SELECT {} from users where username = %s".format(', '.join(fields))) | |||
| try: | |||
| cursor.execute(query, (username,)) | |||
| users = cursor.fetchall() | |||
| if len(users): | |||
| user = users[0] | |||
| return {fields[i]: users[0][i] for i in range(len(fields))} | |||
| except mysql.connector.Error as err: | |||
| logger.error("Failed executing query: %s", err) | |||
| cursor.fetchall() | |||
| @@ -29,7 +32,8 @@ def get_user(username): | |||
| finally: | |||
| cursor.close() | |||
| db.close() | |||
| return user | |||
| return None | |||
| def get_users(): | |||
| @@ -1,12 +1,11 @@ | |||
| import web | |||
| from views.forms import login_form | |||
| from views.utils import get_nav_bar, csrf_protected, get_render | |||
| from views.utils import get_nav_bar, csrf_protected, get_render, check_password, get_totp_token | |||
| import models.session | |||
| import models.user | |||
| import logging | |||
| import random | |||
| import string | |||
| import bcrypt | |||
| import time | |||
| logger = logging.getLogger(__name__) | |||
| @@ -55,32 +54,30 @@ class Login(): | |||
| if user is None: | |||
| return render.login(nav, login_form, "- User authentication failed") | |||
| userid, username, _, password_hash, temporary_password, login_attempts, last_login_attempt = user | |||
| if login_attempts > login_attempts_threshold and last_login_attempt + login_timeout > time.time(): | |||
| if user["login_attempts"] > login_attempts_threshold and user["last_login_attempt"] + login_timeout > time.time(): | |||
| return render.login(nav, login_form, "- There have been too many incorrect login attempts for your account. You have to wait a minute before you can log in.") | |||
| if bcrypt.checkpw(data.password.encode('UTF-8'), password_hash.encode('UTF-8')): | |||
| if login_attempts > login_attempts_threshold: | |||
| logger.info("User %s logged in succesfully after %s attempts", username, login_attempts) | |||
| if check_password(data.password, user["password"]): | |||
| if user["login_attempts"] > login_attempts_threshold: | |||
| logger.info("User %s logged in succesfully after %s attempts", user["username"], user["login_attempts"]) | |||
| if not models.user.is_verified(userid): | |||
| if not models.user.is_verified(user["userid"]): | |||
| return render.login(nav, login_form, "- User not authenticated yet. Please check you email.") | |||
| if temporary_password: | |||
| if user["temporary_password"]: | |||
| logger.info("A password reset was requested for user %s, but they logged in with the old password before resetting") | |||
| models.user.set_temporary_password(userid, "") | |||
| models.user.set_temporary_password(user["userid"], "") | |||
| models.user.set_login_attempts(userid, 0, time.time()) | |||
| self.login(username, userid, data.remember) | |||
| models.user.set_login_attempts(user["userid"], 0, time.time()) | |||
| self.login(user["username"], user["userid"], data.remember) | |||
| raise web.seeother("/") | |||
| elif temporary_password and bcrypt.checkpw(data.password.encode('UTF-8'), temporary_password.encode('UTF-8')): | |||
| session.temporary_userid = userid | |||
| elif check_password(data.password, user["temporary_password"]): | |||
| session.temporary_userid = user["userid"] | |||
| raise web.seeother("/reset") | |||
| else: | |||
| logger.warning("Incorrect login attempt on user %s by IP %s", username, web.ctx.ip) | |||
| models.user.set_login_attempts(userid, login_attempts+1, time.time()) | |||
| if login_attempts == login_attempts_threshold: | |||
| logger.warning("Incorrect login attempt on user %s by IP %s", user["username"], web.ctx.ip) | |||
| models.user.set_login_attempts(user["userid"], user["login_attempts"]+1, time.time()) | |||
| if user["login_attempts"] == login_attempts_threshold: | |||
| return render.login(nav, login_form, "- Too many incorrect login attempts. You have to wait a minute before trying again.") | |||
| else: | |||
| return render.login(nav, login_form, "- User authentication failed") | |||
| @@ -1,11 +1,11 @@ | |||
| import web | |||
| from views.forms import register_form | |||
| from views.utils import get_nav_bar, csrf_protected, password_weakness, get_render, sendmail | |||
| from views.utils import (get_nav_bar, csrf_protected, password_weakness, get_render, | |||
| sendmail, hash_password) | |||
| from uuid import uuid4 | |||
| import models.register | |||
| import models.user | |||
| import logging | |||
| import bcrypt | |||
| import re | |||
| logger = logging.getLogger(__name__) | |||
| @@ -48,7 +48,7 @@ class Register: | |||
| if weakness is not None: | |||
| return render.register(nav, register, weakness) | |||
| password_hash = bcrypt.hashpw(data.password.encode('UTF-8'), bcrypt.gensalt()) | |||
| password_hash = hash_password(data.password) | |||
| # Create a verify token | |||
| while True: | |||
| @@ -1,10 +1,10 @@ | |||
| import web | |||
| from uuid import uuid4 | |||
| from views.forms import reset_form, request_reset_form | |||
| from views.utils import get_nav_bar, csrf_protected, get_render, password_weakness, sendmail | |||
| from views.utils import (get_nav_bar, csrf_protected, get_render, password_weakness, | |||
| sendmail, hash_password, check_password) | |||
| import models.user | |||
| import logging | |||
| import bcrypt | |||
| logger = logging.getLogger(__name__) | |||
| @@ -24,10 +24,10 @@ class RequestReset: | |||
| render = get_render() | |||
| user = models.user.get_user(data.username) | |||
| if user and user[2] == data.email: | |||
| if user and user["email"] == data.email: | |||
| password = uuid4().hex | |||
| password_hash = bcrypt.hashpw(password.encode('UTF-8'), bcrypt.gensalt()) | |||
| models.user.set_temporary_password(user[0], password_hash) | |||
| password_hash = hash_password(password) | |||
| models.user.set_temporary_password(user["userid"], password_hash) | |||
| sendmail( | |||
| 'Reset your Beelance password', | |||
| @@ -72,10 +72,9 @@ class Reset: | |||
| userid = session.temporary_userid | |||
| username = models.user.get_user_name_by_id(userid) | |||
| user = models.user.get_user(username) | |||
| temporary_password = user[4] | |||
| # Check that the temporary password is correct | |||
| if not bcrypt.checkpw(data.temporary.encode('UTF-8'), temporary_password.encode('UTF-8')): | |||
| if not check_password(data.temporary, user["temporary_password"]): | |||
| return render.reset(nav, reset_form, "Incorrect temporary password") | |||
| # Check that the passwords match | |||
| @@ -88,7 +87,7 @@ class Reset: | |||
| return render.reset(nav, reset_form, weakness) | |||
| # Set the new password and log the user in | |||
| password_hash = bcrypt.hashpw(data.password.encode('UTF-8'), bcrypt.gensalt()) | |||
| password_hash = hash_password(data.password) | |||
| models.user.set_password(userid, password_hash) | |||
| models.user.set_temporary_password(userid, "") | |||
| @@ -1,5 +1,6 @@ | |||
| import web | |||
| import os | |||
| import bcrypt | |||
| import logging | |||
| import smtplib | |||
| from email.message import EmailMessage | |||
| @@ -178,3 +179,24 @@ def password_weakness(password, username): | |||
| return "The password is too common. Choose something more unique." | |||
| return None | |||
| def hash_password(password): | |||
| """ | |||
| Create a hash of the given password, with a random salt. | |||
| :param password: The password to hash | |||
| :return: The generated hash | |||
| """ | |||
| return bcrypt.hashpw(password.encode('UTF-8'), bcrypt.gensalt()) | |||
| def check_password(password, password_hash): | |||
| """ | |||
| Check if the entered password matches the hashed password. | |||
| :param password: The password to check | |||
| :param password_hash: The password hash to check against | |||
| :return: True if the password matches, or False if it doesn't | |||
| """ | |||
| return password and password_hash and bcrypt.checkpw(password.encode('UTF-8'), password_hash.encode('UTF-8')) | |||