网络编程-阻塞IO复用实现netcat

    技术2022-07-11  78

    简介

    IO复用:同步,复用线程,事件循环驱动

    阻塞IO非阻塞IO

    客户端

    #!/usr/bin/python import os import select import socket import sys def relay(sock): poll = select.poll() poll.register(sock, select.POLLIN) poll.register(sys.stdin, select.POLLIN) done = False while not done: events = poll.poll(10000) # 10 seconds for fileno, event in events: if event & select.POLLIN: if fileno == sock.fileno(): data = sock.recv(8192) if data: sys.stdout.write(data) else: done = True else: assert fileno == sys.stdin.fileno() data = os.read(fileno, 8192) if data: sock.sendall(data) else: sock.shutdown(socket.SHUT_WR) poll.unregister(sys.stdin) def main(argv): if len(argv) < 3: binary = argv[0] print "Usage:\n %s -l port\n %s host port" % (argv[0], argv[0]) return port = int(argv[2]) if argv[1] == "-l": # server server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('', port)) server_socket.listen(5) (client_socket, client_address) = server_socket.accept() server_socket.close() relay(client_socket) else: # client sock = socket.create_connection((argv[1], port)) relay(sock) if __name__ == "__main__": main(sys.argv)

    服务端

    #include "thread/Atomic.h" #include "datetime/Timestamp.h" #include "Acceptor.h" #include "InetAddress.h" #include "TcpStream.h" #include <string.h> #include <thread> muduo::AtomicInt64 g_bytes; std::string getMessage() { std::string line; for (int i = 33; i < 127; ++i) { line.push_back(char(i)); } line += line; std::string message; for (size_t i = 0; i < 127-33; ++i) { message += line.substr(i, 72) + '\n'; } return message; } void measure() { muduo::Timestamp start = muduo::Timestamp::now(); while (true) { struct timespec ts = { 1, 0 }; ::nanosleep(&ts, NULL); // unfortunately, those two assignments are not atomic int64_t bytes = g_bytes.getAndSet(0); muduo::Timestamp end = muduo::Timestamp::now(); double elapsed = timeDifference(end, start); start = end; if (bytes) { printf("%.3f MiB/s\n", bytes / (1024.0 * 1024) / elapsed); } } } void chargen(TcpStreamPtr stream) { std::string message = getMessage(); while (true) { #客户端没有read会卡住 int nw = stream->sendAll(message.data(), message.size()); g_bytes.add(nw); if (nw < static_cast<int>(message.size())) { break; } } } // a thread-per-connection current chargen server and client int main(int argc, char* argv[]) { if (argc < 3) { printf("Usage:\n %s hostname port\n %s -l port\n", argv[0], argv[0]); return 0; } std::thread(measure).detach(); int port = atoi(argv[2]); if (strcmp(argv[1], "-l") == 0) { InetAddress listenAddr(port); Acceptor acceptor(listenAddr); printf("Accepting... Ctrl-C to exit\n"); int count = 0; while (true) { TcpStreamPtr tcpStream = acceptor.accept(); printf("accepted no. %d client\n", ++count); std::thread thr(chargen, std::move(tcpStream)); thr.detach(); } } else { InetAddress addr; const char* hostname = argv[1]; if (InetAddress::resolve(hostname, port, &addr)) { TcpStreamPtr stream(TcpStream::connect(addr)); if (stream) { chargen(std::move(stream)); } else { printf("Unable to connect %s\n", addr.toIpPort().c_str()); perror(""); } } else { printf("Unable to resolve %s\n", hostname); } } }

    测试

    服务器:./chargen -l 1234 客户端1:nc localhost 1234 > /dev/null # 测试正常 客户端2:nc localhost 1234 < /dev/zero >/dev/null # 阻塞住,因为客户端阻塞在write上

    Processed: 0.012, SQL: 9