1

I am creating a basic Authentication register/login with FastAPI. However, after the user has succesfully registered and logged in, the token does not get recognized. It works fine using the "/docs" through Swagger UI, but not from the main app.

Here is my code: main.py

import uvicorn
from fastapi import Depends, HTTPException
from auth import AuthHandler
from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")

auth_handler = AuthHandler()
users = []


@app.get('/', response_class=HTMLResponse)
def get_register_form(request: Request):
    return templates.TemplateResponse("register.html", {"request": request})


@app.post('/', response_class=HTMLResponse)
def register(request: Request, username: str = Form(...), password: str = Form(...)):
    if len(users) != 0:
        for x in users:
            if x['username'] == username:
                print('Username is taken!')
                raise HTTPException(status_code=400, detail='Username is taken!')
    hashed_password = auth_handler.get_password_hash(password)
    users.append({
        'username': username,
        'password': hashed_password
    })
    print('User:', username, 'registered!')
    return templates.TemplateResponse("success.html", {"request": request})


@app.get('/login', response_class=HTMLResponse)
def get_login_form(request: Request):
    return templates.TemplateResponse("login.html", {"request": request})


@app.post('/login')
def login(request: Request, username: str = Form(...), password: str = Form(...)):
    user = None
    for x in users:
        if x['username'] == username:
            user = x
            break
    if (user is None) or (not auth_handler.verify_password(password, user['password'])):
        print('Invalid username and/or password!')
        raise HTTPException(status_code=401, detail='Invalid username and/or password!')
    token = auth_handler.encode_token(user['username'])
    return {'token': token}


@app.get('/protected')
def protected(username=Depends(auth_handler.auth_wrapper)):
    return {'name': username}


if __name__ == '__main__':
    uvicorn.run(app)

Here is my code: auth.py

import jwt
from fastapi import HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from passlib.context import CryptContext
from datetime import datetime, timedelta


class AuthHandler():
    security = HTTPBearer()
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    secret = 'SECRET'

    def get_password_hash(self, password):
        return self.pwd_context.hash(password)

    def verify_password(self, plain_password, hashed_password):
        return self.pwd_context.verify(plain_password, hashed_password)

    def encode_token(self, user_id):
        payload = {
            'exp': datetime.utcnow() + timedelta(days=0, minutes=5),
            'iat': datetime.utcnow(),
            'sub': user_id
        }
        return jwt.encode(
            payload,
            self.secret,
            algorithm='HS256'
        )

    def decode_token(self, token):
        try:
            payload = jwt.decode(token, self.secret, algorithms=['HS256'])
            return payload['sub']
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail='Signature has expired')
        except jwt.InvalidTokenError as e:
            raise HTTPException(status_code=401, detail='Invalid token')

    def auth_wrapper(self, auth: HTTPAuthorizationCredentials = Security(security)):
        return self.decode_token(auth.credentials)

Here is my forms.html: register.html and login.html are the same.

<!doctype html>
<html lang="en">
<head>
    <link rel="stylesheet" href="../static/styles.css">
    <title>Document</title>
</head>
<body>
    <div id="form">
        <form method="post">
            <h3>Login</h3>
            <label for="username">Username:</label><br>
            <input type="text" name="username" id="username"><br>
            <label for="password">Password:</label><br>
            <input type="text" name="password" id="password"><br><br>
            <input type="submit" value="Submit" id="sub">
        </form>
    </div>
</body>
</html>

The error I get when going to 127.0.0.1/protected is:

{"detail":"Not authenticated"}

How can I fix this, so that it recognizes the token from the user just like in docs?

snakecharmerb
  • 47,570
  • 11
  • 100
  • 153
Pro Girl
  • 762
  • 7
  • 21
  • 2
    You need to actually _include_ the token. The documentation is written as the request is coming from a dynamic frontend application written in Javascript, where you include the token in an `Authorization` header. You'll have to use cookies if you want to have submit regular HTML forms with authentication details automagically included. – MatsLindh Sep 06 '22 at 12:12
  • @MatsLindh thank you, do you know a simple way with FastApi to do that? – Pro Girl Sep 06 '22 at 12:27
  • 2
    It might be easier to use `fastapi-login` or a similar package if you don't want to handle it yourself: https://fastapi-login.readthedocs.io/advanced_usage/ - otherwise you can return a redirect response with a cookie set instead of the token as JSON, and then use that cookie in your authentication validation function. – MatsLindh Sep 06 '22 at 12:59
  • 1
    To implement the approach described by @MatsLindh above, have a look at the `/submit` endpoint of _appB.py_ code snippet (Solution 1) of [this answer](https://stackoverflow.com/a/73599289/17865804). – Chris Sep 06 '22 at 14:23

1 Answers1

0

I found a solution on the advise from @MatsLindh, I also simplified a lot the code, imports, etc. You will obviously still need to have the html files with the form and fields you'll need. In my case I just added the email and password.

Please note that this is not "best practice" especially inserting in the same table of the database the password even it it's hashed.

import uvicorn
import sqlite3
import jwt
from datetime import datetime, timedelta
from fastapi import FastAPI, Form, HTTPException, Cookie, Request, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from passlib.context import CryptContext


connection = sqlite3.connect("users.db", check_same_thread=False)
cursor = connection.cursor()

app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")


class AuthHandler:
    secret = 'SECRET'  # I can put any key I want, it's going to be used to encrypt and decrypt
    bcrypt_obj = CryptContext(schemes=["bcrypt"], deprecated="auto")

    def get_password_hash(self, password):
        return self.bcrypt_obj.hash(password)

    def verify_password(self, plain_password, hashed_password):
        return self.bcrypt_obj.verify(plain_password, hashed_password)

    def encode_token(self, user_id):
        payload = {
            'exp': datetime.utcnow() + timedelta(days=0, minutes=5),
            'iat': datetime.utcnow(),
            'sub': user_id
        }
        return jwt.encode(
            payload,
            self.secret,
            algorithm='HS256'
        )

    def decode_token(self, token):
        try:
            payload = jwt.decode(token, self.secret, algorithms=['HS256'])
            return payload['sub']
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail='Signature has expired')
        except jwt.InvalidTokenError as e:
            raise HTTPException(status_code=401, detail='Invalid token')

    def grant_access(self, token, users_db):
        for x in users_db:
            if x['email'] == self.decode_token(token):
                return True
            else:
                return False


