问题来自欧洲航天局的竞赛卫星碰撞风险预测 当检测到物体接近卫星轨道并有可能与卫星碰撞时会测量该物体与卫星的多个物理量,包括时间点、与卫星的相对位置和速度、加速度等,每一个时间点的这些数据被称为一个CDM,而一次潜在的碰撞事件(event)包含多个时间点的CDM. 对一个event,物体与卫星最接近的时间点称为 tca ( time of closest approach), 而时间点由距离tca的远近来划分(time_to_tca,单位是天)。同时对于每一个CDM,可以根据数据评估出碰撞风险(risk),该风险为取以10为底的对数后的值。 赛题提供了162634行、103列的训练数据,每一行数据为一个CDM,可以参考下图。进入数据下载页面下载train_data.csv即可,test数据可以不用。 这个问题属于时间序列预测问题,如表格中所示,每个event都有独立的id, 而一个event才是一个完整的样本,一个样本会包含多个CDM,这些CDM包含随时间(time_to_tca变化的量,所以并不互相独立。在训练数据中,一个event可能包含1行至23行数据。 而在test_data.csv中,只包含了time_to_tca>2的数据,赛题要求根据每个event的time_tca>2部分的数据预测出time_to_tca=0时的risk,因为物体与卫星最接近时的risk最具有参考性,赛题其实要求我们提前两天预测出最终的risk。但最终risk赛题中并未给出,由官方保留,所以test_data.csv并不可用。作为替代,从train_data中划分训练集、验证集和测试集,并以最后一个CDM的risk作为最终risk(训练数据中也没有time_to_tca=0时的数据).
由于是时间序列预测问题,采用常用的LSTM网络。由于在test_data也包含了risk,所以在训练数据中也保留risk,event_id是样本的标号,要去掉,于是一行数据有102列。输入数据时,将每个样本的数据通过填充全0行,同一扩充成25行。最终输入数据size应该是(batch_size, 25, 102). 我们的LSTM代码。
from tensorflow import keras from tensorflow.keras import layers class MyRNN(keras.Model): def __init__(self, units): super(MyRNN, self).__init__() self.fc1 = layers.Dense(64) # 第一层全连接层,用于数据降维 self.rnn = keras.Sequential([ layers.BatchNormalization(), # 防止梯度弥散 layers.LSTM(units, dropout=0.2, return_sequences=True, unroll=True), # 网络主体,三层LSTM层,加入dropout防止过拟合 layers.LSTM(units, dropout=0.2, return_sequences=True, unroll=True), layers.LSTM(units, dropout=0.2, unroll=True), layers.BatchNormalization(), ]) self.fc2 = layers.Dense(64, activation="sigmoid") self.outlayer = layers.Dense(1) # 输出层 def call(self, inputs, training=True): x = inputs x = self.fc1(x) x = self.rnn(x, training=training) x = self.fc2(x) x = self.outlayer(x) return x观察训练数据,很多event的最后一个CDM的time_to_tca依然较大,而我们需要预测的是time_to_tca减至0时的风险,这些数据参考性较低,应从训练数据中剔除,我们只选择了包含time_to_tca<1数据的event,并将每个event的数据加工成(25, 102)的size. 我们的训练集、验证集和测试集全部从train_data.csv中选取,取3:1:1的比例。 以下代码用于数据的预处理,并返回(*,25,102)size的array.
import tensorflow as tf import numpy as np import pandas as pd from sklearn import preprocessing class DataReader(): df = None train_stats = None data_array = None fin_risk = None # 数据读取与初步处理 def pre_data(self): df = pd.read_csv("train_data.csv") # [162634, 103] encoder = preprocessing.LabelEncoder() df['c_object_type'] = encoder.fit_transform(df['c_object_type']) # 对非数字参量进行编码 df.fillna(value=0, inplace=True) self.df = df return df # 统计每个event的final risk和最小time to tca def get_risk(self): df = self.pre_data() fin_risk = np.zeros(13154) # 统计每个event的final risk用于训练和测试 fin_timetotca = np.zeros(13154) # 统计每个event的最小time to tca,为后续筛选数据做准备 event_id = 0 for index, row in df.iterrows(): # for循环逐行读取数据 if row['event_id'] == event_id: # 每个event独自保存 fin_risk[event_id] = row['risk'] fin_timetotca[event_id] = row['time_to_tca'] else: event_id = event_id + 1 fin_risk[event_id] = row['risk'] fin_timetotca[event_id] = row['time_to_tca'] self.fin_risk = fin_risk self.fin_timetotca = fin_timetotca return fin_risk, fin_timetotca # 得到每个参量的平均值和方差,为归一化做准备 def get_stats(self): df = self.pre_data() event = df.pop('event_id') risk = df.pop('risk') train_stats = df.describe() self.train_stats = train_stats.transpose() df['event_id'] = event df['risk'] = risk return self.train_stats # 归一化 def norm(self, x): train_stats = self.get_stats() return (x - train_stats['mean']) / train_stats['std'] def get_norm_data(self): self.get_stats() df = self.df event = df.pop('event_id') risk = df.pop('risk') df = self.norm(df) df['risk'] = risk df['event_id'] = event return df # 筛选数据,并划分为训练集+验证集和测试集 def get_data_array(self): df = self.get_norm_data() fin_risk, final_timetotca = self.get_risk() final_risk = np.zeros(7293) # 经过运行下面代码,打印data_array.shape得到的训练集+验证集包含event数量为7293 test_risk = np.zeros(1902) # 测试集取time to tca>2时,测试集包含event数量为1902(取time to tca>1时数量为2047) data_array = [] test_array = [] temp_vector = [] event_id = 0 n = 0 k = 0 k1 = 0 for index, row in df.iterrows(): if row['event_id'] < 3000: # 取测试集,通过改变该条件可以得到不同的测试集 if row['event_id'] == event_id: if final_timetotca[event_id] < 1: # 筛选最小time to tca<1的数据,这样对final risk的估计才更准确 if row['time_to_tca'] > -0.671: # 2经过归一化后对应-0.671,1对应-1.169 temp_vector.append(row.values[:-1]) n = n + 1 else: if n > 0: # 若该event有数据被保存 if n < 25: for i in range(25 - n): # 数据维度统一化,填充全0行到25行 temp_vector.append(np.zeros(102, )) test_array.append(np.array(temp_vector)) test_risk[k1] = fin_risk[event_id] # 保存final risk k1 = k1+1 temp_vector = [] event_id = event_id + 1 n = 0 if final_timetotca[event_id] < 1: # 下一个event的第一行 if row['time_to_tca'] > -0.671: temp_vector.append(row.values[:-1]) n = n + 1 else: if row['event_id'] == event_id: # 训练集+验证集,数据筛选方式与测试集同理 if final_timetotca[event_id] < 1: temp_vector.append(row.values[:-1]) n = n + 1 else: if n > 0: if n < 25: for i in range(25 - n): temp_vector.append(np.zeros(102, )) data_array.append(np.array(temp_vector)) final_risk[k] = fin_risk[event_id] k = k+1 temp_vector = [] event_id = event_id + 1 n = 0 if final_timetotca[event_id] < 1: temp_vector.append(row.values[:-1]) n = n + 1 test_array = np.array(test_array) data_array = np.array(data_array) print(data_array.shape, test_array.shape) return data_array, final_risk, test_array, test_risk # 打乱训练集和验证集的分布 def get_shuffle_data(self): data_array, fin_risk, test_array, test_risk = self.get_data_array() index = np.array(range(7293)) np.random.shuffle(index) data_array = data_array[index] fin_risk = fin_risk[index] return data_array, fin_risk完整代码 写代码时比赛已经结束,只当是一个练习,代码和结果只具有一定参考性。