网络编程-非阻塞IO实现netcat

    技术2022-07-11  85

    简介

    阻塞和非阻塞:阻塞是指IO操作需要彻底完成后才返回到用户空间;非阻塞相反。 同步和异步:同步是指线程发起之后要等返回才继续下一步。 非阻塞IO应该是网络库该解决的问题,应用程序不需要去管IO,只需要了解应用。 thread方式比非阻塞IO好理解。 理解同步/异步/阻塞/非阻塞IO区别

    服务端

    #include "thread/Atomic.h" #include "datetime/Timestamp.h" #include "Acceptor.h" #include "InetAddress.h" #include "TcpStream.h" #include <string.h> #include <thread> muduo::AtomicInt64 g_bytes; std::string getMessage() { std::string line; for (int i = 33; i < 127; ++i) { line.push_back(char(i)); } line += line; std::string message; for (size_t i = 0; i < 127-33; ++i) { message += line.substr(i, 72) + '\n'; } return message; } void measure() { muduo::Timestamp start = muduo::Timestamp::now(); while (true) { struct timespec ts = { 1, 0 }; ::nanosleep(&ts, NULL); // unfortunately, those two assignments are not atomic int64_t bytes = g_bytes.getAndSet(0); muduo::Timestamp end = muduo::Timestamp::now(); double elapsed = timeDifference(end, start); start = end; if (bytes) { printf("%.3f MiB/s\n", bytes / (1024.0 * 1024) / elapsed); } } } void chargen(TcpStreamPtr stream) { std::string message = getMessage(); while (true) { #客户端没有read会卡住 int nw = stream->sendAll(message.data(), message.size()); g_bytes.add(nw); if (nw < static_cast<int>(message.size())) { break; } } } // a thread-per-connection current chargen server and client int main(int argc, char* argv[]) { if (argc < 3) { printf("Usage:\n %s hostname port\n %s -l port\n", argv[0], argv[0]); return 0; } std::thread(measure).detach(); int port = atoi(argv[2]); if (strcmp(argv[1], "-l") == 0) { InetAddress listenAddr(port); Acceptor acceptor(listenAddr); printf("Accepting... Ctrl-C to exit\n"); int count = 0; while (true) { TcpStreamPtr tcpStream = acceptor.accept(); printf("accepted no. %d client\n", ++count); std::thread thr(chargen, std::move(tcpStream)); thr.detach(); } } else { InetAddress addr; const char* hostname = argv[1]; if (InetAddress::resolve(hostname, port, &addr)) { TcpStreamPtr stream(TcpStream::connect(addr)); if (stream) { chargen(std::move(stream)); } else { printf("Unable to connect %s\n", addr.toIpPort().c_str()); perror(""); } } else { printf("Unable to resolve %s\n", hostname); } } }

    客户端

    #!/usr/bin/python import errno import fcntl import os import select import socket import sys def setNonBlocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) def nonBlockingWrite(fd, data): try: nw = os.write(fd, data) return nw except OSError as e: if e.errno == errno.EWOULDBLOCK: return -1 def relay(sock): socketEvents = select.POLLIN poll = select.poll() poll.register(sock, socketEvents) poll.register(sys.stdin, select.POLLIN) setNonBlocking(sock) # setNonBlocking(sys.stdin) # setNonBlocking(sys.stdout) done = False socketOutputBuffer = '' while not done: events = poll.poll(10000) # 10 seconds for fileno, event in events: if event & select.POLLIN: if fileno == sock.fileno(): data = sock.recv(8192) if data: nw = sys.stdout.write(data) # stdout does support non-blocking write, though else: done = True else: assert fileno == sys.stdin.fileno() data = os.read(fileno, 8192) if data: assert len(socketOutputBuffer) == 0 nw = nonBlockingWrite(sock.fileno(), data) if nw < len(data): if nw < 0: nw = 0 socketOutputBuffer = data[nw:] socketEvents |= select.POLLOUT poll.register(sock, socketEvents) poll.unregister(sys.stdin) else: sock.shutdown(socket.SHUT_WR) poll.unregister(sys.stdin) if event & select.POLLOUT: if fileno == sock.fileno(): assert len(socketOutputBuffer) > 0 nw = nonBlockingWrite(sock.fileno(), socketOutputBuffer) if nw < len(socketOutputBuffer): assert nw > 0 socketOutputBuffer = socketOutputBuffer[nw:] else: socketOutputBuffer = '' socketEvents &= ~select.POLLOUT poll.register(sock, socketEvents) poll.register(sys.stdin, select.POLLIN) def main(argv): if len(argv) < 3: binary = argv[0] print "Usage:\n %s -l port\n %s host port" % (argv[0], argv[0]) print (sys.stdout.write) return port = int(argv[2]) if argv[1] == "-l": # server server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('', port)) server_socket.listen(5) (client_socket, client_address) = server_socket.accept() server_socket.close() relay(client_socket) else: # client sock = socket.create_connection((argv[1], port)) relay(sock) if __name__ == "__main__": main(sys.argv)
    Processed: 0.014, SQL: 9