
如何快速實現REST API集成以優化業務流程
from pydantic import BaseModel
from typing import List
import pandas as pd
import os
我們需要創建一個從 SerpApi 的 Google Image Scraper API 收集的項目列表,設置要創建的 csv 文檔的名稱,并為訓練數據定義一個分數。這里的分數概念很簡單:TestData(測試數據)將包含我們收集的所有圖像,而 TrainingData(訓練數據)則只會從中選取一小部分圖像。為了這些目的,我們需要構建一個可以傳遞給相關端點的對象。
class ClassificationsArray(BaseModel):
file_name: str
classifications_array: List[str]
train_data_fraction: float
這里提到的分數有一個簡單的解釋。測試數據集將包含所有圖像,而訓練數據集將只包含其中的一小部分。這是為了在我們使用尚未訓練的圖像訓練模型后測試模型,即測試數據集的差異。
現在我們已經定義了負責命令的對象,讓我們定義 CSVCreator 類:
class CSVCreator:
def __init__(self, ClassificationsArray):
self.classifications_array = ClassificationsArray.classifications_array
self.file_name = ClassificationsArray.file_name
self.rows = []
self.train_data_fraction = ClassificationsArray.train_data_fraction
def gather(self):
for label in self.classifications_array:
images = os.listdir("datasets/test/{}".format(label))
for image in images:
row = ["datasets/test/{}/{}".format(label, image), label]
self.rows.append(row)
def create(self):
df = pd.DataFrame(self.rows, columns = ['path', 'label'])
df.to_csv("datasets/csv/{}.csv".format(self.file_name), index=False)
train_df = df.sample(frac = self.train_data_fraction)
train_df.to_csv("datasets/csv/{}_train.csv".format(self.file_name), index=False)
它接收我們提供的參數列表,這些參數是我們對SerpApi的Google Images Scraper API進行查詢時所使用的。基于這些參數,它會從相應文件夾中的每張圖像生成一個CSV文件。在所有圖像處理完畢后,它會隨機選取一部分樣本,并創建一個用于訓練的CSV文件。
現在,讓我們在main.py
中定義一個函數來執行這一操作。
## main.py
from create import CSVCreator, ClassificationsArray
這些類是執行該操作所必需的,而負責具體操作的函數位于 main.py
中。
@app.post("/create/")
def create_csv(arr: ClassificationsArray):
csv = CSVCreator(arr)
csv.gather()
csv.create()
return {"status": "Complete"}
舉個直觀的例子,如果你前往并使用以下參數進行嘗試:http://localhost:8000/docs
/create/
{
"file_name": "apples_and_oranges",
"classifications_array": [
"Apple",
"Orange"
],
"train_data_fraction": 0.8
}
您將在 called 和 中創建兩個 csv 文件datasets/csv
apples_and_oranges.csv
apples_and_oranges_train.csv
apples_and_oranges.csv
將是測試 CSV,將被排序,將包含所有圖像,如下所示:
路徑 | 標簽 |
---|---|
datasets/test/Apple/37.png | Apple |
datasets/test/Apple/24.jpg | Apple |
datasets/test/Apple/77.jpg | Apple |
datasets/test/Apple/85.jpg | Apple |
datasets/test/Apple/81.png | Apple |
datasets/test/Apple/2.png | Apple |
datasets/test/Apple/12.jpg | Apple |
datasets/test/Apple/39.jpg | Apple |
datasets/test/Apple/64.jpg | Apple |
datasets/test/Apple/44.jpg | Apple |
“apples_and_oranges_train.csv” 將作為訓練用的 CSV 文件,其內容將被隨機排列,并且會包含 80% 的圖像,具體細節如下:
路徑 | 標簽 |
---|---|
datasets/test/Apple/38.jpg | Apple |
datasets/test/Orange/55.jpg | Orange |
datasets/test/Orange/61.jpg | Orange |
datasets/test/Apple/23.jpg | Apple |
datasets/test/Orange/62.png | Orange |
datasets/test/Orange/39.jpg | Orange |
datasets/test/Apple/76.jpg | Apple |
datasets/test/Apple/33.jpg | Apple |
這兩個項目將用于創建 Dataset 項。
我們需要一個對象來指定訓練操作的詳細信息,并在多個類之間共享使用以避免循環導入:
## commands.py
from pydantic import BaseModel
class TrainCommands(BaseModel):
model_name: str = "apples_and_oranges"
criterion: str = "CrossEntropyLoss"
annotations_file: str = "apples_and_oranges"
optimizer: str = "SGD"
lr: float = 0.001
momentum: float = 0.9
batch_size: int = 4
n_epoch: int = 2
n_labels: int = None
image_height: int = 500
image_width: int = 500
transform: bool = True
target_transform: bool = True
shuffle: bool = True
讓我們分解此對象中的項:
鑰匙 | 解釋 |
---|---|
model_name | 不帶擴展名的輸出模型名稱 |
criterion | 訓練過程的標準名稱 |
annotations_file | 不包含訓練文件和擴展名 |
optimizer | 優化器名稱 |
LR | 優化器的學習率 |
momentum | Optimizer 的動量 |
batch_size | 每個批次在 Custom Dataloader 中獲取的項目數 |
n_epoch | 要對訓練文件運行的 epoch 數 |
n_labels | 要訓練的標簽數量,自動收集到另一個類中 |
image_height | 所需的固定圖像高度 |
image_width | 所需的固定圖像寬度 |
transform | 是否應應用輸入轉換 |
target_transform | 是否應應用標簽轉換 |
shuffle | Dataloader 是否應該對數據集進行 shuffle 以獲取新項目 |
僅僅固定圖像的高度和寬度是不夠的,因為這樣做可能會導致圖像失真。本周,我們不會實施任何去噪的變換操作,但這類操作在批量加載時是必要的,因為批量中的張量圖像需要保持相同的大小。
現在我們已經有了所需的命令,讓我們開始了解創建 dataset 和 dataloader 的要求:
## dataset.py
import os
import pandas as pd
import numpy as np
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from commands import TrainCommands
然后讓我們初始化我們的 dataset 類:
class CustomImageDataset(Dataset):
def __init__(self, tc: TrainCommands, type: str):
transform = tc.transform
target_transform = tc.target_transform
annotations_file = tc.annotations_file
self.image_height = tc.image_height
self.image_width = tc.image_width
if type == "train":
annotations_file = "{}_train".format(annotations_file)
self.img_labels = pd.read_csv("datasets/csv/{}.csv".format(annotations_file))
unique_labels = list(set(self.img_labels['label'].to_list()))
tc.n_labels = len(unique_labels)
dict_labels = {}
for label in unique_labels:
dict_labels[label] = unique_labels.index(label)
self.dict_labels = dict_labels
if transform == True:
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
else:
self.transform == False
if target_transform == True:
self.target_transform = transforms.Compose([
transforms.ToTensor(),
])
else:
self.transform == False
我們使用參數(parameter)來定義我們的操作目標:是初始化數據庫,還是指定數據庫具有的類型(如 train 或 test)。
if type == "train":
annotations_file = "{}_train".format(annotations_file)
self.img_labels = pd.read_csv("datasets/csv/{}.csv".format(annotations_file))
要定義要在模型整形中使用的標簽列表,我們使用以下行:
unique_labels = list(set(self.img_labels['label'].to_list()))
tc.n_labels = len(unique_labels)
dict_labels = {}
for label in unique_labels:
dict_labels[label] = unique_labels.index(label)
self.dict_labels = dict_labels
這為我們提供了一個要分類的項目的字典,每個項目都有自己唯一的整數:
## self.dict_labels
{
"Apple": 0,
"Orange": 1
}
我們必須為 input 和 label 定義某些轉換。這些轉換定義了如何將它們轉換為用于訓練的張量,以及在轉換后應應用哪些操作:
if transform == True:
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
else:
self.transform == False
if target_transform == True:
self.target_transform = transforms.Compose([
transforms.ToTensor(),
])
else:
self.transform == False
我們還定義一個函數,為我們提供給定數據集中的圖像數量:
def __len__(self):
return len(self.img_labels)
最后,我們需要定義要給出的內容,一個 image 的張量和一個 label 的張量:
def __getitem__(self, idx):
img_path = os.path.join(self.img_labels.iloc[idx, 0])
label = self.img_labels.iloc[idx, 1]
label = self.dict_labels[label]
label_arr = np.full((len(self.dict_labels), 1), 0, dtype=float) #[0.,0.]
label_arr[label] = 1.0 #[0.,1.]
image = Image.open(img_path).convert('RGB')
image = image.resize((self.image_height,self.image_width), Image.ANTIALIAS)
if not self.transform == False:
image = self.transform(image)
if not self.target_transform == False:
label = self.target_transform(label_arr)
return image, label
讓我們逐個部分地分解它。
以下行將獲取具有給定索引的圖像路徑:
img_path = os.path.join(self.img_labels.iloc[idx, 0])
假設數據集是訓練數據集,索引為 0:
datasets/test/Apple/38.jpg | Apple |
路徑包含的原因是我們目前將所有文件都保存在同一目錄中。該圖像數據來源于一個dataframe,因此不會引發問題。接下來的幾行代碼將從相關標簽的索引中創建一個one-hot向量,例如使用self.img_labels.iloc[0, 0]
label = self.img_labels.iloc[idx, 1]
label = self.dict_labels[label]
label_arr = np.full((len(self.dict_labels), 1), 0, dtype=float) #[0.,0.]
label_arr[label] = 1.0 #[0.,1.]
我進行評論的原因是,在我們的例子中存在兩個標簽,即“Apple”和“Orange”,因此one-hot向量的大小將根據這兩個標簽來定義。接下來,我們會將這個向量轉換為numpy數組,以便進一步轉換為張量。
image = Image.open(img_path).convert('RGB')
image = image.resize((self.image_height,self.image_width), Image.ANTIALIAS)
我們將圖像轉換為 RGB 格式,以獲得一個包含三個維度的向量,其中第三個維度代表顏色。接著,我們使用 ANTIALIAS 方法對圖像進行大小調整,以確保調整后的圖像仍然能夠被人類視覺所識別。盡管我之前提到過,這樣的處理通常并不足夠,但目前我們就按照這種方式來進行。
現在是自定義數據加載器:
class CustomImageDataLoader:
def __init__(self, tc: TrainCommands, cid: CustomImageDataset):
batch_size = tc.batch_size
train_data = cid(tc, type = "train")
test_data = cid(tc, type = "test")
self.train_dataloader = DataLoader(train_data, batch_size = tc.batch_size, shuffle = tc.shuffle)
self.test_dataloader = DataLoader(test_data, batch_size = batch_size, shuffle = tc.shuffle)
def iterate_training(self):
train_features, train_labels = next(iter(self.train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Ladabels batch shape: {train_labels.size()}")
return train_features, train_labels
如前所述,我們首先使用參數進行初始化,并在其中定義了數據集。接著,我們利用Pytorch的DataLoader函數來聲明一個數據加載器。在調用數據加載器時,我們通過batch size參數來指定每次迭代中要獲取的圖像數量。
在迭代過程中,我們會使用在初始化階段定義的自定義圖像數據集(Custom Image Dataset),該數據集為我們提供圖像及其對應的標簽。self.train_dataloader
和self.test_dataloader
分別代表訓練集和測試集的數據加載器。當我們從數據集中獲取一批圖像時,train_features
將表示這批圖像的張量,而train_labels
則代表這些圖像對應的標簽的張量。
現在我們已經擁有了一個自定義的圖像數據集以及一個用于從數據集中加載圖像的自定義數據加載器。接下來,我們將使用object來進行自動訓練。訓練類和模型的要求是:TrainCommands
# train.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from dataset import CustomImageDataLoader, CustomImageDataset
from commands import TrainCommands
我們還聲明我們要使用的 CNN 模型:
class CNN(nn.Module):
def __init__(self, tc: TrainCommands):
super().__init__()
n_labels = tc.n_labels
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.flatten = nn.Flatten(start_dim=1)
self.fc1 = nn.Linear(16*122*122, 120) # Manually calculated I will explain next week
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, n_labels) #unique label size
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = self.flatten(x)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
這里需要強調的一點是,在我們的案例中,輸出大小?n_labels
?被設定為 2,因為我們僅在兩個類別間進行分類。此外,還有一個計算步驟是基于我手動確定的圖像嵌入大小以及圖像的高度和寬度。總體而言,這是一個高度通用的圖像分類函數。在接下來的幾周里,我們將探討如何自動完成我原先手動進行的大小計算,并研究如何向該函數中添加更多層,以實現更進一步的自動化。
現在,我們來定義一個訓練函數?AppleOrangeTrainCommands
,該函數將使用自定義的數據集和數據加載器。
class Train:
def __init__(self, tc: TrainCommands, cnn: CNN, cidl: CustomImageDataLoader, cid: CustomImageDataset):
self.loader = cidl(tc, cid)
self.cnn = cnn(tc)
self.criterion = getattr(nn, tc.criterion)()
self.optimizer = getattr(torch.optim, tc.optimizer)(self.cnn.parameters(), lr=tc.lr, momentum=tc.momentum)
self.n_epoch = tc.n_epoch
self.model_name = tc.model_name
def train(self):
for epoch in range(self.n_epoch): # how many times it'll loop over
running_loss = 0.0
for i, data in enumerate(self.loader.train_dataloader):
inputs, labels = data
self.optimizer.zero_grad()
outputs = self.cnn(inputs)
loss = self.criterion(outputs, labels.squeeze())
loss.backward()
self.optimizer.step()
running_loss = running_loss + loss.item()
if i % 5 == 4:
print(
f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
running_loss = 0.0
torch.save(self.cnn.state_dict(), "models/{}.pt".format(self.model_name))
讓我們逐個部分分解它。以下行使用訓練命令使用自定義數據集初始化自定義訓練數據集。
self.loader = cidl(tc, cid)
下一行使用訓練命令聲明模型 (卷積神經網絡):
self.cnn = cnn(tc)
以下行負責創建標準:
self.criterion = getattr(nn, tc.criterion)()
在我們的例子中,使用torch.nn.CrossEntropyLoss()
是等價的。
下一行用于創建具有所需參數的優化器:
self.optimizer = getattr(torch.optim, tc.optimizer)(self.cnn.parameters(), lr=tc.lr, momentum=tc.momentum)
這相當于說,在未來的幾周內,我們將開發一種機制,允許為可選參數指定可選的名稱,從而能夠靈活地配置優化器和標準。但就目前而言,現有的方法已經足夠使用。例如,我們可以使用 torch.optim.SGD(CNN.parameters(), lr=0.001, momentum=0.9)
來初始化一個帶有特定學習率和動量的隨機梯度下降優化器。
最后,我們需要設定要運行的訓練周期(epoch)數,并確定保存模型的名稱:
self.n_epoch = tc.n_epoch
self.model_name = tc.model_name
在以下部分中,我們將迭代 epochs,聲明損失,并使用 function 從數據集中調用圖像和標簽:
def train(self):
for epoch in range(self.n_epoch): # loop over the dataset multiple times
running_loss = 0.0
for i, data in enumerate(self.loader.train_dataloader):
數據將以元組形式出現:
inputs, labels = data
然后我們在 optimizer 中將梯度歸零:
self.optimizer.zero_grad()
運行預測:
outputs = self.cnn(inputs)
然后,使用我們的標準將預測與實際標簽進行比較,以計算損失:
loss = self.criterion(outputs, labels.squeeze())
標簽在此處被擠壓以匹配要在 criterion function 中計算的輸入的形狀。
然后我們運行反向傳播以自動重新累積梯度:
loss.backward()
我們逐步執行優化器:
self.optimizer.step()
然后更新 :running_loss
running_loss = running_loss + loss.item()
以下行是每 5 個步驟的流程輸出:
if i % 5 == 4:
print(
f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
running_loss = 0.0
最后,我們將模型保存到所需的位置:
torch.save(self.cnn.state_dict(), "models/{}.pt".format(self.model_name))
現在我們已經萬事俱備,接下來將在main.py
中聲明一個端點,通過這個端點我們可以接收訓練命令。
# main.py
from fastapi import FastAPI
from add import Download, Query
from create import CSVCreator, ClassificationsArray
from dataset import CustomImageDataLoader, CustomImageDataset
from train import CNN, Train
from commands import TrainCommands
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.post("/add/")
def create_query(query: Query):
## Create unique links
serpapi = Download(query)
serpapi.download_all_images()
return {"status": "Complete"}
@app.post("/create/")
def create_csv(arr: ClassificationsArray):
csv = CSVCreator(arr)
csv.gather()
csv.create()
return {"status": "Complete"}
@app.post("/train/")
def train(tc: TrainCommands):
trainer = Train(tc, CNN, CustomImageDataLoader, CustomImageDataset)
trainer.train()
return {"status": "Success"}
/train/endpoint 將接收您的命令,并自動為您訓練一個模型:
現在,如果您訪問并使用以下路徑嘗試我們的服務:localhost:8000/docs/train/,并附帶相應的參數。
{
"model_name": "apples_and_oranges",
"criterion": "CrossEntropyLoss",
"annotations_file": "apple_orange",
"optimizer": "SGD",
"lr": 0.001,
"momentum": 0.9,
"batch_size": 4,
"n_epoch": 2,
"n_labels": 0,
"image_height": 500,
"image_width": 500,
"transform": true,
"target_transform": true,
"shuffle": true
}
您可以從終端觀察訓練過程,因為我們為 epochs 聲明了 print 函數:
訓練結束后,您將在文件夾中保存一個模型,其中包含您指定的所需名稱:
我衷心感謝SerpApi的杰出團隊,正是他們的努力使得這篇博文得以問世。同時,我也非常感謝讀者的關注與支持。在接下來的幾周里,我們將深入探討如何提升本文提及的某些部分的效率與可定制性。此外,我們還將更全面地討論FastAPI的異步處理機制,以及如何實現對SerpApi的Google Images Scraper API的異步調用。
原文鏈接:https://serpapi.com/blog/automatic-training-fastapi-pytorch-serpapi/