banner
小鱼

小鱼's Blog

FastAPI 與 mongoDB 實現用戶註冊登錄

# OAuth2 實現密碼哈希與 Bearer JWT 令牌驗證#

## JWT 簡介#

JWT 即JSON 網路令牌(JSON Web Tokens)。

JWT 是一種將 JSON 對象編碼為沒有空格,且難以理解的長字符串的標準。JWT 的內容如下所示:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c


image

JWT 字符串沒有加密,任何人都能用它恢復原始信息。

但 JWT 使用了簽名機制。接受令牌時,可以用簽名校驗令牌。

使用 JWT 創建有效期為一周的令牌。第二天,使用者持令牌再次訪問時,仍為登錄狀態。

令牌於一周後過期,屆時,使用者身份驗證就會失敗。只有再次登錄,才能獲得新的令牌。

如果使用者(或第三方)篡改令牌的過期時間,因為簽名不匹配會導致身份驗證失敗。

如需深入了解 JWT 令牌,了解它的工作方式,請參閱 JWT 官網

## 安裝 python-jose#

pip install python-jose[cryptography]

Python-jose 需要安裝配套的加密後端。推薦的後端是:pyca/cryptography

Python-jose 支持 PyJWT 的所有功能,還支持與其它工具集成時可能會用到的一些其它功能。

## 密碼哈希化#

哈希 是指把特定內容轉換為亂碼形式的字節序列(其實就是字符串)。

每次傳入完全相同的內容時(比如,完全相同的密碼),返回的都是完全相同的亂碼。

但這個亂碼無法轉換回傳入的密碼。

### 安裝 passlib#

Passlib 是處理密碼哈希的 Python 包。

它支持很多安全哈希算法及配套工具。

推薦的算法是 Bcrypt

先安裝附帶 Bcrypt 的 PassLib

pip install passlib[bcrypt]

passlib 甚至可以讀取 Django、Flask 的安全插件等工具創建的密碼。

例如,把 Django 應用的數據共享給 FastAPI 應用的數據庫。或利用同一個數據庫,可以逐步把應用從 Django 遷移到 FastAPI。使用者可以同時從 Django 應用或 FastAPI 應用登錄。

### 密碼哈希與校驗#

首先引入 passlib

# 引入 passlib 用於密碼哈希化
from passlib.context import CryptContext

創建一個哈希密碼上下文對象

# 創建一個密碼哈希化的上下文對象,CryptContext 是用於處理密碼哈希和驗證的工具
# 它能接受一個或者多個密碼哈希方案作為參數
# 這裡使用 bcrypt 算法
# 並且 deprecated 參數設置為 auto,表示當有更安全的算法出現時,會自動使用更安全的算法
# 它有 verify() 和 hash() 兩個方法,分別用於校驗密碼和獲取 hash 後的密碼
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

接著創建三個工具函數

1、將使用者的密碼哈希化

# 獲取 hash 後的密碼
def get_password_hash(password):
    # 使用 bcrypt 算法對密碼進行哈希
    return pwd_context.hash(password)

2、校驗接收的密碼和存儲的哈希密碼是否一致

# 校驗密碼
def verify_password(plain_password, hashed_password):
    # 使用 bcrypt 算法校驗密碼
    return pwd_context.verify(plain_password, hashed_password)

3、用於身份驗證,使用前面兩個函數,來校驗,並返回使用者

# 通過用戶名密碼獲取用戶信息進行校驗
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

## 處理 JWT 令牌#

導入已安裝的模塊。

創建用於 JWT 令牌簽名的隨機密鑰。

使用以下命令,生成安全的隨機密鑰:

openssl rand -hex 32

如果沒有 openssl 可以到 [官網](Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions (slproweb.com)) 下載

啟動 window OpenSSL Command Prompt


image

複製生成的隨機密鑰 例如

0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9

然後,把生成的密鑰複製到變量SECRET_KEY,注意,不要使用本例所示的密鑰。

創建指定 JWT 令牌簽名算法的變量 ALGORITHM,本例中的值為 "HS256"

創建設置令牌過期時間的變量。

定義令牌端點響應的 Pydantic 模型。

創建生成新的訪問令牌的工具函數。

首先引入 jwt

from jose import JWTError,jwt

