这是我将机器学习与深度学习各类算法进行汇总,对京东评论数据集进行文本分类。包含各种机器学习算法(sklearn实现)与常用文本分类模型(keras、keras-bert、fastnlp)实现。这算是我自己能力的一个总结。以后做中文文本分类就靠这个大杀器了。
项目源码地址在这里:
github的:
https://github.com/yingdajun/TextClassifierMLAndDLByBertOrFastnlpDemo
码云的:
https://gitee.com/www.ydj.com/TextClassifierMLAndDLByBertOrFastnlpDemo
使用工具:
anconda 中jupyter notebook ,jieba 0.39,sklearn 0.22,keras 2.3.1,torch 1.01,fastnlp 0.5.5
# 机器学习部分
1.数据预处理
读取数据并且将当前数据进行清理的函数,这个是针对于机器学习的预处理
import numpy as np import re import pandas as pd # clean useless characters ''' html_clean = ['& ldquo ;', '& hellip ;', '& rdquo ;', '& yen ;'] punctuation_replace = '[,。!?]+' strange_num = ['①','②','③','④'] ''' punctuation_remove = '[:;……()『』《》【】~!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~]+' def clean(sent): sent = re.sub(r'ldquo', "", sent) sent = re.sub(r'hellip', "", sent) sent = re.sub(r'rdquo', "", sent) sent = re.sub(r'yen', "", sent) sent = re.sub(r'⑦', "7", sent) sent = re.sub(r'(, ){2,}', "", sent) sent = re.sub(r'(! ){2,}', "", sent) # delete too many!,?,。等 sent = re.sub(r'(? ){2,}', "", sent) sent = re.sub(r'(。 ){2,}', "", sent) sent = re.sub(punctuation_remove, "", sent) #delete punctuations s = ' '.join(sent.split()) #delete additional space return s def sent_filter(l): l_new = [] for s,k in enumerate(l): if len(k) > 2: l_new.append(k) return l_new # 这里是深度学习模式下的读取数据集 def dl_load_data_and_labels(good_data_file, bad_data_file, mid_data_file): #load reviews and save them in the list good_examples = list(open(good_data_file, "r", encoding='utf-8').readlines()) good_examples = [s.strip() for s in good_examples] bad_examples = list(open(bad_data_file, "r", encoding='utf-8').readlines()) bad_examples = [s.strip() for s in bad_examples] mid_examples = list(open(mid_data_file, "r", encoding='utf-8').readlines()) mid_examples = [s.strip() for s in mid_examples] #Call the clean () and sent_filter () functions to process the comments, save them in the x_text list good_examples = [clean(sent) for sent in good_examples] bad_examples = [clean(sent) for sent in bad_examples] mid_examples = [clean(sent) for sent in mid_examples] good_examples = [i.strip() for i in good_examples] bad_examples = [i.strip() for i in bad_examples] mid_examples = [i.strip() for i in mid_examples] good_examples = sent_filter(good_examples) bad_examples = sent_filter(bad_examples) mid_examples = sent_filter(mid_examples) x_text = good_examples + bad_examples + mid_examples #Add a label for each comment and save it in y good_labels = [[1, 0, 0] for _ in good_examples] bad_labels = [[0, 1, 0] for _ in bad_examples] mid_labels = [[0, 0, 1] for _ in mid_examples] y = np.concatenate([good_labels, bad_labels, mid_labels], 0) return [x_text, y] # 机器学习模式下的读取到的数据集 def ml_load_data_and_labels(good_data_file, bad_data_file, mid_data_file): #load reviews and save them in the list good_examples = list(open(good_data_file, "r", encoding='utf-8').readlines()) good_examples = [s.strip() for s in good_examples] bad_examples = list(open(bad_data_file, "r", encoding='utf-8').readlines()) bad_examples = [s.strip() for s in bad_examples] mid_examples = list(open(mid_data_file, "r", encoding='utf-8').readlines()) mid_examples = [s.strip() for s in mid_examples] #Call the clean () and sent_filter () functions to process the comments, save them in the x_text list good_examples = [clean(sent) for sent in good_examples] bad_examples = [clean(sent) for sent in bad_examples] mid_examples = [clean(sent) for sent in mid_examples] good_examples = [i.strip() for i in good_examples] bad_examples = [i.strip() for i in bad_examples] mid_examples = [i.strip() for i in mid_examples] good_examples = sent_filter(good_examples) bad_examples = sent_filter(bad_examples) mid_examples = sent_filter(mid_examples) x_text = good_examples + bad_examples + mid_examples #Add a label for each comment and save it in y good_labels = [0 for _ in good_examples] bad_labels = [1 for _ in bad_examples] mid_labels = [2 for _ in mid_examples] y = np.concatenate([good_labels, bad_labels, mid_labels], 0) return [x_text, y] # when you use tensorflow, you need to generate batches yourself, this function may helpe you def batch_iter(data, batch_size, num_epochs, shuffle=True): """ Generates a batch iterator for a dataset. """ data = np.array(data) data_size = len(data) num_batches_per_epoch = int((len(data)-1)/batch_size) + 1 for epoch in range(num_epochs): # Shuffle the data at each epoch if shuffle: shuffle_indices = np.random.permutation(np.arange(data_size)) shuffled_data = data[shuffle_indices] else: shuffled_data = data for batch_num in range(num_batches_per_epoch): start_index = batch_num * batch_size end_index = min((batch_num + 1) * batch_size, data_size) yield shuffled_data[start_index:end_index]2 数据读取
good_data_file = "./data/good_cut_jieba.txt" bad_data_file = "./data/bad_cut_jieba.txt" mid_data_file = "./data/mid_cut_jieba.txt" x_text, y = ml_load_data_and_labels(good_data_file, bad_data_file, mid_data_file) print(y)3.导入机器学习包与导入停用词
import sklearn #机器学习算法模型 from sklearn.linear_model import LogisticRegression from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier,BaggingClassifier,AdaBoostClassifier from sklearn.svm import SVC,LinearSVC from sklearn.naive_bayes import BernoulliNB,MultinomialNB,GaussianNB from sklearn.neighbors import KNeighborsClassifier # 特征提取 from sklearn.feature_extraction.text import CountVectorizer,TfidfVectorizer from sklearn.model_selection import train_test_split #Pipeline 使用一系列 (key, value) 键值对来构建,其中 key 是你给这个步骤起的名字, value 是一个评估器对象: from sklearn.pipeline import Pipeline #准确率,精确率,召回率,f1 from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,classification_report import xgboost as xgb import joblib # 读取停用词 stwlist=[line.strip() for line in open('stopword.txt','r',encoding='utf-8').readlines()]4.文本向量化工具
# 创建各类cv=CountVectorizer()和tf_idf工具 cv=CountVectorizer(min_df=3, max_df=0.5, ngram_range=(1,2), stop_words = stwlist) tdf=TfidfVectorizer()5.各种管道模型
注意高斯模型并不能接受太多大维度的矩阵,因此需要将向量化的矩阵加上todesnse,但是速度很慢
#%% # Count vectoriser --> LogisticRegression() # 分类模型 #1.逻辑回归 lr=LogisticRegression() # 贝叶斯 #2.多项式贝叶斯 mb=MultinomialNB() gb=GaussianNB() #3.伯努利贝叶斯 bb=BernoulliNB() # 支持向量机 #4.支持向量机 svc=SVC(kernel='rbf') svc1=SVC(kernel='linear') svc2=SVC(kernel='poly') svc3=SVC(kernel='sigmoid') #5. linearsvc=LinearSVC() #6.决策树 dtc=DecisionTreeClassifier(random_state=22) #7.随机森林 rfc=RandomForestClassifier(random_state=22) #9.KNN分类器 knn=KNeighborsClassifier() modelList=[lr,mb,bb,svc,svc1,svc2,svc3,linearsvc,dtc,rfc,knn] #11个模型 m_len=len(modelList) # # 形成 9个模型 2个提取特征 5个指标 # # 提取特征分类器 textVectoriser=[cv,tdf] textv_len=len(textVectoriser) new_ticks = [] name=[] # modelNamelist=['逻辑回归','多项式贝叶斯','伯努利贝叶斯','高斯贝叶斯','RBF核SVM' # ,'线性核SVM','多项式核SVM','sigmoid核SVM' # ,'线性分类SVM','决策树','随机森林','KNN'] # modelNamelist2=['lr','mb','gb','bb','svc','svc1','svc2','svc3','l'+'\n'+'svc','dtc','rfc','knn'] modelNamelist=['逻辑回归','多项式贝叶斯','伯努利贝叶斯','RBF核SVM' ,'线性核SVM','多项式核SVM','sigmoid核SVM' ,'线性分类SVM','决策树','随机森林','KNN'] modelNamelist2=['lr','mb','bb','svc','svc1','svc2','svc3','l'+'\n'+'svc','dtc','rfc','knn'] # textVectorNamelist = ['词袋','TDF'] for i in range(m_len): new_ticks.append([modelNamelist2[i]]) name.append(modelNamelist[i]) name_dict={"name":modelNamelist,"model":modelList} label_dict={"name":modelNamelist2,"model":modelList} accuracy_score_list=[] # 莫名其妙搞不出来 for i in range(m_len): for j in range(textv_len): # pipeline =make_pipeline(textVectoriser[j], modelList[i]) train_vec=textVectoriser[j].fit_transform(x_train) test_vec=textVectoriser[j].transform(x_test) # train_vec_dense=train_vec.todense() # test_vec_dense=test_vec.todense() modelList[i].fit(train_vec,y_train) # print(pipeline) # pred=pipeline.predict(x_test) pred=modelList[i].predict(test_vec) # modelList[i].fit(x_train,y_train) # pred=modelList[i].predict(x_test) # pred=pipeline.predict(np.array(X_test).reshape(-1,1)) print('='*150) if(j==0): print('当前模型是:',modelNamelist[i],'当前文本向量化是','词袋',"当前准确率是:",round(accuracy_score(y_test,pred),5)) if(j==1): print('当前模型是:',modelNamelist[i],'当前文本向量化是','TF-IDF',"当前准确率是:",round(accuracy_score(y_test,pred),5)) # fpr[i], tpr[i], _ = roc_curve(y_test, y_pred) # roc_auc[i] = auc(fpr[i], tpr[i]) accuracy_score_list.append(round(accuracy_score(y_test,pred),5)) #准确率这些是不支持二分类以上的分类的 # precision_score_list.append(precision_score(y_test,pred)) # f1_score_list.append(f1_score(y_test,pred)) # recall_score_list.append(f1_score(y_test,pred)) # #获取标签与最后结果 # fpr[i], tpr[i], _ = roc_curve(y_test, pred) # roc_auc[i] = auc(fpr[i], tpr[i]) # fpr, tpr, thresholds = roc_curve(y_test, pred, pos_label=2) # # fpr,tpr,thresholds=roc_curve(y_test,y_0) #计算fpr,tpr,thresholds # # auc=roc_auc_score(y_test,y_0) #计算auc # #画曲线图 # plt.figure() # plt.plot(fpr,tpr) # plt.title('$ROC curve$') # plt.show() # classification_report_list.append(classification_report(y_test,pred)) # modelClass.append(pipeline)xgboost也可以
import xgboost as xgb # 算法参数 # 应该是用于分类 params = { 'booster': 'gbtree', 'objective': 'multi:softmax', 'num_class': 3, 'gamma': 0.1, 'max_depth': 6, 'lambda': 2, 'subsample': 0.7, 'colsample_bytree': 0.75, 'min_child_weight': 3, 'silent': 0, 'eta': 0.1, 'seed': 1, 'nthread': 4, } for i in range(m_len): for j in range(textv_len): # pipeline =make_pipeline(textVectoriser[j], modelList[i]) train_vec=textVectoriser[j].fit_transform(x_train) test_vec=textVectoriser[j].transform(x_test) plst = params.items() dtrain = xgb.DMatrix(train_vec, y_train) # 生成数据集格式 num_rounds = 500 model = xgb.train(plst, dtrain, num_rounds) # xgboost模型训练 # 对测试集进行预测 dtest = xgb.DMatrix(test_vec) pred = model.predict(dtest) # 计算准确率 accuracy = accuracy_score(y_test,y_pred) # print('当前是xgboost') print('='*150) if(j==0): print('当前模型是:xgboost','当前文本向量化是','词袋',"当前准确率是:",round(accuracy_score(y_test,pred),5)) if(j==1): print('当前模型是:xgboost','当前文本向量化是','TF-IDF',"当前准确率是:",round(accuracy_score(y_test,pred),5)) # fpr[i], tpr[i], _ = roc_curve(y_test, y_pred) # roc_auc[i] = auc(fpr[i], tpr[i]) modelNamelist.append('xgboost') accuracy_score_list.append(round(accuracy,5)) print("accuarcy: %.2f%%" % (accuracy*100.0))绘制图像
import matplotlib.pyplot as plt from pylab import * mpl.rcParams['font.sans-serif'] = ['SimHei'] #plot根据列表绘制出有意义的图形,linewidth是图形线宽,可省略 # plt.plot(input_values,squares,linewidth=5) plt.figure(figsize=(12,5),dpi=80) plt.bar(range(len(accuracy_score_list)),accuracy_score_list,linewidth=5) #设置图标标题 plt.title("不同管道模型准确率",fontsize = 24) #设置坐标轴标签 plt.xlabel("模型类型",fontsize = 0.2) plt.ylabel("准确率",fontsize = 0.5) #设置刻度标记的大小 plt.tick_params(axis='both',labelsize = 14) #打开matplotlib查看器,并显示绘制图形 #这是一半 plt.xticks(range(new_ticks),new_ticks) plt.show()#深度学习模型
数据处理
# 保证映射后结构一样 from keras.preprocessing.sequence import pad_sequences # 文本预处理 from keras.preprocessing.text import Tokenizer # 将类别映射成需要的格式 from keras.utils.np_utils import to_categorical # 这个是连接层 from keras.layers.merge import concatenate # 搭建模型 from keras.models import Sequential, Model # 这个是层的搭建 from keras.layers import Dense, Embedding, Activation, Input from keras.layers import Convolution1D, Flatten, Dropout, MaxPool1D from keras.layers import BatchNormalization from keras.layers import Conv1D,MaxPooling1D # 导入使用到的库 from keras.preprocessing.sequence import pad_sequences from keras.preprocessing.text import Tokenizer from keras.layers.merge import concatenate from keras.models import Sequential, Model from keras.layers import Dense, Embedding, Activation, merge, Input, Lambda, Reshape from keras.layers import Convolution1D, Flatten, Dropout, MaxPool1D, GlobalAveragePooling1D from keras.layers import LSTM, GRU, TimeDistributed, Bidirectional from keras.utils.np_utils import to_categorical from keras import initializers from keras import backend as K from keras.engine.topology import Layer # from sklearn.naive_bayes import MultinomialNB # from sklearn.linear_model import SGDClassifier # from sklearn.feature_extraction.text import TfidfVectorizer # import pandas as pd # import numpy as np # 数据处理 # from data_helper_ml import load_data_and_labels # 数据可视化 import matplotlib.pyplot as plt # 文本标签分类数量 NUM_CLASS=3 # 输入维度 INPUT_SIZE=64 # # 序列对齐文本数据 # LENTH=100 # Tokenizer是一个用于向量化文本,或将文本转换为序列 tokenizer = Tokenizer(filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n',lower=True,split=" ") tokenizer.fit_on_texts(x_text) vocab = tokenizer.word_index #映射成数字 x_train_word_ids = tokenizer.texts_to_sequences(x_train) x_test_word_ids = tokenizer.texts_to_sequences(x_test) #让他共同化 x_train_padded_seqs = pad_sequences(x_train_word_ids, maxlen=INPUT_SIZE) x_test_padded_seqs = pad_sequences(x_test_word_ids, maxlen=INPUT_SIZE)CNN
def cnn(): model = Sequential() model.add(Embedding(len(vocab) + 1, 300, input_length=INPUT_SIZE)) #使用Embeeding层将每个词编码转换为词向量 model.add(Conv1D(256, 5, padding='same')) model.add(MaxPooling1D(3, 3, padding='same')) model.add(Conv1D(128, 5, padding='same')) model.add(MaxPooling1D(3, 3, padding='same')) model.add(Conv1D(64, 3, padding='same')) model.add(Flatten()) model.add(Dropout(0.1)) model.add(BatchNormalization()) # (批)规范化层 model.add(Dense(256, activation='relu')) model.add(Dropout(0.1)) model.add(Dense(NUM_CLASS, activation='softmax')) model.summary()textCNN
def textCNN(): main_input = Input(shape=(64,), dtype='float64') # 词嵌入(使用预训练的词向量) embedder = Embedding(len(vocab) + 1, 300, input_length=INPUT_SIZE, trainable=False) embed = embedder(main_input) # 词窗大小分别为3,4,5 cnn1 = Conv1D(256, 3, padding='same', strides=1, activation='relu')(embed) cnn1 = MaxPooling1D(pool_size=48)(cnn1) cnn2 = Conv1D(256, 4, padding='same', strides=1, activation='relu')(embed) cnn2 = MaxPooling1D(pool_size=47)(cnn2) cnn3 = Conv1D(256, 5, padding='same', strides=1, activation='relu')(embed) cnn3 = MaxPooling1D(pool_size=46)(cnn3) # 合并三个模型的输出向量 cnn = concatenate([cnn1, cnn2, cnn3], axis=-1) flat = Flatten()(cnn) drop = Dropout(0.2)(flat) main_output = Dense(NUM_CLASS, activation='softmax')(drop) model = Model(inputs=main_input, outputs=main_output) model.summary() # 使用Word2Vec词向量的TextCNN # w2v_model=Word2Vec.load('sentiment_analysis/w2v_model.pkl') # # 预训练的词向量中没有出现的词用0向量表示 # embedding_matrix = np.zeros((len(vocab) + 1, 300)) # for word, i in vocab.items(): # try: # embedding_vector = w2v_model[str(word)] # embedding_matrix[i] = embedding_vector # except KeyError: # continue # #构建TextCNN模型 # def TextCNN_model_2(): # # 模型结构:词嵌入-卷积池化*3-拼接-全连接-dropout-全连接 # main_input = Input(shape=(INPUT_SIZE,), dtype='float64') # # 词嵌入(使用预训练的词向量) # embedder = Embedding(len(vocab) + 1, 300, input_length=INPUT_SIZE, weights=[embedding_matrix], trainable=False) # #embedder = Embedding(len(vocab) + 1, 300, input_length=50, trainable=False) # embed = embedder(main_input) # # 词窗大小分别为3,4,5 # cnn1 = Conv1D(256, 3, padding='same', strides=1, activation='relu')(embed) # cnn1 = MaxPooling1D(pool_size=38)(cnn1) # cnn2 = Conv1D(256, 4, padding='same', strides=1, activation='relu')(embed) # cnn2 = MaxPooling1D(pool_size=37)(cnn2) # cnn3 = Conv1D(256, 5, padding='same', strides=1, activation='relu')(embed) # cnn3 = MaxPooling1D(pool_size=36)(cnn3) # # 合并三个模型的输出向量 # cnn = concatenate([cnn1, cnn2, cnn3], axis=-1) # flat = Flatten()(cnn) # drop = Dropout(0.2)(flat) # main_output = Dense(3, activation='softmax')(drop) # model = Model(inputs=main_input, outputs=main_output) # model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) # one_hot_labels = keras.utils.to_categorical(y_train, num_classes=NUM_CLASS) # 将标签转换为one-hot编码 # # model.fit(x_train_padded_seqs, one_hot_labels, batch_size=800, epochs=20) # # #y_test_onehot = keras.utils.to_categorical(y_test, num_classes=3) # 将标签转换为one-hot编码 # # result = model.predict(x_test_padded_seqs) # 预测样本属于每个类别的概率 # # result_labels = np.argmax(result, axis=1) # 获得最大概率对应的标签 # # y_predict = list(map(str, result_labels)) # # print('准确率', metrics.accuracy_score(y_test, y_predict)) # # print('平均f1-score:', metrics.f1_score(y_test, y_predict, average='weighted'))RNN
def rnn(): # 模型结构:词嵌入-LSTM-全连接 model = Sequential() model.add(Embedding(len(vocab)+1, 300, input_length=INPUT_SIZE)) model.add(LSTM(256, dropout=0.2, recurrent_dropout=0.1)) model.add(Dense(NUM_CLASSM, activation='softmax')) model.summary()Bi-GRU
def digru(): # 模型结构:词嵌入-双向GRU*2-全连接 model = Sequential() # 64是序列号 model.add(Embedding(len(vocab)+1, 300, input_length=INPUT_SIZE)) model.add(Bidirectional(GRU(256, dropout=0.2, recurrent_dropout=0.1, return_sequences=True))) model.add(Bidirectional(GRU(256, dropout=0.2, recurrent_dropout=0.1))) model.add(Dense(NUM_CLASSM_C, activation='softmax')) model.summary()CNN+RNN 串联
def clstm(): # 模型结构:词嵌入-卷积池化-GRU*2-全连接 model = Sequential() model.add(Embedding(len(vocab)+1, 300, input_length=INPUT_SIZE)) model.add(Convolution1D(256, 3, padding='same', strides = 1)) model.add(Activation('relu')) model.add(MaxPool1D(pool_size=2)) model.add(GRU(256, dropout=0.2, recurrent_dropout=0.1, return_sequences = True)) model.add(GRU(256, dropout=0.2, recurrent_dropout=0.1)) model.add(Dense(NUM_CLASS, activation='softmax')) model.summary()CNN+RNN 并联
def blstm(): # 模型结构:词嵌入-卷积池化-全连接 ---拼接-全连接 # -双向GRU-全连接 main_input = Input(shape=(INPUT_SIZE,), dtype='float64') embed = Embedding(len(vocab)+1, 300, input_length=INPUT_SIZE)(main_input) cnn = Convolution1D(256, 3, padding='same', strides = 1, activation='relu')(embed) cnn = MaxPool1D(pool_size=4)(cnn) cnn = Flatten()(cnn) cnn = Dense(256)(cnn) rnn = Bidirectional(GRU(256, dropout=0.2, recurrent_dropout=0.1))(embed) rnn = Dense(256)(rnn) con = concatenate([cnn,rnn], axis=-1) main_output = Dense(NUM_CLASS, activation='softmax')(con) model = Model(inputs = main_input, outputs = main_output)fasttext
# 模型结构:词嵌入(n-gram)-最大化池化-全连接 # 生成n-gram组合的词(以3为例) ngram = 3 # 将n-gram词加入到词表 def create_ngram(sent, ngram_value): return set(zip(*[sent[i:] for i in range(ngram_value)])) ngram_set = set() for sentence in x_train_padded_seqs: for i in range(2, ngram+1): set_of_ngram = create_ngram(sentence, i) ngram_set.update(set_of_ngram) # 给n-gram词汇编码 start_index = len(vocab) + 2 token_indice = {v: k + start_index for k, v in enumerate(ngram_set)} # 给n-gram词汇编码 indice_token = {token_indice[k]: k for k in token_indice} max_features = np.max(list(indice_token.keys())) + 1 # 将n-gram词加入到输入文本的末端 def add_ngram(sequences, token_indice, ngram_range): new_sequences = [] for sent in sequences: new_list = sent[:] for i in range(len(new_list) - ngram_range + 1): for ngram_value in range(2, ngram_range + 1): ngram = tuple(new_list[i:i + ngram_value]) if ngram in token_indice: new_list.append(token_indice[ngram]) new_sequences.append(new_list) return new_sequences x_train = add_ngram(x_train_word_ids, token_indice, ngram) x_test = add_ngram(x_test_word_ids, token_indice, ngram) # x_train = pad_sequences(x_train, maxlen=25) # x_test = pad_sequences(x_test, maxlen=25) x_train_padded_seqs = pad_sequences(x_train_word_ids, maxlen=INPUT_SIZE) x_test_padded_seqs = pad_sequences(x_test_word_ids, maxlen=INPUT_SIZE) def fasttext(): model = Sequential() model.add(Embedding(max_features, 300, input_length=INPUT_SIZE)) model.add(GlobalAveragePooling1D()) model.add(Dense(NUM_CLASS, activation='softmax')) model.summary()训练模型:
model=cnn() #rnn model.compile(loss='categorical_crossentropy',optimizer='adam',metrics=['accuracy']) one_hot_labels = to_categorical(y_train, num_classes=NUM_CLASS) # 将标签转换为one-hot编码 # one_hot_labels=y_train model.fit(x_train_padded_seqs, one_hot_labels,epochs=5, batch_size=800) y_predict = model.predict_classes(x_test_padded_seqs) # 预测的是类别,结果就是类别号 y_predict = list(map(str, y_predict))
keras_bert
import pandas as pd import codecs, gc import numpy as np from sklearn.model_selection import KFold from keras_bert import load_trained_model_from_checkpoint, Tokenizer from keras.metrics import top_k_categorical_accuracy from keras.layers import * from keras.callbacks import * from keras.models import Model import keras.backend as K from keras.optimizers import Adam from keras.utils import to_categorical maxlen = INPUT_SIZE #设置序列长度为120,要保证序列长度不超过512 #预训练好的模型 # 还是放在原有样本中 # path=os.path.join(dirpath,config_path) # os.path.join() config_path = 'bert_config.json' # config_path=os.path.join(dirpath,config_path) checkpoint_path = 'bert_model.ckpt' # checkpoint_path=os.path.join(dirpath,checkpoint_path) dict_path = 'vocab.txt' # dict_path=os.path.join(dirpath,checkpoint_path) #将词表中的词编号转换为字典 token_dict = {} with codecs.open(dict_path, 'r', 'utf8') as reader: for line in reader: token = line.strip() token_dict[token] = len(token_dict) #重写tokenizer class OurTokenizer(Tokenizer): def _tokenize(self, text): R = [] for c in text: if c in self._token_dict: R.append(c) elif self._is_space(c): R.append('[unused1]') # 用[unused1]来表示空格类字符 else: R.append('[UNK]') # 不在列表的字符用[UNK]表示 return R tokenizer = OurTokenizer(token_dict) #让每条文本的长度相同,用0填充 def seq_padding(X, padding=0): L = [len(x) for x in X] ML = max(L) return np.array([ np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X ]) #data_generator只是一种为了节约内存的数据方式 class data_generator: def __init__(self, data, batch_size=32, shuffle=True): self.data = data self.batch_size = batch_size self.shuffle = shuffle self.steps = len(self.data) // self.batch_size if len(self.data) % self.batch_size != 0: self.steps += 1 def __len__(self): return self.steps def __iter__(self): while True: idxs = list(range(len(self.data))) if self.shuffle: np.random.shuffle(idxs) X1, X2, Y = [], [], [] for i in idxs: d = self.data[i] text = d[0][:maxlen] x1, x2 = tokenizer.encode(first=text) y = d[1] X1.append(x1) X2.append(x2) Y.append([y]) if len(X1) == self.batch_size or i == idxs[-1]: X1 = seq_padding(X1) X2 = seq_padding(X2) Y = seq_padding(Y) yield [X1, X2], Y[:, 0, :] [X1, X2, Y] = [], [], [] #bert模型设置 def build_bert(nclass): bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None) #加载预训练模型 for l in bert_model.layers: l.trainable = True x1_in = Input(shape=(None,)) x2_in = Input(shape=(None,)) x = bert_model([x1_in, x2_in]) x = Lambda(lambda x: x[:, 0])(x) # 取出[CLS]对应的向量用来做分类 p = Dense(nclass, activation='softmax')(x) model = Model([x1_in, x2_in], p) model.compile(loss='categorical_crossentropy', optimizer=Adam(1e-5), #用足够小的学习率 metrics=['accuracy', acc_top2]) print(model.summary()) return model #计算top-k正确率,当预测值的前k个值中存在目标类别即认为预测正确 def acc_top2(y_true, y_pred): return top_k_categorical_accuracy(y_true, y_pred, k=2) #训练数据、测试数据和标签转化为模型输入格式 DATA_LIST = [] for data_row in train_df1.iloc[:].itertuples(): DATA_LIST.append((xtrain, to_categorical(ytraim, NUM_CLASS))) DATA_LIST = np.array(DATA_LIST) DATA_LIST_TEST = [] for data_row in test_df1.iloc[:].itertuples(): DATA_LIST_TEST.append((xtest, to_categorical(0, NUM_CLASS))) DATA_LIST_TEST = np.array(DATA_LIST_TEST) #交叉验证训练和测试模型 def run_cv(nfold, data, data_labels, data_test): kf = KFold(n_splits=nfold, shuffle=True, random_state=520).split(data) train_model_pred = np.zeros((len(data), 3)) test_model_pred = np.zeros((len(data_test), 3)) for i, (train_fold, test_fold) in enumerate(kf): X_train, X_valid, = data[train_fold, :], data[test_fold, :] model = build_bert(NUM_CLASS) early_stopping = EarlyStopping(monitor='val_acc', patience=3) #早停法,防止过拟合 plateau = ReduceLROnPlateau(monitor="val_acc", verbose=1, mode='max', factor=0.5, patience=2) #当评价指标不在提升时,减少学习率 checkpoint = ModelCheckpoint('./bert_dump/' + str(i) + '.hdf5', monitor='val_acc',verbose=2, save_best_only=True, mode='max', save_weights_only=True) #保存最好的模型 train_D = data_generator(X_train, shuffle=True) valid_D = data_generator(X_valid, shuffle=True) test_D = data_generator(data_test, shuffle=False) #模型训练 model.fit_generator( train_D.__iter__(), steps_per_epoch=len(train_D), epochs=2, validation_data=valid_D.__iter__(), validation_steps=len(valid_D), callbacks=[early_stopping, plateau, checkpoint], ) # model.load_weights('./bert_dump/' + str(i) + '.hdf5') # return model train_model_pred[test_fold, :] = model.predict_generator(valid_D.__iter__(), steps=len(valid_D), verbose=1) test_model_pred += model.predict_generator(test_D.__iter__(), steps=len(test_D), verbose=1) del model gc.collect() #清理内存 K.clear_session() #clear_session就是清除一个session # break return train_model_pred, test_model_pred # import os # print(os.getcwd()) #打印出当前工作路径 # 很容易就崩溃了 #n折交叉验证 train_model_pred, test_model_pred = run_cv(2, DATA_LIST, None, DATA_LIST_TEST)(3)fastnlp部分:
先将当前文本进行读取然后导出成fastnlp需要的样子。
# 数据分割 from sklearn.model_selection import train_test_split # 数据管道 from sklearn.pipeline import Pipeline,make_pipeline # 数据分割 x_train, x_test, y_train, y_test = train_test_split(x_text, y, test_size=0.2, random_state=2017) data_dict1={"raw_words":x_train,"target":y_train} data_dict2={"raw_words":x_test} df_train=pd.DataFrame(data_dict1) df_train.head() # 为fastnlp做准备 df_train.to_csv('train.txt',sep='\t', index=False,header=None,encoding='utf-8') df_test=pd.DataFrame(data_dict2) df_test.head() # 为fastnlp做准备 df_test.to_csv('test.txt',sep='\t', index=False,header=None,encoding='utf-8') #导入Pytorch包 import torch import torch.nn as nn from fastNLP.io.loader import CSVLoader dataset_loader = CSVLoader(headers=('raw_words','target'), sep='\t') testset_loader = CSVLoader( headers=['raw_words'],sep='\t') # 表示将CSV文件中每一行的第一项将填入'raw_words' field,第二项填入'target' field。 # 其中项之间由'\t'分割开来 train_path=r'train.txt' test_path=r'test.txt' dataset = dataset_loader._load(train_path) testset = testset_loader._load(test_path) # 将句子分成单词形式, 详见DataSet.apply()方法 import jieba from itertools import chain print(jieba.__version__) # from itertools import chain # ''' # @params: # data: 数据的列表,列表中的每个元素为 [文本字符串,0/1标签] 二元组 # @return: 切分词后的文本的列表,列表中的每个元素为切分后的词序列 # ''' def get_tokenized(data,words=True): def tokenizer(text): return [tok for tok in jieba.cut(text, cut_all=False)] if words: #按词语进行编码 return tokenizer(data) else: #按字进行编码 return [tokenizer(review) for review in data] print(dataset) dataset.apply(lambda ins:get_tokenized(ins['raw_words']), new_field_name='words', is_input=True) print(dataset) dataset.apply(lambda ins: len(ins['words']) ,new_field_name='seq_len', is_input=True) print(dataset) dataset.apply(lambda x: int(x['target']), new_field_name='target', is_target=True) print(dataset) #testset.apply(lambda ins: list(chain.from_iterable(get_tokenized(ins['raw_words']))), new_field_name='words', is_input=True) testset.apply(lambda ins: get_tokenized(ins['raw_words']), new_field_name='words', is_input=True) testset.apply(lambda ins: len(ins['words']) ,new_field_name='seq_len',is_input=True) print(testset) ### from fastNLP import Vocabulary #将DataSet按照ratio的比例拆分,返回两个DataSet #ratio (float) -- 0<ratio<1, 返回的第一个DataSet拥有 (1-ratio) 这么多数据,第二个DataSet拥有`ratio`这么多数据 train_data, dev_data = dataset.split(0.1, shuffle=False) print(train_data) print(len(train_data),len(dev_data),len(testset)) vocab = Vocabulary(min_freq=2).from_dataset(dataset, field_name='words') vocab.index_dataset(train_data, dev_data, testset, field_name='words', new_field_name='words') from fastNLP.embeddings import StaticEmbedding,StackEmbedding fastnlp_embed = StaticEmbedding(vocab, model_dir_or_name='cn-char-fastnlp-100d',min_freq=2) # # 不知道咋用 # from fastNLP.models import ESIM # # 这个不照 # model_scim=ESIM(fastnlp_embed,num_labels=2, dropout_rate=0.3, dropout_embed=0.1) # print(model_scim) from fastNLP.models.star_transformer import STSeqCls # 这个不照 model_stsc=STSeqCls(fastnlp_embed,num_cls=3, hidden_size=300 , num_layers=4, num_head=8 , head_dim=32, max_len=512, cls_hidden_size=600, emb_dropout=0.1, dropout=0.1) # ESIM(fastnlp_embed,num_labels=2, dropout_rate=0.3, dropout_embed=0.1) print(model_stsc) from fastNLP.models import CNNText model_CNN = CNNText(fastnlp_embed, num_classes=3,dropout=0.1) print(model_CNN) from fastNLP import Trainer, CrossEntropyLoss, AccuracyMetric,BCELoss trainer_CNN = Trainer(model=model_CNN, train_data=train_data, dev_data=dev_data,loss=CrossEntropyLoss(), metrics=AccuracyMetric()) trainer_CNN.train()