FastAPI は、API を構築するための現代的で高速(高性能)な Web フレームワークで、Python 3.6 + を使用し、標準の Python 型ヒントに基づいています。
# インストールと起動#
まず、FastAPI をインストールします。
pip install "fastapi[all]"
上記のインストールにはuvicorn
も含まれており、コードを実行するためのサーバーとして使用できます。
コードを実行する際は、以下のコマンドを使用します。
uvicorn main:app --reload
サーバーをバックグラウンドで起動するには、次のコマンドを実行します。
nohup uvicorn main:app --host 0.0.0.0 --port 8000 > /ファイルパス/log.log 2>&1 &
次に、以下を実行します。
bg
バックグラウンドで一時停止しているジョブを、バックグラウンドで実行を続けるように切り替えます。
最後に、以下を実行します。
disown
現在のターミナルセッションからジョブを切り離し、ターミナルを終了しても実行を続けます。
uvicorn main:app
コマンドの意味は以下の通りです:
main
:main.py
ファイル(Python の「モジュール」)。app
:main.py
ファイル内でapp = FastAPI()
によって作成されたオブジェクトで、他の名前でも可能です。--reload
:コードを更新した後にサーバーを再起動します。このオプションは開発時のみ使用します。
この時、localhost:8000 にアクセスすると呼び出すことができます。
# Hello World#
## 新しい main.py ファイルを作成#
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
コードの説明
from fastapi import FastAPI
FastAPI
は、API に必要なすべての機能を提供する Python クラスです。
app = FastAPI()
ここでの変数app
はFastAPI
クラスの「インスタンス」となります。
このインスタンスは、すべての API を作成するための主要なインタラクションオブジェクトとなります。
このapp
も、起動コマンド内でuvicorn
によって参照されます。
myapi = FastAPI () と書くことも可能です。
起動コマンドは uvicorn main --reload に変わります。
@app.get("/")
FastAPIに、下の関数が次のアクセスリクエストを処理する責任があることを伝えます。
リクエストのパスは / です。
GET 操作を使用して呼び出します。
同様に、@app.post ()、@app.put ()、@app.delete () を使用することもできます。
async def root():
Python 関数を定義し、関数名は root です。
async は、これは非同期関数であることを示します。
return {"message": "Hello World"}
返す内容です。
## ファイルを実行#
uvicorn main:app --reload
実行が成功しました。
# パスパラメータ#
## パスにパラメータを渡す#
@app.get("/items/{item_id}")
async def read_item(item_id):
return {"item_id": item_id}
{item_id} は、これは渡すことができるパラメータを示します。
この時、localhost:8000/items/1 にリクエストすると、
{ "item_id": "1"}
が返されます。
## パラメータの型を制限する#
同時に、このパラメータに型制限を与えることができます。
@app.get("/items/{item_id}")
async def read_item(item_id:int):
return {"item_id": item_id}
関数の引数の後に **: 型 ** を追加するだけです。
この時、文字列を渡すと、422 エラーが返されます。
型はstr
、float
、bool
などを使用できます。
注意!
順序の問題に注意する必要があります。
例えば:
from fastapi import FastAPI
app = FastAPI()
@app.get("/users/me")
async def read_user_me():
return {"user_id": "the current user"}
@app.get("/users/{user_id}")
async def read_user(user_id: str):
return {"user_id": user_id}
現在、2 つのインターフェースがありますが、/users/{user_id} が /users/me の前にある場合、
_パス操作_は順番に実行されるため、
/users/me にリクエストすると、/users/{user_id} インターフェースが実行され、「me」文字列が返されます。
したがって、/users/me が /users/{user_id} の前にあることを確認する必要があります。
## 列挙型のデフォルト値を作成#
### 列挙型クラスを作成#
from fastapi import FastAPI
# 列挙型をインポート
from enum import Enum
# 列挙型クラスを作成
class ModelNameEnum(str, Enum):
fire = "fire"
peach = "peach"
firepeach = "firepeach"
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/items/{item_id}")
async def read_item(item_id:str):
return {"item_id": item_id}
@app.get("/models/{model_name}")
async def get_model(model_name: ModelNameEnum):
# 列挙型の値を使って判断
if model_name is ModelNameEnum.fire:
return {"model_name": model_name, "message": "Fire Fire Fire"}
if model_name.value == "peach":
return {"model_name": model_name, "message": "Peach Peach Peach"}
return {"model_name": model_name, "message": "FirePeach FirePeach FirePeach"}
コードの説明:
async def get_model(model_name: ModelNameEnum):
定義した引数 model_name は ModelNameEnum 列挙型です。
if model_name is ModelNameEnum.fire:
ここでの is と == の役割は同じで、列挙メンバーと比較するために使用されます。
ただし、== を使用する場合は、引数 **.value** を使って値を取得する必要があります。
model_name.value
### テストリクエスト#
## ファイルパスを引数として渡す#
渡す必要がある引数がパスである場合、
/files/{file_path:path}
をリクエストパスとして使用できます。
@app.get("/files/{file_path:path}")
async def read_file(file_path: str):
return {"file_path": file_path}
/home/test/text.txt パスを渡す必要がある場合、
リクエストの引数は localhost:8000/files//home/test/text.txt で、注意点は 2 つの **/** です。
# クエリパラメータ#
## デフォルト値#
パスパラメータに属さない他の関数パラメータを宣言すると、それらは自動的に「クエリ文字列」パラメータとして解釈されます。
例えば、以下のコードのように:
from fastapi import FastAPI
app = FastAPI()
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
return fake_items_db[skip : skip + limit]
関数の引数内で
async def read_item(skip: int = 0, limit: int = 10):
デフォルト値 skip は 0、limit は 10 として定義されています。
したがって、
-
http://127.0.0.1:8000/items/?skip=0&limit=10 にアクセスすると、渡されるパラメータは skip = 0、limit = 10 です。
-
http://127.0.0.1:8000/items/?skip=1 にアクセスすると、渡されるパラメータは skip = 1、limit = 10 です。limit が渡されない場合、デフォルトで 10 になります。
-
http://127.0.0.1:8000/items/?limit=1 にアクセスすると、渡されるパラメータは skip = 0、limit = 1 です。skip が渡されない場合、デフォルトで 0 になります。
## オプション値#
Union をインポートする必要があります。
from typing import Union
@app.get("/items/{item_id}")
async def read_itemUnion(item_id: str, q: Union[str, None] = None):
if q:
return {"item_id": item_id, "q": q}
return {"item_id": item_id}
この時、q は渡すことも渡さないこともできるパラメータです。
さらに bool 型の引数を追加することもできます。
@app.get("/items/{item_id}")
async def read_itemUnion(item_id: str, q: Union[str, None] = None, short: bool = False):
item = {"item_id": item_id}
if q:
# qをitemのキーの1つとして追加し、updateで新しいキーをマージできます
item.update({"q": q})
if not short:
item.update({"description": "This is an amazing item that has a long description"})
return item
update ({"XX" : "XX"}) を使用すると、元のキーのペアにこのキーを追加できます。
テストアクセス
## 複数のパスとクエリパラメータ#
@app.get("/users/{user_id}/items/{item_id}")
async def read_user_item(
user_id: int, item_id: str, q: Union[str, None] = None, short: bool = False
):
item = {"item_id": item_id, "owner_id": user_id}
if q:
item.update({"q": q})
if not short:
item.update(
{"description": "This is an amazing item that has a long description"}
)
return item
複数のパラメータのリクエストをサポートします。
## 必須のクエリパラメータ#
非パスのパラメータにデフォルト値のないパラメータを指定した場合、そのパラメータは必須となります。
つまり、@app.get (/XXX/{ XXX}) で指定されているのではなく、関数の引数で指定されているものです。
また、Union[str, None] = None
が設定されていない場合、そのパラメータは必須です。
@app.get("/items/{item_id}")
async def read_user_item(item_id: str, needy: str):
item = {"item_id": item_id, "needy": needy}
return item
例えば、ここでの needy パラメータは必須です。
# リクエストボディ#
クライアント(例えばブラウザ)からデータをAPIに送信する必要がある場合、それを「リクエストボディ」として送信します。
リクエストボディは、クライアントがAPIに送信するデータです。レスポンスボディは、APIがクライアントに送信するデータです。
あなたの API はほぼ常にレスポンスボディを送信します。しかし、クライアントは必ずしもリクエストボディを送信する必要はありません。
## BaseModel をインポート#
from pydantic import BaseModel
リクエストボディを宣言するために pydantic モデルを使用する必要があります。
## データモデルを作成#
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
これで、Item という名前のリクエストボディの構造タイプを宣言しました。
{
"name": "Foo",
"description": "An optional description",
"price": 45.2,
"tax": 3.5
}
次に、POST メソッドを追加し、引数を Item モデルタイプとして指定できます。
@app.post("/items/")
async def create_item(item: Item):
return item
テストリクエスト
## モデルを使用#
関数の内部では、モデルオブジェクトのすべての属性に直接アクセスできます。
ただし、モデル.dict ()を使用してモデルの辞書を取得する必要があります。
** モデル.dict ()** は、モデル内の各属性を個別に取り出します。
@app.post("/items/")
async def create_item(item: Item):
item_dict = item.dict()
if item.tax:
price_with_tax = item.price + item.tax
item_dict.update({"price_with_tax": price_with_tax})
return item_dict
## リクエストボディとパスパラメータを組み合わせる#
@app.put("/items/{item_id}")
async def create_item2(item_id: int, item: Item):
return {"item_id": item_id, "item": item, **item.dict()}
パスパラメータとリクエストボディを同時に渡すことができます。
item.dict () を返す際には ** を追加することに注意してください。
item.dict () を返すと、モデル内の各属性が個別に取り出されます。
## リクエストボディ、パスパラメータ、クエリパラメータを組み合わせる#
@app.put("/items/{item_id}")
async def create_item(item_id: int, item: Item, q: Union[str, None] = None):
result = {"item_id": item_id, **item.dict()}
if q:
result.update({"q": q})
return result
# パラメータの検証#
クエリパラメータの検証#
まず、Query をインポートする必要があります。
from fastapi import FastAPI, Query
### 長さの検証#
@app.get("/items/")
async def read_items(q: Union[str, None] = Query(default=None, min_length=2, max_length=5)):
results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
if q:
results.update({"q": q})
return results
パラメータの後に Query () を追加できます。
デフォルト値None
を置き換えるためにQuery(default=None)
を使用する必要があります。Query の最初のパラメータもデフォルト値を定義するために使用されます。
上記のコードは、q フィールドの最小長さが 2、最大長さが 5 であることを示しています。
### 正規表現の検証#
@app.get("/items/")
async def read_items(q: Union[str, None] = Query(default=None, min_length=2, max_length=5, regex='^query$')):
results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
if q:
results.update({"q": q})
return results
regex='^query$'
上記のコードは、q の内容が query である必要があることを指定しています。
### デフォルト値#
デフォルト値も指定できます。
default = 'query'
## パスパラメータの検証#
Path をインポートする必要があります。
from fastapi import FastAPI, Query, Path
メソッドを追加します。
from fastapi import FastAPI, Path, Query
app = FastAPI()
@app.get("/items/{item_id}")
async def read_items(
*,
item_id: int = Path(title="The ID of the item to get", ge=0, le=1000),
q: str,
size: float = Query(gt=0, lt=10.5),
):
results = {"item_id": item_id}
if q:
results.update({"q": q})
return results
@app.get("/items4/{item_id}")
async def read_items6(
*,
item_id: int = Path(title="The ID of the item to get", ge=0, le=1000),
q: str,
size: float = Query(gt=0, lt=10.5),
):
results = {"item_id": item_id}
if q:
results.update({"q": q})
return results
gt
:より大きい(g
reatert
han)ge
:より大きいまたは等しい(g
reater than ore
qual)lt
:より小さい(l
esst
han)le
:より小さいまたは等しい(l
ess than ore
qual)
関数の最初のパラメータとして*
を渡します。
Python はその*
に対して何も行いませんが、その後のすべてのパラメータはキーワードパラメータ(キーと値のペア)として呼び出されることを知ります。たとえそれらにデフォルト値がなくても。
つまり、最初のパラメータとして*
を追加しない場合、関数の引数にq: str
だけを記入することはできません。
なぜなら、デフォルト値がないからです。
# リクエストボディに複数のモデルパラメータ#
from typing import Union
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
class User(BaseModel):
username: str
full_name: Union[str, None] = None
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item, user: User):
results = {"item_id": item_id, "item": item, "user": user}
return results
その後、引数は以下の形式で使用できます。
{
"item": {
"name": "Foo",
"description": "The pretender",
"price": 42.0,
"tax": 3.2
},
"user": {
"username": "dave",
"full_name": "Dave Grohl"
}
}
## リクエストボディの単一値#
同じリクエストボディの中に新しいキーと値のペアを追加したい場合、
例えば、
{
"item": {
"name": "Foo",
"description": "The pretender",
"price": 42.0,
"tax": 3.2
},
"user": {
"username": "dave",
"full_name": "Dave Grohl"
},
"key": "key"
}
この場合、Body をインポートできます。
from fastapi import FastAPI, Query, Path, Body
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item, user: User, key: str = Body()):
results = {"item_id": item_id, "item": item, "user": user, "key": key}
return results
以下のコードを追加することで実現できます。
key: str = Body()
同様に、Body () 内でも Query や Path と同様に検証を追加できます。
key: str = Body(gt=1)
テストインターフェース
## 単一のリクエストボディパラメータを埋め込む#
以下のようなリクエスト方式を実現したい場合、
{
"item": {
"name": "Foo",
"description": "The pretender",
"price": 42.0,
"tax": 3.2
}
}
ではなく、
{
"name": "Foo",
"description": "The pretender",
"price": 42.0,
"tax": 3.2
}
この場合、Body () の embed パラメータを使用できます。
例えば、
item: Item = Body(embed=True)
from typing import Union
from fastapi import Body, FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item = Body(embed=True)):
results = {"item_id": item_id, "item": item}
return results
# リクエストボディ内のネストされたモデル#
## サブタイプを持つ List フィールド#
まず、List をインポートします。
from typing import List, Union
例えば、複数のタグを渡す必要がある場合、
以下のコードを使用して List を表すことができます。
# 内容の型を制限し、str型のみを渡すことができます
tags: List[str] = []
# 内容の型を制限せず、str、intなども渡すことができます
tags: List = []
コードは以下の通りです。
from typing import List, Union
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
tags: List[str] = []
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item):
results = {"item_id": item_id, "item": item}
return results
## Set 型#
List とは異なり、Set の要素は重複できないため、タグを保存するのにより適しています。
重複する要素は自動的にマージされます。
同様に、typing から Set をインポートする必要があります。
from typing import Set
使用コード:
tags: Set[str] = set()
コードは以下の通りです。
class Message(BaseModel):
role: str
content: str
tags: Set[str] = set()
@app.post("/chat")
async def chat(message: Message, model: str = Body()):
return {"message": message, "model": model}
## ネストされたモデル#
モデル内に別のモデルをネストすることができます。
例えば、
{
"name": "Foo",
"description": "The pretender",
"price": 42.0,
"tax": 3.2,
"tags": ["rock", "metal", "bar"],
"image": {
"url": "http://example.com/baz.jpg",
"name": "The Foo live"
}
}
まず、サブモデルを定義します。
class Image(BaseModel):
url: str
name: str
次に、元のモデルに追加します。
class Message(BaseModel):
role: str
content: str
tags: Set[str] = set()
image: Union[Image, None] = None
## 特殊な型と検証#
通常の単一値型(str
、int
、float
など)以外にも、str
から派生したより複雑な単一値型を使用できます。
外部の特殊型は Pydantic からインポートできます。具体的にはPydantic の公式サイトを参照してください。
例えば、image 内の URL はアドレスであるべきなので、Pydantic は HttpUrl 型を提供して検証します。
同様に、以下をインポートします。
from pydantic import BaseModel, HttpUrl
次に、image モデルクラスを修正します。
class Image(BaseModel):
url: HttpUrl
name: str
この時、URL が http または https で始まらない場合、エラーが発生します。
## 一連のサブモデルを持つ属性#
同様に、Pydantic モデルをlist
、set
などのサブタイプとして使用できます。
例えば、
class Message(BaseModel):
role: str
content: str
tags: Set[str] = set()
image: Union[List[Image], None] = None
# レスポンスモデル#
## ユーザー登録のレスポンス#
任意の_パス操作_でresponse_model
パラメータを使用して、レスポンスに使用するモデルを宣言できます:
例えば、ユーザー登録時、登録成功後にユーザーデータを返します。
ただし、パスワードは直接返すことができないため、パスワードのないレスポンスモデルを設定できます。
ユーザーが返す定義されたモデルの出力です。
まず、Any 型と EmailStr をインポートします。
from typing import Any
from pydantic import EmailStr
登録ユーザーのモデルを定義します。
class User(BaseModel):
username: str
full_name: Union[str, None] = None
email: EmailStr
password: str
登録成功後に返すレスポンスモデルを定義します。
class UserOut(BaseModel):
username: str
full_name: Union[str, None] = None
email: EmailStr
実装コードは以下の通りです。
@app.post("/user/", response_model=UserOut)
async def create_user(user: User) -> Any:
return user
ここでの Any は、返される user が任意の型であることを示します。
テストインターフェース
## 予め定義されたモデルレスポンス#
まず、レスポンスモデルを定義します。
class Item2(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: float = 10
tags: List[str] = []
次に、デフォルトの返却を設定します。
item2s = {
"fire": {"name": "Fire", "price": 3.2},
"peach": {"name": "Peach", "description": "Peach Peach Peach", "price": 2.2, "tags": ["aa", "bb", "cc"]},
"firepeach": {"name": "firepeach", "description": "firepeach firepeach firepeach", "price": 2.2}
}
インターフェースの実装は以下の通りです。
@app.get("/item2s/{item_id}", response_model=Item2)
async def read_item2(item_id: str):
return item2s[item_id]
この時、リクエスト引数として item_id を送信すると、item2s 内の対応する item_id のデータが返されます。
テスト呼び出し
デフォルト値を持つパラメータを返したくない場合は、response_model_exclude_unset=Trueというパラメータを追加できます。
コードを以下のように修正します。
@app.get("/item2s/{item_id}", response_model=Item2, response_model_exclude_unset=True)
async def read_item2(item_id: str):
return item2s[item_id]
この時、インターフェース呼び出しでは、デフォルト値のデータ内容や空のデータ内容は返されません。
デフォルト値と異なるものは引き続き返されます。
# 追加のモデル#
ユーザーデータを保存する際に、データベースに保存するための新しいモデルが必要です。
パスワードは平文で保存できないため、ハッシュ値で保存することができます。
UserDB モデルを追加します。
class UserDB(BaseModel):
username: str
hashed_password: str
email: EmailStr
full_name: Union[str, None] = None
偽のパスワードハッシュ関数を定義します。
def fake_password_hasher(raw_password: str):
return "hashpassword" + raw_password
UserDB モデルタイプのデータを作成し、データベースに保存する関数を定義します。
def fake_save_user(user_in: User):
fake_hash_password = fake_password_hasher(user_in.password)
# **を使用して辞書をキーワード引数に展開します
# そして、展開されたuser_inをキーワード引数としてuser_dbに渡します
# パスワードをhashed_passwordに暗号化してuser_dbに割り当てます
user_db = UserDB(**user_in.dict(), hashed_password=fake_hash_password)
print("ユーザーの偽の保存が成功しました!");
return user_db
新しいユーザー登録インターフェースを追加します。
@app.post("/user2/", response_model=UserOut)
async def create_user2(user: User):
user_save = fake_save_user(user)
return user_save
テストインターフェース
## **user.dict()
について#
### Pydantic モデル#
user
はUser
クラスの Pydantic モデルです。
Pydantic モデルには.dict()
メソッドがあり、このメソッドはモデルデータを持つdict
(辞書)を返します。
したがって、User モデルのオブジェクトを作成すると、
user_in = User(username="john", password="secret", email="[email protected]")
user.dict()
を呼び出すと、
user_dict = user.dict()
user_dict のデータは次のようになります。
{
'username': 'john',
'password': 'secret',
'email': '[email protected]',
'full_name': None,
}
つまり、すべての属性が個別に取り出されます。
### dict の解包#
user_dict
のようなdict
を**user_dict
形式で関数(またはクラス)に渡すと、Python はそれを「解包」します。user_dict
のキーと値をキーワード引数として直接渡します。
したがって、次のコードを使用すると、
UserDB(**user_dict)
# または
UserDB(**user.dict())
この時、得られる UserDB は次のようになります。
UserDB(
'username': 'john',
'password': 'secret',
'email': '[email protected]',
'full_name': None,
)
または、より正確には、user_dict
を使用して将来含まれる可能性のある任意の内容を表すことができます。
UserDB(
username = user_dict["username"],
password = user_dict["password"],
email = user_dict["email"],
full_name = user_dict["full_name"],
)
次に、追加のキーワード引数hashed_password=hashed_password
を追加すると、次のようになります。
UserDB(**user.dict(), hashed_password=hashed_password)
最終的な結果は次のようになります。
UserDB(
username = user_dict["username"],
password = user_dict["password"],
email = user_dict["email"],
full_name = user_dict["full_name"],
hashed_password = hashed_password,
)
## コードの重複について#
コードの重複は、バグ、安全性の問題、コードの不整合の可能性を高めます(ある位置でコードを更新したが、他の位置で更新しなかった場合など)。
上記のモデルは多くのデータを共有し、重複する属性名と型を持っています。
したがって、上記のdict と解包に基づいて、UserBase の基底クラスを直接定義できます。
class UserBase(BaseModel):
username: str
email: EmailStr
full_name: Union[str, None] = None
その後、他のモデルはこの基底クラスを基にして、新しい属性を割り当てるだけです。
例えば、
class User(UserBase):
password: str
class UserDB(UserBase):
hashed_password: str
class UserOut(UserBase):
# passは何も処理せず、UserBaseと一致させることを示します
pass
後のインターフェースの使用では、基本的にすべてのフィールドが存在するため、新しいフィールドを解決するだけで済みます。
## Union モデル#
レスポンスを 2 つの型のUnion
として宣言できます。これは、レスポンスが 2 つの型のいずれかであることを意味します。
これにより、ドキュメントに anyOf として表示されます。
例えば、
response_model=Union[PlaneItem, CarItem]
コードは以下の通りです。
from typing import Union
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class BaseItem(BaseModel):
description: str
type: str
class CarItem(BaseItem):
type = "car"
class PlaneItem(BaseItem):
type = "plane"
size: int
items = {
"item1": {"description": "All my friends drive a low rider", "type": "car"},
"item2": {
"description": "Music is my aeroplane, it's my aeroplane",
"type": "plane",
"size": 5,
},
}
@app.get("/items/{item_id}", response_model=Union[PlaneItem, CarItem])
async def read_item(item_id: str):
return items[item_id]
## モデルリスト#
同様の方法で、オブジェクトリストで構成されるレスポンスを宣言できます。
response_model=List[Item]
コードは以下の通りです。
from typing import List
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str
items = [
{"name": "Foo", "description": "There comes my hero"},
{"name": "Red", "description": "It's my aeroplane"},
]
@app.get("/items/", response_model=List[Item])
async def read_items():
return items
# レスポンスステータスコード#
status_code
を使用してステータスコードを指定できます。
例えば、
@app.post("/user2/", response_model=UserOut, status_code=201)
async def create_user2(user: User):
user_saved = fake_save_user(user)
return user_saved
成功した場合、201 のステータスコードが返されます。
また、FastAPI は便利な変数も提供しています。
インポートします。
from fastapi import status
@app.post("/user2/", response_model=UserOut, status_code=status.HTTP_201_CREATED)
async def create_user2(user: User):
user_saved = fake_save_user(user)
return user_saved
一般的なステータスコードは以下の通りです。
各ステータスコードおよび適用シーンについての詳細は、HTTP レスポンスステータスコード - HTTP | MDNを参照してください。
# フォームデータ#
JSON とは異なり、HTML フォーム(<form></form>
)は、サーバーにデータを送信する際に通常「特殊」なエンコーディングを使用します。
受信するのは JSON ではなく、フォームフィールドの場合はForm
を使用します。
フォームを使用するには、事前にpython-multipart
をインストールする必要があります。
実行します。
pip install python-multipart
FastAPI が提供する Form をインポートします。
from fastapi import FastAPI, Form
app = FastAPI()
フォーム(Form
)パラメータを作成する方法は、Body
やQuery
と同様です。
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
return {"username": username}
例えば、OAuth2 仕様の「パスワードフロー」では、username
とpassword
をフォームフィールドを介して送信する必要があります。
この仕様では、フィールド名をusername
とpassword
にする必要があり、JSON を使用してはなりません。
Form
を使用すると、Body
(およびQuery
、Path
、Cookie
)と同様に、メタデータや検証を宣言できます。
フォームデータの「メディアタイプ」エンコーディングは一般にapplication/x-www-form-urlencoded
です。
ただし、ファイルを含むフォームのエンコーディングはmultipart/form-data
です。
# リクエストファイル#
# エラー処理#
特定の状況では、クライアントにエラーメッセージを返す必要があります。
ここで言うクライアントには、フロントエンドのブラウザ、他のアプリケーション、IoT デバイスなどが含まれます。
クライアントにエラーメッセージを返す必要があるシーンは主に以下の通りです:
- クライアントが操作を実行する権限がない
- クライアントがリソースにアクセスする権限がない
- クライアントがアクセスしようとしているアイテムが存在しない
- その他...
これらの状況に遭遇した場合、通常は4XX(400 から 499)HTTP ステータスコードを返す必要があります。
4XXステータスコードは、リクエストが成功したことを示す2XX(200 から 299)HTTP ステータスコードに似ています。
ただし、4XXステータスコードは、クライアント側で発生したエラーを示します。
## HTTPException#
クライアントに HTTP エラー応答を返すには、HTTPException
を使用できます。
from fastapi import HTTPException
例えば、
@app.get("/item2s/{item_id}", response_model=Item2, response_model_exclude_unset=True)
async def read_item2(item_id: str):
if item_id not in item2s:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="そのファイルは存在しません")
return item2s[item_id]
テスト呼び出し
## カスタム例外ハンドラーのインストール#
カスタムハンドラーを追加するには、Starlette の例外ツールを使用します。
トリガーされるカスタム例外をUnicornException
と呼ぶと仮定します。
FastAPI がこの例外をグローバルに処理する必要があります。
この場合、@app.exception_handler()
を使用してカスタム例外コントローラーを追加できます。
また、FastAPI がラップしたものを直接インポートすることもできます。
from fastapi import Request
from fastapi.response import JSONResponse
コードは以下の通りです。
class UnicornException(Exception):
def __init__(self, name: str):
self.name = name
@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
return JSONResponse(
status_code=418,
content={"message": f"あはは! {exc.name} ここでエラーが発生しました..."},
)
@app.get("/unicorns/{name}")
async def read_unicorn(name: str):
if name == "yolo":
raise UnicornException(name="カスタムエラー名")
return {"unicorn_name": name}
{"message": f"あはは! {exc.name} ここでエラーが発生しました..."}
内の f はカスタムエラー名を取得できます。
テスト結果
# JSON 互換エンコーダーとデータの更新#
特定の状況では、データ型(Pydantic モデルなど)を JSON 互換のデータ型(dict
、list
など)に変換する必要があります。
例えば、データベースに保存する必要がある場合です。
このような要求に対して、FastAPIはjsonable_encoder()
関数を提供します。
## jsonable_encoder () を使用してデータを更新#
仮のデータベースfake_db
があり、これは JSON 互換のデータのみを受け入れます。
例えば、datetime
のようなオブジェクトは JSON 互換ではないため、これらのオブジェクトは ISO 形式のstr
型オブジェクトに変換する必要があります。
同様に、このデータベースは Pydantic モデル(属性を持つオブジェクト)を受け入れず、dict
のみを受け入れます。
この場合、jsonable_encoder
を使用します。
データを更新するには、HTTP PUT
操作を使用します。
これは、Pydantic モデルなどのオブジェクトを受け取り、JSON 互換のバージョンを返します:
入力データを JSON 形式で保存するためのデータ(例えば、NoSQL データベースを使用する場合)に変換できます。datetime
をstr
に変換します。
from datetime import datetime
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
# 偽のデータベース
fake_db = {
"fee": {"title": "foo", "timestamp": datetime.now()}
}
# ItemJモデルを定義
class ItemJ(BaseModel):
title: str
timestamp: datetime
description: str | None = None
# putを使用して更新
@app.put("/itemsJ/{id}")
def update_itemJ(id: str, item: ItemJ):
# 時間をstr型のJSON形式に変換
json_compatible_item_data = jsonable_encoder(item)
fake_db[id] = json_compatible_item_data
return fake_db[id]
テスト呼び出し
# セキュリティ#
仮定として、バックエンド API はあるドメインに存在します。
フロントエンドは別のドメインに存在するか、(モバイルアプリケーション内で)同じドメインの異なるパスに存在します。
そして、フロントエンドはバックエンドのusernameとpasswordを使用してユーザーの身元を確認する必要があります。
当然、FastAPIはOAuth2認証をサポートしています。
## 概要#
Depends 依存と OAuth2PasswordBearer 認証をインポートします。
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.get("/oauth/")
async def read_items(token: str = Depends(oauth2_scheme)):
return {"token": token}
この時、API ドキュメント localhost:8000/docs にアクセスすると、
ページの右上に「Authorize」ボタンが表示されます。
_パス操作_の右上にもクリック可能な小さなロックアイコンが表示されます。
Authorizeボタンをクリックすると、認証フォームが表示され、username
とpassword
およびその他のオプションフィールドを入力します:
もし必要がなければ、python-multipart をインストールする必要があります。
実行します。
pip install python-multipart
これは、OAuth2 がフォームデータを介してusername
とpassword
を送信するためです。
### パスワードフロー#
Password
フローは、OAuth2 が安全性と認証を処理するための方法(フロー)を定義しています。
OAuth2 の設計目標は、バックエンドまたは API がサーバーからユーザーの身元を確認することから独立していることです。
ただし、この例では、FastAPIアプリケーションが API と認証を処理します。
以下は簡略化された実行フローです:
- ユーザーがフロントエンドに
username
とpassword
を入力し、Enterをクリックします。 - (ユーザーのブラウザで実行される)フロントエンドが
username
とpassword
を API に指定された URL に送信します(tokenUrl="token"
で宣言)。 - API は
username
とpassword
を確認し、トークン(Token
)で応答します(この機能はまだ実装されていません): - トークンはユーザーを確認するための文字列に過ぎません。
- 一般的に、トークンは一定の時間後に期限切れになります。
- 期限切れ後、ユーザーは再度ログインする必要があります。
- こうすることで、トークンが盗まれてもリスクが低くなります。なぜなら、トークンは永久的なキーとは異なり、ほとんどのケースで長期間有効ではないからです。
- フロントエンドは一時的にトークンをどこかに保存します。
- ユーザーがフロントエンドをクリックして、フロントエンドアプリケーションの他の部分に移動します。
- フロントエンドは API からさらにデータを取得する必要があります:
- 指定されたエンドポイント(Endpoint)で認証を行います。
- したがって、API で認証する際には、
Authorization
リクエストヘッダーにBearer
+ トークンの値を送信する必要があります。 - もしトークンが
foobar
であれば、Authorization
リクエストヘッダーは:Bearer foobar
となります。
### OAuth2PasswordBearer#
FastAPIは異なる抽象レベルのセキュリティツールを提供しています。
この例では、OAuth2のPasswordフローとBearerトークン(Token
)を使用します。
そのために、OAuth2PasswordBearer
クラスを使用します。
OAuth2PasswordBearer
のインスタンスを作成する際に、tokenUrl
パラメータを渡す必要があります。このパラメータは、クライアント(ユーザーのブラウザで実行されるフロントエンド)がusername
とpassword
を送信し、トークンを取得するための URL を含みます。
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
ここで、tokenUrl="token"
はまだ作成されていない相対 URLtoken
を指します。この相対 URL は./token
に相当します。
相対 URL を使用することは非常に重要で、アプリケーションがプロキシの背後にあるような高度なユースケースに遭遇したときでも、正常に動作することを保証します。
### 使用#
oauth2_scheme
変数はOAuth2PasswordBearer
のインスタンスであり、呼び出し可能です。
呼び出し方は次の通りです。
oauth2_scheme(some, parameters)
したがって、Depends
(依存)を使用してoauth2_scheme
を依存項目として渡すことができます。
次に、Depends
を使用してoauth2_scheme
を依存項目として渡します。
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
return {"token": token}
この依存項目は、文字列(str
)を受け取って_パス操作関数_の引数token
として使用します。
## 現在のユーザーを取得#
### ユーザーモデルを作成#
class UserAuth(BaseModel):
username: str
# 後でEmailStrに変更するために、最初はUnion[str, None]を使用します。
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
### 依存項目を作成#
get_current_user
依存項目を作成します。
get_current_user
は、以前に作成した同じoauth2_scheme
を依存項目として持ちます。
以前にパス操作で行ったのと同様に、新しい依存項目get_current_user
は、サブ依存項目oauth2_scheme
からstr
型のtoken
を受け取ります。
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
return user
### ユーザー情報を取得#
oauth2_scheme
から取得したトークンを使用してユーザーを取得します。
get_current_user
は、作成した(擬似)ツール関数fake_decode_tokenを使用し、str
型のトークンを受け取り、Pydantic のUser
モデルを返します。
def fake_decode_token(token):
return User(username=token + "fakedecoded", email="[email protected]", full_name="John Doe")
### 現在のユーザーを注入#
これで、パス操作内でget_current_user
をDepends
として使用できます。
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
## パスワードと Bearer を使用した簡単な OAuth2#
### 概要#
これから、FastAPIのセキュリティユーティリティを使用してusername
とpassword
を取得します。
OAuth2 は「パスワードフロー」を使用する際、クライアント / ユーザーがusername
とpassword
フィールドをフォームデータとして送信する必要があることを規定しています。
また、仕様ではフィールド名をこのように命名する必要があると明記しています。したがって、user-name
やemail
は機能しません。
ただし、データベースモデルは他の名前を使用できます。
ただし、ログインパス操作では、これらの名前を使用して仕様と互換性を持たせる必要があります(たとえば、統合された API ドキュメントシステムの能力を持たせるため)。
仕様では、username
とpassword
はフォームデータとして送信する必要があると明記されています(したがって、ここでは JSON を使用できません)。
### username
とpassword
を取得#
#
OAuth2PasswordRequestForm
が必要です。
まず、インポートします。
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
# user_dictが存在しない場合
if not user_dict:
raise HTTPException(status_code=400, detail="ユーザー名またはパスワードが間違っています")
# UserのデータをUserDBモデルクラスに渡します
user = UserDB(**user_dict)
# パスワードをハッシュ化します
hashed_password = fake_hash_password(form_data.password)
# パスワードが間違っている場合
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="ユーザー名またはパスワードが間違っています")
# ユーザー名とトークンのタイプを返します
return {"access_token": user.username, "token_type": "bearer"}
OAuth2PasswordRequestForm
は、リクエストフォームを宣言するクラスです:
username
。password
。- オプションの
scope
フィールドは、空白で区切られた文字列からなる大きな文字列です。 - オプションの
grant_type
です。
#### パスワードのハッシュ化#
「ハッシュ」とは、特定の内容(この場合はパスワード)を、見た目がランダムなバイト列(単なる文字列)に変換することを意味します。
同じ内容(完全に同じパスワード)を渡すたびに、完全に同じランダムな文字列が得られます。
ただし、ランダムな文字列からパスワードを逆変換することはできません。
# パスワードをハッシュ化します
hashed_password = fake_hash_password(form_data.password)
# パスワードが間違っている場合
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="ユーザー名またはパスワードが間違っています")
#### トークンを返す#
token
エンドポイントのレスポンスは JSON オブジェクトである必要があります。
token_type
フィールドが必要です。この例では「Bearer」トークンを使用しているため、トークンタイプは「bearer
」である必要があります。
また、access_token
フィールドも必要で、これは私たちのアクセス令牌を含む文字列です。
この簡単な例では、非常に安全でない方法で同じusername
をトークンとして返します。
# ユーザー名とトークンのタイプを返します
return {"access_token": user.username, "token_type": "bearer"}
#### 依存項目を更新#
現在、依存項目を更新します。
このユーザーが有効である場合にのみ、current_user
を取得したいと考えています。
そのため、get_current_active_user
という追加の依存項目を作成し、その依存項目はget_current_user
を依存項目として持ちます。
ユーザーが存在しないか、無効な場合、これらの 2 つの依存項目は HTTP エラーを返します。
したがって、私たちのエンドポイントでは、ユーザーが存在し、認証が通過し、有効な状態である場合にのみ、そのユーザーを取得できます。
# トークンを介してユーザー情報を取得
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="無効な認証情報",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# 現在のアクティブなユーザーを取得します。ログイン後はアクティブです。
# 認証が通過し、有効な状態である場合にのみ、このユーザーを取得できます。
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="無効なユーザー")
return current_user
#### 偽のデータベースを作成#
# 偽のデータベース
fake_users_db = {
"bryce": {
"username": "bryce",
"full_name": "bryce Yu",
"email": "[email protected]",
"hashed_password": "hash123123",
"disabled": False,
},
"yu": {
"username": "yu",
"full_name": "yu",
"email": "[email protected]",
"hashed_password": "hash123456",
"disabled": True,
},
}
### 完全なコード(重要!!!!)#
ユーザーがログインして現在のユーザーを取得するための完全なコードは以下の通りです。
# ユーザーモデルクラスを作成
class UserAuth(BaseModel):
username: str
# 最初はUnion[str, None]を使用し、後でEmailStrに変更します。
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
# ユーザーデータベースモデルクラスを作成
class UserDB(UserAuth):
hashed_password: str
# パスワードをハッシュ化する擬似関数を作成
def fake_hash_password(raw_password: str):
# パスワードをハッシュ化する擬似関数。実際にはbcryptや他のハッシュアルゴリズムを使用するべきです。
return "hash" + raw_password
# トークンを介してユーザー情報を取得
def fake_decode_token(token):
# トークンを介してユーザー情報を取得します。現在はユーザー名をトークンとして安全でない方法で使用します。
user = get_user(fake_users_db, token)
return user
# dbデータベースから、ユーザー名がusernameのユーザー情報を取得
def get_user(db, username: str):
# 現在はユーザー名をトークンとして安全でない方法で使用します。
if username in db:
user_dict = db[username]
return UserDB(**user_dict)
# oauth2_schemeを介してトークンを取得し、トークンを使用して現在のログインユーザー情報を取得します。
# oauth2_schemeは認証スキームであり、リクエストヘッダーからAuthorizationフィールドの値を取得し、それをトークンに解析します。
async def get_current_user(token: str = Depends(oauth2_scheme)):
# トークンを介してユーザー情報を取得します。
user = fake_decode_token(token)
# ユーザーが存在しない場合
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="無効な認証情報",
headers={"WWW-Authenticate": "Bearer"},
)
# ユーザーが存在する場合、ユーザー情報を返します。
return user
# 現在のアクティブなユーザーを取得します。
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
# get_current_userを介して現在のログインユーザーcurrent_userを取得します。
# 次に、ユーザーが無効かどうかを確認します。
if current_user.disabled:
raise HTTPException(status_code=400, detail="ユーザーが無効です")
return current_user
# フォームを介してユーザーをログインさせます。
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# ユーザー名を使用して擬似データベースからユーザーを取得します。
user_dict = fake_users_db.get(form_data.username)
# user_dictが存在しない場合
if not user_dict:
raise HTTPException(status_code=400, detail="ユーザー名またはパスワードが間違っています")
# UserのデータをUserDBモデルクラスに渡し、UserDBのインスタンスを取得してパスワードを比較します。
user = UserDB(**user_dict)
# パスワードをハッシュ化し、データベース内のパスワードと比較します。
hashed_password = fake_hash_password(form_data.password)
# パスワードが間違っている場合
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="ユーザー名またはパスワードが間違っています")
# トークンとトークンのタイプを返します。現在はユーザー名をトークンとして安全でない方法で使用します。
return {"access_token": user.username, "token_type": "bearer"}
# このインターフェースを呼び出すときは、ヘッダーにAuthorization: Bearer tokenを追加する必要があります。tokenはログイン時に返されたトークンです。
# OpenApiを使用してテストする際には、ヘッダーにAuthorization: Bearer tokenが自動的に追加されます。
# 実際のリクエストでは、手動でAuthorization: Bearer tokenを追加する必要があります。
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
# get_current_active_userを介して現在のログインユーザーcurrent_userを取得します。
return current_user
#### フローのまとめ#
まず、/token
エンドポイントを呼び出し、フォームを介してusername
とpassword
を送信すると、エンドポイントはトークンを返します。
次に、ヘッダーの Authorization にトークンの値を追加する必要があります。/user/me
を呼び出して現在のログインユーザーを取得します。
この例では Bearer を使用しているため、Authorization の値は「Bearer 」+ 返されたトークンです。
OpenApi を使用してテストする際には、トークンの値が自動的に Authorization に追加されます。
実際のプロジェクトで使用する場合は、手動で Authorization の値をヘッダーに追加する必要があります。