auth_handler = AuthHandler()


@app.get('/', response_class=HTMLResponse)
def get_register_form(request: Request):
    return templates.TemplateResponse("register.html", {"request": request})


@app.post('/', response_class=HTMLResponse)
def register(email: str = Form(...), password: str = Form(...)):

    cursor.execute("select * from users where email=:e", {"e": email})
    user_with_same_email_list = cursor.fetchall()
    if len(user_with_same_email_list) != 0:
        print(user_with_same_email_list)
        print('Email already registered!')
        raise HTTPException(status_code=400, detail='Email already registered!')
    else:
        hashed_password = auth_handler.get_password_hash(password)
        sqlite_insert_query = "INSERT INTO users (user_id, username, email, hashed_password, eoa) VALUES " \
                              "(1,'','" + email + "','" + hashed_password + "','')"
        cursor.execute(sqlite_insert_query)
        connection.commit()
        print('User with email:', email, 'registered!')
        response = RedirectResponse(url="/login")
        response.status_code = 302
        return response


@app.get('/login', response_class=HTMLResponse)
def get_login_form(request: Request):
    return templates.TemplateResponse("login.html", {"request": request})


@app.post('/login')
def login(email: str = Form(...), password: str = Form(...)):
    cursor.execute("select * from users where email=:e", {"e": email})
    user_with_same_email_list = cursor.fetchall()
    if len(user_with_same_email_list) == 0:
        print('No user with this email!')
        raise HTTPException(status_code=400, detail='No user with this email!')
    elif (email is None) or (not auth_handler.verify_password(password, user_with_same_email_list[0][3])):
        print('Invalid email and/or password!')
        raise HTTPException(status_code=401, detail='Invalid email and/or password!')
    else:
        token = auth_handler.encode_token(email)
        response = RedirectResponse(url="/check_cookie")
        response.status_code = 302
        response.set_cookie(key="Authorization", value=token, secure=True, httponly=True)
        return response


@app.get("/check_cookie")
async def check_cookie(Authorization: str | None = Cookie(None)):
    if Authorization:
        email = auth_handler.decode_token(Authorization)
        cursor.execute("select * from users where email=:e", {"e": email})
        user_with_same_email_list = cursor.fetchall()
        if len(user_with_same_email_list) == 0:
            print('Invalid token')
            raise HTTPException(status_code=401, detail='Invalid token')
        else:
            print('Access granted!')
            response = RedirectResponse(url="/protected")
            response.status_code = 302
            return response
    else:
        print("No token found")
        raise HTTPException(status_code=401, detail='No token found')


@app.get('/protected', response_class=HTMLResponse)
async def protected(request: Request, email=Depends(check_cookie)):
    return templates.TemplateResponse("logged_in.html", {"request": request})


@app.get("/logged_out", response_class=HTMLResponse)
async def logged_out(request: Request):
    return templates.TemplateResponse("logged_out.html", {"request": request})


@app.get("/logout")
async def route_logout_and_remove_cookie():
    response = RedirectResponse(url="/logged_out")
    response.delete_cookie("Authorization", domain="127.0.0.1")
    return response


if __name__ == '__main__':
    uvicorn.run(app)
Pro Girl
  • 762
  • 7
  • 21
  • 1
    You should set the `HttpOnly` flag when creating the cookie, as shown in **appB.py** (Solution 1) of [this answer](https://stackoverflow.com/a/73599289/17865804). A cookie with the `HttpOnly` attribute is inaccessible to the JavaScript `Document.cookie` API; it's **only sent to the server**. This precaution [helps mitigate cross-site scripting (XSS) attacks](https://developer.mozilla.org/en-US/docs/Web/HTTP/Cookies#restrict_access_to_cookies). – Chris Sep 08 '22 at 16:48
  • Thanks @Chris, I edited the answer with: secure=True, httponly=True in the cookie creation. – Pro Girl Sep 09 '22 at 13:52