stompjs下载,(不要用npm安装stomp)
下载地址
rabbitmq的连接配置
基于Taro,stomp的RabbitMQ消费者实现
import config
from './config'
import {Stomp
as stompjs
} from './stomp'
import Taro
from '@tarojs/taro'
import { isString
} from 'lodash';
export default class MqConsumer{
serverUrl
=null;
ws
=null;
client
=null;
queue
=null;
onMsg
=null;
count
=0;
t
=null;
MAX=200;
socketOpen
=false;
sendMsgQueue
= []
constructor(queue
,onMsg
){
this.queue
=queue
;
this.onMsg
=onMsg
;
this.serverUrl
= config
.hostname
+":"+config
.port
;
this.connect();
}
send(msg
){
let that
=this;
if(msg
){
if (that
.socketOpen
) {
that
.client
.send(that
.queue
,{},isString(msg
)?msg
:JSON.stringify(msg
));
} else {
that
.sendMsgQueue
.push(msg
)
}
}
}
connect(){
this.socketOpen
= false
let that
=this;
function sendSocketMessage(msg
) {
if(msg
){
if (that
.socketOpen
) {
Taro
.sendSocketMessage({
data
: msg
})
} else {
that
.sendMsgQueue
.push(msg
)
}
}
}
this.ws
= { send
: sendSocketMessage
, onopen
: null, onmessage
: null }
Taro
.connectSocket({
url
: `ws://${this.serverUrl}/ws`
}).then(st
=>{
this.ws
.st
=st
;
})
Taro
.onSocketOpen(function (res
) {
console
.log('WebSocket连接已打开!')
that
.socketOpen
= true
for (var i
= 0; i
< that
.sendMsgQueue
.length
; i
++) {
sendSocketMessage(that
.sendMsgQueue
[i
])
}
that
.sendMsgQueue
= []
that
.ws
.onopen
&& that
.ws
.onopen()
})
Taro
.onSocketMessage(function (res
) {
console
.log('ws消息:')
that
.ws
.onmessage
&& that
.ws
.onmessage(res
)
})
Taro
.onSocketError(function (res
) {
console
.log('ws异常!',res
)
that
.socketOpen
= false
that
.reConnect(that
);
})
Taro
.onSocketClose(function (res
) {
console
.log('ws断开!',res
)
that
.socketOpen
= false
that
.reConnect(that
);
})
this.setClient();
}
setClient(){
stompjs
.setInterval = function () { }
stompjs
.clearInterval = function () { }
this.client
= stompjs
.over(this.ws
);
this.client
.heartbeat
.outgoing
= 0
this.client
.heartbeat
.incoming
= 0
let onConnect = () => {
console
.log('连接MQ成功')
var headers
={};
this.client
.subscribe(this.queue
, this.onMsg
||(function(data
) {
var msg
= data
.body
;
console
.log("MQ消息:" + msg
);
}),headers
);
}
let that
=this;
let _onError =(err
) => {
console
.log("mq异常",err
)
that
.reConnect(that
);
}
let clientInfo
= {
login
: config
.user
,
passcode
: config
.password
,
host
:'/',
}
this.client
.connect(clientInfo
, onConnect
, _onError
)
}
reConnect(that
){
that
.count
++;
console
.log("ws重连...【" + that
.count
+ "】");
if ( that
.ws
.st
.readyState
=== 1) {
clearTimeout(that
.t
);
that
.t
=null;
that
.count
=0;
} else if(that
.count
>= that
.MAX){
alert("重连失败超过设定次数...");
}else {
if (that
.ws
.st
.readyState
=== 3) {
that
.connect();
}
that
.t
&&clearTimeout(that
.t
);
that
.t
= setTimeout(function() {that
.reConnect(that
);}, 1000*10);
}
}
static subscribe_queue(queue
,onMsg
){
return new MqConsumer('/queue/'+queue
,onMsg
);
}
static subscribe_topic(queue
,onMsg
){
return new MqConsumer('/topic/'+queue
,onMsg
);
}
}
总结
1,可以断线重连,一段时间后连接会自动断掉,原因我知道 2,queue使用正常,topic收不到消息不能正常使用,可能是我哪里设置有问题
转载请注明原文地址:https://ipadbbs.8miu.com/read-15433.html