You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

568 lines
24 KiB

"""
Author:陆绍超
Project name:swDLiner
Created on 2024/05/07 下午1:20
"""
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from TCN import TemporalConvNet
import json
import tornado.web
from tornado.escape import json_decode
from tornado.log import LogFormatter
import logging
from datetime import datetime
class NormalizedScaler:
def __init__(self):
self.min_value = 0.
self.max_value = 1.0
self.target_column_indices = None
def fit(self, data):
self.min_value = data.min(0)
self.max_value = data.max(0)
# 计算最小值和最大值
self.maxmin_zeros = ((self.max_value - self.min_value) <= 1e-2)
# print(self.maxmin_zeros)
def transform(self, data):
max_value = torch.from_numpy(self.max_value).type_as(data).to(data.device) if torch.is_tensor(
data) else self.max_value
min_value = torch.from_numpy(self.min_value).type_as(data).to(data.device) if torch.is_tensor(
data) else self.min_value
if any(self.maxmin_zeros):
normalized_data = torch.zeros_like(data) if torch.is_tensor(data) else np.zeros_like(data)
# 对每一列进行归一化,除非该列的最大值和最小值相等
for col in range(data.shape[1]):
if not self.maxmin_zeros[col]:
normalized_data[:, col] = (data[:, col] - min_value[col]) / (max_value[col] - min_value[col])
else:
normalized_data = (data - min_value) / (max_value - min_value)
return normalized_data
def y_transform(self, data):
max_value = torch.from_numpy(self.max_value[self.target_column_indices]).type_as(data).to(
data.device) if torch.is_tensor(
data) else self.max_value[self.target_column_indices]
min_value = torch.from_numpy(self.min_value[self.target_column_indices]).type_as(data).to(
data.device) if torch.is_tensor(
data) else self.min_value[self.target_column_indices]
maxmin_zeros = self.maxmin_zeros[self.target_column_indices]
if any(self.maxmin_zeros):
normalized_data = torch.zeros_like(data) if torch.is_tensor(data) else np.zeros_like(data)
# 对每一列进行归一化,除非该列的最大值和最小值相等
for col in range(data.shape[1]):
if not maxmin_zeros[col]:
normalized_data[:, col] = (data[:, col] - min_value[col]) / (max_value[col] - min_value[col])
else:
normalized_data = (data - min_value) / (max_value - min_value)
return normalized_data
def inverse_transform(self, data):
max_value = torch.from_numpy(self.max_value[self.target_column_indices]).type_as(data).to(
data.device) if torch.is_tensor(
data) else self.max_value[self.target_column_indices]
min_value = torch.from_numpy(self.min_value[self.target_column_indices]).type_as(data).to(
data.device) if torch.is_tensor(
data) else self.min_value[self.target_column_indices]
return (data * (max_value - min_value)) + min_value
class Dataset_GUISANLI_minute(Dataset):
def __init__(self, size=None, target=None, column_order=None, scale=True):
if target is None:
self.target = ['Do', 'outCod', 'outNH3N', 'outPh', 'outTN', 'outTP']
else:
self.target = target
if column_order is None:
# 列名列表,按照这个顺序排列
self.column_order = ['Do', 'Do1', 'Do2', 'inCod', 'inFlow', 'inNH3N', 'inPh',
'outCod', 'outFlow', 'outFlowNow', 'outNH3N', 'outPh',
'outTN', 'outTP', 'yw_bz', 'yw_mc1', 'yw_mc2', 'yw_tj2']
else:
self.column_order = column_order
if size is None:
self.seq_len = 120
self.pred_len = 60
else:
self.seq_len = size[0]
self.pred_len = size[1]
self.scale = scale
self.scaler = NormalizedScaler()
self.df_raw = None
def read_data(self, df_raw):
self.df_raw = df_raw
'''
df_raw.columns: ['date', ...(other features), target feature]
'''
if not all(column in df_raw.columns for column in self.column_order):
print(f"DataFrame must contain columns: {self.column_order}")
# 使用reindex方法按照列名列表对列进行排列
df_data = df_raw[self.column_order]
self.data_x = df_data
self.data_y = df_data
if self.target:
self.data_y = self.data_y[self.target]
if self.scale:
# 获取列名对应的列索引列表,给反标准化做准备
column_indices_1 = [self.data_x.columns.get_loc(col) for col in self.target]
self.scaler.target_column_indices = column_indices_1
def __getitem__(self, index):
s_begin = index
s_end = s_begin + self.seq_len
r_begin = s_end
r_end = r_begin + self.pred_len
seq_x = self.data_x[s_begin:s_end]
seq_y = self.data_y[r_begin:r_end]
if self.scale:
self.scaler.fit(seq_x.values)
x_data = self.scaler.transform(seq_x.values)
y_data = self.scaler.y_transform(seq_y.values)
return torch.from_numpy(x_data).to(torch.float32), torch.from_numpy(y_data).to(torch.float32)
def __len__(self):
return len(self.data_x) - self.seq_len - self.pred_len + 1
def inverse_transform(self, data):
return self.scaler.inverse_transform(data)
class Pred_GUISANLI_minute():
def __init__(self, size=None, target=None, column_order=None, scale=True, sn=None):
if column_order is None:
self.column_order = ['Do', 'Do1', 'Do2', 'inCod', 'inFlow', 'inNH3N', 'inPh',
'outCod', 'outFlow', 'outFlowNow', 'outNH3N', 'outPh',
'outTN', 'outTP', 'yw_bz', 'yw_mc1', 'yw_mc2', 'yw_tj2']
else:
self.column_order = column_order
if target is None:
self.target = ['Do', 'outCod', 'outNH3N', 'outPh', 'outTN', 'outTP'] # 6
else:
self.target = target
if size is None:
self.seq_len = 120
self.pred_len = 60
else:
self.seq_len = size[0]
self.pred_len = size[1]
self.scale = scale
self.scaler = NormalizedScaler()
self.sn = sn
self.df_raw = None
def get_df_raw(self, df_raw):
self.df_raw = df_raw
'''
df_raw.columns: ['date', ...(other features), target feature]
'''
# 列名列表,按照这个顺序排列
if not all(column in self.df_raw.columns for column in self.column_order):
print(f"DataFrame must contain columns: {self.column_order}")
def __getitem__(self, index):
self.data_x = self.df_raw[self.column_order] # 预测数据
self.data_date = self.df_raw['date'] # 时间数据
if self.scale:
# 获取列名对应的列索引列表,给反标准化做准备
column_indices_1 = [self.data_x.columns.get_loc(col) for col in self.target]
self.scaler.target_column_indices = column_indices_1
# 使用reindex方法按照列名列表对列进行排列
s_begin = len(self.data_x) - self.seq_len - index
s_end = s_begin + self.seq_len
seq_date = self.data_date[s_begin:s_end]
seq_x = self.data_x[s_begin:s_end]
if self.scale:
# 测试是否为数据的部分,已经为测试标签联合测试
# print('==start' * 20)
# print(seq_x)
# print('==end' * 20)
self.scaler.fit(seq_x.values)
x_data = self.scaler.transform(seq_x.values)
return seq_date.values, torch.from_numpy(x_data).to(torch.float32)
def __len__(self):
if self.df_raw is None:
return 0
elif (len(self.df_raw) - self.seq_len + 1) < 0:
return 0
else:
return len(self.df_raw) - self.seq_len + 1
def inverse_transform(self, data):
return self.scaler.inverse_transform(data)
def load_model(weights_path, num_inputs=32, num_outputs=6):
predict_model = TemporalConvNet(seq_len=120,
pred_len=60,
num_inputs=num_inputs,
num_channels=[64, 128, 256, 128, 64, 32, num_outputs]) # 加载模型
if os.path.exists(weights_path):
model_weights = torch.load(weights_path) # 读取权重文件
predict_model.load_state_dict(model_weights) # 模型加载权重
else:
print("模型权重不存在")
return predict_model
def config_init():
# 从文件中读取JSON并转换回字典
config_load = {
'20210225GUISANLI': {'model': './Upload/GUIGWULI/TCN_weights_GUIGWULIm1.pth',
'data_loader': '20210225GUISANLI',
'SN': '20210225GUISANLI',
'target': ['Do', 'outCod', 'outNH3N', 'outPh', 'outTN', 'outTP'],
'columns': ['Do', 'Do1', 'Do2', 'inCod', 'inFlowNow', 'inNH3N', 'inPh', 'outCod',
'outFlowNow', 'outNH3N', 'outPh', 'outTN', 'outTP', 'yw_bz', 'yw_mc1',
'yw_mc2', 'yw_tj2'],
},
'20210207GUIGWULI': {'model': './Upload/GUIGWULI/TCN_weights_GUIGWULIm1.pth',
'data_loader': '20210207GUIGWULI',
'SN': '20210207GUIGWULI',
'target': ['Do', 'outCod', 'outNH3N', 'outPh', 'outTN', 'outTP'],
'columns': ['Do', 'inCod', 'inFlowNow', 'inNH3N', 'inPh', 'outCod', 'outFlowNow',
'outNH3N', 'outPh', 'outTN', 'outTP', 'yw_bz', 'yw_mc1', 'yw_mc2', 'yw_tj1'],
},
'20210309ZHANGMUZ': {'model': './Upload/GUIGWULI/TCN_weights_GUIGWULIm1.pth',
'data_loader': '20210309ZHANGMUZ',
'SN': '20210309ZHANGMUZ',
'target': ['outCOD', 'outNH3N', 'outPH', 'outTN', 'outTP'],
'columns': ['inCOD', 'inFlowNow', 'inNH3N', 'inPH', 'outCOD', 'outFlowNow', 'outNH3N',
'outPH', 'outTN', 'outTP', 'yw_bz', 'yw_mc1', 'yw_mc2', 'yw_mc3', 'yw_mc4',
'yw_tj1', 'yw_tj2', 'yw_tj3', 'yw_tj4']
},
}
# with open('config', 'r', encoding='utf-8') as f:
# config_load = json.load(f)
# config_load = dict(config_load)
configs = {}
for key, val in config_load.items():
config_item = {}
for k, v in val.items():
if k == 'model':
config_item[k] = load_model(weights_path=v,
num_inputs=len(val.get('columns', [])),
num_outputs=len(val.get('target', [])))
elif k == 'data_loader':
config_item[k] = Pred_GUISANLI_minute(sn=v,
target=val.get('target', None),
column_order=val.get('columns', None))
elif k == 'SN':
config_item[k] = v
elif k == 'target':
config_item[k] = v
elif k == 'columns':
config_item[k] = v
else:
raise ValueError("配置错误")
configs[key] = config_item
return configs
configs = config_init()
def pseudo_model_predict(model, pred_data_loader):
# 尝试从pred_data_loader加载预测数据
if len(pred_data_loader) > 0:
# 假设pred_data_loader是一个列表,并且至少有一个元素
date, predict_data = pred_data_loader[0]
else:
return {}
try:
# 将预测数据转换为一个批次,在PyTorch中,每个批次至少需要有一个样本
predict_data = torch.unsqueeze(predict_data, 0) # 第0维加入batch维度
# 确保模型处于评估模式
model.eval()
# 使用模型进行推理
predict_result = model(predict_data)
# 删除batch维度
predict_result = torch.squeeze(predict_result, 0)
# 对预测结果进行后处理
predict_result = pred_data_loader.inverse_transform(predict_result)
# 确保预测结果是一个numpy数组
predict_result = predict_result.detach().numpy()
# 创建一个时间序列索引
start_time = pd.Timestamp(date[-1])
date = pd.date_range(start=start_time + pd.Timedelta(minutes=1), periods=len(predict_result), freq='T')
# 创建一个DataFrame,将时间序列索引作为列
df = pd.DataFrame(date, columns=['date'])
# 标题行列表
target_headers = ['outCod', 'outTN', 'outNH3N', 'outTP', 'outPh', 'Do']
# 将时间序列索引设置为DataFrame的索引
df[target_headers] = predict_result
print(df)
# 将DataFrame转换为JSON格式的字符串
json_str = df.to_json(orient='records')
print(json_str)
except Exception as e: # 使用异常捕获来处理可能出现的任何异常
# 记录错误信息
print(f"An error occurred: {e}")
# 返回一个空的字典作为JSON字符串
json_str = {}
return json_str
# 模型预测请求
class PredictHandler(tornado.web.RequestHandler):
def get(self, keyword):
if keyword in configs.keys():
json_str = pseudo_model_predict(configs[keyword]['model'], configs[keyword]['data_loader'])
# 构造响应数据
response = {"prediction": json_str}
# 设置响应的Content-Type为application/json
self.set_header("Content-Type", "application/json")
# 将结果返回给客户端
self.write(json.dumps(response))
else:
self.write("Unknown keyword.")
# 模型上传请求
class UploadHandler(tornado.web.RequestHandler):
def post(self):
# 获取表单字段
group_name = self.get_body_argument('groupName')
model_file = self.request.files['modelFile'][0]
csv_file = self.request.files['csvTable'][0]
# 创建组别目录
save_path = os.path.join('./Upload', group_name)
if not os.path.exists(save_path):
os.makedirs(save_path)
# 保存模型文件
model_filename = model_file.filename
model_path = os.path.join(save_path, model_filename)
with open(model_path, 'wb') as f:
f.write(model_file.body)
# 保存CSV文件
csv_filename = csv_file.filename
csv_path = os.path.join(save_path, csv_filename)
with open(csv_path, 'wb') as f:
f.write(csv_file.body)
self.write(f'Files for group "{group_name}" have been uploaded and saved successfully.')
async def train(data_set, predict_model, pth_save_name):
print('模型训练开始')
random_seed = 240510 # set a random seed for reproducibility
np.random.seed(random_seed)
torch.manual_seed(random_seed)
# prep_dataloader 函数 将数据拆分成训练集与验证集。 并载入dataloader
train_dataloader = DataLoader(
data_set,
batch_size=16,
shuffle=True,
num_workers=0,
drop_last=False)
loss_function = nn.MSELoss() # 采用MSE为回归的损失函数
optimizer = torch.optim.Adam(predict_model.parameters(), lr=0.0001) # 采用Adam优化器
epochs = 4 # 迭代epoch次数
train_epoch_loss = [] # 记录每个训练epoch的平均损失
for epoch in range(epochs):
# train --------------------------------------------------------------------------------------------------
predict_model.train()
train_step_loss = []
for step, data in enumerate(train_dataloader):
sample, label = data
optimizer.zero_grad() # 清空梯度,pytorch默认梯度会保留累加
pre = predict_model(sample)
loss = loss_function(pre, label)
loss.backward()
optimizer.step()
train_step_loss.append(loss.item())
train_average_loss = sum(train_step_loss) / len(train_step_loss)
train_epoch_loss.append(train_average_loss)
print(f"[在第{epoch + 1:}个epoch,训练的]: train_epoch_loss = {train_average_loss:.4f}")
torch.save(predict_model.state_dict(), pth_save_name)
print('模型训练完成')
return predict_model
# ==========================================
# 定时获取数据
# ==========================================
http_client = tornado.httpclient.AsyncHTTPClient()
async def generate_data():
global http_client
global configs
try:
for k1, v1 in configs.items():
SN = v1['SN']
# 请求头
headers = {
'Authority': 'iot.gxghzh.com:8888',
'Method': 'POST',
'Path': '/exeCmd',
'Scheme': 'https',
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cmd': 'GetAllConfigs',
'Content-Length': '2',
'Content-Type': 'application/json;charset=UTF-8',
'Origin': 'http://127.0.0.1:6810',
'Priority': 'u=1, i',
'Referer': 'http://127.0.0.1:6810/',
'Sec-Ch-Ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'Sn': SN,
'Token': '45a73a59b3d23545',
'Uid': '0',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/124.0.0.0 '
'Safari/537.36 Edg/124.0.0.0 '
}
# 构造POST请求的URL和参数
response = await http_client.fetch("https://iot.gxghzh.com:8888/exeCmd", method="POST", headers=headers,
body=json.dumps({}))
# 检查响应状态码
if response.code == 200:
# 解析响应数据(假设是JSON格式)
response_data = response.body.decode('utf-8')
# 将字符串解析为 JSON 对象
response_data = json.loads(response_data)
SwitchTerminals = response_data['Result']['SwitchTerminals']
AnalogTerminals = response_data['Result']['AnalogTerminals']
# 获取当前时间并格式化为字符串
current_time = datetime.now()
item_dict = {'date': current_time} # 为数据添加现在的时间
item_key_list_1 = []
item_key_list_2 = []
for child in AnalogTerminals:
key = child['key']
value = child['value']
item_dict[key] = value
item_key_list_1.append(key)
for child in SwitchTerminals:
key = child['key']
value = child['value']
item_dict[key] = value
item_key_list_2.append(key)
if v1['data_loader'].df_raw is None:
# 第一次创建df
# 获取当前日期
current_date = datetime.now().strftime("%Y%m%d")
# 构建文件名模式
file_pattern = f"./{current_date}_{SN}.csv"
# 判断当前文件夹是否存在该文件
file_exists = os.path.exists(file_pattern)
if file_exists:
v1['data_loader'].get_df_raw(pd.read_csv(file_pattern, parse_dates=True))
else:
item_key_list_1 = sorted(item_key_list_1)
item_key_list_2 = sorted(item_key_list_2)
sort_list = ['date'] + item_key_list_1 + item_key_list_2
print(sort_list, len(sort_list))
v1['data_loader'].get_df_raw(pd.DataFrame(columns=sort_list))
# 使用concat方法添加新行
v1['data_loader'].df_raw = pd.concat([v1['data_loader'].df_raw, pd.DataFrame([item_dict])],
ignore_index=True)
# json_str_GUISANLI = pseudo_model_predict(v1['model'], v1['data_loader'])
print(f'请求成功,状态码:{response.code}')
else:
print(f'请求失败,状态码:{response.code}')
print("============ 每隔一分钟展示df ====================")
print(v1['data_loader'].df_raw.tail()) # 每隔一分钟展示df
print(f"shape:{v1['data_loader'].df_raw.shape}")
print("===============================================")
# 保存到文件以防止
if len(v1['data_loader'].df_raw) % 10 == 0:
# 获取当前日期
current_date = datetime.now().strftime("%Y%m%d")
# 构建文件名模式
file_pattern = f"./{current_date}_{SN}.csv"
data_set = Dataset_GUISANLI_minute(target=v1.get('target', None), column_order=v1.get('columns', None))
data_set.read_data(v1['data_loader'].df_raw)
predict_model = v1['model']
configs[k1]['model'] = await train(data_set=data_set,
predict_model=predict_model,
pth_save_name=f"./{current_date}_{SN}_TCN.pth")
v1['data_loader'].df_raw.to_csv(file_pattern, index=False)
# df_new = v1['data_loader'].df_raw.iloc[-120:, :].copy()
# del v1['data_loader'].df_raw
# v1['data_loader'].df_raw = df_new
# v1['data_loader'].df_raw.reset_index(drop=True, inplace=True)
except tornado.httpclient.HTTPError as e:
print("HTTP Error:", e)
except Exception as e:
print("Exception:", e)
finally:
http_client.close()
# 创建Tornado应用
app = tornado.web.Application([
(r"/predict/(\w+)", PredictHandler),
(r"/upload", UploadHandler),
])
# 配置日志
# logger = logging.getLogger()
# logger.setLevel(logging.INFO)
#
# formatter = LogFormatter(
# fmt='%(color)s[%(asctime)s] %(levelname)s - %(message)s%(end_color)s',
# datefmt='%Y-%m-%d %H:%M:%S'
# )
#
# # 设置日志文件
# file_handler = logging.FileHandler("tornado.log")
# file_handler.setFormatter(formatter)
# logger.addHandler(file_handler)
if __name__ == "__main__":
# 启动服务器
app.listen(8886)
print("Server started on port 8886")
# 每隔60秒调用一次generate_data函数
tornado.ioloop.PeriodicCallback(generate_data, 60000).start()
tornado.ioloop.IOLoop.current().start()