接著聲明
1、隨機密鑰 SECRET_KEY 也就是上面 openSSL 生成的
2、JWT 令牌簽名算法的變量 ALGORITHM ,本例中使用的是 'HS256'
3、設置令牌過期時間的變量 ACCESS_TOKEN_EXPIRE_MINUTES 分鐘數

SECRET_KEY = "0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9" 
ALGORITHM = "HS256" 
ACCESS_TOKEN_EXPIRE_MINUTES = 30

定義一個返回 Token 的模型類

class Token(BaseModel):
    access_token: str
    token_type: str

引入 timedelta 類型

from datetime import datetime, timedelta

創建一個新的 生成訪問令牌 的函數

# 生成訪問令牌的函數
# data 表示的是要存進 token 中的數據,比如用戶名、用戶 ID 等
# expires_delta 指的是過期時間,它是一個 timedelta 類型的變量
# 如果沒有傳的話,就使用默認的過期時間,這裡寫死為 15 分鐘
def create_access_token(data: dict, expires_delta: Union[timedelta,None] = None):
    # to_encode 是要存進 token 中的數據,將其複製一份,防止修改原數據
    to_encode = data.copy()

    # expires_delta 是 timedelta 類型的變量,指的是過期的時間
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # 如果 expires_delta 為空,就使用默認的過期時間
        expire = datetime.utcnow() + timedelta(minutes=15)

    # 將過期時間存進 token 中
    to_encode.update({"exp": expire})
    # 使用 jwt.encode() 方法生成 token 入參是 用戶信息 , 隨機的密鑰 , 加密算法
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

修改 登錄獲取 token 的接口

@app.post("/token", response_model=Token):
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # 調用校驗用戶的函數,傳入用戶名和密碼
    user = authenticate_user(fake_users_db,form_data.username,form_data.password)
    # 如果用戶不存在
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="用戶名或密碼錯誤",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # 如果用戶存在,就生成 token
    # 將之前定義好的 token 過期時間 賦值給 access_token_expires 傳到 expires_delta 中
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 調用生成 token 的函數,傳入用戶信息和過期時間
    access_token = create_access_token(
        userData = {"username": user.username}, expires_delta=access_token_expires)
    
    # 將生成 token 封裝成 Token 模型類
    token = Token(access_token=access_token, token_type="bearer")

    # 將 token 返回
    return token

## 更新依賴項#

更新 get_current_user 以接收與之前相同的令牌,但這裡用的是 JWT 令牌。

解碼並校驗接收到的令牌,然後,返回當前用戶。

如果令牌無效,則直接返回 HTTP 錯誤。

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="無效的認證憑證",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 使用 token 和定義好的 OpenSSL生成的隨機密鑰 SECRET_KEY 以及加密方式 
        # 調用 jwt 來解碼 token,獲取用戶的信息,payload 的類型是 dict
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 從 token 中獲取用戶名 ,也就是之前生成 token 的時候傳入的 username
        username = payload.get("username")
        # 如果用戶名不存在
        if username is None:
            # 抛出前面定義的異常
            raise credentials_exception
    except JWTError:
        # 如果解碼失敗,抛出異常
        raise credentials_exception
    # 將用戶名傳給 get_user 函數,獲取用戶信息
    user = get_user(fake_users_db, username=username)
    if user is None:
        # 如果用戶不存在,抛出異常
        raise credentials_exception
    return user

代碼中的

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="無效的認證憑證",
        headers={"WWW-Authenticate": "Bearer"},
    )

用於表示驗證憑據失敗的異常情況。也就是在這段代碼中,如果在解碼 jwt 令牌時發生任何異常
(比如令牌無效、過期、被篡改等)都會引發這個異常

它返回了 status_code 表示未授權的狀態 401 、 detail 錯誤的信息 、 headers 令牌的類型

## 獲得當前登錄用戶的信息#

# 通過 get_current_active_user 獲取當前登錄並且沒有被禁用的用戶 current_user
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

## 完整代碼#

# 創建一個密碼哈希化的上下文對象,CryptContext 是用於處理密碼哈希和驗證的工具
# 它能接受一個或者多個密碼哈希方案作為參數
# 這裡使用 bcrypt 算法
# 並且 deprecated 參數設置為 auto,表示當有更安全的算法出現時,會自動使用更安全的算法
# 它有 verify() 和 hash() 兩個方法,分別用於校驗密碼和獲取 hash 後的密碼
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 通過 /token 中的 username 和 password 獲取 token, 返回 token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT 令牌簽名算法的變量,這裡使用 HS256 算法,還有其他算法,比如 RS256、ES256 等
ALGORITHM = "HS256"
# 使用 openSSL 隨機生成一個 SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# token的過期時間,單位為分鐘
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 定義一個返回 token 模型類
class Token(BaseModel):
    access_token: str
    token_type: str

# 定義一個 token 數據模型類
class TokenData(BaseModel):
    username: Union[str, None] = None

# 生成訪問令牌的函數
# userData 表示的是要存進 token 中的數據,比如用戶名、用戶 ID 等
# expires_delta 指的是過期時間,它是一個 timedelta 類型的變量
# 如果沒有傳的話,就使用默認的過期時間,這裡寫死為 15 分鐘
def create_access_token(userData: dict, expires_delta: Union[timedelta,None] = None):
    # to_encode 是要存進 token 中的數據,將其複製一份,防止修改原數據
    to_encode = userData.copy()

    # expires_delta 是 timedelta 類型的變量,指的是過期的時間
    # 使用 datetime.utcnow() 獲取當前時間 + 過期時間 得到 token 到期的時間
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # 如果 expires_delta 為空,就使用默認的過期時間
        expire = datetime.utcnow() + timedelta(minutes=15)

    # 將到期時間存進 token 中
    to_encode.update({"exp": expire})
    # 使用 jwt.encode() 方法生成 token 入參是 用戶信息 , 隨機的密鑰 , 加密算法
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# 模擬數據庫
fake_users_db = {
    "bryce": {
        "username": "bryce",
        "full_name": "bryce Yu",
        "email": "[email protected]",
        # 123123
        "hashed_password": "$2b$12$w2IXuK5lx6mMxNVBBoxRl.jHYWRblX1QjVg6UCtBZ7BzBMOH7RvkW",
        "disabled": False,
    },
    "yu": {
        "username": "yu",
        "full_name": "yu",
        "email": "[email protected]",
        # 123456
        "hashed_password": "$2b$12$BknHh482Zqkt9ODqSSbSiepUBdL1GGoeaxKCnGNXmfUbjbe7LKcxy",
        "disabled": True,
    },
}

# 創建一個 用戶 模型類
class UserAuth(BaseModel):
    username: str
    # 先使用 Union[str, None],後面會改成 EmailStr
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

#  創建一個 用戶 數據庫 模型類
class UserDB(UserAuth):
    hashed_password: str

# 校驗密碼
def verify_password(plain_password, hashed_password):
    # 使用 bcrypt 算法校驗密碼
    return pwd_context.verify(plain_password, hashed_password)

# 獲取 hash 後的密碼
def get_password_hash(password):
    # 使用 bcrypt 算法對密碼進行哈希
    return pwd_context.hash(password)

# 通過用戶名密碼獲取用戶信息進行校驗
def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# 從 db 數據庫中,獲取用戶名為 username 的用戶信息
def get_user( username: str):
    userdb = db.users.find_one({"username": username})
    if userdb:
        return UserDB(**userdb)

# 這個方法會就是通過依賴 oauth2_scheme 來獲取 token,並校驗 token 的有效性
# 如果 token 有效,調用 get_user 函數,獲取當前的用戶信息,返回用戶信息
# 如果無效,就拋出異常
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="無效的認證憑證",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 使用 token 和定義好的 OpenSSL生成的隨機密鑰 SECRET_KEY 以及加密方式 
        # 調用 jwt 來解碼 token,獲取用戶的信息,payload 的類型是 dict
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 從 token 中獲取用戶名 ,也就是之前生成 token 的時候傳入的 username
        username = payload.get("username")
        # 如果用戶名不存在
        if username is None:
            # 抛出前面定義的異常
            raise credentials_exception
    except JWTError:
        # 如果解碼失敗,抛出異常
        raise credentials_exception
    # 將用戶名傳給 get_user 函數,獲取用戶信息
    user = get_user(username=username)
    if user is None:
        # 如果用戶不存在,抛出異常
        raise credentials_exception
    return user


# 獲取沒有被禁用的用戶,這個函數會 調用依賴 get_current_user 函數,獲得當前登錄的用戶
# 接著判斷這個用戶是否被禁用,如果被禁用,就拋出異常,如果沒有被禁用,就返回用戶信息
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
    # 通過 get_current_user 獲取當前登錄的用戶 current_user
    # 接著判斷用戶是否被禁用
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="用戶被禁用")
    return current_user

