import matplotlib.pyplot as plt
import pandas as pd
plt.rcParams['font.sans-serif'] = 'SimHei' # 設置中文顯示
plt.rcParams['axes.unicode_minus'] = False
df = pd.read_excel('df.xlsx',index_col=0, parse_dates=['數據時間'])
# 定義劃分比例
train_ratio = 0.7
val_ratio = 0.1
test_ratio = 0.2
# 計算劃分的索引
train_split = int(train_ratio * len(df))
val_split = int((train_ratio + val_ratio) * len(df))
# 劃分數據集
train_set = df.iloc[:train_split]
val_set = df.iloc[train_split:val_split]
test_set = df.iloc[val_split:]
plt.figure(figsize=(15, 10))
plt.subplot(3,1,1)
plt.plot(train_set, color='c', alpha=0.3)
plt.title('train時序圖')
plt.subplot(3,1,2)
plt.plot(val_set, color='b', alpha=0.3)
plt.title('val時序圖')
plt.subplot(3,1,3)
plt.plot(test_set, color='r', alpha=0.3)
plt.title('test時序圖')
plt.xticks(rotation=45)
plt.show()
讀取Excel文件中的時間序列數據,然后將數據集分為訓練集、驗證集和測試集,最后繪制出這三部分數據的時間序列圖
from sklearn.preprocessing import MinMaxScaler
def normalize_dataframe(train_set, val_set, test_set):
scaler = MinMaxScaler()
scaler.fit(train_set) # 在訓練集上擬合歸一化模型
train = pd.DataFrame(scaler.transform(train_set), columns=train_set.columns, index = train_set.index)
val = pd.DataFrame(scaler.transform(val_set), columns=val_set.columns, index = val_set.index)
test = pd.DataFrame(scaler.transform(test_set), columns=test_set.columns, index = test_set.index)
return train, val, test
train, val, test = normalize_dataframe(train_set, val_set, test_set)
def prepare_data(data, win_size):
X = []
y = []
for i in range(len(data) - win_size):
temp_x = data[i:i + win_size]
temp_y = data[i + win_size]
X.append(temp_x)
y.append(temp_y)
X = np.asarray(X)
y = np.asarray(y)
X = np.expand_dims(X, axis=-1)
return X, y
win_size = 30
# 訓練集
X_train, y_train= prepare_data(train['data'].values, win_size)
# 驗證集
X_val, y_val= prepare_data(val['data'].values, win_size)
# 測試集
X_test, y_test = prepare_data(test['data'].values, win_size)
print("訓練集形狀:", X_train.shape, y_train.shape)
print("驗證集形狀:", X_val.shape, y_val.shape)
print("測試集形狀:", X_test.shape, y_test.shape)
定義了函數將時間序列數據轉換為用于訓練LSTM模型的輸入和標簽,并分別處理訓練集、驗證集和測試集的數據,接下來對訓練集數據形狀進行解讀:
意味著將時間序列數據劃分為包含 30 個時間步的窗口,共生成了 907 個這樣的窗口,每個窗口對應一個下一個時間步的數據作為預測目標,其它數據集類似,到這里其實和TensorFlow框架數據預處理基本都一致,接下來就開始有一定變化了
import torch
import torch.nn as nn # 構建神經網絡的模塊和層
import torch.optim as optim # 包含優化算法
from torch.utils.data import TensorDataset, DataLoader, Subset
#device 表示了一個用于存儲和計算張量的設備,檢查是否有可用的GPU,如果有則使用GPU,否則使用CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 檢查是否有可用的 GPU
# 將NumPy數組轉換為PyTorch張量
#將 numpy 數組 X_train_ts 轉換為 PyTorch 的張量,并指定數據類型為 torch.float32,將張量放置在指定的設備上進行存儲和計算
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
X_validation_tensor=torch.tensor(X_val, dtype=torch.float32).to(device)
y_validation_tensor= torch.tensor(y_val,dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)
# 創建訓練集、驗證集和測試集數據集
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
validation_dataset = TensorDataset(X_validation_tensor, y_validation_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
# 定義批量大小
batch_size = 32 #批量大小,算力越強,可以設置越大,可自定義 ,常見的批量大小通常在32到256之間
# 創建數據加載器 shuffle=True 表示在每個 epoch 開始時將數據打亂,這里不進行打亂處理
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 打印訓練數據形狀
dataiter = iter(train_loader)
sample_x, sample_y = next(dataiter) # 修改這里,使用next方法手動獲取一個批次的數據
print('Sample input shape: ', sample_x.shape)
print('Sample output shape: ', sample_y.shape)
將原始數據轉換為PyTorch張量,并創建了用于訓練和驗證模型的數據加載器。通過定義批量大小和數據加載器,能夠高效地批量處理數據,有助于加快模型訓練過程,torch.Size([32, 30, 1])表示一個批次包含32個樣本,每個樣本是一個長度為30的單變量時間序列,torch.Size([32])表示一個批次的輸出包含32個樣本的標簽,每個標簽對應輸入數據窗口之后的一個值
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, Subset
# 定義模型參數字典
model_params = {
'lstm': {
'input_size': X_train.shape[2], # 輸入特征維度
'hidden_size': 256, # LSTM隱藏層維度
'num_layers': 1, # LSTM層數
'output_size': 1 # 輸出維度
}
}
# 定義 LSTM 模型
class LSTMModel(nn.Module):
def __init__(self, params):
super(LSTMModel, self).__init__()
self.hidden_size = params['hidden_size']
self.num_layers = params['num_layers']
# 定義LSTM層
self.lstm = nn.LSTM(params['input_size'], params['hidden_size'], params['num_layers'], batch_first=True)
# 定義全連接層
self.fc1 = nn.Linear(params['hidden_size'], 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 32)
self.fc4 = nn.Linear(32, 16)
self.fc5 = nn.Linear(16, params['output_size'])
self.relu = nn.ReLU() # 激活函數ReLU
def forward(self, x):
# 初始化隱藏狀態和細胞狀態
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
# LSTM前向傳播
out, _ = self.lstm(x, (h0, c0))
out = self.relu(out[:, -1, :]) # 取最后一個時間步的輸出,并應用ReLU激活函數
out = self.relu(self.fc1(out)) # 全連接層1
out = self.relu(self.fc2(out)) # 全連接層2
out = self.relu(self.fc3(out)) # 全連接層3
out = self.relu(self.fc4(out)) # 全連接層4
out = self.fc5(out) # 輸出層
return out
# 初始化模型
lstm_model = LSTMModel(model_params['lstm']).to(device)
# 打印模型架構
print(lstm_model)
定義并初始化一個包含LSTM層和多層全連接層的神經網絡模型,用于處理時間序列數據
criterion = nn.MSELoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)
# 訓練模型
num_epochs = 150
train_losses = []
val_losses = []
for epoch in range(num_epochs):
lstm_model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
outputs = lstm_model(X_batch)
loss = criterion(outputs.squeeze(), y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
train_losses.append(train_loss)
lstm_model.eval()
val_loss = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
outputs = lstm_model(X_batch)
loss = criterion(outputs.squeeze(), y_batch)
val_loss += loss.item()
val_loss /= len(val_loader)
val_losses.append(val_loss)
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.8f}, Val Loss: {val_loss:.8f}')
# 繪制損失曲線
plt.figure()
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.legend()
plt.show()
使用PyTorch訓練LSTM模型,通過均方誤差損失函數和Adam優化器在150個epoch內迭代優化參數,繪制訓練和驗證損失曲線,在模型早停部分存在對該部分代碼的一定注釋
# 保存模型
torch.save(lstm_model.state_dict(), 'lstm_model.pth')
# 調用模型
lstm_model = LSTMModel(model_params['lstm']).to(device)
lstm_model.load_state_dict(torch.load('lstm_model.pth'))
lstm_model.eval()
# 在測試集上進行預測
predictions = []
lstm_model.eval()
with torch.no_grad():
for inputs, _ in test_loader:
outputs = lstm_model(inputs)
predictions.extend(outputs.cpu().numpy())
# 將預測結果轉換為 NumPy 數組
predictions = np.array(predictions)
模型評價
from sklearn import metrics
mse = metrics.mean_squared_error(y_test, np.array([i for arr in predictions for i in arr]))
rmse = np.sqrt(mse)
mae = metrics.mean_absolute_error(y_test, np.array([i for arr in predictions for i in arr]))
from sklearn.metrics import r2_score
r2 = r2_score(y_test, np.array([i for arr in predictions for i in arr]))
print("均方誤差 (MSE):", mse)
print("均方根誤差 (RMSE):", rmse)
print("平均絕對誤差 (MAE):", mae)
print("擬合優度:", r2)
預測可視化
# 反歸一化
df_max = np.max(train_set)
df_min = np.min(train_set)
plt.figure(figsize=(15,4), dpi =300)
plt.subplot(2,1,1)
plt.plot(train_set, color = 'c', label = '訓練集')
plt.plot(val_set, color = 'r', label = '驗證集')
plt.plot(test_set, color = 'b', label = '測試集')
plt.plot(pd.date_range(start='2021-01-06', end='2021-08-31', freq='D')
,predictions*(df_max-df_min)+df_min, color = 'y', label = '測試集預測')
plt.legend()
plt.subplot(2,1,2)
plt.plot(test_set, color = 'b', label = '測試集')
plt.plot(pd.date_range(start='2021-01-06', end='2021-08-31', freq='D')
,predictions*(df_max-df_min)+df_min, color = 'y', label = '測試集預測')
plt.legend()
plt.show()
模型早停
# 定義損失函數和優化器
criterion = nn.MSELoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)
# 訓練參數
num_epochs = 150 # 總的訓練輪數
patience = 10 # 早停的容忍次數
min_delta = 1e-4 # 最小損失變化
save_path = 'best_lstm_model.pth' # 最佳模型保存路徑
train_losses = [] # 存儲每個 epoch 的訓練損失
val_losses = [] # 存儲每個 epoch 的驗證損失
# 早停參數初始化
best_loss = float('inf') # 最佳驗證損失初始值為無窮大
current_patience = 0 # 當前容忍次數
for epoch in range(num_epochs):
lstm_model.train() # 設置模型為訓練模式
train_loss = 0 # 初始化訓練損失
for X_batch, y_batch in train_loader:
optimizer.zero_grad() # 梯度清零
outputs = lstm_model(X_batch) # 前向傳播
loss = criterion(outputs.squeeze(), y_batch) # 計算損失
loss.backward() # 反向傳播
optimizer.step() # 更新參數
train_loss += loss.item() # 累加訓練損失
train_loss /= len(train_loader) # 計算平均訓練損失
train_losses.append(train_loss) # 記錄訓練損失
lstm_model.eval() # 設置模型為評估模式
val_loss = 0 # 初始化驗證損失
with torch.no_grad(): # 禁用梯度計算
for X_batch, y_batch in val_loader:
outputs = lstm_model(X_batch) # 前向傳播
loss = criterion(outputs.squeeze(), y_batch) # 計算損失
val_loss += loss.item() # 累加驗證損失
val_loss /= len(val_loader) # 計算平均驗證損失
val_losses.append(val_loss) # 記錄驗證損失
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.8f}, Val Loss: {val_loss:.8f}') # 打印當前 epoch 的訓練和驗證損失
# 早停邏輯判斷
if val_loss < best_loss - min_delta:
best_loss = val_loss # 更新最佳驗證損失
current_patience = 0 # 重置容忍次數
torch.save(lstm_model.state_dict(), save_path) # 保存當前最佳模型
else:
current_patience += 1 # 增加容忍次數
if current_patience >= patience: # 如果超過容忍次數,停止訓練
print(f"Early stopping at epoch {epoch+1}.")
break
# 繪制訓練和驗證損失曲線
plt.figure()
plt.plot(train_losses, label='Train Loss') # 繪制訓練損失曲線
plt.plot(val_losses, label='Validation Loss') # 繪制驗證損失曲線
plt.legend()
plt.show()
在模型訓練時可添加模型早停,其意義在于防止模型過擬合并節省訓練時間,早停是一種正則化技術,當驗證損失不再改善時提前停止訓練,以避免模型在訓練集上過度擬合,從而在測試集上表現不佳,早停還可以減少不必要的計算,提高訓練效率
best_loss
) 初始化為無窮大val_loss
)min_delta
),則更新最佳驗證損失為當前驗證損失,并重置容忍計數器 (current_patience
)state_dict
),作為目前的最佳模型patience
),則停止訓練,防止過擬合通過這種方式,模型可以在驗證損失不再顯著改善時自動停止訓練,從而避免過擬合,并且只需要保存最優模型,節省了存儲空間和訓練時間