响应式web(二):微服务内部中的响应式、响应式和阻塞式、背压的概念、响应式web应用场景

    技术2024-07-04  83

    上节回顾

    同步与异步命令式编程与响应式编程消息驱动观察者模式Tomcat的NIO异步网络io服务器推技术Servlet3.0与3.1

    本节内容

    响应式系统主流框架概览

    akka

    使用Scala语言开发,后期支持Java,但是在Java社区并不如从前受欢迎

    Vert.x

    Vert.x是Eclipse发行的开源项目,设计初衷是作为Node.js在Java虚拟机上的替代方法,支持非阻塞和事件驱动,在近几年开始受大家关注

    Project Reactor

    Spring5中默认引入的响应式编程机制,出现的比较晚,所以也吸收了前辈的经验。

    RxJava

    ReactiveExtensions,早起应用于.net平台

    微服务内部中的响应式

    我们把响应式开发拆分为两个阶段:

    SpringCloud:前面Controller接入用户请求,内部用Rest进行远程调用,维护了一个线程池(可以受Hytrix保护),去调用远端的服务并阻塞等待返回结果,然后再执行自己后面的业务逻辑,最后返回给用户。这是传统的“命令式编程” controller是单例、多线程:每一个用户请求进来,就创建一个业务线程。

    阻塞式的 响应式的 单一职责原则 在响应式的模型中,用户请求和业务线程就不是一对一的了,处理每一个业务的线程只做自己的事情,可以通过事件驱动的机制,达到线程复用,提高系统性能,适合IO密集度比较高的情况下。

    雪崩效应 雪崩效应形成的原因,是因为不断的重试导致的资源的耗尽。第一次请求没成功返回,耗时很长,后来的请求又来了,造成了请求的积压,所有的请求都没有完成。一个服务不通了,后面的连锁式的反应,导致整个链路不可用,这就是雪崩。

    资源隔离,舱壁模式 解决的是一个服务在调用另外一个服务的时候,会从线程池里拿一个线程回来,发起一个异步调用,如果服务挂了,线程内部就报错了。调不通会产生请求的延迟,需要更长的时间才能返回成功或失败。如果服务B的线程池里的线程都被拿去进行调用了,没有关系,服务C有他自己的线程池,一边满了不影响另一边的线程池,这就是Hytrix实现的舱壁隔离。

    如果按照新的思路,只关心事件,而不关心结果,你爱通不通,不通待会儿再重新调用,这样就不会存在雪崩的问题了。 服务之间的边界清晰的时候,你可以用MQ 如果需要复用,比如我在调用远程服务的时候,存图的服务和存数据库的服务都是远程异步调用的。 如果存图耗时比较长,你可以在存图的时候加一个MQ,反正客户端不需要等待存图的结果。 而对于用户名是否重复的判断,(可能有两个人同时想要使用一个id的时候是不行的),你需要拿存储结果,你可以使用响应式编程

    什么时候需要事件驱动,什么时候不需要事件驱动? 一般微服务里面不用这个。两个服务如果响应时间很快,就没有必要再事件驱动了,毕竟中间要增加一个环节。 即使是雪崩问题,也可以使用Hystrix进行快速失败,解决雪崩问题。 这套技术从14 15年就有了,Node.js也是这套东西,因为之前已经有了解决方案,引入新方案会有新的隐患,所以为啥要用它呢

    富客户端 富因特网应用程序(Rich Internet Applications,RIA)利用具有很强交互性的富客户端技术来为用户提供一个更高和更全方位的网络体验。

    基于TCP连接的一套实现方案:服务器有什么消息,直接推送给我。是一种全双工的交互方式 轮询、长轮询、SSE

    http默认能挂起90秒,比如你可以用sleep,超过90秒需要续租,这是可以用js实现的 h5的特性:websocket是浏览器实现的,ws://开头的协议

    mime 类型不同,浏览器消费方式不同

    有时间看看:浏览器下载文件用的是什么协议?计算机网络 谢希仁 应用层那一张

    响应式web的特点: 啥是背压?解决方案? 在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现 如何减少堆积?限流算法,或者增加机器,或者改为主动去取

    代码示例

    类似于一个聊天室

    package com.mashibing.admin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @RestController @RequestMapping(path = "/sse") public class SseRest { private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>(); @GetMapping(path = "/subscribe") public SseEmitter subscribe(String id) { // 超时时间设置为1小时 SseEmitter sseEmitter = new SseEmitter(3600000L); sseCache.put(id, sseEmitter); // 超时回调 触发 sseEmitter.onTimeout(() -> sseCache.remove(id)); // 结束之后的回调触发 sseEmitter.onCompletion(() -> System.out.println("完成!!!")); return sseEmitter; } @GetMapping(path = "/push") public String push(String id, String content) throws IOException { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { // 发送消息 sseEmitter.send(content); } return "over"; } @GetMapping(path = "over") public String over(String id) { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { // 执行完毕,断开连接 sseEmitter.complete(); sseCache.remove(id); } return "over"; } @GetMapping(path = "/push-all") public String pushAll(String content) throws IOException { for (String s : sseCache.keySet()) { SseEmitter sseEmitter = sseCache.get(s); if (sseEmitter != null) { // 发送消息 sseEmitter.send(content); } } return "over"; } }

    user1.html

    <!doctype html> <html lang="en"> <head> <meta charset="UTF-8"> <!-- for HTML5 --> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>Sse测试文档</title> </head> <body> <div>sse 测试</div> <div id="result"></div> </body> </html> <script> var source = new EventSource('http://localhost/sse/subscribe?id=user1'); source.onmessage = function (event) { text = document.getElementById('result').innerText; text += '\n' + event.data; document.getElementById('result').innerText = text; }; <!-- 添加一个开启回调 --> source.onopen = function (event) { text = document.getElementById('result').innerText; text += '\n 开启: '; console.log(event); document.getElementById('result').innerText = text; }; </script>

    响应式web应用场景

    ajax是只能拉,websocket是全双工,有状态连接,可以推可以拉,可以发送二进制数据,websocket写起来比较麻烦一些。 sse(server sent event)是单向的,是ws的一半,是无状态连接,适合比如股票行情展示,只能服务器给客户端推 下节课我们讲 Reactor,啥叫Flux,Mono,啥叫WebFlux

    响应式宣言

    官方自带中文文档:

    https://www.reactivemanifesto.org/zh-CN

    响应式系统的特点:即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)

    即时响应性:

    只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。

    回弹性:

    系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理。

    弹性:

    系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。

    消息驱动:

    反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。


    响应式系统的应用场景

    电商中的响应式富客户端(Gmail、qq邮箱)系统通知股市K线聊天室

    希望上完所有课程,你可以:

    了解响应式编程,对WebFlux有实战开发经验

    拓展:P2P

    P2P:默克尔树,树形结构,种子里存的是Hash值、节点服务器的地址,通过中央节点提供节点之间的相互发现,如果没有中间节点的话,需要广播(UDP)才行。广播地址是互相找的,节点到节点之间怎么找?可以找服务器去找,可以广播。连接之后开始传文件,我想要共享文件,首先文件会被分为固定块大小。种子文件里有个固定的Hash,分为top hash和子hash

    下载到最后会回退的原因:块被篡改了,找不到合适的块。可能由于恶意篡改,或者网路传输中的丢包。

    Processed: 0.020, SQL: 9