LAN video transmission

    技术2026-02-14  19

    对于局域网通信无非就是传输视频或者进行信息交互,今天介绍一下局域网视频传输的构建。

    For LAN communication is nothing more than transmission of video or information exchange, today to introduce the construction of LAN video transmission.

    首先,看看我的程序目录:

    First, my program directory:

     config.ini:

    [server] port = 12340 host = 127.0.0.1 [client] port = 12340 host = 127.0.0.1

    init.py

    import configparser import os , sys def get_address(section): config = configparser.ConfigParser() config.read(os.path.join(sys.path[0],r'config.ini')) return (config.get(section=section, option='host'), int(config.get(section=section, option='port')))

    client.py

    import socket import sys import time from init import * from camera import * jpeg_quality = 80 server_address = get_address('server') client_address = get_address('client') buffersize = 65507 if __name__ == '__main__': grabber1 = VideoGrabber(jpeg_quality) grabber1.setDaemon(True) grabber1.start() running = True sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # sk.bind(client_address) tot_frame = 0 time_sta = time.time() while(running): time.sleep(0.001) buffer = grabber1.get_buffer() if buffer is None: continue # print(len(buffer)) if len(buffer) > 65507: print("The message is too large to be sent within a single UDP datagram. We do not handle splitting the message in multiple datagrams") sk.sendto(b"FAIL",server_address) continue sk.sendto(buffer.tobytes(), server_address) tot_frame +=1 if tot_frame % 100 ==0 : print("{:.2f}fps".format(tot_frame/(time.time()-time_sta))) print("Quitting..") grabber1.stop() grabber1.join() sk.close()

    server.py

    import socket import cv2 import numpy as np import sys import time from init import * sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_address = get_address('server') client_address = get_address('client') buffersize = 65507 sk.bind(server_address) tot_frame = 0 time_sta = time.time() while(True): data, client = sk.recvfrom(buffersize) # print("接受成功") if data == b"FAIL": print("buffersize is too small or lose the packet") continue array = np.frombuffer(data, dtype=np.uint8) img = cv2.imdecode(array, 1) #解码 cv2.imshow("test_server_receive", img) tot_frame +=1 if tot_frame % 100 ==0 : print("{:.2f}fps".format(tot_frame/(time.time()-time_sta))) if cv2.waitKey(1) & 0xFF == ord('q'): break print("The server is quitting. ")

    camera.py

    from threading import Thread, Lock import cv2 class VideoGrabber(Thread): """ A threaded video grabber Attributes: encode_params (): cap (str): attr2 (:obj:`int`, optional): Description of `attr2`. """ def __init__(self, jpeg_quality): """Constructor. Args: jpeg_quality (:obj:`int`): Quality of JPEG encoding, in 0, 100. """ Thread.__init__(self) self.encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality] self.cap = cv2.VideoCapture(0) self.running = True self.buffer = None self.lock = Lock() def stop(self): self.running = False def get_buffer(self): """Method to access the encoded buffer. Returns: np.ndarray: the compressed image if one has been acquired. None otherwise. """ if self.buffer is not None: self.lock.acquire() cpy = self.buffer.copy() self.lock.release() return cpy # ndarray def run(self): # 一直读取 while self.running: success, img = self.cap.read() if not success: continue #cv2.imshow("test_server_send", img) self.lock.acquire() result, self.buffer = cv2.imencode('.jpg', img, self.encode_param) self.lock.release()

       I hope I can help you,If you have any questions, please  comment on this blog or send me a private message. I will reply in my free time. 

    Processed: 0.008, SQL: 9