""" Author:陆绍超 Project name:swDLiner Created on 2024/05/07 下午1:20 """ import os import pandas as pd import numpy as np import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from TCN import TemporalConvNet import json import tornado.web from tornado.escape import json_decode from tornado.log import LogFormatter import logging # 添加了5:07 from datetime import datetime class NormalizedScaler: def __init__(self): self.min_value = 0. self.max_value = 1.0 self.target_column_indices = None def fit(self, data): self.min_value = data.min(0) self.max_value = data.max(0) # 计算最小值和最大值 self.maxmin_zeros = ((self.max_value - self.min_value) <= 1e-4) # print(self.maxmin_zeros) def transform(self, data): max_value = torch.from_numpy(self.max_value).type_as(data).to(data.device) if torch.is_tensor( data) else self.max_value min_value = torch.from_numpy(self.min_value).type_as(data).to(data.device) if torch.is_tensor( data) else self.min_value if any(self.maxmin_zeros): normalized_data = torch.zeros_like(data) if torch.is_tensor(data) else np.zeros_like(data) # 对每一列进行归一化,除非该列的最大值和最小值相等 for col in range(data.shape[1]): if not self.maxmin_zeros[col]: normalized_data[:, col] = (data[:, col] - min_value[col]) / (max_value[col] - min_value[col]) else: normalized_data = (data - min_value) / (max_value - min_value) return normalized_data def y_transform(self, data): max_value = torch.from_numpy(self.max_value[self.target_column_indices]).type_as(data).to( data.device) if torch.is_tensor( data) else self.max_value[self.target_column_indices] min_value = torch.from_numpy(self.min_value[self.target_column_indices]).type_as(data).to( data.device) if torch.is_tensor( data) else self.min_value[self.target_column_indices] maxmin_zeros = self.maxmin_zeros[self.target_column_indices] if any(self.maxmin_zeros): normalized_data = torch.zeros_like(data) if torch.is_tensor(data) else np.zeros_like(data) # 对每一列进行归一化,除非该列的最大值和最小值相等 for col in range(data.shape[1]): if not maxmin_zeros[col]: normalized_data[:, col] = (data[:, col] - min_value[col]) / (max_value[col] - min_value[col]) else: normalized_data = (data - min_value) / (max_value - min_value) return normalized_data def inverse_transform(self, data): max_value = torch.from_numpy(self.max_value[self.target_column_indices]).type_as(data).to( data.device) if torch.is_tensor( data) else self.max_value[self.target_column_indices] min_value = torch.from_numpy(self.min_value[self.target_column_indices]).type_as(data).to( data.device) if torch.is_tensor( data) else self.min_value[self.target_column_indices] return (data * (max_value - min_value)) + min_value class Dataset_GUISANLI_minute(Dataset): def __init__(self, size=None, target=None, column_order=None, scale=True): if target is None: self.target = ['Do', 'outCod', 'outNH3N', 'outPh', 'outTN', 'outTP'] else: self.target = target if column_order is None: # 列名列表,按照这个顺序排列 self.column_order = ['Do', 'Do1', 'Do2', 'inCod', 'inFlow', 'inNH3N', 'inPh', 'outCod', 'outFlow', 'outFlowNow', 'outNH3N', 'outPh', 'outTN', 'outTP', 'yw_bz', 'yw_mc1', 'yw_mc2', 'yw_tj2'] else: self.column_order = column_order if size is None: self.seq_len = 120 self.pred_len = 60 else: self.seq_len = size[0] self.pred_len = size[1] self.scale = scale self.scaler = NormalizedScaler() self.df_raw = None def read_data(self, df_raw): self.df_raw = df_raw ''' df_raw.columns: ['date', ...(other features), target feature] ''' if not all(column in df_raw.columns for column in self.column_order): print(f"DataFrame must contain columns: {self.column_order}") # 使用reindex方法按照列名列表对列进行排列 df_data = df_raw[self.column_order] self.data_x = df_data self.data_y = df_data if self.target: self.data_y = self.data_y[self.target] if self.scale: # 获取列名对应的列索引列表,给反标准化做准备 column_indices_1 = [self.data_x.columns.get_loc(col) for col in self.target] self.scaler.target_column_indices = column_indices_1 def __getitem__(self, index): s_begin = index s_end = s_begin + self.seq_len r_begin = s_end r_end = r_begin + self.pred_len seq_x = self.data_x[s_begin:s_end] seq_y = self.data_y[r_begin:r_end] if self.scale: self.scaler.fit(seq_x.values) x_data = self.scaler.transform(seq_x.values) y_data = self.scaler.y_transform(seq_y.values) return torch.from_numpy(x_data).to(torch.float32), torch.from_numpy(y_data).to(torch.float32) def __len__(self): return len(self.data_x) - self.seq_len - self.pred_len + 1 def inverse_transform(self, data): return self.scaler.inverse_transform(data) class Pred_GUISANLI_minute(): def __init__(self, size=None, target=None, column_order=None, scale=True, sn=None): if column_order is None: self.column_order = ['Do', 'Do1', 'Do2', 'inCod', 'inFlow', 'inNH3N', 'inPh', 'outCod', 'outFlow', 'outFlowNow', 'outNH3N', 'outPh', 'outTN', 'outTP', 'yw_bz', 'yw_mc1', 'yw_mc2', 'yw_tj2'] else: self.column_order = column_order if target is None: self.target = ['Do', 'outCod', 'outNH3N', 'outPh', 'outTN', 'outTP'] # 6 else: self.target = target if size is None: self.seq_len = 120 self.pred_len = 60 else: self.seq_len = size[0] self.pred_len = size[1] self.scale = scale self.scaler = NormalizedScaler() self.sn = sn self.df_raw = None def get_df_raw(self, df_raw): self.df_raw = df_raw ''' df_raw.columns: ['date', ...(other features), target feature] ''' # 列名列表,按照这个顺序排列 if not all(column in self.df_raw.columns for column in self.column_order): print(f"DataFrame must contain columns: {self.column_order}") def __getitem__(self, index): self.data_x = self.df_raw[self.column_order] # 预测数据 self.data_date = self.df_raw['date'] # 时间数据 if self.scale: # 获取列名对应的列索引列表,给反标准化做准备 column_indices_1 = [self.data_x.columns.get_loc(col) for col in self.target] self.scaler.target_column_indices = column_indices_1 # 使用reindex方法按照列名列表对列进行排列 s_begin = len(self.data_x) - self.seq_len - index s_end = s_begin + self.seq_len seq_date = self.data_date[s_begin:s_end] seq_x = self.data_x[s_begin:s_end] if self.scale: # 测试是否为数据的部分,已经为测试标签联合测试 # print('==start' * 20) # print(seq_x) # print('==end' * 20) self.scaler.fit(seq_x.values) x_data = self.scaler.transform(seq_x.values) return seq_date.values, torch.from_numpy(x_data).to(torch.float32) def __len__(self): if self.df_raw is None: return 0 elif (len(self.df_raw) - self.seq_len + 1) < 0: return 0 else: return len(self.df_raw) - self.seq_len + 1 def inverse_transform(self, data): return self.scaler.inverse_transform(data) def load_model(weights_path, num_inputs=32, num_outputs=6): predict_model = TemporalConvNet(seq_len=120, pred_len=60, num_inputs=num_inputs, num_channels=[64, 128, 256, 128, 64, 32, num_outputs]) # 加载模型 if os.path.exists(weights_path): model_weights = torch.load(weights_path) # 读取权重文件 predict_model.load_state_dict(model_weights) # 模型加载权重 else: print("模型权重不存在") return predict_model def config_init(): # 从文件中读取JSON并转换回字典 config_load = { '20210225GUISANLI': {'model': './Upload/GUIGWULI/TCN_weights_GUIGWULIm1.pth', 'data_loader': '20210225GUISANLI', 'SN': '20210225GUISANLI', 'target': ['Do', 'outCod', 'outNH3N', 'outPh', 'outTN', 'outTP'], 'columns': ['Do', 'Do1', 'Do2', 'inCod', 'inFlowNow', 'inNH3N', 'inPh', 'outCod', 'outFlowNow', 'outNH3N', 'outPh', 'outTN', 'outTP', 'yw_bz', 'yw_mc1', 'yw_mc2', 'yw_tj2'], }, '20210207GUIGWULI': {'model': './Upload/GUIGWULI/TCN_weights_GUIGWULIm1.pth', 'data_loader': '20210207GUIGWULI', 'SN': '20210207GUIGWULI', 'target': ['Do', 'outCod', 'outNH3N', 'outPh', 'outTN', 'outTP'], 'columns': ['Do', 'inCod', 'inFlowNow', 'inNH3N', 'inPh', 'outCod', 'outFlowNow', 'outNH3N', 'outPh', 'outTN', 'outTP', 'yw_bz', 'yw_mc1', 'yw_mc2', 'yw_tj1'], }, '20210309ZHANGMUZ': {'model': './Upload/GUIGWULI/TCN_weights_GUIGWULIm1.pth', 'data_loader': '20210309ZHANGMUZ', 'SN': '20210309ZHANGMUZ', 'target': ['outCOD', 'outNH3N', 'outPH', 'outTN', 'outTP'], 'columns': ['inCOD', 'inFlowNow', 'inNH3N', 'inPH', 'outCOD', 'outFlowNow', 'outNH3N', 'outPH', 'outTN', 'outTP', 'yw_bz', 'yw_mc1', 'yw_mc2', 'yw_mc3', 'yw_mc4', 'yw_tj1', 'yw_tj2', 'yw_tj3', 'yw_tj4'] }, } # with open('config', 'r', encoding='utf-8') as f: # config_load = json.load(f) # config_load = dict(config_load) configs = {} for key, val in config_load.items(): config_item = {} for k, v in val.items(): if k == 'model': config_item[k] = load_model(weights_path=v, num_inputs=len(val.get('columns', [])), num_outputs=len(val.get('target', []))) elif k == 'data_loader': config_item[k] = Pred_GUISANLI_minute(sn=v, target=val.get('target', None), column_order=val.get('columns', None)) elif k == 'SN': config_item[k] = v elif k == 'target': config_item[k] = v elif k == 'columns': config_item[k] = v else: raise ValueError("配置错误") configs[key] = config_item return configs configs = config_init() def pseudo_model_predict(model, pred_data_loader): # 尝试从pred_data_loader加载预测数据 if len(pred_data_loader) > 0: # 假设pred_data_loader是一个列表,并且至少有一个元素 date, predict_data = pred_data_loader[0] else: return {} try: # 将预测数据转换为一个批次,在PyTorch中,每个批次至少需要有一个样本 predict_data = torch.unsqueeze(predict_data, 0) # 第0维加入batch维度 # 确保模型处于评估模式 model.eval() # 使用模型进行推理 predict_result = model(predict_data) # 删除batch维度 predict_result = torch.squeeze(predict_result, 0) # 对预测结果进行后处理 predict_result = pred_data_loader.inverse_transform(predict_result) # 确保预测结果是一个numpy数组 predict_result = predict_result.detach().numpy() # 创建一个时间序列索引 start_time = pd.Timestamp(date[-1]) date = pd.date_range(start=start_time + pd.Timedelta(minutes=1), periods=len(predict_result), freq='T') # 创建一个DataFrame,将时间序列索引作为列 df = pd.DataFrame(date, columns=['date']) # 标题行列表 target_headers = ['outCod', 'outTN', 'outNH3N', 'outTP', 'outPh', 'Do'] # 将时间序列索引设置为DataFrame的索引 df[target_headers] = predict_result print(df) # 将DataFrame转换为JSON格式的字符串 json_str = df.to_json(orient='records') print(json_str) except Exception as e: # 使用异常捕获来处理可能出现的任何异常 # 记录错误信息 print(f"An error occurred: {e}") # 返回一个空的字典作为JSON字符串 json_str = {} return json_str # 模型预测请求 class PredictHandler(tornado.web.RequestHandler): def get(self, keyword): if keyword in configs.keys(): json_str = pseudo_model_predict(configs[keyword]['model'], configs[keyword]['data_loader']) # 构造响应数据 response = {"prediction": json_str} # 设置响应的Content-Type为application/json self.set_header("Content-Type", "application/json") # 将结果返回给客户端 self.write(json.dumps(response)) else: self.write("Unknown keyword.") # 模型上传请求 class UploadHandler(tornado.web.RequestHandler): def post(self): # 获取表单字段 group_name = self.get_body_argument('groupName') model_file = self.request.files['modelFile'][0] csv_file = self.request.files['csvTable'][0] # 创建组别目录 save_path = os.path.join('./Upload', group_name) if not os.path.exists(save_path): os.makedirs(save_path) # 保存模型文件 model_filename = model_file.filename model_path = os.path.join(save_path, model_filename) with open(model_path, 'wb') as f: f.write(model_file.body) # 保存CSV文件 csv_filename = csv_file.filename csv_path = os.path.join(save_path, csv_filename) with open(csv_path, 'wb') as f: f.write(csv_file.body) self.write(f'Files for group "{group_name}" have been uploaded and saved successfully.') async def train(data_set, predict_model, pth_save_name): print('模型训练开始') random_seed = 240510 # set a random seed for reproducibility np.random.seed(random_seed) torch.manual_seed(random_seed) # prep_dataloader 函数 将数据拆分成训练集与验证集。 并载入dataloader train_dataloader = DataLoader( data_set, batch_size=16, shuffle=True, num_workers=0, drop_last=False) loss_function = nn.MSELoss() # 采用MSE为回归的损失函数 optimizer = torch.optim.Adam(predict_model.parameters(), lr=0.0001) # 采用Adam优化器 epochs = 4 # 迭代epoch次数 train_epoch_loss = [] # 记录每个训练epoch的平均损失 for epoch in range(epochs): # train -------------------------------------------------------------------------------------------------- predict_model.train() train_step_loss = [] for step, data in enumerate(train_dataloader): sample, label = data optimizer.zero_grad() # 清空梯度,pytorch默认梯度会保留累加 pre = predict_model(sample) loss = loss_function(pre, label) loss.backward() optimizer.step() train_step_loss.append(loss.item()) train_average_loss = sum(train_step_loss) / len(train_step_loss) train_epoch_loss.append(train_average_loss) print(f"[在第{epoch + 1:}个epoch,训练的]: train_epoch_loss = {train_average_loss:.4f}") torch.save(predict_model.state_dict(), pth_save_name) print('模型训练完成') return predict_model # ========================================== # 定时获取数据 # ========================================== http_client = tornado.httpclient.AsyncHTTPClient() async def generate_data(): global http_client global configs try: for k1, v1 in configs.items(): SN = v1['SN'] # 请求头 headers = { 'Authority': 'iot.gxghzh.com:8888', 'Method': 'POST', 'Path': '/exeCmd', 'Scheme': 'https', 'Accept': 'application/json, text/plain, */*', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Cmd': 'GetAllConfigs', 'Content-Length': '2', 'Content-Type': 'application/json;charset=UTF-8', 'Origin': 'http://127.0.0.1:6810', 'Priority': 'u=1, i', 'Referer': 'http://127.0.0.1:6810/', 'Sec-Ch-Ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"', 'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"Windows"', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'cross-site', 'Sn': SN, 'Token': '45a73a59b3d23545', 'Uid': '0', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/124.0.0.0 ' 'Safari/537.36 Edg/124.0.0.0 ' } # 构造POST请求的URL和参数 response = await http_client.fetch("https://iot.gxghzh.com:8888/exeCmd", method="POST", headers=headers, body=json.dumps({})) # 检查响应状态码 if response.code == 200: # 解析响应数据(假设是JSON格式) response_data = response.body.decode('utf-8') # 将字符串解析为 JSON 对象 response_data = json.loads(response_data) SwitchTerminals = response_data['Result']['SwitchTerminals'] AnalogTerminals = response_data['Result']['AnalogTerminals'] # 获取当前时间并格式化为字符串 current_time = datetime.now() item_dict = {'date': current_time} # 为数据添加现在的时间 item_key_list_1 = [] item_key_list_2 = [] for child in AnalogTerminals: key = child['key'] value = child['value'] item_dict[key] = value item_key_list_1.append(key) for child in SwitchTerminals: key = child['key'] value = child['value'] item_dict[key] = value item_key_list_2.append(key) if v1['data_loader'].df_raw is None: # 第一次创建df # 获取当前日期 current_date = datetime.now().strftime("%Y%m%d") # 构建文件名模式 file_pattern = f"./{current_date}_{SN}.csv" # 判断当前文件夹是否存在该文件 file_exists = os.path.exists(file_pattern) if file_exists: v1['data_loader'].get_df_raw(pd.read_csv(file_pattern, parse_dates=True)) else: item_key_list_1 = sorted(item_key_list_1) item_key_list_2 = sorted(item_key_list_2) sort_list = ['date'] + item_key_list_1 + item_key_list_2 print(sort_list, len(sort_list)) v1['data_loader'].get_df_raw(pd.DataFrame(columns=sort_list)) # 使用concat方法添加新行 v1['data_loader'].df_raw = pd.concat([v1['data_loader'].df_raw, pd.DataFrame([item_dict])], ignore_index=True) # json_str_GUISANLI = pseudo_model_predict(v1['model'], v1['data_loader']) print(f'请求成功,状态码:{response.code}') else: print(f'请求失败,状态码:{response.code}') print("============ 每隔一分钟展示df ====================") print(v1['data_loader'].df_raw.tail()) # 每隔一分钟展示df print(f"shape:{v1['data_loader'].df_raw.shape}") print("===============================================") # 保存到文件以防止 if len(v1['data_loader'].df_raw) % 10 == 0: # 获取当前日期 current_date = datetime.now().strftime("%Y%m%d") # 构建文件名模式 file_pattern = f"./{current_date}_{SN}.csv" data_set = Dataset_GUISANLI_minute(target=v1.get('target', None), column_order=v1.get('columns', None)) data_set.read_data(v1['data_loader'].df_raw) predict_model = v1['model'] configs[k1]['model'] = await train(data_set=data_set, predict_model=predict_model, pth_save_name=f"./{current_date}_{SN}_TCN.pth") v1['data_loader'].df_raw.to_csv(file_pattern, index=False) # df_new = v1['data_loader'].df_raw.iloc[-120:, :].copy() # del v1['data_loader'].df_raw # v1['data_loader'].df_raw = df_new # v1['data_loader'].df_raw.reset_index(drop=True, inplace=True) except tornado.httpclient.HTTPError as e: print("HTTP Error:", e) except Exception as e: print("Exception:", e) finally: http_client.close() # 创建Tornado应用 app = tornado.web.Application([ (r"/predict/(\w+)", PredictHandler), (r"/upload", UploadHandler), ]) # 配置日志 # logger = logging.getLogger() # logger.setLevel(logging.INFO) # # formatter = LogFormatter( # fmt='%(color)s[%(asctime)s] %(levelname)s - %(message)s%(end_color)s', # datefmt='%Y-%m-%d %H:%M:%S' # ) # # # 设置日志文件 # file_handler = logging.FileHandler("tornado.log") # file_handler.setFormatter(formatter) # logger.addHandler(file_handler) if __name__ == "__main__": # 启动服务器 app.listen(8886) print("Server started on port 8886") # 每隔60秒调用一次generate_data函数 tornado.ioloop.PeriodicCallback(generate_data, 60000).start() tornado.ioloop.IOLoop.current().start()