WebSocket协议实时通信技术原理

    技术2022-07-11  92

    HTML5 WebSocket协议实时通讯机制 Ajax长轮询过程中,客户端通过频繁地向服务器发送HTTP请求的方式与服务器保持这一种虚拟的连接,此连接的方式属于循环连接而不属于长连接。

    相对于HTTP协议这一非持久连接的特点来说,为避免HTTP轮询的滥用,2011年,由IETF(互联网工程任务组)制定并规范了WebSocket通信协议。

    WebSocket通信协议是HTML5支持浏览器与服务器进行多路复用全双工(Full-Duplex)通信的技术,是一个持久化协议,允许服务器主动发送信息给客户端。客户端通过JavaScript实现相应的API与服务器建立WebSocket连接后,客户端发送的Request请求信息当中不再带有请求头Head的部分信息,与Ajax长轮询通信对比,WebSocket通讯,不仅能降低服务器的压力而且保证了数据的实时传输。

    WebSocket协议实时通信技术原理 最喜欢拆东西,解剖技术原理了。做事多问个为什么,凭什么要这样实现,这样做的作用是什么。

    WebSocket协议是基于TCP协议并遵从HTTP协议的握手规范的一种通讯协议,其通过发送连接请求,握手,验证握手信息这三个步骤与服务器建立WebSocket连接。

    发送连接请求 客户端通过一个格式为:ws://host:port/的请求地址发起WebSocket连接请求,并由JavaScript实现WebSocket API与服务器建立WebSocket连接,其中host为服务器主机IP地址或域名,port为端口。为了让本客服系统能够在不同浏览器建立WebSocket连接,在发送连接请求前,需要开启SockJS的支持,创建一个全双工的信道。

    WebSocket请求头信息:

    相关字段说明:

    字段名    说明 Connection:Upgrade    标识该HTTP请求是一个协议升级请求 Upgrade: WebSocket    协议升级为WebSocket协议 Sec-WebSocket-Version: 13    客户端支持WebSocket的版本 Sec-WebSocket-Key:jONIMu4nFOf0iwNnc2cihg==    客户端采用base64编码的24位随机字符序列。 Sec-WebSocket-Extensions    协议扩展类型 HTTP协议和WebSocket协议关系图:

    可以看出WebSocket请求是HTTP协议进行升级的,即使请求格式为ws://,其本质也是一个HTTP请求,借用了HTTP的部分设施兼容了客户端的握手规则。

    握手 当服务器收到请求后,会解析请求头信息,根据升级后的协议判断该请求为WebSocket请求,并取出请求信息中的Sec-WebSocket-Key字段的数据按照某种算法重新生成一个新的字符串序列放入响应头Sec-WebSocket-Accept中。

    WebSocket服务器响应头信息:

    相关字段说明: Sec-WebSocket-Accept:服务器接受客户端HTTP协议升级的证明。

    WebSocket建立连接 客户端接收服务器的响应后,同样会解析请求头信息,取出请求信息中的Sec-WebSocket-Accept字段,并用服务器内部处理Sec-WebSocket-Key字段的算法处理之前发送的Sec-WebSocket-Key,把处理得到的结果与Sec-WebSocket-Accept进行对比,数据相同则表示客户端与服务器成功建立WebSocket连接,反之失败。

    WebSocket通信协议的数据传输 WebSocket通讯协议的数据传输格式是以帧的形式传输的,其帧格式如图:

    相关字段说明:

    根据数据帧的设计可以看出,WebSocket通讯协议是通过心跳检查PING-PONG帧来实现WebSocket长连接。当WebSocket连接建立后,PING帧和PONG帧都会不携带数据地进行来回传输,当连接发生变化时,相应数据信息会被植入PING帧,PONG帧作为响应帧返回结果。

    建立连接后,可在客户端使用JavaScript实现相关的WebSocket API。相关实现接口如下表:

    实现方式    说明 New WebSocket(“ws://host:port/”);    发起与服务器建立WebSocket连接的对象 websocket.onopen()=function(){}    接收连接成功建立的响应函数 websocket.onerror()=function(){}    接收异常信息的响应函数 websocket.onmessage()=functionm(event){}    接收服务器返回的消息函数 websocket.onclose()=function(){}    接收连接关闭的响应函数 sendMessage(event.data)=function(data){}    发送消息函数 websocket.close()=function(){}    连接关闭函数 ..

    从目前各Web技术领域来看,各大浏览器均支持WebSocket协议的实现,但WebSocket协议所支持服务器较少,如:NodeJS、Tomcat7.0等,相信未来会得到普及。

    Spring-WebSocket源码剖析 上面已经通过解析数据包阐述了WebSocket的基本原理。由于Spring 4.0更新后直接增加了对WebSocket的支持,下文将借此通过Spring 框架提供的源代码进行分析Spring 框架是如何实现在接收到客户端的连接请求后与服务器建立连接的。

    以下代码片段摘自官方下载的源代码:spring-websocket-4.0.6.RELEASE-sources 下载地址:https://spring.io/blog/2014/07/08/spring-framework-4-0-6-released

    (1)封装来自客户端发送的WebSocket连接请求信息:

    public class WebSocketHttpHeaders extends HttpHeaders {

        public static final String SEC_WEBSOCKET_ACCEPT = "Sec-WebSocket-Accept";

        public static final String SEC_WEBSOCKET_EXTENSIONS = "Sec-WebSocket-Extensions";

        public static final String SEC_WEBSOCKET_KEY = "Sec-WebSocket-Key";

        public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

        public static final String SEC_WEBSOCKET_VERSION = "Sec-WebSocket-Version";

        private static final long serialVersionUID = -6644521016187828916L;

        private final HttpHeaders headers;

        public WebSocketHttpHeaders() {         this(new HttpHeaders(), false);     }

        public WebSocketHttpHeaders(HttpHeaders headers) {         this(headers, false);     }

        private WebSocketHttpHeaders(HttpHeaders headers, boolean readOnly) {         this.headers = readOnly ? HttpHeaders.readOnlyHttpHeaders(headers) : headers;     }

        public static WebSocketHttpHeaders readOnlyWebSocketHttpHeaders(WebSocketHttpHeaders headers){         return new WebSocketHttpHeaders(headers, true);     }

        ***     **     *     * }(2)判断请求的协议类型,获取请求信息进入握手环节: 注意相关注释!!

     

    public abstract class AbstractWebSocketClient implements WebSocketClient {

        protected final Log logger = LogFactory.getLog(getClass());

        private static final Set<String> specialHeaders = new HashSet<String>();

        static {         specialHeaders.add("cache-control");         specialHeaders.add("connection");         specialHeaders.add("host");         specialHeaders.add("sec-websocket-extensions");         specialHeaders.add("sec-websocket-key");         specialHeaders.add("sec-websocket-protocol");         specialHeaders.add("sec-websocket-version");         specialHeaders.add("pragma");         specialHeaders.add("upgrade");     }

        @Override     public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,String uriTemplate, Object... uriVars) {..

        @Override     public final ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,             WebSocketHttpHeaders headers, URI uri) {

            Assert.notNull(webSocketHandler, "webSocketHandler must not be null");         Assert.notNull(uri, "uri must not be null");

            String scheme = uri.getScheme();

            /*判断协议类型*/         Assert.isTrue(((scheme != null) && ("ws".equals(scheme) || "wss".equals(scheme))), "Invalid scheme: " + scheme);

            if (logger.isDebugEnabled()) {             logger.debug("Connecting to " + uri);

            /*封装响应信息*/         HttpHeaders headersToUse = new HttpHeaders();         if (headers != null) {             for (String header : headers.keySet()) {                 if (!specialHeaders.contains(header.toLowerCase())) {                     headersToUse.put(header, headers.get(header));                 }             }         }

            List<String> subProtocols = ((headers != null) && (headers.getSecWebSocketProtocol() != null)) ?                 headers.getSecWebSocketProtocol() : Collections.<String>emptyList();

            List<WebSocketExtension> extensions = ((headers != null) && (headers.getSecWebSocketExtensions() != null)) ?                 headers.getSecWebSocketExtensions() : Collections.<WebSocketExtension>emptyList();

            /*进入握手环节*/         return doHandshakeInternal(webSocketHandler, headersToUse, uri, subProtocols, extensions,                 Collections.<String, Object>emptyMap());     }

        protected abstract ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler,             HttpHeaders headers, URI uri, List<String> subProtocols, List<WebSocketExtension> extensions,             Map<String, Object> attributes);

    }

    (3)握手处理: 注意相关注释!!

     

    public class StandardWebSocketClient extends AbstractWebSocketClient {

        private final WebSocketContainer webSocketContainer;

        private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();

        public StandardWebSocketClient() {         this.webSocketContainer = ContainerProvider.getWebSocketContainer();     }

        public StandardWebSocketClient(WebSocketContainer webSocketContainer) {         Assert.notNull(webSocketContainer, "WebSocketContainer must not be null");         this.webSocketContainer = webSocketContainer;     }

        public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {..

        public AsyncListenableTaskExecutor getTaskExecutor() {..

        /*握手处理,生成连接会话句柄Session*/     @Override     protected ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler,             HttpHeaders headers, final URI uri, List<String> protocols,             List<WebSocketExtension> extensions, Map<String, Object> attributes) {

            int port = getPort(uri);         InetSocketAddress localAddress = new InetSocketAddress(getLocalHost(), port);         InetSocketAddress remoteAddress = new InetSocketAddress(uri.getHost(), port);

            final StandardWebSocketSession session = new StandardWebSocketSession(headers,                 attributes, localAddress, remoteAddress);

            final ClientEndpointConfig.Builder configBuilder = ClientEndpointConfig.Builder.create();         configBuilder.configurator(new StandardWebSocketClientConfigurator(headers));         configBuilder.preferredSubprotocols(protocols);         configBuilder.extensions(adaptExtensions(extensions));         final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session);

            Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {             @Override             public WebSocketSession call() throws Exception {                 webSocketContainer.connectToServer(endpoint, configBuilder.build(), uri);                 return session;             }         };

            if (this.taskExecutor != null) {             return this.taskExecutor.submitListenable(connectTask);         }         else {             ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);             task.run();             return task;         }     }

        private static List<Extension> adaptExtensions(List<WebSocketExtension> extensions){..

        private InetAddress getLocalHost() {..     private int getPort(URI uri) {..     private class StandardWebSocketClientConfigurator extends Configurator {         private final HttpHeaders headers;         public StandardWebSocketClientConfigurator(HttpHeaders headers) {..         @Override         public void beforeRequest(Map<String, List<String>> requestHeaders) {..         @Override         public void afterResponse(HandshakeResponse response) {..

    }

    (4)建立WebSocket连接,开始通讯: 注意相关注释!!

     

    public abstract class ConnectionManagerSupport implements SmartLifecycle {

        protected final Log logger = LogFactory.getLog(getClass());

        private final URI uri;

        private boolean autoStartup = false;

        private boolean isRunning = false;

        private int phase = Integer.MAX_VALUE;

        private final Object lifecycleMonitor = new Object();

        public ConnectionManagerSupport(String uriTemplate, Object... uriVariables) {         this.uri = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(                 uriVariables).encode().toUri();     }

        public void setAutoStartup(boolean autoStartup) {..     @Override     public boolean isAutoStartup() {..     public void setPhase(int phase) {..     @Override     public int getPhase() {..     protected URI getUri() {..     @Override     public boolean isRunning() {..

        @Override     public final void start() {         synchronized (this.lifecycleMonitor) {             if (!isRunning()) {                 startInternal();             }         }     }

        /*打开连接,synchronized处理并发连接*/     protected void startInternal() {         synchronized (lifecycleMonitor) {             if (logger.isDebugEnabled()) {                 logger.debug("Starting " + this.getClass().getSimpleName());             }             this.isRunning = true;             openConnection();         }     }

        protected abstract void openConnection();

        @Override     public final void stop() {..

        /*关闭连接*/     protected void stopInternal() throws Exception {         if (isConnected()) {             closeConnection();         }     }

        protected abstract boolean isConnected();

        protected abstract void closeConnection() throws Exception;

        @Override     public final void stop(Runnable callback) {         synchronized (this.lifecycleMonitor) {             this.stop();             callback.run();         }     }

    }

    Processed: 0.011, SQL: 9