banner
小鱼

小鱼's Blog

FastAPI と mongoDB を使用したユーザー登録とログイン

# OAuth2 によるパスワードハッシュと Bearer JWT トークンの検証#

## JWT の紹介#

JWT はJSON Web トークン(JSON Web Tokens)です。

JWT は、JSON オブジェクトを空白なしで、理解しにくい長い文字列にエンコードする標準です。JWT の内容は以下の通りです:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c


image

JWT 文字列は暗号化されておらず、誰でもそれを使用して元の情報を復元できます。

しかし、JWT は署名メカニズムを使用しています。トークンを受け取る際に、署名を使用してトークンを検証できます。

JWT を使用して、有効期限が 1 週間のトークンを作成します。翌日、ユーザーがトークンを持って再度アクセスすると、依然としてログイン状態です。

トークンは 1 週間後に期限切れとなり、その時点でユーザーの認証は失敗します。再度ログインすることで、新しいトークンを取得できます。

ユーザー(または第三者)がトークンの有効期限を改ざんした場合、署名が一致しないため、認証が失敗します。

JWT トークンについて詳しく知り、その動作を理解するには、JWT 公式サイトを参照してください。

## python-jose のインストール#

pip install python-jose[cryptography]

Python-joseは、関連する暗号化バックエンドをインストールする必要があります。推奨されるバックエンドは:pyca/cryptographyです。

Python-jose は、PyJWT のすべての機能をサポートしており、他のツールと統合する際に使用される可能性のあるいくつかの他の機能もサポートしています。

## パスワードハッシュ化#

ハッシュとは、特定の内容をランダムな形式のバイト列(実際には文字列)に変換することを指します。

完全に同じ内容(例えば、完全に同じパスワード)を渡すたびに、返されるのは完全に同じランダムな文字列です。

しかし、このランダムな文字列は、渡されたパスワードに戻すことはできません。

### passlib のインストール#

Passlib は、パスワードハッシュを処理するための Python パッケージです。

多くの安全なハッシュアルゴリズムと関連ツールをサポートしています。

推奨されるアルゴリズムはBcryptです。

Bcrypt を含む PassLib を最初にインストールします。

pip install passlib[bcrypt]

passlibは、Django や Flask のセキュリティプラグインなどのツールが作成したパスワードを読み取ることさえできます。

例えば、Django アプリのデータを FastAPI アプリのデータベースと共有することができます。また、同じデータベースを利用して、アプリを Django から FastAPI に段階的に移行することができます。ユーザーは、Django アプリまたは FastAPI アプリのいずれからでもログインできます。

### パスワードのハッシュと検証#

まず、passlib をインポートします。

# パスワードハッシュ化のためにpasslibをインポート
from passlib.context import CryptContext

ハッシュパスワードコンテキストオブジェクトを作成します。

# パスワードハッシュ化のためのコンテキストオブジェクトを作成します。CryptContextはパスワードハッシュと検証を処理するためのツールです。
# 1つまたは複数のパスワードハッシュスキームをパラメータとして受け取ることができます。
# ここではbcryptアルゴリズムを使用します。
# さらに、deprecatedパラメータをautoに設定し、より安全なアルゴリズムが登場した場合に自動的により安全なアルゴリズムを使用します。
# verify()とhash()の2つのメソッドがあり、それぞれパスワードの検証とハッシュ後のパスワードを取得するために使用されます。
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

次に、3 つのツール関数を作成します。

  1. ユーザーのパスワードをハッシュ化します。
# ハッシュ後のパスワードを取得します。
def get_password_hash(password):
    # bcryptアルゴリズムを使用してパスワードをハッシュ化します。
    return pwd_context.hash(password)
  1. 受信したパスワードと保存されたハッシュパスワードが一致するかどうかを検証します。
# パスワードを検証します。
def verify_password(plain_password, hashed_password):
    # bcryptアルゴリズムを使用してパスワードを検証します。
    return pwd_context.verify(plain_password, hashed_password)
  1. 認証に使用し、前の 2 つの関数を使用して検証し、ユーザーを返します。
# ユーザー名とパスワードを使用してユーザー情報を取得して検証します。
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

## JWT トークンの処理#

インストール済みのモジュールをインポートします。

JWT トークン署名用のランダムキーを作成します。

