First we will have to install the python-jose library
pip install "python-jose[cryptography]"
Next we will create a file called oauth2.py in the app folder
In said file we will import the following:
from jose import JWTError, jwt
We will need to provide 3 things
The Secret Key
The Algorithm that we want to use
The Expiration time of the token - if we do not provide this, the user is logged forever
from jose import JWTError, jwt
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
Creating the JWT function and returning the token
from jose import JWTError, jwt
from datetime import date, datetime, timedelta
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict):
# We do not want to modify the actual data, so we create a copy
to_encode = data.copy()
# We use the datetime.now() so we can update the time to expiry
expire = datetime.now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# to_encode is a dirctionary so we are updating this with the expiration time
to_encode.update({"exp": expire})
# Now as we have the data, we can return the JWT by using the .encode method
# and providing the data, the secret key, and the algorithm in the method
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
Now that we can create the token, we will update this in the auth.py file
from os import access
from fastapi import APIRouter, Depends, status, HTTPException, Response
from sqlalchemy.orm import Session
from .. import database, schemas, models, utils, oauth2
router = APIRouter(
tags=['Authentification']
)
@router.post('/login')
def login(user_credentials: schemas.UserLogin, db: Session = Depends(database.get_db)):
user = db.query(models.User).filter(models.User.email == user_credentials.email).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Invalid Credentials")
if not utils.verify(user_credentials.password, user.password):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Invalid Credentials")
# REMEMBER:
# This is the data that we decide to put in the payload
# In this case we only care about the user_id
access_token = oauth2.create_access_token(data= {"user_id": user.id})
return {"access_token" : access_token,
"token_type": "bearer"
}
If we use postman to see how the login works we should get the following data back: