# OAuth2 Implementation of Password Hashing and Bearer JWT Token Verification#
## Introduction to JWT#
JWT stands for JSON Web Tokens.
JWT is a standard for encoding a JSON object into a long string that is compact and hard to understand. The content of a JWT is as follows:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
The JWT string is not encrypted, and anyone can use it to recover the original information.
However, JWT uses a signature mechanism. When accepting a token, the signature can be used to verify the token.
A token with a validity period of one week is created using JWT. The next day, when the user accesses again with the token, they remain logged in.
The token expires after one week, at which point user authentication will fail. A new token can only be obtained by logging in again.
If a user (or a third party) tampers with the token's expiration time, authentication will fail due to a signature mismatch.
For more in-depth information about JWT tokens and how they work, please refer to the JWT official website.
## Installing python-jose#
pip install python-jose[cryptography]
Python-jose requires a compatible cryptography backend. The recommended backend is: pyca/cryptography.
Python-jose supports all features of PyJWT and also supports some additional features that may be used when integrating with other tools.
## Password Hashing#
Hashing refers to converting specific content into a garbled byte sequence (essentially a string).
Every time the exact same content is input (for example, the exact same password), the returned garbled output is always the same.
However, this garbled output cannot be converted back to the input password.
### Installing passlib#
Passlib is a Python package for handling password hashing.
It supports many secure hashing algorithms and accompanying tools.
The recommended algorithm is Bcrypt.
First, install PassLib with Bcrypt:
pip install passlib[bcrypt]
passlib
can even read passwords created by security plugins from Django, Flask, and other tools.
For example, sharing data from a Django application to a FastAPI application's database. Or using the same database, gradually migrating the application from Django to FastAPI. Users can log in from either the Django application or the FastAPI application simultaneously.
### Password Hashing and Verification#
First, import passlib:
# Import passlib for password hashing
from passlib.context import CryptContext
Create a password hashing context object:
# Create a password hashing context object; CryptContext is a tool for handling password hashing and verification
# It can accept one or more password hashing schemes as parameters
# Here, the bcrypt algorithm is used
# The deprecated parameter is set to auto, meaning that when a more secure algorithm becomes available, it will automatically use the more secure algorithm
# It has two methods, verify() and hash(), used for verifying passwords and obtaining hashed passwords, respectively
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
Next, create three utility functions:
- Hash the user's password:
# Get the hashed password
def get_password_hash(password):
# Hash the password using the bcrypt algorithm
return pwd_context.hash(password)
- Verify if the received password matches the stored hashed password:
# Verify password
def verify_password(plain_password, hashed_password):
# Verify the password using the bcrypt algorithm
return pwd_context.verify(plain_password, hashed_password)
- For authentication, use the previous two functions to verify and return the user:
# Authenticate user information using username and password
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
## Handling JWT Tokens#
Import the installed modules.
Create a random key for signing JWT tokens.
Use the following command to generate a secure random key:
openssl rand -hex 32
If you don't have openssl, you can download it from the official website.
Start the Windows OpenSSL Command Prompt.
Copy the generated random key, for example:
0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9
Then, copy the generated key to the variable SECRET_KEY. Note that you should not use the key shown in this example.
Create a variable ALGORITHM specifying the JWT token signing algorithm, with the value in this example being "HS256"
.
Create a variable to set the token expiration time.
Define a Pydantic model for the token endpoint response.
Create a utility function to generate a new access token.
First, import jwt:
from jose import JWTError, jwt
Next, declare:
- The random key SECRET_KEY, which is the one generated by OpenSSL above.
- The JWT token signing algorithm variable ALGORITHM, which in this example is 'HS256'.
- The variable ACCESS_TOKEN_EXPIRE_MINUTES for setting the token expiration time in minutes.
SECRET_KEY = "0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
Define a model class to return the Token:
class Token(BaseModel):
access_token: str
token_type: str
Import the timedelta type:
from datetime import datetime, timedelta
Create a new function to generate access tokens:
# Function to generate access tokens
# data represents the data to be stored in the token, such as username, user ID, etc.
# expires_delta refers to the expiration time, which is a timedelta type variable
# If not provided, the default expiration time is used, which is hardcoded to 15 minutes
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
# to_encode is the data to be stored in the token, making a copy to prevent modifying the original data
to_encode = data.copy()
# expires_delta is a timedelta type variable indicating the expiration time
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# If expires_delta is empty, use the default expiration time
expire = datetime.utcnow() + timedelta(minutes=15)
# Store the expiration time in the token
to_encode.update({"exp": expire})
# Use the jwt.encode() method to generate the token; parameters are user information, random key, and encryption algorithm
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
Modify the login interface to obtain the token:
@app.post("/token", response_model=Token):
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# Call the function to verify the user, passing in the username and password
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
# If the user does not exist
if not user:
raise HTTPException(
status_code=400,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# If the user exists, generate the token
# Assign the previously defined token expiration time to access_token_expires and pass it to expires_delta
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# Call the function to generate the token, passing in user information and expiration time
access_token = create_access_token(
userData={"username": user.username}, expires_delta=access_token_expires)
# Wrap the generated token into the Token model class
token = Token(access_token=access_token, token_type="bearer")
# Return the token
return token
## Updating Dependencies#
Update get_current_user
to accept the same token as before, but this time using the JWT token.
Decode and verify the received token, then return the current user.
If the token is invalid, return an HTTP error directly.
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Use the token and the defined OpenSSL-generated random key SECRET_KEY and encryption method
# Call jwt to decode the token and obtain user information; payload is of type dict
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Get the username from the token, which is the username passed when generating the token
username = payload.get("username")
# If the username does not exist
if username is None:
# Raise the previously defined exception
raise credentials_exception
except JWTError:
# If decoding fails, raise the exception
raise credentials_exception
# Pass the username to the get_user function to obtain user information
user = get_user(fake_users_db, username=username)
if user is None:
# If the user does not exist, raise the exception
raise credentials_exception
return user
In the code:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
This indicates an exception for failed verification credentials. In this code, if any exception occurs while decoding the JWT token (for example, if the token is invalid, expired, or tampered with), this exception will be raised.
It returns a status_code indicating unauthorized status 401, detail error information, and headers for the token type.
## Obtaining Information of the Currently Logged-in User#
# Get the currently logged-in user who is active and not disabled using get_current_active_user
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
return current_user
## Complete Code#
# Create a password hashing context object; CryptContext is a tool for handling password hashing and verification
# It can accept one or more password hashing schemes as parameters
# Here, the bcrypt algorithm is used
# The deprecated parameter is set to auto, meaning that when a more secure algorithm becomes available, it will automatically use the more secure algorithm
# It has two methods, verify() and hash(), used for verifying passwords and obtaining hashed passwords, respectively
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Obtain the token through the username and password in /token, returning the token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT token signing algorithm variable; here, HS256 algorithm is used, along with other algorithms like RS256, ES256, etc.
ALGORITHM = "HS256"
# Use OpenSSL to randomly generate a SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# Token expiration time in minutes
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Define a model class to return the token
class Token(BaseModel):
access_token: str
token_type: str
# Define a token data model class
class TokenData(BaseModel):
username: Union[str, None] = None
# Function to generate access tokens
# userData represents the data to be stored in the token, such as username, user ID, etc.
# expires_delta refers to the expiration time, which is a timedelta type variable
# If not provided, the default expiration time is used, which is hardcoded to 15 minutes
def create_access_token(userData: dict, expires_delta: Union[timedelta, None] = None):
# to_encode is the data to be stored in the token, making a copy to prevent modifying the original data
to_encode = userData.copy()
# expires_delta is a timedelta type variable indicating the expiration time
# Use datetime.utcnow() to get the current time + expiration time to obtain the token expiration time
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# If expires_delta is empty, use the default expiration time
expire = datetime.utcnow() + timedelta(minutes=15)
# Store the expiration time in the token
to_encode.update({"exp": expire})
# Use jwt.encode() method to generate the token; parameters are user information, random key, and encryption algorithm
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# Simulated database
fake_users_db = {
"bryce": {
"username": "bryce",
"full_name": "bryce Yu",
"email": "[email protected]",
# 123123
"hashed_password": "$2b$12$w2IXuK5lx6mMxNVBBoxRl.jHYWRblX1QjVg6UCtBZ7BzBMOH7RvkW",
"disabled": False,
},
"yu": {
"username": "yu",
"full_name": "yu",
"email": "[email protected]",
# 123456
"hashed_password": "$2b$12$BknHh482Zqkt9ODqSSbSiepUBdL1GGoeaxKCnGNXmfUbjbe7LKcxy",
"disabled": True,
},
}
# Create a User model class
class UserAuth(BaseModel):
username: str
# Initially use Union[str, None], later will change to EmailStr
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
# Create a User database model class
class UserDB(UserAuth):
hashed_password: str
# Verify password
def verify_password(plain_password, hashed_password):
# Verify the password using the bcrypt algorithm
return pwd_context.verify(plain_password, hashed_password)
# Get the hashed password
def get_password_hash(password):
# Hash the password using the bcrypt algorithm
return pwd_context.hash(password)
# Authenticate user information using username and password
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# Get user information from the db database by username
def get_user(db, username: str):
# Currently, it is unsafe to use the username as a token
if username in db:
user_dict = db[username]
return UserDB(**user_dict)
# This method will use the dependency oauth2_scheme to obtain the token and verify the token's validity
# If the token is valid, call the get_user function to obtain the current user's information and return it
# If invalid, raise an exception
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Use the token and the defined OpenSSL-generated random key SECRET_KEY and encryption method
# Call jwt to decode the token and obtain user information; payload is of type dict
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Get the username from the token, which is the username passed when generating the token
username = payload.get("username")
# If the username does not exist
if username is None:
# Raise the previously defined exception
raise credentials_exception
except JWTError:
# If decoding fails, raise the exception
raise credentials_exception
# Pass the username to the get_user function to obtain user information
user = get_user(fake_users_db, username=username)
if user is None:
# If the user does not exist, raise the exception
raise credentials_exception
return user
# Get the currently logged-in user who is active and not disabled using get_current_active_user
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
# Get the currently logged-in user current_user through get_current_user
# Then check if the user is disabled
if current_user.disabled:
raise HTTPException(status_code=400, detail="User is disabled")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# Call the function to verify the user, passing in the username and password
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
# If the user does not exist
if not user:
raise HTTPException(
status_code=400,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# If the user exists, generate the token
# Assign the previously defined token expiration time to access_token_expires and pass it to expires_delta
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# Call the function to generate the token, passing in user information and expiration time
access_token = create_access_token(
userData={"username": user.username}, expires_delta=access_token_expires)
# Wrap the generated token into the Token model class
token = Token(access_token=access_token, token_type="bearer")
# Return the token
return token
# Get the currently logged-in user who is active and not disabled using get_current_active_user
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
return current_user
# Test use, get the hash value of the password
@app.get("/password/")
async def read_password(password: str):
return get_password_hash(password)
### Process Summary#
- Login
First, call the /token method to log in the user, which obtains form data through OAuth2PasswordRequestForm.
The form data contains the username and password.
At this point, the authenticate method is called, passing in the simulated user database, username, and password.
In the authenticate method, the get_user method is used to compare the user with the same username in the simulated database.
If the user is found, the verify_password method is called, passing in the hashed password stored in the found user database and the login password.
In the verify_password method:
The verify method of the CryptContext class instance provided by passlib is called for verification, which will return the verification result. If true, the user obtained from the get_user function will be returned.
Next, set the token expiration time and call the create_access_token function, passing in user information.
In create_access_token, user information and expiration time are integrated into a dict and the jwt.encode method is called to generate the token.
Then, the generated token is returned.
- Obtain the currently logged-in user
Call the /users/me interface, and the Authorization value needs to be added to the request header, with the value being 'Bearer ' + the token returned upon login.
The interface function calls the dependency get_current_active_user to obtain the currently logged-in user who is not disabled.
get_current_active_user will obtain the current user through the get_current_user function and further check if they are disabled.
get_current_user will use the dependency oauth2_scheme to obtain the token from the request header.
And use jwt.decode to decode the token to obtain the user information stored during login, such as the username.
Using the username, the user data is obtained from the database and returned.
Note! The value of the Authorization
response header in the request starts with Bearer
.
### Interface Testing#
Form call to /token
Get request to obtain the currently logged-in user
# Connecting to MongoDB#
## Importing pymongo#
First, you need to install the library for Python to operate on MongoDB:
pip install pymongo
Then, import it in the file:
# Import MongoDB database
from pymongo import MongoClient
## Configuring MongoDB Connection#
# Create MongoDB database connection
client = MongoClient('mongodb://fastapi:fastapi@localhost:27017/fastapi')
# Create the database; all database operations are done using the db object
# For example, db.users.insert_one() indicates inserting a piece of data into the users collection
db = client['fastapi']
## Register User Using MongoDB#
Add a new user registration interface.
The code is as follows:
@app.post("/users/register")
async def register_user(user: UserInDB):
userdict = user.dict()
userdb = db.users.find_one({"username": user.username})
if userdb:
raise HTTPException(
status_code=400,
detail="User already exists",
)
hashdPassword = get_password_hash(user.password)
userdict.update({"hashed_password": hashdPassword})
# Convert user information to JSON format
json_compatible_item_data = jsonable_encoder(userdict)
# Store user information in the database
db.users.insert_one(json_compatible_item_data)
# Return user information
return user
## Query User During Login#
Modify the previous get_user function:
# Get user information from the db database by username
def get_user(username: str):
userdb = db.users.find_one({"username": username})
if userdb:
return UserDB(**userdb)
## Modify User Information#
Create a user update model, as the password can be empty since it may not be updated:
# Create a User Update model class
# The password can be empty since it may not be updated
class UserUpdate(UserAuth):
password: Union[str, None] = None
And a UserOut model to return without the password:
# User model class for returning
class UserOut(UserAuth):
pass
The model for obtaining the currently logged-in user also needs to be modified:
# Get the currently logged-in user who is active and not disabled using get_current_active_user
@app.get("/users/me", response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
return current_user
@app.put("/users/update", response_model=UserOut)
async def update_user(userInDB: UserUpdate, current_user: UserAuth = Depends(get_current_active_user)):
# Convert the currently logged-in user's information to dict
current_user_dict = current_user.dict()
# Check if the username to be updated matches the currently logged-in username
if userInDB.username != current_user_dict.get("username"):
raise HTTPException(
status_code=400,
detail="Unauthorized modification",
)
# Convert the user information to be updated to dict
userdict = userInDB.dict()
# If the password is not empty, update the password
if userInDB.password:
hashdPassword = get_password_hash(userInDB.password)
userdict.update({"hashed_password": hashdPassword})
else:
# If the password is empty, use the currently logged-in user's password
userdict.update({"hashed_password": current_user_dict.get("hashed_password")})
# Remove the password from user information; plaintext passwords cannot be stored in the database
userdict.pop("password")
# Convert user information to JSON format
json_compatible_item_data = jsonable_encoder(userdict)
# Store user information in the database
db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
# Return user information
return {"username": userInDB.username, "email": userInDB.email, "full_name": userInDB.full_name}
## Viewing MongoDB Data#
Use MongoDB Compass to view:
# Further Organization#
Use the router method to implement the interface.
The directory structure will be:
├── MyApi
│ ├── app # "app" is a Python package
│ ├── __init__.py # This file makes "app" a Python package
│ ├── main.py # "main" module, e.g., import app.main
│ ├── dependencies.py # "dependencies" module, e.g., import app.dependencies
│ └── routers # "routers" is a "Python subpackage"
│ ├── __init__.py # Makes "routers" a "Python subpackage"
│ └── users.py # "users" submodule, e.g., import app.routers.users
Note that an empty __init__.py
file should be placed in each folder to indicate to Python that this is a directory.
Each directory or subdirectory has one.
This is why code can be imported from one file to another.
For example, in app/main.py, you can directly use:
# Absolute path
from app.routers import items
# Relative path
from .routers import items
## Router Configuration#
First, create a routers folder in the app folder to store the routes.
In the folder, create a new users.py file.
First, import:
from fastapi import APIRouter
Configure the router:
# Configure the router
router = APIRouter(
# Define the prefix for accessing the route; there should be no slash after the prefix
# By setting the prefix here, you don't need to write the prefix in every path operation
prefix="/users",
# To display custom tags in the documentation, we can use the tags parameter
tags=["users"],
# To display custom responses in the documentation, we can use the responses parameter to define return values
responses={404: {"description": "Not found"}},
)
Then, you can move all user-related routes here and change app to router:
@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# Call the function to verify the user, passing in the username and password
user = authenticate_user(form_data.username, form_data.password)
# If the user does not exist
if not user:
raise HTTPException(
status_code=400,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# If the user exists, generate the token
# Assign the previously defined token expiration time to access_token_expires and pass it to expires_delta
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# Call the function to generate the token, passing in user information and expiration time
access_token = create_access_token(
userData={"username": user.username}, expires_delta=access_token_expires)
# Wrap the generated token into the Token model class
token = Token(access_token=access_token, token_type="bearer")
# Return the token
return token
# Get the currently logged-in user who is active and not disabled using get_current_active_user
@router.get("/me", response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
return current_user
# Test use, get the hash value of the password
@router.get("/password/")
async def read_password(password: str):
return get_password_hash(password)
@router.post("/register", response_model=UserOut)
async def register_user(user: UserInDB):
userdict = user.dict()
userdb = db.users.find_one({"username": user.username})
if userdb:
raise HTTPException(
status_code=400,
detail="User already exists",
)
hashdPassword = get_password_hash(user.password)
userdict.update({"hashed_password": hashdPassword})
# Remove the password from user information; plaintext passwords cannot be stored in the database
userdict.pop("password")
# Convert user information to JSON format
json_compatible_item_data = jsonable_encoder(userdict)
# Store user information in the database
db.users.insert_one(json_compatible_item_data)
# Return user information
return user
@router.put("/update", response_model=UserOut)
async def update_user(userInDB: UserUpdate, current_user: UserAuth = Depends(get_current_active_user)):
# Convert the currently logged-in user's information to dict
current_user_dict = current_user.dict()
# Check if the username to be updated matches the currently logged-in username
if userInDB.username != current_user_dict.get("username"):
raise HTTPException(
status_code=400,
detail="Unauthorized modification",
)
# Convert the user information to be updated to dict
userdict = userInDB.dict()
# If the password is not empty, update the password
if userInDB.password:
hashdPassword = get_password_hash(userInDB.password)
userdict.update({"hashed_password": hashdPassword})
else:
# If the password is empty, use the currently logged-in user's password
userdict.update({"hashed_password": current_user_dict.get("hashed_password")})
# Remove the password from user information; plaintext passwords cannot be stored in the database
userdict.pop("password")
# Convert user information to JSON format
json_compatible_item_data = jsonable_encoder(userdict)
# Store user information in the database
db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
# Return user information
return {"username": userInDB.username, "email": userInDB.email, "full_name": userInDB.full_name}
At this point, many errors will occur because the previously imported libraries no longer exist.
## Dependencies#
When there are many places that require the same dependencies, you can store them all in dependencies for unified management.
In the /app directory, create a new dependencies.py file.
You can place all the imports and custom variables from users in dependencies.py.
For example:
# Path: dependencies.py
# This file is used to handle dependencies,
# For example, if some dependencies are used in several places in the application,
# You can place these dependencies in this file
from fastapi import Depends, HTTPException, status
# Import OAuth2PasswordBearer class to declare oauth2_scheme variable for authentication
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# Import Union
from typing import Union
# Import BaseModel for parsing request bodies
from pydantic import BaseModel, HttpUrl, EmailStr
# Import time module
from datetime import datetime, timedelta
# Import JSONCompatibleEncoder to convert models to JSON
from fastapi.encoders import jsonable_encoder
# Import passlib for password hashing
from passlib.context import CryptContext
# Import JWT for generating tokens
from jose import JWTError, jwt
# Import MongoDB database
from pymongo import MongoClient
# Create MongoDB database connection
client = MongoClient('mongodb://fastapi:[email protected]:27017/fastapi')
# Create the database; all database operations are done using the db object
# For example, db.users.insert_one() indicates inserting a piece of data into the users collection
db = client['fastapi']
# Create a password hashing context object; CryptContext is a tool for handling password hashing and verification
# It can accept one or more password hashing schemes as parameters
# Here, the bcrypt algorithm is used
# The deprecated parameter is set to auto, meaning that when a more secure algorithm becomes available, it will automatically use the more secure algorithm
# It has two methods, verify() and hash(), used for verifying passwords and obtaining hashed passwords, respectively
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Obtain the token through the username and password in /token, returning the token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT token signing algorithm variable; here, HS256 algorithm is used, along with other algorithms like RS256, ES256, etc.
ALGORITHM = "HS256"
# Use OpenSSL to randomly generate a SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# Token expiration time in minutes
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Define a model class to return the token
class Token(BaseModel):
access_token: str
token_type: str
# Define a token data model class
class TokenData(BaseModel):
username: Union[str, None] = None
# Create a User model class
class UserAuth(BaseModel):
username: str
# Initially use Union[str, None], later will change to EmailStr
email: Union[EmailStr, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = False
# Create a User database model class
class UserDB(UserAuth):
hashed_password: str
class UserInDB(UserAuth):
password: str
# Create a User Update model class
# The password can be empty since it may not be updated
class UserUpdate(UserAuth):
password: Union[str, None] = None
# User model class for returning
class UserOut(UserAuth):
pass
# Function to generate access tokens
# userData represents the data to be stored in the token, such as username, user ID, etc.
# expires_delta refers to the expiration time, which is a timedelta type variable
# If not provided, the default expiration time is used, which is hardcoded to 15 minutes
def create_access_token(userData: dict, expires_delta: Union[timedelta, None] = None):
# to_encode is the data to be stored in the token, making a copy to prevent modifying the original data
to_encode = userData.copy()
# expires_delta is a timedelta type variable indicating the expiration time
# Use datetime.utcnow() to get the current time + expiration time to obtain the token expiration time
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# If expires_delta is empty, use the default expiration time
expire = datetime.utcnow() + timedelta(minutes=15)
# Store the expiration time in the token
to_encode.update({"exp": expire})
# Use jwt.encode() method to generate the token; parameters are user information, random key, and encryption algorithm
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# Verify password
def verify_password(plain_password, hashed_password):
# Verify the password using the bcrypt algorithm
return pwd_context.verify(plain_password, hashed_password)
# Get the hashed password
def get_password_hash(password):
# Hash the password using the bcrypt algorithm
return pwd_context.hash(password)
# Authenticate user information using username and password
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# Get user information from the db database by username
def get_user(username: str):
userdb = db.users.find_one({"username": username})
if userdb:
return UserDB(**userdb)
# This method will use the dependency oauth2_scheme to obtain the token and verify the token's validity
# If the token is valid, call the get_user function to obtain the current user's information and return it
# If invalid, raise an exception
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Use the token and the defined OpenSSL-generated random key SECRET_KEY and encryption method
# Call jwt to decode the token and obtain user information; payload is of type dict
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Get the username from the token, which is the username passed when generating the token
username = payload.get("username")
# If the username does not exist
if username is None:
# Raise the previously defined exception
raise credentials_exception
except JWTError:
# If decoding fails, raise the exception
raise credentials_exception
# Pass the username to the get_user function to obtain user information
user = get_user(username=username)
if user is None:
# If the user does not exist, raise the exception
raise credentials_exception
return user
# Get the currently logged-in user who is active and not disabled using get_current_active_user
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
# Get the currently logged-in user current_user through get_current_user
# Then check if the user is disabled
if current_user.disabled:
raise HTTPException(status_code=400, detail="User is disabled")
return current_user
Then, in /routers/users.py, import dependencies:
from fastapi import APIRouter, Depends
# Import dependencies from dependencies.py
from dependencies import *
## main.py#
At this point, create a new main.py file in the /app directory:
from fastapi import FastAPI
# Import CORS middleware for cross-origin requests
from fastapi.middleware.cors import CORSMiddleware
# Import users from routers
from .routers import users
app = FastAPI()
# CORS settings
app.add_middleware(
CORSMiddleware,
# Allow all domain names to access
allow_origins=["*"],
# Allow all requests to carry cookies
allow_credentials=True,
# Allow all request methods
allow_methods=["*"],
# Allow all request headers
allow_headers=["*"],
)
# Include the users routes from the router
app.include_router(users.router)
## Starting the Project#
Navigate to the MyApi directory and execute:
uvicorn app.main:app --reload
This will start the program.
At this point, you can also see that the Tags are effective in the OpenAPI documentation.