以下のコマンドを使用して、安全なランダムキーを生成します:

openssl rand -hex 32

openssl がない場合は、公式サイトからダウンロードできます。

Windows OpenSSL コマンドプロンプトを起動します。


image

生成されたランダムキーをコピーします。例えば、

0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9

次に、生成されたキーを変数SECRET_KEYにコピーします。注意:この例に示されているキーを使用しないでください。

指定された JWT トークン署名アルゴリズムの変数ALGORITHMを作成します。この例では、値は"HS256"です。

トークンの有効期限を設定する変数を作成します。

トークンエンドポイントの応答に対する Pydantic モデルを定義します。

新しいアクセストークンを生成するためのツール関数を作成します。

まず、jwt をインポートします。

from jose import JWTError,jwt

次に、以下を宣言します。

  1. ランダムキーSECRET_KEY(上記の openSSL で生成されたもの)
  2. JWT トークン署名アルゴリズムの変数ALGORITHM(この例では 'HS256' を使用)
  3. トークンの有効期限を設定する変数ACCESS_TOKEN_EXPIRE_MINUTES(分数)
SECRET_KEY = "0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9" 
ALGORITHM = "HS256" 
ACCESS_TOKEN_EXPIRE_MINUTES = 30

トークンを返すモデルクラスを定義します。

class Token(BaseModel):
    access_token: str
    token_type: str

timedelta 型をインポートします。

from datetime import datetime, timedelta

新しいアクセストークンを生成する関数を作成します。

# アクセストークンを生成する関数
# dataはトークンに格納するデータ(ユーザー名、ユーザーIDなど)を示します。
# expires_deltaは有効期限を示し、timedelta型の変数です。
# もし渡されなければ、デフォルトの有効期限(ここでは15分)を使用します。
def create_access_token(data: dict, expires_delta: Union[timedelta,None] = None):
    # to_encodeはトークンに格納するデータで、元のデータを変更しないようにコピーします。
    to_encode = data.copy()

    # expires_deltaはtimedelta型の変数で、有効期限を指します。
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # expires_deltaが空であれば、デフォルトの有効期限を使用します。
        expire = datetime.utcnow() + timedelta(minutes=15)

    # 有効期限をトークンに格納します。
    to_encode.update({"exp": expire})
    # jwt.encode()メソッドを使用してトークンを生成します。引数はユーザー情報、ランダムなキー、暗号化アルゴリズムです。
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

ログインしてトークンを取得するインターフェースを修正します。

@app.post("/token", response_model=Token):
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # ユーザーを検証する関数を呼び出し、ユーザー名とパスワードを渡します。
    user = authenticate_user(fake_users_db,form_data.username,form_data.password)
    # ユーザーが存在しない場合
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="ユーザー名またはパスワードが間違っています",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # ユーザーが存在する場合、トークンを生成します。
    # 以前に定義したトークンの有効期限をaccess_token_expiresに割り当て、expires_deltaに渡します。
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # トークン生成関数を呼び出し、ユーザー情報と有効期限を渡します。
    access_token = create_access_token(
        userData = {"username": user.username}, expires_delta=access_token_expires)
    
    # 生成されたトークンをTokenモデルクラスに封装します。
    token = Token(access_token=access_token, token_type="bearer")

    # トークンを返します。
    return token

## 依存関係の更新#

get_current_userを更新して、以前と同じトークンを受け取りますが、ここでは JWT トークンを使用します。

受信したトークンをデコードして検証し、現在のユーザーを返します。

トークンが無効な場合は、HTTP エラーを直接返します。

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="無効な認証情報",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # トークンと定義されたOpenSSLで生成されたランダムキーSECRET_KEYおよび暗号化方式を使用して
        # jwtを呼び出してトークンをデコードし、ユーザー情報を取得します。payloadの型はdictです。
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # トークンからユーザー名を取得します。これは以前にトークンを生成したときに渡されたユーザー名です。
        username = payload.get("username")
        # ユーザー名が存在しない場合
        if username is None:
            # 前に定義した例外を投げます。
            raise credentials_exception
    except JWTError:
        # デコードに失敗した場合、例外を投げます。
        raise credentials_exception
    # ユーザー名をget_user関数に渡して、ユーザー情報を取得します。
    user = get_user(fake_users_db, username=username)
    if user is None:
        # ユーザーが存在しない場合、例外を投げます。
        raise credentials_exception
    return user