### 流程總結#

1、登錄

首先調用 /token 方法進行用戶登錄,方法通過 OAuth2PasswordRequestForm 獲得表單數據

表單數據中存有 username 和 password,

此時調用 authenticate 方法,傳入 模擬的用戶數據庫,以及用戶名和密碼

在 authenticate 方法中,使用 get_user 方法比較 模擬數據庫中用戶名相同的用戶

如果找到了用戶,調用 verify_password 方法,傳入找到的用戶數據庫存放的哈希密碼和登錄密碼

verify_password 方法中

調用 passlib 提供的上下文加密 CryptContext 類實例對象

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

的 verify 方法進行校驗,會返回校驗結果,如果為 true,就會把 get_user 函數獲取到的 user 返回

接著設置好 token 過期時間,通過 create_access_token 函數,傳入用戶信息

create_access_token 中會將用戶信息和過期時間內整合成一個 dict 調用 jwt.encode 生成 token

接著把生成的 token 返回

2、獲取當前的登錄用戶

調用 /users/me 接口,消息頭中需要添加 Authorization 值,值為 'Bearer ' + 登錄返回的 token

接口函數調用依賴 get_current_active_user 來獲取沒有被禁用的當前的用戶

get_current_active_user 會通過 get_current_user 函數獲取當前用戶,進一步判斷有沒有被禁用

get_current_user 會通過依賴 oauth2_scheme 獲取消息頭中的 token

並使用 jwt.decode 解碼 token 從而獲得登錄時存進去的用戶信息,比如用戶名

使用用戶名,從數據庫中得到 用戶的數據,並返回

注意!請求中 Authorization 響應頭的值以 Bearer 開頭。

### 接口測試#

表單調用 /token


image

Get 請求獲取當前登錄用戶


image

# 連接 mongoDB#

## 引入 pymongo#

首先需要安裝 python 對 mongoDB 操作的庫

pip install pymongo

接著在 文件中 引入

# 引入 mongodb 數據庫
from pymongo import MongoClient

## 配置 mongoDB 連接#

# 創建 mongodb 數據庫連接
client = MongoClient('mongodb://fastapi:fastapi@localhost:27017/fastapi')


# 創建數據庫, 數據庫的操作都使用 db 對象來完成
# 比如 db.users.insert_one() 表示在 users 集合中插入一條數據
db = client['fastapi']

## 註冊用戶使用 mongoDB 新增#

新增一個用戶註冊的接口

代碼如下

@app.post("/users/register")
async def register_user(user: UserInDB):
    userdict = user.dict()
    userdb = db.users.find_one({"username": user.username})
    if userdb:
        raise HTTPException(
            status_code=400,
            detail="用戶已存在",
        )
    hashdPassword = get_password_hash(user.password)
    userdict.update({"hashed_password": hashdPassword})

    # 將用戶信息轉換為 json 格式
    json_compatible_item_data = jsonable_encoder(userdict)
    # 將用戶信息存進數據庫
    db.users.insert_one(json_compatible_item_data)
    # 返回用戶信息
    return user

## 登錄時使用查詢用戶#

修改之前的 get_user 函數

# 從 db 數據庫中,獲取用戶名為 username 的用戶信息
def get_user( username: str):
    userdb = db.users.find_one({"username": username})
    if userdb:
        return UserDB(**userdb)

## 修改用戶信息#

創建用戶更新模型,因為可以不更新密碼,所以密碼可以為空

# 創建一個 用戶 更新 模型類
# 因為可以不更新密碼,所以密碼可以為空
class UserUpdate(UserAuth):
    password: Union[str, None] = None

以及一個 UserOut 模型,用來返回,不返回密碼

# 用來返回的用戶模型類
class UserOut(UserAuth):
    pass

同時獲取當前登錄用戶的模型也要修改

