物理服务器通过光纤接入到网络或存储交换机,交换机接口功率不定时出现功率过低或过高的情况,导致服务器至交换机连接中断。
交换机接口功率不稳定,并未达到报警阀值,所以未产生任何报警信息。( 可能导致功率不稳定的原因很多:网卡、光纤线、光模块、交换机端口等等等等。)
根据如上场景,本人设计并实施了一个小小的项目:通过程序定时采集交换机光纤接口收、发功率,将采集的数据结构化,写入postgresql数据库,再通过grafana对采集的数据进行图形化展示,对有故障隐患的端口进行特殊颜色展示,方便运维工程师尽早发现问题。
先来看看成果。(使用 python + 开源工具 完成) 这是Grafana的截图,异常的接口功率通过特殊颜色展示,阀值可以自定义。每条记录包含设备名称、管理IP、接口编号、收发功率和采集时间。 每1小时采集一次数据,运维人员可定期查看。如发现异常可以及时处理。
思路很重要。 首先,数据采集。即从目标设备或系统中采集出需要的数据。 其次,数据结构化。定时获取相关数据后进行清洗、过滤、结构化处理。整理成流行的json,例如: 然后,将数据写入pg数据库。 最后,使用pg做数据源,通过Grafana,将采集的数据进行图形化展示。 我在上面的流程中增加了kafka集群。利用统一的接口,将数据传递流式化。至于mq的其他好处,我会在其他文章里详细说明。
总体架构如下:
如上架构,统一了数据处理流程及数据传输接口。 再深入思考一下,这个项目相当于建立了一个运维自动化工具模板,可以把实施工作进行模块化分解,并能将其他分散的工具整合到一个统一的架构体系中,避免形成 “孤岛”,为后续统一运维平台、运维数据仓库的建设打下良好基础。
代码中变量名、ip地址、用户名、密码、表名称和字段名都为示例,可根据需求进行修改。 程序目录结构如下: program是主程序。 consumer是消费者程序,读取mq数据并通过sql写入pg。 class内封装有数据采集、结构化、mq生产者函数。
注:代码适用于Cisco存储及网络交换机,其余厂商设备需要调整数据处理相关内容。
7k_int_consumer.py
import json from kafka import KafkaConsumer import psycopg2 consumer = KafkaConsumer('7k_intpower', group_id= 'a', bootstrap_servers='192.168.103.230:9092') print('consumer success') print(consumer.topics()) # 打印所有topic conn= psycopg2.connect(database="network",user="postgres",password="123.abc",host="192.168.103.194",port="5432") cur= conn.cursor() #创建指针对象 cur.execute("select * from device_info;") for msg in consumer: recv_dict= eval(msg.value.decode('utf-8')) cur.execute("INSERT INTO intpower (topic, top_offset, device_ip, interface, tx_power, rx_power, time) VALUES ("+"'"+str(msg.topic)+"'"+ ','+ str(msg.offset)+ \ ','+"'"+ recv_dict['device']+"'"+ ','+"'"+str(recv_dict['interface'])+"'"+ ','+recv_dict['txpower']+','+recv_dict['rxpower']+','+"'"+recv_dict['timestamp']+\ "'"+ ");") conn.commit() cur.close() conn.close()7k_int_program.py
from db.intpower_class import intPower import schedule import multiprocessing as mp import time def task_intpower(): func_int, ip= intPower().func_getIntPower, '192.168.249.35 192.168.249.36 192.168.249.3 192.168.249.4' pool= mp.Pool(processes= 4) for ip in ip.split(): pool.apply_async(func_int, args= (ip, )) pool.close() pool.join() if __name__ == '__main__': schedule.every(3600).seconds.do(task_intpower) while True: schedule.run_pending() time.sleep(1)intpower_class.py
import time, datetime, paramiko from kafka import KafkaProducer import json class intPower(object): """docstring for int_power""" def __init__(self): pass def func_getIntPower(self, ip): user, passwd= 'network', '123.abc' timestamp= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_client.connect(hostname=ip, username=user, password=passwd) print("登录成功 " + str(ip)+ ' 正在抓取信息... '+ timestamp) msg = lambda x: json.dumps(x).encode('utf-8') command = ssh_client.invoke_shell() command.send('terminal length 0\n') command.send("sh interface transceiver details\n") time.sleep(5) command.send('terminal length 30\n') command.send("exit\n") output_list = command.recv(655350).decode('utf-8').split('Ethernet') ssh_client.close interface_list= [inter_info for inter_info in output_list if 'transceiver is present' in inter_info] interface_list2= [i.split('\r\n') for i in interface_list] tx_list= [k for j in interface_list2 for k in j if 'Tx Power' in k] rx_list= [k for j in interface_list2 for k in j if 'Rx Power' in k] tx_list2= [k.split() for k in tx_list] rx_list2= [k.split() for k in rx_list] producer= KafkaProducer(bootstrap_servers='192.168.103.230:9092') if len(interface_list2)== len(tx_list2)== len(rx_list2): for n in range(len(rx_list2)): res_dict= { 'interface': interface_list2[n][0], 'txpower': tx_list2[n][2], 'rxpower': rx_list2[n][2], 'device': ip, 'timestamp': timestamp } producer.send('7k_intpower', msg(res_dict)) producer.close() else: print('收发接口数量不一致!')关于kafka集群、pg和grafana搭建与部署内容暂时省略。
各模块作用如下: 采集程序:定期采集数据并进行结构化处理,数据生产者。
kafka:数据流传输平台,提供统一的数据传输接口。具有一定的数据持久化功能。
消费者程序:从kafka消费数据,并写入pg。
pg数据库:数据持久化仓库,数据源。
grafana:数据图形化展示平台。前端,用户直接看到的内容。
简要说明: 数据结构化:观察采集的“show”信息的规律,将端口编号、发送功率、接收功率分别整理为list数据结构。之后,针对每个端口,生成一条json数据,发送至kafka集群。
多进程或多线程:如果设备较多,可针对每台设备开辟一条进程或线程进行操作,提高效率。
PG:数据库建表时,设备ip数据类型为“inet”,时间为“timestamp”。
Grafana:查询数据库时会用到联结查询。
有问题欢迎讨论交流!私信、微信、邮件都可以。个人微信: 邮箱:episode5763@163.com