コード内の

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="無効な認証情報",
        headers={"WWW-Authenticate": "Bearer"},
    )

は、認証情報の失敗を示す例外の状況を表します。このコードの中で、jwt トークンのデコード中に何らかの例外が発生した場合
(例えば、トークンが無効、期限切れ、改ざんされたなど)この例外が引き起こされます。

これは、status_code 未認可の状態 401、detail エラー情報、headers トークンのタイプを返します。

## 現在のログインユーザーの情報を取得#

# get_current_active_userを通じて、現在ログインしていて無効化されていないユーザーcurrent_userを取得します。
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

## 完全なコード#

# パスワードハッシュ化のためのコンテキストオブジェクトを作成します。CryptContextはパスワードハッシュと検証を処理するためのツールです。
# 1つまたは複数のパスワードハッシュスキームをパラメータとして受け取ることができます。
# ここではbcryptアルゴリズムを使用します。
# さらに、deprecatedパラメータをautoに設定し、より安全なアルゴリズムが登場した場合に自動的により安全なアルゴリズムを使用します。
# verify()とhash()の2つのメソッドがあり、それぞれパスワードの検証とハッシュ後のパスワードを取得するために使用されます。
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# /tokenでusernameとpasswordを使用してトークンを取得し、トークンを返します。
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWTトークン署名アルゴリズムの変数。ここではHS256アルゴリズムを使用します。他のアルゴリズムもあります(例えばRS256、ES256など)。
ALGORITHM = "HS256"
# OpenSSLを使用してランダムに生成されたSECRET_KEY。
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# トークンの有効期限(分単位)。
ACCESS_TOKEN_EXPIRE_MINUTES = 30;

# トークンモデルクラスを定義します。
class Token(BaseModel):
    access_token: str
    token_type: str

# トークンデータモデルクラスを定義します。
class TokenData(BaseModel):
    username: Union[str, None] = None

# アクセストークンを生成する関数
# userDataはトークンに格納するデータ(ユーザー名、ユーザーIDなど)を示します。
# expires_deltaは有効期限を示し、timedelta型の変数です。
# もし渡されなければ、デフォルトの有効期限(ここでは15分)を使用します。
def create_access_token(userData: dict, expires_delta: Union[timedelta,None] = None):
    # to_encodeはトークンに格納するデータで、元のデータを変更しないようにコピーします。
    to_encode = userData.copy()

    # expires_deltaはtimedelta型の変数で、有効期限を指します。
    # datetime.utcnow()を使用して現在の時間を取得し、有効期限を加算してトークンの有効期限を取得します。
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # expires_deltaが空であれば、デフォルトの有効期限を使用します。
        expire = datetime.utcnow() + timedelta(minutes=15)

    # 有効期限をトークンに格納します。
    to_encode.update({"exp": expire})
    # jwt.encode()メソッドを使用してトークンを生成します。引数はユーザー情報、ランダムなキー、暗号化アルゴリズムです。
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# ユーザー名とパスワードを使用してユーザー情報を取得して検証します。
def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# モックデータベースを作成します。
fake_users_db = {
    "bryce": {
        "username": "bryce",
        "full_name": "bryce Yu",
        "email": "[email protected]",
        # 123123
        "hashed_password": "$2b$12$w2IXuK5lx6mMxNVBBoxRl.jHYWRblX1QjVg6UCtBZ7BzBMOH7RvkW",
        "disabled": False,
    },
    "yu": {
        "username": "yu",
        "full_name": "yu",
        "email": "[email protected]",
        # 123456
        "hashed_password": "$2b$12$BknHh482Zqkt9ODqSSbSiepUBdL1GGoeaxKCnGNXmfUbjbe7LKcxy",
        "disabled": True,
    },
}

# ユーザーモデルクラスを作成します。
class UserAuth(BaseModel):
    username: str
    # 最初はUnion[str, None]を使用しますが、後でEmailStrに変更します。
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

# ユーザーデータベースモデルクラスを作成します。
class UserDB(UserAuth):
    hashed_password: str