# 通過 get_current_active_user 獲取當前登錄並且沒有被禁用的用戶 current_user
@app.get("/users/me",response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user
@app.put("/users/update",response_model=UserOut)
async def update_user(userInDB: UserUpdate,current_user: UserAuth = Depends(get_current_active_user)):
    # 將當前登錄用戶的信息轉換為 dict
    current_user_dict = current_user.dict()

    # 判斷要更新的用戶名和當前登錄的用戶名是否一致
    if userInDB.username != current_user_dict.get("username"):
        raise HTTPException(
            status_code=400,
            detail="無權修改",
        )
    
    # 將要更新的用戶信息轉換為 dict
    userdict = userInDB.dict()

    # 如果密碼不為空,就更新密碼
    if userInDB.password:
        hashdPassword = get_password_hash(userInDB.password)
        userdict.update({"hashed_password": hashdPassword})
    else:
        # 如果密碼為空,就使用當前登錄用戶的密碼
        userdict.update({"hashed_password": current_user_dict.get("hashed_password")})

    # 將用戶信息中的密碼刪除,不能將明文密碼存進數據庫
    userdict.pop("password")

    # 將用戶信息轉換為 json 格式
    json_compatible_item_data = jsonable_encoder(userdict)
    # 將用戶信息存進數據庫
    db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
    # 返回用戶信息
    return {"username": userInDB.username,"email": userInDB.email,"full_name": userInDB.full_name}

## 查看 mongo 庫數據#

使用 mongo compass 查看


image

# 進一步整理#

使用 router 的方式來實現接口

文件的目錄將會是

├── MyApi
│	├── app							# 「app」是一個 Python 包
│   	├── __init__.py				# 這個文件使「app」成為一個 Python 包
│   	├── main.py					# 「main」模塊,例如 import app.main
│   	├── dependencies.py		    # 「dependencies」模塊,例如 import app.dependencies
│   	└── routers					# 「routers」是一個「Python 子包」
│   	    ├── __init__.py			# 使「routers」成為一個「Python 子包」
│   	    └── users.py			# 「users」子模塊,例如 import app.routers.users

注意文件夾中要放入一個 __init__.py 空文件,用來告訴 python 這是文件目錄

每個目錄或子目錄中都有一個。

這就是能將代碼從一個文件導入到另一個文件的原因。

例如 在 app/main.py 中 可以直接使用

# 絕對路徑
from app.routers import items
# 相對路徑
from .routers import items

## routers 路由#

首先在 app 文件夾中

創建一個 routers 文件夾,用來存放路由

在文件夾中新建一個 users.py

文件中首先引入

from fastapi import APIRouter

對此路由進行配置

# 對路由進行配置
router = APIRouter(
    # 定義訪問路由的前綴,前綴後面不能有斜杠
    # 在這裡設置了前綴,就不用在每個路徑操作中都寫上前綴了
    prefix="/users",
    # 為了在文檔中顯示自定義的標籤,我們可以使用 tags 參數
    tags=["users"],
    # 為了在文檔中顯示自定義的響應,我們可以使用 responses 參數定義返回值
    responses={404: {"description": "Not found"}},
)

接著就可以將直接 user 相關的路由全部移動到這裡,將 app 改為 router

@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # 調用校驗用戶的函數,傳入用戶名和密碼
    user = authenticate_user(form_data.username,form_data.password)
    # 如果用戶不存在
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="用戶名或密碼錯誤",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # 如果用戶存在,就生成 token
    # 將之前定義好的 token 過期時間 賦值給 access_token_expires 傳到 expires_delta 中
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 調用生成 token 的函數,傳入用戶信息和過期時間
    access_token = create_access_token(
        userData = {"username": user.username}, expires_delta=access_token_expires)
    
    # 將生成 token 封裝成 Token 模型類
    token = Token(access_token=access_token, token_type="bearer")

    # 將 token 返回
    return token
    

# 通過 get_current_active_user 獲取當前登錄並且沒有被禁用的用戶 current_user
@router.get("/me",response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

# 測試使用,獲取密碼的哈希值
@router.get("/password/")
async def read_password(password: str):
    return get_password_hash(password)


@router.post("/register",response_model=UserOut)
async def register_user(user: UserInDB):
    userdict = user.dict()
    userdb = db.users.find_one({"username": user.username})
    if userdb:
        raise HTTPException(
            status_code=400,
            detail="用戶已存在",
        )
    hashdPassword = get_password_hash(user.password)
    userdict.update({"hashed_password": hashdPassword})

    # 將用戶信息中的密碼刪除,不能將明文密碼存進數據庫
    userdict.pop("password")

    # 將用戶信息轉換為 json 格式
    json_compatible_item_data = jsonable_encoder(userdict)
    # 將用戶信息存進數據庫
    db.users.insert_one(json_compatible_item_data)
    # 返回用戶信息
    return user

@router.put("/update",response_model=UserOut)
async def update_user(userInDB: UserUpdate,current_user: UserAuth = Depends(get_current_active_user)):
    # 將當前登錄用戶的信息轉換為 dict
    current_user_dict = current_user.dict()

    # 判斷要更新的用戶名和當前登錄的用戶名是否一致
    if userInDB.username != current_user_dict.get("username"):
        raise HTTPException(
            status_code=400,
            detail="無權修改",
        )
    
    # 將要更新的用戶信息轉換為 dict
    userdict = userInDB.dict()

    # 如果密碼不為空,就更新密碼
    if userInDB.password:
        hashdPassword = get_password_hash(userInDB.password)
        userdict.update({"hashed_password": hashdPassword})
    else:
        # 如果密碼為空,就使用當前登錄用戶的密碼
        userdict.update({"hashed_password": current_user_dict.get("hashed_password")})

    # 將用戶信息中的密碼刪除,不能將明文密碼存進數據庫
    userdict.pop("password")

    # 將用戶信息轉換為 json 格式
    json_compatible_item_data = jsonable_encoder(userdict)
    # 將用戶信息存進數據庫
    db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
    # 返回用戶信息
    return {"username": userInDB.username,"email": userInDB.email,"full_name": userInDB.full_name}

這時候會出現很多錯誤,因為之前 import 的庫都不存在

## dependencies 依賴項#

當存在有很多地方都要用到的相同的依賴項的時候

可以全部放在 dependencies 中統一存放

在 /app 目錄下 ,新增一個 dependencies.py 文件

就可以將,之前 users 中那些 import 和 一直自定義的變量 全部放到 dependencies.py 中

例如:

# Path: dependencies.py

# 這個文件是用來處理依賴的,
# 比如需要一些在應用程序的好幾個地方所使用的依賴項,
# 那麼就可以將這些依賴項放在這個文件中
from fastapi import Depends, HTTPException, status
# 引入 OAuth2PasswordBearer 類,用於聲明 oauth2_scheme 變量,用於身份驗證
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm
# 引入 Union
from typing import Union
# 引入 BaseModel,用來解析請求體
from pydantic import BaseModel,HttpUrl,EmailStr
# 引入時間模塊
from datetime import datetime,timedelta
# 引入 JSONCompatibleEncoder 用於將模型轉換為 JSON
from fastapi.encoders import jsonable_encoder
# 引入 passlib 用於密碼哈希化
from passlib.context import CryptContext
# 引入 JWT 用於生成 token
from jose import JWTError,jwt
# 引入 mongodb 數據庫
from pymongo import MongoClient


# 創建 mongodb 數據庫連接
client = MongoClient('mongodb://fastapi:[email protected]:27017/fastapi')

# 創建數據庫, 數據庫的操作都使用 db 對象來完成
# 比如 db.users.insert_one() 表示在 users 集合中插入一條數據
db = client['fastapi']


# 創建一個密碼哈希化的上下文對象,CryptContext 是用於處理密碼哈希和驗證的工具
# 它能接受一個或者多個密碼哈希方案作為參數
# 這裡使用 bcrypt 算法
# 並且 deprecated 參數設置為 auto,表示當有更安全的算法出現時,會自動使用更安全的算法
# 它有 verify() 和 hash() 兩個方法,分別用於校驗密碼和獲取 hash 後的密碼
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 通過 /token 中的 username 和 password 獲取 token, 返回 token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT 令牌簽名算法的變量,這裡使用 HS256 算法,還有其他算法,比如 RS256、ES256 等
ALGORITHM = "HS256"
# 使用 openSSL 隨機生成一個 SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# token的過期時間,單位為分鐘
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 定義一個返回 token 模型類
class Token(BaseModel):
    access_token: str
    token_type: str

# 定義一個 token 數據模型類
class TokenData(BaseModel):
    username: Union[str, None] = None


# 創建一個 用戶 模型類
class UserAuth(BaseModel):
    username: str
    # 先使用 Union[str, None],後面會改成 EmailStr
    email: Union[EmailStr, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = False

#  創建一個 用戶 數據庫 模型類
class UserDB(UserAuth):
    hashed_password: str

class UserInDB(UserAuth):
    password: str

# 創建一個 用戶 更新 模型類
# 因為可以不更新密碼,所以密碼可以為空
class UserUpdate(UserAuth):
    password: Union[str, None] = None

# 用來返回的用戶模型類
class UserOut(UserAuth):
    pass


# 生成訪問令牌的函數
# userData 表示的是要存進 token 中的數據,比如用戶名、用戶 ID 等
# expires_delta 指的是過期時間,它是一個 timedelta 類型的變量
# 如果沒有傳的話,就使用默認的過期時間,這裡寫死為 15 分鐘
def create_access_token(userData: dict, expires_delta: Union[timedelta,None] = None):
    # to_encode 是要存進 token 中的數據,將其複製一份,防止修改原數據
    to_encode = userData.copy()

    # expires_delta 是 timedelta 類型的變量,指的是過期的時間
    # 使用 datetime.utcnow() 獲取當前時間 + 過期時間 得到 token 到期的時間
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # 如果 expires_delta 為空,就使用默認的過期時間
        expire = datetime.utcnow() + timedelta(minutes=15)

    # 將到期時間存進 token 中
    to_encode.update({"exp": expire})
    # 使用 jwt.encode() 方法生成 token 入參是 用戶信息 , 隨機的密鑰 , 加密算法
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# 校驗密碼
def verify_password(plain_password, hashed_password):
    # 使用 bcrypt 算法校驗密碼
    return pwd_context.verify(plain_password, hashed_password)

# 獲取 hash 後的密碼
def get_password_hash(password):
    # 使用 bcrypt 算法對密碼進行哈希
    return pwd_context.hash(password)


# 通過用戶名密碼獲取用戶信息進行校驗
def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


# 從 db 數據庫中,獲取用戶名為 username 的用戶信息
def get_user( username: str):
    userdb = db.users.find_one({"username": username})
    if userdb:
        return UserDB(**userdb)


# 這個方法會就是通過依賴 oauth2_scheme 來獲取 token,並校驗 token 的有效性
# 如果 token 有效,調用 get_user 函數,獲取當前的用戶信息,返回用戶信息
# 如果無效,就拋出異常
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="無效的認證憑證",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 使用 token 和定義好的 OpenSSL生成的隨機密鑰 SECRET_KEY 以及加密方式 
        # 調用 jwt 來解碼 token,獲取用戶的信息,payload 的類型是 dict
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 從 token 中獲取用戶名 ,也就是之前生成 token 的時候傳入的 username
        username = payload.get("username")
        # 如果用戶名不存在
        if username is None:
            # 抛出前面定義的異常
            raise credentials_exception
    except JWTError:
        # 如果解碼失敗,抛出異常
        raise credentials_exception
    # 將用戶名傳給 get_user 函數,獲取用戶信息
    user = get_user(username=username)
    if user is None:
        # 如果用戶不存在,抛出異常
        raise credentials_exception
    return user



# 獲取沒有被禁用的用戶,這個函數會 調用依賴 get_current_user 函數,獲得當前登錄的用戶
# 接著判斷這個用戶是否被禁用,如果被禁用,就拋出異常,如果沒有被禁用,就返回用戶信息
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
    # 通過 get_current_user 獲取當前登錄的用戶 current_user
    # 接著判斷用戶是否被禁用
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="用戶被禁用")
    return current_user

接著在 /routers/users.py 中 引入 dependencies 即可

from fastapi import APIRouter, Depends

# 引入 dependencies.py 中的依賴
from dependencies import *

## main.py#

這時候在 /app 目錄中新增一個 main.py 文件

from fastapi import FastAPI
# 引入 CORS 中間件,用於跨域
from fastapi.middleware.cors import CORSMiddleware
# 從 routers 中導入 users
from .routers import users

app = FastAPI()
  

# 跨域設置
app.add_middleware(
    CORSMiddleware,
    # 允許所有的域名訪問
    allow_origins=["*"],
    # 允許所有的請求攜帶 cookie
    allow_credentials=True,
    # 允許所有的請求方法
    allow_methods=["*"],
    # 允許所有的請求頭
    allow_headers=["*"],
)

# 引入 router 中的 users 路由
app.include_router(users.router)

## 啟動項目#

進入 MyApi 目錄中執行

uvicorn app.main:app --reload

即可啟動程序

此時 從 openApi 的文檔中也能看到

Tags 生效了


image

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。