简介
IO复用:同步,复用线程,事件循环驱动
阻塞IO非阻塞IO
客户端
import os
import select
import socket
import sys
def relay(sock
):
poll
= select
.poll
()
poll
.register
(sock
, select
.POLLIN
)
poll
.register
(sys
.stdin
, select
.POLLIN
)
done
= False
while not done
:
events
= poll
.poll
(10000)
for fileno
, event
in events
:
if event
& select
.POLLIN
:
if fileno
== sock
.fileno
():
data
= sock
.recv
(8192)
if data
:
sys
.stdout
.write
(data
)
else:
done
= True
else:
assert fileno
== sys
.stdin
.fileno
()
data
= os
.read
(fileno
, 8192)
if data
:
sock
.sendall
(data
)
else:
sock
.shutdown
(socket
.SHUT_WR
)
poll
.unregister
(sys
.stdin
)
def main(argv
):
if len(argv
) < 3:
binary
= argv
[0]
print "Usage:\n %s -l port\n %s host port" % (argv
[0], argv
[0])
return
port
= int(argv
[2])
if argv
[1] == "-l":
server_socket
= socket
.socket
(socket
.AF_INET
, socket
.SOCK_STREAM
)
server_socket
.setsockopt
(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
server_socket
.bind
(('', port
))
server_socket
.listen
(5)
(client_socket
, client_address
) = server_socket
.accept
()
server_socket
.close
()
relay
(client_socket
)
else:
sock
= socket
.create_connection
((argv
[1], port
))
relay
(sock
)
if __name__
== "__main__":
main
(sys
.argv
)
服务端
#include "thread/Atomic.h"
#include "datetime/Timestamp.h"
#include "Acceptor.h"
#include "InetAddress.h"
#include "TcpStream.h"
#include <string.h>
#include <thread>
muduo
::AtomicInt64 g_bytes
;
std
::string
getMessage()
{
std
::string line
;
for (int i
= 33; i
< 127; ++i
)
{
line
.push_back(char(i
));
}
line
+= line
;
std
::string message
;
for (size_t i
= 0; i
< 127-33; ++i
)
{
message
+= line
.substr(i
, 72) + '\n';
}
return message
;
}
void measure()
{
muduo
::Timestamp start
= muduo
::Timestamp
::now();
while (true)
{
struct timespec ts
= { 1, 0 };
::nanosleep(&ts
, NULL);
int64_t bytes
= g_bytes
.getAndSet(0);
muduo
::Timestamp end
= muduo
::Timestamp
::now();
double elapsed
= timeDifference(end
, start
);
start
= end
;
if (bytes
)
{
printf("%.3f MiB/s\n", bytes
/ (1024.0 * 1024) / elapsed
);
}
}
}
void chargen(TcpStreamPtr stream
)
{
std
::string message
= getMessage();
while (true)
{
#客户端没有read会卡住
int nw
= stream
->sendAll(message
.data(), message
.size());
g_bytes
.add(nw
);
if (nw
< static_cast<int>(message
.size()))
{
break;
}
}
}
int main(int argc
, char* argv
[])
{
if (argc
< 3)
{
printf("Usage:\n %s hostname port\n %s -l port\n", argv
[0], argv
[0]);
return 0;
}
std
::thread(measure
).detach();
int port
= atoi(argv
[2]);
if (strcmp(argv
[1], "-l") == 0)
{
InetAddress
listenAddr(port
);
Acceptor
acceptor(listenAddr
);
printf("Accepting... Ctrl-C to exit\n");
int count
= 0;
while (true)
{
TcpStreamPtr tcpStream
= acceptor
.accept();
printf("accepted no. %d client\n", ++count
);
std
::thread
thr(chargen
, std
::move(tcpStream
));
thr
.detach();
}
}
else
{
InetAddress addr
;
const char* hostname
= argv
[1];
if (InetAddress
::resolve(hostname
, port
, &addr
))
{
TcpStreamPtr
stream(TcpStream
::connect(addr
));
if (stream
)
{
chargen(std
::move(stream
));
}
else
{
printf("Unable to connect %s\n", addr
.toIpPort().c_str());
perror("");
}
}
else
{
printf("Unable to resolve %s\n", hostname
);
}
}
}
测试
服务器:./chargen -l 1234 客户端1:nc localhost 1234 > /dev/null # 测试正常 客户端2:nc localhost 1234 < /dev/zero >/dev/null # 阻塞住,因为客户端阻塞在write上
转载请注明原文地址:https://ipadbbs.8miu.com/read-16417.html