# パスワードを検証します。
def verify_password(plain_password, hashed_password):
    # bcryptアルゴリズムを使用してパスワードを検証します。
    return pwd_context.verify(plain_password, hashed_password)

# ハッシュ後のパスワードを取得します。
def get_password_hash(password):
    # bcryptアルゴリズムを使用してパスワードをハッシュ化します。
    return pwd_context.hash(password)

# ユーザー名とパスワードを使用してユーザー情報を取得して検証します。
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# dbデータベースからユーザー名がusernameのユーザー情報を取得します。
def get_user(db, username: str):
    # 現在はユーザー名をトークンとして使用するのは安全ではありません。
    if username in db:
        user_dict = db[username]
        return UserDB(**user_dict)

# このメソッドは、依存関係oauth2_schemeを使用してトークンを取得し、トークンの有効性を検証します。
# トークンが有効な場合、get_user関数を呼び出して現在のユーザー情報を取得し、ユーザー情報を返します。
# 無効な場合は例外を投げます。
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="無効な認証情報",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # トークンと定義されたOpenSSLで生成されたランダムキーSECRET_KEYおよび暗号化方式を使用して
        # jwtを呼び出してトークンをデコードし、ユーザー情報を取得します。payloadの型はdictです。
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # トークンからユーザー名を取得します。これは以前にトークンを生成したときに渡されたユーザー名です。
        username = payload.get("username")
        # ユーザー名が存在しない場合
        if username is None:
            # 前に定義した例外を投げます。
            raise credentials_exception
    except JWTError:
        # デコードに失敗した場合、例外を投げます。
        raise credentials_exception
    # ユーザー名をget_user関数に渡して、ユーザー情報を取得します。
    user = get_user(fake_users_db, username=username)
    if user is None:
        # ユーザーが存在しない場合、例外を投げます。
        raise credentials_exception
    return user

# get_current_active_userを通じて、現在ログインしていて無効化されていないユーザーcurrent_userを取得します。
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

# テスト用に、パスワードのハッシュ値を取得します。
@app.get("/password/")
async def read_password(password: str):
    return get_password_hash(password)

### プロセスのまとめ#

  1. ログイン

最初に /token メソッドを呼び出してユーザーがログインします。このメソッドは OAuth2PasswordRequestForm を通じてフォームデータを取得します。

フォームデータには username と password が含まれています。

この時、authenticate メソッドを呼び出し、モックユーザーデータベースとユーザー名、パスワードを渡します。

authenticate メソッド内で、get_user メソッドを使用してモックデータベース内のユーザー名が一致するユーザーを比較します。

ユーザーが見つかった場合、verify_password メソッドを呼び出し、見つかったユーザーのデータベースに保存されているハッシュパスワードとログインパスワードを渡します。

verify_password メソッド内では、passlib が提供するコンテキスト暗号化 CryptContext クラスのインスタンスオブジェクトを呼び出し、

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

の verify メソッドを使用して検証を行い、結果を返します。true の場合、get_user 関数で取得した user が返されます。

次に、トークンの有効期限を設定し、create_access_token 関数を通じてユーザー情報を渡します。

create_access_token 内では、ユーザー情報と有効期限を統合して dict を作成し、jwt.encode を呼び出してトークンを生成します。

生成されたトークンが返されます。

  1. 現在のログインユーザーを取得

/users/me インターフェースを呼び出します。リクエストヘッダーには Authorization 値が必要で、値は 'Bearer ' + ログイン時に返されたトークンです。

インターフェース関数は、get_current_active_user 依存関係を呼び出して、無効化されていない現在のユーザーを取得します。

get_current_active_user は、get_current_user 関数を呼び出して現在のユーザーを取得し、さらに無効化されていないかどうかを確認します。

get_current_user は、依存関係 oauth2_scheme を使用してリクエストヘッダー内のトークンを取得し、

jwt.decode を使用してトークンをデコードし、ログイン時に保存されたユーザー情報(ユーザー名など)を取得します。

ユーザー名を使用して、データベースからユーザーのデータを取得し、返します。

注意!リクエスト中のAuthorizationレスポンスヘッダーの値はBearerで始まります。

### インターフェーステスト#

フォームを使用して /token を呼び出します。


image

現在のログインユーザーを取得するための GET リクエスト。


image

# MongoDB に接続#

