123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- var requestLib = require('./request');
- var wxTunnel = require('./wxTunnel');
- /**
- * 当前打开的信道,同一时间只能有一个信道打开
- */
- var currentTunnel = null;
- // 信道状态枚举
- var STATUS_CLOSED = Tunnel.STATUS_CLOSED = 'CLOSED';
- var STATUS_CONNECTING = Tunnel.STATUS_CONNECTING = 'CONNECTING';
- var STATUS_ACTIVE = Tunnel.STATUS_ACTIVE = 'ACTIVE';
- var STATUS_RECONNECTING = Tunnel.STATUS_RECONNECTING = 'RECONNECTING';
- // 错误类型枚举
- var ERR_CONNECT_SERVICE = Tunnel.ERR_CONNECT_SERVICE = 1001;
- var ERR_CONNECT_SOCKET = Tunnel.ERR_CONNECT_SOCKET = 1002;
- var ERR_RECONNECT = Tunnel.ERR_RECONNECT = 2001;
- var ERR_SOCKET_ERROR = Tunnel.ERR_SOCKET_ERROR = 3001;
- // 包类型枚举
- var PACKET_TYPE_MESSAGE = 'message';
- var PACKET_TYPE_PING = 'ping';
- var PACKET_TYPE_PONG = 'pong';
- var PACKET_TYPE_TIMEOUT = 'timeout';
- var PACKET_TYPE_CLOSE = 'close';
- // 断线重连最多尝试 5 次
- var DEFAULT_MAX_RECONNECT_TRY_TIMES = 5;
- // 每次重连前,等待时间的增量值
- var DEFAULT_RECONNECT_TIME_INCREASE = 1000;
- function Tunnel(serviceUrl) {
- if (currentTunnel && currentTunnel.status !== STATUS_CLOSED) {
- throw new Error('当前有未关闭的信道,请先关闭之前的信道,再打开新信道');
- }
- currentTunnel = this;
- // 等确认微信小程序全面支持 ES6 就不用那么麻烦了
- var me = this;
- //=========================================================================
- // 暴露实例状态以及方法
- //=========================================================================
- this.serviceUrl = serviceUrl;
- this.socketUrl = null;
- this.status = null;
- this.open = openConnect;
- this.on = registerEventHandler;
- this.emit = emitMessagePacket;
- this.close = close;
- this.isClosed = isClosed;
- this.isConnecting = isConnecting;
- this.isActive = isActive;
- this.isReconnecting = isReconnecting;
- //=========================================================================
- // 信道状态处理,状态说明:
- // closed - 已关闭
- // connecting - 首次连接
- // active - 当前信道已经在工作
- // reconnecting - 断线重连中
- //=========================================================================
- function isClosed() { return me.status === STATUS_CLOSED; }
- function isConnecting() { return me.status === STATUS_CONNECTING; }
- function isActive() { return me.status === STATUS_ACTIVE; }
- function isReconnecting() { return me.status === STATUS_RECONNECTING; }
- function setStatus(status) {
- var lastStatus = me.status;
- if (lastStatus !== status) {
- me.status = status;
- }
- }
- // 初始为关闭状态
- setStatus(STATUS_CLOSED);
- //=========================================================================
- // 信道事件处理机制
- // 信道事件包括:
- // connect - 连接已建立
- // close - 连接被关闭(包括主动关闭和被动关闭)
- // reconnecting - 开始重连
- // reconnect - 重连成功
- // error - 发生错误,其中包括连接失败、重连失败、解包失败等等
- // [message] - 信道服务器发送过来的其它事件类型,如果事件类型和上面内置的事件类型冲突,将在事件类型前面添加前缀 `@`
- //=========================================================================
- var preservedEventTypes = 'connect,close,reconnecting,reconnect,error'.split(',');
- var eventHandlers = [];
- /**
- * 注册消息处理函数
- * @param {string} messageType 支持内置消息类型("connect"|"close"|"reconnecting"|"reconnect"|"error")以及业务消息类型
- */
- function registerEventHandler(eventType, eventHandler) {
- if (typeof eventHandler === 'function') {
- eventHandlers.push([eventType, eventHandler]);
- }
- }
- /**
- * 派发事件,通知所有处理函数进行处理
- */
- function dispatchEvent(eventType, eventPayload) {
- eventHandlers.forEach(function (handler) {
- var handleType = handler[0];
- var handleFn = handler[1];
- if (handleType === '*') {
- handleFn(eventType, eventPayload);
- } else if (handleType === eventType) {
- handleFn(eventPayload);
- }
- });
- }
- /**
- * 派发事件,事件类型和系统保留冲突的,事件名会自动加上 '@' 前缀
- */
- function dispatchEscapedEvent(eventType, eventPayload) {
- if (preservedEventTypes.indexOf(eventType) > -1) {
- eventType = '@' + eventType;
- }
- dispatchEvent(eventType, eventPayload);
- }
- //=========================================================================
- // 信道连接控制
- //=========================================================================
- var isFirstConnection = true;
- var isOpening = false;
- /**
- * 连接信道服务器,获取 WebSocket 连接地址,获取地址成功后,开始进行 WebSocket 连接
- */
- function openConnect() {
- if (isOpening) return;
- isOpening = true;
- // 只有关闭状态才会重新进入准备中
- setStatus(isFirstConnection ? STATUS_CONNECTING : STATUS_RECONNECTING);
- requestLib.request({
- url: serviceUrl,
- method: 'GET',
- success: function (response) {
- if (+response.statusCode === 200 && response.data && response.data.data.connectUrl) {
- console.log('通知服务端获准备开始连接,并成功取信道通讯地址', response.data.data.connectUrl)
- openSocket(me.socketUrl = response.data.data.connectUrl);
- } else {
- dispatchConnectServiceError(response);
- }
- },
- fail: dispatchConnectServiceError,
- complete: () => isOpening = false,
- });
- function dispatchConnectServiceError(detail) {
- if (isFirstConnection) {
- setStatus(STATUS_CLOSED);
- dispatchEvent('error', {
- code: ERR_CONNECT_SERVICE,
- message: '连接信道服务失败,网络错误或者信道服务没有正确响应',
- detail: detail || null,
- });
- } else {
- startReconnect(detail);
- }
- }
- }
- /**
- * 打开 WebSocket 连接,打开后,注册微信的 Socket 处理方法
- */
- function openSocket(url) {
- wxTunnel.listen({
- onOpen: handleSocketOpen,
- onMessage: handleSocketMessage,
- onClose: handleSocketClose,
- onError: handleSocketError,
- });
- //jacksplwxy:
- //wx.connectSocket({ url: url });
- wx.connectSocket({
- url: url,
- success(){
- console.log('开始尝试信道连接')
- }
- });
- isFirstConnection = false;
- }
- //=========================================================================
- // 处理消息通讯
- //
- // packet - 数据包,序列化形式为 `${type}` 或者 `${type}:${content}`
- // packet.type - 包类型,包括 message, ping, pong, close
- // packet.content? - 当包类型为 message 的时候,会附带 message 数据
- //
- // message - 消息体,会使用 JSON 序列化后作为 packet.content
- // message.type - 消息类型,表示业务消息类型
- // message.content? - 消息实体,可以为任意类型,表示消息的附带数据,也可以为空
- //
- // 数据包示例:
- // - 'ping' 表示 Ping 数据包
- // - 'message:{"type":"speak","content":"hello"}' 表示一个打招呼的数据包
- //=========================================================================
- // 连接还没成功建立的时候,需要发送的包会先存放到队列里
- var queuedPackets = [];
- /**
- * WebSocket 打开之后,更新状态,同时发送所有遗留的数据包
- */
- function handleSocketOpen() {
- /* istanbul ignore else */
- if (isConnecting()) {
- dispatchEvent('connect');
- console.log('监听到信道连接成功')
- }
- else if (isReconnecting()) {
- dispatchEvent('reconnect');
- resetReconnectionContext();
- }
- setStatus(STATUS_ACTIVE);
- emitQueuedPackets();
- nextPing();
- }
- /**
- * 收到 WebSocket 数据包,交给处理函数
- */
- function handleSocketMessage(message) {
- resolvePacket(message.data);
- }
- /**
- * 发送数据包,如果信道没有激活,将先存放队列
- */
- function emitPacket(packet) {
- if (isActive()) {
- sendPacket(packet);
- } else {
- queuedPackets.push(packet);
- }
- }
- /**
- * 数据包推送到信道
- */
- function sendPacket(packet) {
- var encodedPacket = [packet.type];
- if (packet.content) {
- encodedPacket.push(JSON.stringify(packet.content));
- }
- wx.sendSocketMessage({
- data: encodedPacket.join(':'),
- fail: handleSocketError,
- });
- }
- function emitQueuedPackets() {
- queuedPackets.forEach(emitPacket);
- // empty queued packets
- queuedPackets.length = 0;
- }
- /**
- * 发送消息包
- */
- function emitMessagePacket(messageType, messageContent) {
- var packet = {
- type: PACKET_TYPE_MESSAGE,
- content: {
- type: messageType,
- content: messageContent,
- },
- };
- emitPacket(packet);
- }
- /**
- * 发送 Ping 包
- */
- function emitPingPacket() {
- emitPacket({ type: PACKET_TYPE_PING });
- }
- /**
- * 发送关闭包
- */
- function emitClosePacket() {
- emitPacket({ type: PACKET_TYPE_CLOSE });
- }
- /**
- * 解析并处理从信道接收到的包
- */
- function resolvePacket(raw) {
- var packetParts = raw.split(':');
- var packetType = packetParts.shift();
- var packetContent = packetParts.join(':') || null;
- var packet = { type: packetType };
- if (packetContent) {
- try {
- packet.content = JSON.parse(packetContent);
- } catch (e) { }
- }
- switch (packet.type) {
- case PACKET_TYPE_MESSAGE:
- handleMessagePacket(packet);
- break;
- case PACKET_TYPE_PONG:
- handlePongPacket(packet);
- break;
- case PACKET_TYPE_TIMEOUT:
- handleTimeoutPacket(packet);
- break;
- case PACKET_TYPE_CLOSE:
- handleClosePacket(packet);
- break;
- default:
- handleUnknownPacket(packet);
- break;
- }
- }
- /**
- * 收到消息包,直接 dispatch 给处理函数
- */
- function handleMessagePacket(packet) {
- var message = packet.content;
- dispatchEscapedEvent(message.type, message.content);
- }
- //=========================================================================
- // 心跳、断开与重连处理
- //=========================================================================
- /**
- * Ping-Pong 心跳检测超时控制,这个值有两个作用:
- * 1. 表示收到服务器的 Pong 相应之后,过多久再发下一次 Ping
- * 2. 如果 Ping 发送之后,超过这个时间还没收到 Pong,断开与服务器的连接
- * 该值将在与信道服务器建立连接后被更新
- */
- let pingPongTimeout = 15000;
- let pingTimer = 0;
- let pongTimer = 0;
- /**
- * 信道服务器返回 Ping-Pong 控制超时时间
- */
- function handleTimeoutPacket(packet) {
- var timeout = packet.content * 1000;
- /* istanbul ignore else */
- if (!isNaN(timeout)) {
- pingPongTimeout = timeout;
- ping();
- }
- }
- /**
- * 收到服务器 Pong 响应,定时发送下一个 Ping
- */
- function handlePongPacket(packet) {
- nextPing();
- }
- /**
- * 发送下一个 Ping 包
- */
- function nextPing() {
- clearTimeout(pingTimer);
- clearTimeout(pongTimer);
- pingTimer = setTimeout(ping, pingPongTimeout);
- }
- /**
- * 发送 Ping,等待 Pong
- */
- function ping() {
- /* istanbul ignore else */
- if (isActive()) {
- emitPingPacket();
- // 超时没有响应,关闭信道
- pongTimer = setTimeout(handlePongTimeout, pingPongTimeout);
- }
- }
- /**
- * Pong 超时没有响应,信道可能已经不可用,需要断开重连
- */
- function handlePongTimeout() {
- startReconnect('服务器已失去响应');
- }
- // 已经重连失败的次数
- var reconnectTryTimes = 0;
- // 最多允许失败次数
- var maxReconnectTryTimes = Tunnel.MAX_RECONNECT_TRY_TIMES || DEFAULT_MAX_RECONNECT_TRY_TIMES;
- // 重连前等待的时间
- var waitBeforeReconnect = 0;
- // 重连前等待时间增量
- var reconnectTimeIncrease = Tunnel.RECONNECT_TIME_INCREASE || DEFAULT_RECONNECT_TIME_INCREASE;
- var reconnectTimer = 0;
- function startReconnect(lastError) {
- if (reconnectTryTimes >= maxReconnectTryTimes) {
- close();
- dispatchEvent('error', {
- code: ERR_RECONNECT,
- message: '重连失败',
- detail: lastError,
- });
- }
- else {
- wx.closeSocket();
- waitBeforeReconnect += reconnectTimeIncrease;
- setStatus(STATUS_RECONNECTING);
- reconnectTimer = setTimeout(doReconnect, waitBeforeReconnect);
- }
- if (reconnectTryTimes === 0) {
- dispatchEvent('reconnecting');
- }
- reconnectTryTimes += 1;
- }
- function doReconnect() {
- openConnect();
- }
- function resetReconnectionContext() {
- reconnectTryTimes = 0;
- waitBeforeReconnect = 0;
- }
- /**
- * 收到服务器的关闭请求
- */
- function handleClosePacket(packet) {
- close();
- }
- function handleUnknownPacket(packet) {
- // throw away
- }
- var isClosing = false;
- /**
- * 收到 WebSocket 断开的消息,处理断开逻辑
- */
- function handleSocketClose() {
- /* istanbul ignore if */
- if (isClosing) return;
- /* istanbul ignore else */
- if (isActive()) {
- // 意外断开的情况,进行重连
- startReconnect('链接已断开');
- }
- }
- function close() {
- isClosing = true;
- closeSocket();
- setStatus(STATUS_CLOSED);
- resetReconnectionContext();
- isFirstConnection = false;
- clearTimeout(pingTimer);
- clearTimeout(pongTimer);
- clearTimeout(reconnectTimer);
- dispatchEvent('close');
- isClosing = false;
- }
- function closeSocket(emitClose) {
- if (isActive() && emitClose !== false) {
- emitClosePacket();
- }
- wx.closeSocket();
- }
- //=========================================================================
- // 错误处理
- //=========================================================================
- /**
- * 错误处理
- */
- function handleSocketError(detail) {
- switch (me.status) {
- case Tunnel.STATUS_CONNECTING:
- dispatchEvent('error', {
- code: ERR_SOCKET_ERROR,
- message: '连接信道失败,网络错误或者信道服务不可用',
- detail: detail,
- });
- break;
- }
- }
- }
- module.exports = Tunnel;
|