1.自动初始化数据库里的配置表 2.自动配置postman里的参数和触发发送按钮 3.统计服务器中需要解析的文件里的文件数量 4.每30秒一扫描配置文件里字段标签,判断是否结束入库,如入库结束,开始进行判断全量表与增量表里的数量是否一致,如标签在测试经验内最长时间后一直不变,跳出,输出错误 5.各文件名称、路径与表名统一写在一个csv文件里,脚本执行结束后,分别输出一个全量测试报告和增量测试报告,在每条后面标注成功或失败状态
import os import requests import cx_Oracle import paramiko import unittest import time import csv import datetime import re class sqltest(unittest.TestCase): def test1_dataInitialize(self): ####################连接数据库################################## conn = cx_Oracle.connect( 'wg_zc_ts_csjzxn/oracle_1Q#Zg@XXX.XXX.XXX.XX:XXXX/WG_ZC') # 用自己的实际数据库用户名、密码、主机ip地址 替换即可 filedataInitialize = open("userinfo.csv", 'r') table1 = csv.reader(filedataInitialize) for filedata in table1: ########################初始化配置表######################### curs1 = conn.cursor() sql1 = "update Original_to_core set ALLSTATE='0',UPSTATE='0' where ORIGTABLE ='"+filedata[0]+"'" sql2 = "update collection_config set FILENAMECONTRASTKEY='' where FILENAME='"+filedata[0]+"'" curs2 = conn.cursor() sql3="select ALLSTATE from Original_to_core where ORIGTABLE ='"+filedata[0]+"'" curs3 = conn.cursor() sql4="select UPSTATE from Original_to_core where ORIGTABLE ='"+filedata[0]+"'" curs4 = conn.cursor() sql5="select FILENAMECONTRASTKEY from collection_config where FILENAME='"+filedata[0]+"'" rr1=curs1.execute(sql1) rr2 =curs1.execute(sql2) rr3=curs2.execute(sql3) rr4=curs3.execute(sql4) rr5=curs4.execute(sql5) conn.commit() row1 = curs2.fetchone() print("ALLSTATE的值为", row1[0]) ################判断是否重置通过########################### self.assertEqual(0, row1[0], "配置文件sql中ALLSTATE的值重置不通过") row2 = curs3.fetchone() print("UPSTATE的值为", row2[0]) self.assertEqual(0, row2[0], "配置文件sql中UPSTATE的值重置不通过") # print(type(row[0])) row3 = curs4.fetchone() # print("FILENAMECONTRASTKEY的值为", row3[0]) self.assertEqual(None, row3[0], "配置文件sql中FILENAMECONTRASTKEY的值重置不通过") # a = row[0] curs1.close() curs2.close() curs3.close() conn.close() ########################连接ftp服务器################################# def test2_postman(self): hostname = "XXX.XXX.XXX.XXX port = 'XX' username = "XXXX" password = "XXXXXXXXXX" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname, port, username, password, compress=True) sftp_client = client.open_sftp() ##############################触发postman发送数据#################### url = "http://XXX.XXX.XXX.XXX:8086/tcoamp-web-rest/controlller/upDateCollectionByRedis" file = open("userinfo.csv", 'r') table = csv.reader(file) file2 = open("userinforesult.csv", "w") file3 = open("userinforesultINC.csv", "w") for rowfile in table: cursstart = conn.cursor() sqlstart = "select RUNNUM from Original_to_core where ORIGTABLE ='" + rowfile[0] + "'" # sql语句 print(sqlstart) rrfinal = cursstart.execute(sqlstart) rowstart = cursstart.fetchone() print("RUNNUM的初始值为",rowstart[0]) userinfo = {} userinfo["fileName"] = rowfile[4] userinfo["filePath"] = rowfile[3] response = requests.get(url, params=userinfo).text print(response) r = response.find("success") print(r) if r > -1: result = "执行成功" print("执行成功") else: result = "执行失败" print("执行失败") self.assertEqual(result, "执行成功", "插入数据执行不成功") # startTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time.sleep(5) ###################查询解析文件csv中的数量################################## remote_file = sftp_client.open(rowfile[5]) # 文件路径 try: countfinal = -1 for count, line in enumerate(sftp_client.open(rowfile[5], 'rU')): pass countfinal += 1 print(countfinal) print(countfinal) finally: remote_file.close() #####################全量表与增量表里的数量与csv文件里做对比################## conn = cx_Oracle.connect( 'wg_zc_ts_csjzxn/oracle_1Q#Zg@XXX.XXX.XXX.X:XXXX/WG_ZC') # 用自己的实际数据库用户名、密码、主机ip地址 替换即可 flag=rowstart[0] startTime = time.strftime("%H:%M:%S", time.localtime()) h, m, s = startTime.strip().split(':') # .split() startmiao=int(h) * 3600 + int(m) * 60 + int(s) print("startmiao为",startmiao) while(True): curfinal = conn.cursor() sqlfinal = "select RUNNUM from Original_to_core where ORIGTABLE ='" + rowfile[0] + "'" # sql语句 print(sqlfinal) rrfinal = curfinal.execute(sqlfinal) rowsfinal = curfinal.fetchone() print("RUNNUM的终止值为", rowsfinal[0]) if rowsfinal[0]> flag: print("dddd",rowfile[1]) curs11 = conn.cursor() sqlAll = "select count(*) from "+rowfile[1]+"" # sql语句 print(sqlAll) rr11 = curs11.execute(sqlAll) row11 = curs11.fetchone() print(row11[0]) print(type(row11[0])) a1 = row11[0] print("全量的数为:", a1) curs22 = conn.cursor() sqlAdd = "select count(*) from "+rowfile[2]+"" print(sqlAdd) rr22 = curs22.execute(sqlAdd) row22 = curs22.fetchone() print(row22[0]) print(type(row22[0])) a2 = row22[0] print("增量的数为:", a2) resultdata={} print("对比全量", countfinal, a1) if countfinal == a1: compareresult1 = "采集文件中的数量与入库中全量的数量一致" else: compareresult1 = "采集文件中的数量与入库中全量的数量不一致" print(rowfile[1],"的数量",compareresult1) self.assertEqual(compareresult1,'采集文件中的数量与入库中全量的数量一致',"全量表解析不成功") print("对比增量", countfinal, a2) if countfinal == a2: compareresult2 = "采集文件中的数量与入库中增量的数量一致" else: compareresult2 = "采集文件中的数量与入库中增量的数量不一致" print(rowfile[2],"的数量",compareresult2) self.assertEqual(compareresult2,'采集文件中的数量与入库中增量的数量一致',"增量表解析不成功") if compareresult1=="采集文件中的数量与入库中全量的数量一致" : file2.write(rowfile[0] + "," + rowfile[1] + "," + rowfile[2] + "," + rowfile[3] + "," + rowfile[4] + "," + rowfile[5]+ "," + "全量入库测试成功" + "\n") else: file2.write(rowfile[0] + "," + rowfile[1] + "," + rowfile[2] + "," + rowfile[3] + "," + rowfile[4] + "," + rowfile[5]+ "," + "全量入库测试失败" + "\n") if compareresult2 == "采集文件中的数量与入库中全量的数量一致": file3.write(rowfile[0] + "," + rowfile[1] + "," + rowfile[2] + "," + rowfile[3] + "," + rowfile[ 4] + "," + rowfile[5]+ "," + "增量入库测试成功" + "\n") else: file3.write(rowfile[0] + "," + rowfile[1] + "," + rowfile[2] + "," + rowfile[3] + "," + rowfile[ 4] + "," + rowfile[5]+ "," + "增量入库测试失败" + "\n") curs11.close() curs22.close() break else: time.sleep(10) startend = time.strftime("%H:%M:%S", time.localtime()) h, m, s = startend.strip().split(':') # .split() endmiao = int(h) * 3600 + int(m) * 60 + int(s) print("end秒为",endmiao) wait=endmiao-startmiao if wait>60: waitresult="配置表RUNNUM无变化" self.assertNotEquals(waitresult, "配置表RUNNUM无变化", "配置表更改不成功") break conn.close() file.close() file2.close() file3.close() if __name__ == '__main__': sqltestObj=sqltest() sqltestObj.test1_dataInitialize() sqltestObj.test2_postman()
测试结果截图
配置表
全量表测试结果
增量表测试结果