## pymongo のインポート#

まず、MongoDB 操作用の Python ライブラリをインストールする必要があります。

pip install pymongo

次に、ファイル内でインポートします。

# mongodbデータベースをインポート
from pymongo import MongoClient

## MongoDB 接続の設定#

# mongodbデータベース接続を作成
client = MongoClient('mongodb://fastapi:fastapi@localhost:27017/fastapi')

# データベースを作成し、データベースの操作はdbオブジェクトを使用して行います。
# 例えば、db.users.insert_one()はusersコレクションにデータを挿入することを示します。
db = client['fastapi']

## MongoDB を使用したユーザー登録の追加#

ユーザー登録用の新しいインターフェースを追加します。

コードは以下の通りです。

@app.post("/users/register")
async def register_user(user: UserInDB):
    userdict = user.dict()
    userdb = db.users.find_one({"username": user.username})
    if userdb:
        raise HTTPException(
            status_code=400,
            detail="ユーザーはすでに存在します",
        )
    hashdPassword = get_password_hash(user.password)
    userdict.update({"hashed_password": hashdPassword})

    # ユーザー情報をjson形式に変換します。
    json_compatible_item_data = jsonable_encoder(userdict)
    # ユーザー情報をデータベースに保存します。
    db.users.insert_one(json_compatible_item_data)
    # ユーザー情報を返します。
    return user

## ログイン時にユーザーをクエリする#

以前の get_user 関数を修正します。

# dbデータベースからユーザー名がusernameのユーザー情報を取得します。
def get_user(username: str):
    userdb = db.users.find_one({"username": username})
    if userdb:
        return UserDB(**userdb)

## ユーザー情報の修正#

ユーザー更新モデルを作成します。パスワードを更新しないこともできるため、パスワードは空にできます。

# ユーザー更新モデルクラスを作成します。
# パスワードを更新しないこともできるため、パスワードは空にできます。
class UserUpdate(UserAuth):
    password: Union[str, None] = None

また、パスワードを返さないための UserOut モデルも作成します。

# 返すためのユーザーモデルクラス
class UserOut(UserAuth):
    pass

同時に、現在ログインしているユーザーのモデルも修正する必要があります。

