import matplotlib.pyplot as plt
from meteostat import Stations, Daily
plt.rcParams['font.sans-serif'] = 'SimHei' # 設置中文顯示
plt.rcParams['axes.unicode_minus'] = False
chongqing = (29.56301, 106.55156)
# 查找重慶附近的氣象站
stations = Stations()
nearby_stations = stations.nearby(*chongqing).fetch(5)
# 獲取最近的氣象站的ID
station_id = nearby_stations.index[0]
# 設置時間范圍
start = datetime(2008, 1, 1)
end = datetime(2024, 5, 25)
# 獲取每日數據
data = Daily(station_id, start, end)
data = data.fetch()
data.head()
利用python meteostat庫對重慶進行氣象數據訪問,詳細的調取方法參考往期文章利用python meteostat庫對全球氣象數據訪問,獲取歷史氣象數據
import pandas as pd
df = pd.DataFrame()
df['tavg'] = data['tavg']
# 定義劃分比例
train_ratio = 0.7
val_ratio = 0.1
test_ratio = 0.2
# 計算劃分的索引
train_split = int(train_ratio * len(df))
val_split = int((train_ratio + val_ratio) * len(df))
# 劃分數據集
train_set = df.iloc[:train_split]
val_set = df.iloc[train_split:val_split]
test_set = df.iloc[val_split:]
plt.figure(figsize=(15, 10))
plt.subplot(3,1,1)
plt.plot(train_set, color='c', alpha=0.3)
plt.title('train時序圖')
plt.subplot(3,1,2)
plt.plot(val_set, color='b', alpha=0.3)
plt.title('val時序圖')
plt.subplot(3,1,3)
plt.plot(test_set, color='r', alpha=0.3)
plt.title('test時序圖')
plt.xticks(rotation=45)
plt.show()
在這里進行單序列的時序建模,對tavg列構建Attention+lstm模型
from sklearn.preprocessing import MinMaxScaler
def normalize_dataframe(train_set, val_set, test_set):
scaler = MinMaxScaler()
scaler.fit(train_set) # 在訓練集上擬合歸一化模型
train = pd.DataFrame(scaler.transform(train_set), columns=train_set.columns, index = train_set.index)
val = pd.DataFrame(scaler.transform(val_set), columns=val_set.columns, index = val_set.index)
test = pd.DataFrame(scaler.transform(test_set), columns=test_set.columns, index = test_set.index)
return train, val, test
train, val, test = normalize_dataframe(train_set, val_set, test_set)
對測試集、驗證集、測試集采取歸一化處理:
這里采用歸一化是因為原始數據本身不存在量綱問題,且為單序列不要考慮特征之間的尺度差異,但是通常來說標準化更適用于Attention,讓數據具有零均值和單位方差為什么標準化更適合注意力機制?
處理不同量綱的數據:注意力機制通常會處理不同特征的組合,標準化可以消除不同特征量綱的影響,使得特征在相同的尺度上
優化收斂:標準化的數據通常收斂更快,因為它們的數值范圍在訓練過程中更加穩定對梯度的影響:標準化后的數據通常會使梯度分布更加均勻,從而避免梯度消失或爆炸的問題
標準化公式:
這里公式參數的詳解請移步文章特征工程——數據轉換,當采用標準化時代碼如下
# 數據標準化
# 計算均值和標準差
mean = train_set.mean()
std = train_set.std()
# 對數據進行標準化
train = (train_set - mean) / std
val = (val_set - mean) / std
test = (test_set - mean) / std
對于這個數據作者也采用了標準化后進行后續的模型構建,在測試集上所得評價指標如下,方便與后續采用的歸一化后進行模型構建所得評價指標進行比較【實際上最后得出的結果是在該數據集上歸一化優于標準化】
import numpy as np
def prepare_data(data, win_size):
X = []
y = []
for i in range(len(data) - win_size):
temp_x = data[i:i + win_size]
temp_y = data[i + win_size]
X.append(temp_x)
y.append(temp_y)
X = np.asarray(X)
y = np.asarray(y)
X = np.expand_dims(X, axis=-1)
return X, y
win_size = 30
# 訓練集
X_train, y_train= prepare_data(train['tavg'].values, win_size)
# 驗證集
X_val, y_val= prepare_data(val['tavg'].values, win_size)
# 測試集
X_test, y_test = prepare_data(test['tavg'].values, win_size)
print("訓練集形狀:", X_train.shape, y_train.shape)
print("驗證集形狀:", X_val.shape, y_val.shape)
print("測試集形狀:", X_test.shape, y_test.shape)
不做過多贅述時間窗口劃分:時序預測模型的多種形式解析
數據集轉換為 PyTorch 張量
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, Subset
#device 表示了一個用于存儲和計算張量的設備。
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 檢查是否有可用的 GPU
# 將NumPy數組轉換為PyTorch張量
#將 numpy 數組 X_train_ts 轉換為 PyTorch 的張量,并指定數據類型為 torch.float32,將張亮放置在指定的設備上進行存儲和計算
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
X_validation_tensor=torch.tensor(X_val, dtype=torch.float32).to(device)
y_validation_tensor= torch.tensor(y_val,dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)
# 創建訓練集、驗證集和測試集數據集
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
validation_dataset = TensorDataset(X_validation_tensor, y_validation_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
# 定義批量大小
batch_size = 64 #批量大小,算力越強,可以設置越大,可自定義 ,常見的批量大小通常在32到256之間
# 創建數據加載器 shuffle=True 表示在每個 epoch 開始時將數據打亂
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 打印訓練數據形狀
dataiter = iter(train_loader)
sample_x, sample_y = next(dataiter) # 修改這里,使用next方法手動獲取一個批次的數據
print('Sample input shape: ', sample_x.shape)
print('Sample output shape: ', sample_y.shape)
數據集轉換為 PyTorch 張量,并創建了對應的數據加載器,方便后續模型的訓練、驗證和測試
# 定義模型參數字典
model_params = {
'lstm': {
'input_size': X_train.shape[2], # 輸入特征維度
'hidden_size': 256, # LSTM隱藏層維度
'num_layers': 1, # LSTM層數
'output_size': 1 # 輸出維度
},
'attention': {
'num_heads': 8 # 注意力頭數
}
}
# 定義多頭注意力層
class MultiHeadAttention(nn.Module):
def __init__(self, hidden_size, num_heads):
super(MultiHeadAttention, self).__init__()
# 定義多頭注意力層
self.attention = nn.MultiheadAttention(hidden_size, num_heads)
def forward(self, lstm_output):
# lstm_output 形狀: (batch_size, seq_length, hidden_size)
# MultiheadAttention 期望的輸入形狀: (seq_length, batch_size, hidden_size)
lstm_output = lstm_output.permute(1, 0, 2) # 轉置維度
attn_output, attn_weights = self.attention(lstm_output, lstm_output, lstm_output)
attn_output = attn_output.permute(1, 0, 2) # 轉置回原來的維度
return attn_output, attn_weights
# 定義 Attention_LSTM 模型
class Attention_LSTM(nn.Module):
def __init__(self, lstm_params, attention_params):
super(Attention_LSTM, self).__init__()
self.hidden_size = lstm_params['hidden_size']
self.num_layers = lstm_params['num_layers']
# 定義LSTM層
self.lstm = nn.LSTM(lstm_params['input_size'], lstm_params['hidden_size'], lstm_params['num_layers'], batch_first=True)
# 定義多頭注意力層
self.attention = MultiHeadAttention(lstm_params['hidden_size'], attention_params['num_heads'])
# 定義全連接層
self.fc1 = nn.Linear(lstm_params['hidden_size'], 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 32)
self.fc4 = nn.Linear(32, 16)
self.fc5 = nn.Linear(16, lstm_params['output_size'])
self.relu = nn.ReLU() # 激活函數ReLU
def forward(self, x):
# 初始化隱藏狀態和細胞狀態
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
# LSTM前向傳播
lstm_out, _ = self.lstm(x, (h0, c0))
# 應用多頭注意力層
attn_out, _ = self.attention(lstm_out)
# 取最后一個時間步的輸出
out = self.relu(attn_out[:, -1, :])
# 全連接層前向傳播
out = self.relu(self.fc1(out)) # 全連接層1
out = self.relu(self.fc2(out)) # 全連接層2
out = self.relu(self.fc3(out)) # 全連接層3
out = self.relu(self.fc4(out)) # 全連接層4
out = self.fc5(out) # 輸出層
return out
# 模型參數
lstm_params = model_params['lstm']
attention_params = model_params['attention']
# 實例化模型
model = Attention_LSTM(lstm_params, attention_params).to(device)
print(model)
模型是一個具有注意力機制的長短期記憶網絡(LSTM),下面是這個模型的各個組成部分的解釋:
在這里需要注意Attention的參數num_heads,使用注意力頭數時存在一些技巧:
有時為了避免使用Attention時產生過擬合往往會在其后面加上隨機失活層,當然也可以通過優化器添加 L2 正則化項來防止模型過擬合,如下代碼采用隨機失活層Dropout()
# 定義多頭注意力層
class MultiHeadAttention(nn.Module):
def __init__(self, hidden_size, num_heads):
super(MultiHeadAttention, self).__init__()
# 定義多頭注意力層
self.attention = nn.MultiheadAttention(hidden_size, num_heads)
self.dropout = nn.Dropout(p=0.1) # Dropout層,防止過擬合
def forward(self, lstm_output):
# lstm_output 形狀: (batch_size, seq_length, hidden_size)
# MultiheadAttention 期望的輸入形狀: (seq_length, batch_size, hidden_size)
lstm_output = lstm_output.permute(1, 0, 2) # 轉置維度
attn_output, attn_weights = self.attention(lstm_output, lstm_output, lstm_output)
attn_output = self.dropout(attn_output) # 應用Dropout
attn_output = attn_output.permute(1, 0, 2) # 轉置回原來的維度
return attn_output, attn_weights
模型訓練
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 訓練模型
num_epochs = 150
train_losses = []
val_losses = []
for epoch in range(num_epochs):
model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs.squeeze(), y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
train_losses.append(train_loss)
model.eval()
val_loss = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
outputs = model(X_batch)
loss = criterion(outputs.squeeze(), y_batch)
val_loss += loss.item()
val_loss /= len(val_loader)
val_losses.append(val_loss)
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.8f}, Val Loss: {val_loss:.8f}')
# 繪制損失曲線
plt.figure()
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.legend()
plt.show()
模型預測評價
# 保存模型
torch.save(model.state_dict(), 'Attention+lstm.pth')
# 調用模型
lstm_model = Attention_LSTM(lstm_params, attention_params).to(device)
lstm_model.load_state_dict(torch.load('Attention+lstm.pth'))
lstm_model.eval()
# 在測試集上進行預測
predictions = []
lstm_model.eval()
with torch.no_grad():
for inputs, _ in test_loader:
outputs = lstm_model(inputs)
predictions.extend(outputs.cpu().numpy())
predictions = np.array(predictions)
from sklearn import metrics
mse = metrics.mean_squared_error(y_test, np.array([i for arr in predictions for i in arr]))
rmse = np.sqrt(mse)
mae = metrics.mean_absolute_error(y_test, np.array([i for arr in predictions for i in arr]))
from sklearn.metrics import r2_score
r2 = r2_score(y_test, np.array([i for arr in predictions for i in arr]))
print("均方誤差 (MSE):", mse)
print("均方根誤差 (RMSE):", rmse)
print("平均絕對誤差 (MAE):", mae)
print("擬合優度:", r2)
可視化展示
df_max = np.max(train_set)
df_min = np.min(train_set)
plt.figure(figsize=(15,4), dpi =300)
plt.subplot(2,1,1)
plt.plot(train_set, color = 'c', label = '訓練集')
plt.plot(val_set, color = 'r', label = '驗證集')
plt.plot(test_set, color = 'b', label = '測試集')
plt.plot(pd.date_range(start='2021-03-15', end='2024-05-25', freq='D')
,predictions*(df_max-df_min)+df_min, color = 'y', label = '測試集預測')
plt.legend()
plt.subplot(2,1,2)
plt.plot(test_set, color = 'b', label = '測試集')
plt.plot(pd.date_range(start='2021-03-15', end='2024-05-25', freq='D')
,predictions*(df_max-df_min)+df_min, color = 'y', label = '測試集預測')
plt.legend()
plt.show()