!!! 事先说明 : ①本文章是以redis作为驱动来解读的。 ② 文章中的代码注释一定看,介绍都在注释中!!! 此图如有侵权,通知我删除 ;
通过上图可以看得出来,laravel广播事件与前端是没有直接交互的。
大概流程就是 :laravel广播事件通过redis的发布订阅功能发布频道,laravel-echo-server链接redis订阅频道获取发布的数据,通过socket主动推送给laravel-echo,前端渲染。
现在我们就跟着这个流程读一下源码吧!
(1)进入广播服务提供者类所在文件夹中 :\vendor\laravel\framework\src\Illuminate\Broadcasting (2) 写一个发送广播事件的demo
use Illuminate\Broadcasting\Channel; use Illuminate\Queue\SerializesModels; use Illuminate\Broadcasting\PrivateChannel; use Illuminate\Broadcasting\PresenceChannel; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Broadcasting\InteractsWithSockets; use Illuminate\Contracts\Broadcasting\ShouldBroadcast; class PrivateMessageEvent implements ShouldBroadcast { use Dispatchable, InteractsWithSockets, SerializesModels; public function __construct(){} /** * 获取广播事件对应的频道。 */ public function broadcastOn() { return new PrivateChannel('PrivateMessage.' . $this->admin_id ); } /** * 自定义广播数据 * @return mixed */ public function broadcastWith() { return $this->data; } } // 发布广播事件 broadcast(new PrivateMessageEvent();(3) 进入 BroadcastEvent.php 广播事件类找 handle 方法
/** * 创建一个新的作业处理程序实例。 * @param mixed $event 广播事件类实例 */ public function __construct($event) { $this->event = $event; } /** * 处理排队的作业。 * @param \Illuminate\Contracts\Broadcasting\Broadcaster $broadcaster */ public function handle(Broadcaster $broadcaster) { // 主要看是否自定义广播事件名称,没有则使用类名 $name = method_exists($this->event, 'broadcastAs') ? $this->event->broadcastAs() : get_class($this->event); // 发布广播数据到redis中 $broadcaster->broadcast( // 参1: 获取广播的频道名 Arr::wrap($this->event->broadcastOn()), // 参2:事件名 $name, // 参3:获取自定义广播数据 / 广播类的公共属性 / socket $this->getPayloadFromEvent($this->event) ); }(4)追一下这个方法:getPayloadFromEvent($this->event) 发现它会返回 (自定义广播数据 / 广播类的公共属性 / socket)
/** * 获取给定事件的有效负载。 * * @param mixed $event 广播事件类对象 * @return array */ protected function getPayloadFromEvent($event) { // 判断事件类中是否有broadcastWith方法 if (method_exists($event, 'broadcastWith')) { // 返回自定义载核 以及 对象中叫socket的属性值 return array_merge( $event->broadcastWith(), ['socket' => data_get($event, 'socket')] ); } $payload = []; foreach ((new ReflectionClass($event))->getProperties(ReflectionProperty::IS_PUBLIC) as $property) { $payload[$property->getName()] = $this->formatProperty($property->getValue($event)); } unset($payload['broadcastQueue']); // 返回所有公共属性 return $payload; }(5)返回刚才的地方再追一下 $broadcaster->broadcast() 方法,然后向下找到我们要使用的redis驱动类。进入!
/** * 广播给定事件。 * * @param array $channels 广播对应频道名称 * @param string $event 广播事件名称 * @param array $payload 广播数据 * @return void */ public function broadcast(array $channels, $event, array $payload = []) { // 建立redis链接 $connection = $this->redis->connection($this->connection); // 写入redis的数据 : 事件名 、 载核 、 socket $payload = json_encode([ 'event' => $event, 'data' => $payload, 'socket' => Arr::pull($payload, 'socket'), ]); foreach ($this->formatChannels($channels) as $channel) { // 发布广播数据到redis频道中 $connection->publish($channel, $payload); } }至此我们的广播数据已经发布到redis中!
(1) 进入laravel-echo-server包里面,可以在 github 中下载看看。建议要看一下。
|____api | |____http-api.ts | |____index.ts |____channels | |____channel.ts | |____index.ts | |____presence-channel.ts | |____private-channel.ts |____cli | |____cli.ts | |____index.ts |____database | |____database-driver.ts | |____database.ts | |____index.ts | |____redis.ts | |____sqlite.ts |____echo-server.ts |____index.ts |____log.ts |____server.ts |____subscribers | |____http-subscriber.ts | |____index.ts | |____redis-subscriber.ts | |____subscriber.tslaravel-echo-server 目录中主要包含:接口 (api)、频道 (channels)、 数据库 (database)、订阅 (subscribers) 等
(2) 当执行laravel-echo-server start的时候就是从 echo-server.js中的run()方法开始的
/* * L A R A V E L E C H O S E R V E R version 1.6.2 ⚠ Starting server in DEV mode... ✔ Running at localhost on port 6001 ✔ Channels are ready. ✔ Listening for http events... ✔ Listening for redis events... Server ready! * !!!!!!!!!!! 上面控制台输出就是run方法执行过程中输出的。 */ EchoServer.prototype.run = function (options) { var _this = this; return new Promise(function (resolve, reject) { // 判断是否自定义配置,没有则用默认的 _this.options = Object.assign(_this.defaultOptions, options); // 输出到控制台 _this.startup(); // 意思是在host:port上面运行服务,准备通道,监听http事件,监听redis事件 _this.server = new server_1.Server(_this.options); // 服务器初始化完毕。 _this.server.init().then(function (io) { _this.init(io).then(function () { log_1.Log.info('\nServer ready!\n'); resolve(_this); }, function (error) { return log_1.Log.error(error); }); }, function (error) { return log_1.Log.error(error); }); }); };(3) 启动过后就处于监听状态了,我们进入监听函数listen中看一下
EchoServer.prototype.listen = function () { var _this = this; return new Promise(function (resolve, reject) { var subscribePromises = _this.subscribers.map(function (subscriber) { // 这里就是订阅redis ,在回调函数中发送到socket上 return subscriber.subscribe(function (channel, message) { return _this.broadcast(channel, message); }); }); Promise.all(subscribePromises).then(function () { return resolve(); }); }); };(4) 在这里我们发现循环了_this.subscribers属性我们追一下,看看这个里面都存了什么东西
我们追到了 init初始化方法中,发现在往_this.subscribers这个属性中push实例 EchoServer.prototype.init = function (io) { var _this = this; return new Promise(function (resolve, reject) { _this.channel = new channels_1.Channel(io, _this.options); _this.subscribers = []; if (_this.options.subscribers.http) _this.subscribers.push(new subscribers_1.HttpSubscriber(_this.server.express, _this.options)); // 因为我们用的redis驱动,直接看这里 if (_this.options.subscribers.redis) _this.subscribers.push(new subscribers_1.RedisSubscriber(_this.options)); _this.httpApi = new api_1.HttpApi(io, _this.channel, _this.server.express, _this.options.apiOriginAllow); _this.httpApi.init(); _this.onConnect(); _this.listen().then(function () { return resolve(); }, function (err) { return log_1.Log.error(err); }); }); }; 我们继续追代码追过去。来到了\dist\subscribers\redis-subscriber.js中。因为(3)使用了这个实例的subscribe 方法,我们看一下这个方法做了什么! /* 通过订阅以 _this._keyPrefix 为前缀的所有频道获取广播数据, 然后调用外层的回调函数发布到 socket 中。 */ RedisSubscriber.prototype.subscribe = function (callback) { var _this = this; return new Promise(function (resolve, reject) { // 监听pmessage , 回调函数 参2: 订阅的广播频道名称 ; 参3 : 广播数据 _this._redis.on('pmessage', function (subscribed, channel, message) { try { message = JSON.parse(message); if (_this.options.devMode) { log_1.Log.info("Channel: " + channel); // 将广播名 输出到控制台 log_1.Log.info("Event: " + message.event); // 将事件名 输出到控制台 } /* 调用外层回调 function (channel, message) { return _this.broadcast(channel, message); } */ callback(channel.substring(_this._keyPrefix.length), message); } catch (e) { if (_this.options.devMode) { log_1.Log.info("No JSON message"); } } }); // 订阅以 _this._keyPrefix 为前缀的所有频道 _this._redis.psubscribe(_this._keyPrefix + "*", function (err, count) { if (err) { reject('Redis could not subscribe.'); } log_1.Log.success('Listening for redis events...'); resolve(); }); }); };(5)我们看一下 (3)的回调函数中_this.broadcast 方法的作用
EchoServer.prototype.broadcast = function (channel, message) { // 判断是否私有 if (message.socket && this.find(message.socket)) { // 发送给已经授权的人 return this.toOthers(this.find(message.socket), channel, message); } else { // 发送给所有 return this.toAll(channel, message); } };至此laravel-echo-server 订阅redis中的广播数据,并发布到socket就完成了。
(1) 实例化laravel-echo传入配置参数,接收socket服务器的公开频道和监听事件
window.Echo = new Echo({ broadcaster: 'socket.io', host: window.location.hostname + ':6001', auth: { headers: { 'authorization': 'Bearer ' + store.getters.token } } }); window.Echo.channel('public_channel') .listen('RssPublicEvent', (e) => { that.names.push(e.name) });(2) 根据new Echo()—> 进入到laravel-echo包中的echo.ts,找到 constructor 构造方法
/** * 创建一个新的类实例。 */ constructor(options: any) { this.options = options; // 初始化配置参数 this.connect(); if (!this.options.withoutInterceptors) { this.registerInterceptors(); } /* this.connect() 对应代码 /** * 创建一个新的连接。 */ connect(): void { if (this.options.broadcaster == 'pusher') { this.connector = new PusherConnector(this.options); } else if (this.options.broadcaster == 'socket.io') { // 此类创建到Socket.io服务器的连接器 this.connector = new SocketIoConnector(this.options); } else if (this.options.broadcaster == 'null') { this.connector = new NullConnector(this.options); } else if (typeof this.options.broadcaster == 'function') { this.connector = new this.options.broadcaster(this.options); } } */ }(3) 追一下 window.Echo.channel('public_channel'),进入 echo.ts中channel() 方法
/** * 通过名称获取通道实例。 */ channel(channel: string): Channel { // 根据广播名称获取通道实例 return this.connector.channel(channel); } /** * 通过名称获取通道实例。 */ channel(name: string): SocketIoChannel { // 判断属性中是否保存了对应的通道实例 if (!this.channels[name]) { // 获取通道实例并保存在属性中 SocketIoChannel(socket链接,通道名称,配置参数) this.channels[name] = new SocketIoChannel(this.socket, name, this.options); } // 返回通道实例 return this.channels[name]; }(4)追一下 new SocketIoChannel() ,进入到 SocketIoChannel类中看一下构造函数中做了什么事
/** * 此类表示Socket.io通道。 */ export class SocketIoChannel extends Channel { /** * 创建一个新的类实例。 构造函数 */ constructor(socket: any, name: string, options: any) { super(); this.name = name; this.socket = socket; this.options = options; this.eventFormatter = new EventFormatter(this.options.namespace); // 订阅Socket.io频道 this.subscribe(); /* 追 subscribe() 查看以下代码 //订阅Socket.io频道。 subscribe(): void { // 发送事件 this.socket.emit('subscribe', { channel: this.name, auth: this.options.auth || {}, }); } */ this.configureReconnector(); } }(6)在(5)中的实例化SocketIoChannel类时向socket发送了事件名叫subscribe的,我们看看laravel-echo-server怎么接收它的。并做了什么事? ① 在 init() 中有 _this.onConnect(); 进入!
EchoServer.prototype.onConnect = function () { var _this = this; this.server.io.on('connection', function (socket) { _this.onSubscribe(socket); //监听客户端 _this.onUnsubscribe(socket); _this.onDisconnecting(socket); _this.onClientEvent(socket); }); };② 进入 onSubscribe 里面发现有在监听 subscribe
EchoServer.prototype.onSubscribe = function (socket) { var _this = this; socket.on('subscribe', function (data) { _this.channel.join(socket, data); }); };③ 进入 join ,因为这里是公共的 ,所以会走else ,从而建立通道
Channel.prototype.join = function (socket, data) { if (data.channel) { // 如果是私有的 if (this.isPrivate(data.channel)) { this.joinPrivate(socket, data); } else { // 建立通道 socket.join(data.channel); // 将通道名输出到控制台中 this.onJoin(socket, data.channel); } } };从.channel()返回了SocketIoChannel类的实例,而在在实例化SocketIoChannel类的构造函数中发送一个事件到socket里,laravel-echo-server接收然后建立通道(所以这个通道的建立实在这个地方)。
(7)继续向下走我们追一下 .listen() 我们进入 SocketIoChannel类中找到 listen方法
// socketio-connector.ts 在通道实例上监听事件。 listen(event: string, callback: Function): SocketIoChannel { this.on(this.eventFormatter.format(event), callback); return this; } /** * 将通道的套接字绑定到事件并存储回调。 */ on(event: string, callback: Function): void { let listener = (channel, data) => { if (this.name == channel) { callback(data); } }; // 监听通道上的事件 this.socket.on(event, listener); this.bind(event, listener); }至此 laravel-echo 完成了与laravel-echo-server的交互。
(2)实例化Echo因为上面有说,这里直接追private()进入方法中看一下
/** * 通过名称获取私有通道实例. */ private(channel: string): Channel { return this.connector.privateChannel(channel); } /** * 通过名称获取私有通道实例。 */ privateChannel(name: string): SocketIoPrivateChannel { // 这里拼接private-然后判断私有通道是否存在 if (!this.channels['private-' + name]) { // 不存在就实例化一个 this.channels['private-' + name] = new SocketIoPrivateChannel(this.socket, 'private-' + name, this.options); } // 返回私有通道 return this.channels['private-' + name]; }(3)最终也追到了 SocketIoChannel类 ,进入到 SocketIoChannel类中,依旧发送事件
/** * 此类表示Socket.io通道。 */ export class SocketIoChannel extends Channel { /** * 创建一个新的类实例。 构造函数 */ constructor(socket: any, name: string, options: any) { super(); this.name = name; this.socket = socket; this.options = options; this.eventFormatter = new EventFormatter(this.options.namespace); // 订阅Socket.io频道 this.subscribe(); /* 追 subscribe() 查看以下代码 //订阅Socket.io频道。 subscribe(): void { // 发送事件 this.socket.emit('subscribe', { channel: this.name,// 拼接了private- auth: this.options.auth || {}, }); } */ this.configureReconnector(); } }(4)在(3)中的实例化SocketIoChannel类时向socket发送了事件名叫subscribe的,我们看看laravel-echo-server怎么接收它的。并做了什么事? ① 在 init() 中有 _this.onConnect(); 进入!
EchoServer.prototype.onConnect = function () { var _this = this; this.server.io.on('connection', function (socket) { _this.onSubscribe(socket); //监听客户端 _this.onUnsubscribe(socket); _this.onDisconnecting(socket); _this.onClientEvent(socket); }); };② 进入 onSubscribe 里面发现有在监听 subscribe
EchoServer.prototype.onSubscribe = function (socket) { var _this = this; socket.on('subscribe', function (data) { _this.channel.join(socket, data); }); };③ 进入 join ,因为这里是私有的 ,所以会走if
Channel.prototype.join = function (socket, data) { if (data.channel) { // 如果是私有的 if (this.isPrivate(data.channel)) { this.joinPrivate(socket, data); } else { // 建立通道 socket.join(data.channel); // 将通道名输出到控制台中 this.onJoin(socket, data.channel); } } };④ 进入 joinPrivate(); ,这里发送了鉴权请求到laravel项目,鉴权成功则建立通道,相比公共的,这里多了个鉴权操作。
Channel.prototype.joinPrivate = function (socket, data) { var _this = this; // 直接向laravel项目发送鉴权请求 this.private.authenticate(socket, data).then(function (res) { // 鉴权成功则 建立通道 socket.join(data.channel); if (_this.isPresence(data.channel)) { var member = res.channel_data; try { member = JSON.parse(res.channel_data); } catch (e) { } _this.presence.join(socket, data.channel, member); } // 输出到控制台 _this.onJoin(socket, data.channel); }, function (error) { if (_this.options.devMode) { log_1.Log.error(error.reason); } _this.io.sockets.to(socket.id) .emit('subscription_error', data.channel, error.status); }); };