# get_current_active_userを通じて、現在ログインしていて無効化されていないユーザーcurrent_userを取得します。
@app.get("/users/me",response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user
@app.put("/users/update",response_model=UserOut)
async def update_user(userInDB: UserUpdate,current_user: UserAuth = Depends(get_current_active_user)):
    # 現在ログインしているユーザーの情報をdictに変換します。
    current_user_dict = current_user.dict()

    # 更新するユーザー名と現在ログインしているユーザー名が一致するかどうかを確認します。
    if userInDB.username != current_user_dict.get("username"):
        raise HTTPException(
            status_code=400,
            detail="権限がありません",
        )
    
    # 更新するユーザー情報をdictに変換します。
    userdict = userInDB.dict()

    # パスワードが空でない場合、パスワードを更新します。
    if userInDB.password:
        hashdPassword = get_password_hash(userInDB.password)
        userdict.update({"hashed_password": hashdPassword})
    else:
        # パスワードが空の場合、現在ログインしているユーザーのパスワードを使用します。
        userdict.update({"hashed_password": current_user_dict.get("hashed_password")})

    # ユーザー情報からパスワードを削除します。明文パスワードをデータベースに保存することはできません。
    userdict.pop("password")

    # ユーザー情報をjson形式に変換します。
    json_compatible_item_data = jsonable_encoder(userdict)
    # ユーザー情報をデータベースに保存します。
    db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
    # ユーザー情報を返します。
    return {"username": userInDB.username,"email": userInDB.email,"full_name": userInDB.full_name}

## MongoDB のデータを確認#

Mongo Compass を使用して確認します。


image

# さらなる整理#

ルーター方式を使用してインターフェースを実装します。

ファイルのディレクトリは次のようになります。

├── MyApi
│	├── app							# 「app」はPythonパッケージです。
│   	├── __init__.py				# このファイルにより「app」はPythonパッケージになります。
│   	├── main.py					# 「main」モジュール、例えばimport app.main
│   	├── dependencies.py		    # 「dependencies」モジュール、例えばimport app.dependencies
│   	└── routers					# 「routers」は「Pythonサブパッケージ」です。
│   	    ├── __init__.py			# 「routers」をPythonサブパッケージにします。
│   	    └── users.py			# 「users」サブモジュール、例えばimport app.routers.users

フォルダー内には必ず__init__.pyの空ファイルを置いて、Python にこれがファイルディレクトリであることを知らせます。

各ディレクトリまたはサブディレクトリにはこれがあります。

これが、コードを 1 つのファイルから別のファイルにインポートできる理由です。

例えば、app/main.py 内で直接使用できます。

# 絶対パス
from app.routers import items
# 相対パス
from .routers import items

## routers ルーティング#

まず、app フォルダー内に routers フォルダーを作成し、ルーティングを格納します。

フォルダー内に新しい users.py を作成します。

ファイル内でまずインポートします。

from fastapi import APIRouter

ルーティングを設定します。

# ルーティングを設定します。
router = APIRouter(
    # アクセスルートのプレフィックスを定義します。プレフィックスの後にスラッシュを付けてはいけません。
    # ここでプレフィックスを設定すれば、各パス操作でプレフィックスを書く必要はありません。
    prefix="/users",
    # ドキュメントにカスタムタグを表示するために、tagsパラメータを使用できます。
    tags=["users"],
    # ドキュメントにカスタムレスポンスを表示するために、responsesパラメータで返り値を定義できます。
    responses={404: {"description": "Not found"}},
)

次に、ユーザー関連のルートをすべてここに移動し、app を router に変更します。

@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # ユーザーを検証する関数を呼び出し、ユーザー名とパスワードを渡します。
    user = authenticate_user(form_data.username,form_data.password)
    # ユーザーが存在しない場合
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="ユーザー名またはパスワードが間違っています",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # ユーザーが存在する場合、トークンを生成します。
    # 以前に定義したトークンの有効期限をaccess_token_expiresに割り当て、expires_deltaに渡します。
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # トークン生成関数を呼び出し、ユーザー情報と有効期限を渡します。
    access_token = create_access_token(
        userData = {"username": user.username}, expires_delta=access_token_expires)
    
    # 生成されたトークンをTokenモデルクラスに封装します。
    token = Token(access_token=access_token, token_type="bearer")

    # トークンを返します。
    return token
    

# get_current_active_userを通じて、現在ログインしていて無効化されていないユーザーcurrent_userを取得します。
@router.get("/me",response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

# テスト用に、パスワードのハッシュ値を取得します。
@router.get("/password/")
async def read_password(password: str):
    return get_password_hash(password)


@router.post("/register",response_model=UserOut)
async def register_user(user: UserInDB):
    userdict = user.dict()
    userdb = db.users.find_one({"username": user.username})
    if userdb:
        raise HTTPException(
            status_code=400,
            detail="ユーザーはすでに存在します",
        )
    hashdPassword = get_password_hash(user.password)
    userdict.update({"hashed_password": hashdPassword})

    # ユーザー情報からパスワードを削除します。明文パスワードをデータベースに保存することはできません。
    userdict.pop("password")

    # ユーザー情報をjson形式に変換します。
    json_compatible_item_data = jsonable_encoder(userdict)
    # ユーザー情報をデータベースに保存します。
    db.users.insert_one(json_compatible_item_data)
    # ユーザー情報を返します。
    return user

@router.put("/update",response_model=UserOut)
async def update_user(userInDB: UserUpdate,current_user: UserAuth = Depends(get_current_active_user)):
    # 現在ログインしているユーザーの情報をdictに変換します。
    current_user_dict = current_user.dict()

    # 更新するユーザー名と現在ログインしているユーザー名が一致するかどうかを確認します。
    if userInDB.username != current_user_dict.get("username"):
        raise HTTPException(
            status_code=400,
            detail="権限がありません",
        )
    
    # 更新するユーザー情報をdictに変換します。
    userdict = userInDB.dict()

    # パスワードが空でない場合、パスワードを更新します。
    if userInDB.password:
        hashdPassword = get_password_hash(userInDB.password)
        userdict.update({"hashed_password": hashdPassword})
    else:
        # パスワードが空の場合、現在ログインしているユーザーのパスワードを使用します。
        userdict.update({"hashed_password": current_user_dict.get("hashed_password")})

    # ユーザー情報からパスワードを削除します。明文パスワードをデータベースに保存することはできません。
    userdict.pop("password")

    # ユーザー情報をjson形式に変換します。
    json_compatible_item_data = jsonable_encoder(userdict)
    # ユーザー情報をデータベースに保存します。
    db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
    # ユーザー情報を返します。
    return {"username": userInDB.username,"email": userInDB.email,"full_name": userInDB.full_name}

この時、多くのエラーが発生します。なぜなら、以前にインポートしたライブラリが存在しないからです。

## 依存関係#

多くの場所で使用される同じ依存関係がある場合は、すべてを dependencies に統一して保存できます。

/app ディレクトリ内に新しい dependencies.py ファイルを追加します。

これにより、以前の users 内のインポートとカスタム変数をすべて dependencies.py に移動できます。

例えば:

# Path: dependencies.py

# このファイルは依存関係を処理するためのもので、
# アプリケーションのいくつかの場所で使用される依存関係を格納できます。
from fastapi import Depends, HTTPException, status
# OAuth2PasswordBearerクラスをインポートし、oauth2_scheme変数を宣言して認証に使用します。
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm
# Unionをインポートします。
from typing import Union
# リクエストボディを解析するためにBaseModelをインポートします。
from pydantic import BaseModel,HttpUrl,EmailStr
# 時間モジュールをインポートします。
from datetime import datetime,timedelta
# モデルをJSONに変換するためにJSONCompatibleEncoderをインポートします。
from fastapi.encoders import jsonable_encoder
# パスワードハッシュ化のためにpasslibをインポートします。
from passlib.context import CryptContext
# トークン生成のためにJWTをインポートします。
from jose import JWTError,jwt
# mongodbデータベースをインポートします。
from pymongo import MongoClient

# mongodbデータベース接続を作成します。
client = MongoClient('mongodb://fastapi:[email protected]:27017/fastapi')

# データベースを作成し、データベースの操作はdbオブジェクトを使用して行います。
# 例えば、db.users.insert_one()はusersコレクションにデータを挿入することを示します。
db = client['fastapi']

# パスワードハッシュ化のためのコンテキストオブジェクトを作成します。CryptContextはパスワードハッシュと検証を処理するためのツールです。
# 1つまたは複数のパスワードハッシュスキームをパラメータとして受け取ることができます。
# ここではbcryptアルゴリズムを使用します。
# さらに、deprecatedパラメータをautoに設定し、より安全なアルゴリズムが登場した場合に自動的により安全なアルゴリズムを使用します。
# verify()とhash()の2つのメソッドがあり、それぞれパスワードの検証とハッシュ後のパスワードを取得するために使用されます。
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# /tokenでusernameとpasswordを使用してトークンを取得し、トークンを返します。
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWTトークン署名アルゴリズムの変数。ここではHS256アルゴリズムを使用します。他のアルゴリズムもあります(例えばRS256、ES256など)。
ALGORITHM = "HS256"
# OpenSSLを使用してランダムに生成されたSECRET_KEY。
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# トークンの有効期限(分単位)。
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# トークンモデルクラスを定義します。
class Token(BaseModel):
    access_token: str
    token_type: str

# トークンデータモデルクラスを定義します。
class TokenData(BaseModel):
    username: Union[str, None] = None

# ユーザーモデルクラスを作成します。
class UserAuth(BaseModel):
    username: str
    # 最初はUnion[str, None]を使用しますが、後でEmailStrに変更します。
    email: Union[EmailStr, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = False

# ユーザーデータベースモデルクラスを作成します。
class UserDB(UserAuth):
    hashed_password: str

class UserInDB(UserAuth):
    password: str

# ユーザー更新モデルクラスを作成します。
# パスワードを更新しないこともできるため、パスワードは空にできます。
class UserUpdate(UserAuth):
    password: Union[str, None] = None

# 返すためのユーザーモデルクラス
class UserOut(UserAuth):
    pass

# アクセストークンを生成する関数
# userDataはトークンに格納するデータ(ユーザー名、ユーザーIDなど)を示します。
# expires_deltaは有効期限を示し、timedelta型の変数です。
# もし渡されなければ、デフォルトの有効期限(ここでは15分)を使用します。
def create_access_token(userData: dict, expires_delta: Union[timedelta,None] = None):
    # to_encodeはトークンに格納するデータで、元のデータを変更しないようにコピーします。
    to_encode = userData.copy()

    # expires_deltaはtimedelta型の変数で、有効期限を指します。
    # datetime.utcnow()を使用して現在の時間を取得し、有効期限を加算してトークンの有効期限を取得します。
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # expires_deltaが空であれば、デフォルトの有効期限を使用します。
        expire = datetime.utcnow() + timedelta(minutes=15)

    # 有効期限をトークンに格納します。
    to_encode.update({"exp": expire})
    # jwt.encode()メソッドを使用してトークンを生成します。引数はユーザー情報、ランダムなキー、暗号化アルゴリズムです。
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# パスワードを検証します。
def verify_password(plain_password, hashed_password):
    # bcryptアルゴリズムを使用してパスワードを検証します。
    return pwd_context.verify(plain_password, hashed_password)

# ハッシュ後のパスワードを取得します。
def get_password_hash(password):
    # bcryptアルゴリズムを使用してパスワードをハッシュ化します。
    return pwd_context.hash(password)

# ユーザー名とパスワードを使用してユーザー情報を取得して検証します。
def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# dbデータベースからユーザー名がusernameのユーザー情報を取得します。
def get_user(username: str):
    userdb = db.users.find_one({"username": username})
    if userdb:
        return UserDB(**userdb)

# 依存関係oauth2_schemeを使用してトークンを取得し、トークンの有効性を検証します。
# トークンが有効な場合、get_user関数を呼び出して現在のユーザー情報を取得し、ユーザー情報を返します。
# 無効な場合は例外を投げます。
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="無効な認証情報",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # トークンと定義されたOpenSSLで生成されたランダムキーSECRET_KEYおよび暗号化方式を使用して
        # jwtを呼び出してトークンをデコードし、ユーザー情報を取得します。payloadの型はdictです。
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # トークンからユーザー名を取得します。これは以前にトークンを生成したときに渡されたユーザー名です。
        username = payload.get("username")
        # ユーザー名が存在しない場合
        if username is None:
            # 前に定義した例外を投げます。
            raise credentials_exception
    except JWTError:
        # デコードに失敗した場合、例外を投げます。
        raise credentials_exception
    # ユーザー名をget_user関数に渡して、ユーザー情報を取得します。
    user = get_user(username=username)
    if user is None:
        # ユーザーが存在しない場合、例外を投げます。
        raise credentials_exception
    return user

# 無効化されていないユーザーを取得します。この関数は、get_current_user関数を呼び出して現在ログインしているユーザーを取得します。
# その後、このユーザーが無効化されているかどうかを確認し、無効化されている場合は例外を投げ、無効化されていない場合はユーザー情報を返します。
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
    # get_current_userを通じて現在ログインしているユーザーcurrent_userを取得します。
    # ユーザーが無効化されているかどうかを確認します。
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="ユーザーが無効化されています")
    return current_user

次に、/routers/users.py 内で dependencies をインポートします。

from fastapi import APIRouter, Depends

# dependencies.py内の依存関係をインポートします。
from dependencies import *

## main.py#

この時、/app ディレクトリ内に新しい main.py ファイルを追加します。

from fastapi import FastAPI
# CORSミドルウェアをインポートし、クロスオリジンを許可します。
from fastapi.middleware.cors import CORSMiddleware
# routersからusersをインポートします。
from .routers import users

app = FastAPI()
  

# クロスオリジン設定
app.add_middleware(
    CORSMiddleware,
    # すべてのドメインからのアクセスを許可します。
    allow_origins=["*"],
    # すべてのリクエストがcookieを持つことを許可します。
    allow_credentials=True,
    # すべてのリクエストメソッドを許可します。
    allow_methods=["*"],
    # すべてのリクエストヘッダーを許可します。
    allow_headers=["*"],
)

# router内のusersルーティングをインクルードします。
app.include_router(users.router)

## プロジェクトの起動#

MyApi ディレクトリに移動し、次のコマンドを実行します。

uvicorn app.main:app --reload

これでプログラムが起動します。

この時、openApi のドキュメントからもタグが有効になっていることが確認できます。


image

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。