Source

client/core/modules/websocket.js

/**
 *
 * ## A simple client WebSocket wrapper
 *
 * **This module is 100% specific and only works in combination with KissJS server.**
 * The websocket wrapper uses [kiss.session](kiss.session.html) to authenticate the user with the server.
 *
 * @namespace
 */
kiss.websocket = {
	// The connection object
	connection: {},

	// Avoids collision in websocket config
	RECONNECTION_SYMBOL: Symbol("reconnection"),

	// State flags
	reconnecting: false,
	closing: false,

	/**
	 * Init the WebSocket connection
	 *
	 * @async
	 * @param {object} config
	 * @param {string} [config.clientMode] - optional client mode to use (insecure/secure/both). Default = "secure"
	 * @param {string} [config.socketHost] - optional socket host to use
	 * @param {string} [config.port] - optional socket non-secure port to use. Default = 80
	 * @param {string} [config.sslPort] - optional socket secure port to use. Default = 443
	 * @param {object} [config.reconnection] - optional reconnection config
	 * @param {boolean} [config.reconnection.enabled=true] - optional enable/disable reconnection
	 * @param {number} [config.reconnection.delay=5000] - optional reconnection delay in ms
	 * @param {number} [config.reconnection.delta=2] - optional reconnection delta to avoid DDOS on server reboot
	 * @param {number} [config.reconnection.maxAttempts=10] - optional max reconnection in a row. Reset each time the connection is established.
	 * @param {object} [config.heartbeat] - optional heartbeat config
	 * @param {boolean} [config.heartbeat.enabled=true] - optional enable/disable heartbeat
	 * @param {number} [config.heartbeat.delay=10000] - optional heartbeat frequency in ms
	 * @param {number} [config.heartbeat.timeout=35000] - optional heartbeat timeout before considering the socket as
	 *                                                    closed. Should be set several time greater than heartbeat delay
	 *                                                    to be sure pings are not answered by the server, because it may be
	 *                                                    busy or may forget to send the pong response.
	 * @param {function} [config.onopen] - Hook to the onopen event
	 * @param {function} [config.onmessage] - Hook to the onmessage event
	 * @param {function} [config.onclose] - Hook to the onclose event
	 * @returns {Promise} Resolves when the connection is established, rejects on connection error
	 * 
	 * @example
	 * await kiss.websocket.init()
	 * kiss.websocket.send("something")
	 *
	 * // More complex case:
	 * await kiss.websocket.init({
	 * 	socketHost: "wss://ws.airprocess.com",
	 *
	 * 	onopen: () => {
	 * 		kiss.websocket.send({
	 * 			type: "SUBSCRIBE",
	 * 			subscriptions: [{
	 * 				event: "AGGREGATED_ORDERBOOK_UPDATE",
	 * 				pairs: ["BTCZAR"]
	 * 			}]
	 * 		})
	 * 	},
	 *
	 * 	onmessage: (message) => {
	 * 		console.log("Message received: ", message)
	 *  },
	 *
	 * 	onclose: () => {
	 * 		console.log("Socket closed!")
	 * 	}
	 * })
	 */
	async init(config = {}) {
		// If a socket is already open, we return immediately as we don't want several connection opened at the same time
		if (
			// Check if the connection have been initialised first
			(
				this.connection.readyState !== undefined &&
				this.connection.readyState !== WebSocket.CLOSED
			) || kiss.websocket.closing ||
			kiss.context.ws === "no"

		) {
			return
		}

		// Race conditions may occur, and they are hard to track down if there is no id
		const logPrefix = `kiss.websocket - ${kiss.tools.uid()}`
		const ws = window.WebSocket || window.MozWebSocket
		let socketUrl

		let {
			clientMode = "secure",
			socketHost,
			port = 80,
			sslPort = 443,
			reconnection: {
				enabled: autoReconnect = true,
				delay: reconnectionDelay = 5000,
				delta: reconnectionDelta = 2,
				maxAttempts = 10
			} = {},
			heartbeat: {
				enabled: heartbeatEnabled = true,
				delay: heartbeatDelay = 10000,
				timeout: heartbeatTimeout = 35000
			} = {}
		} = config

		if (!socketHost) socketHost = kiss.session.getWebsocketHost()
					
		// Connect to WS or WSS depending on the current client mode
		if (clientMode === "secure") {
			socketUrl = `wss://${socketHost}:${sslPort}/?token=`
		}
		else {
			socketUrl = `ws://${socketHost}:${port}/?token=`
		}

		log(`${logPrefix} - Connecting to ${socketUrl}`)
		const connection = new ws(socketUrl + kiss.session.getToken())

		// Will allow us to leverage the current function asynchronicity despite WebSocket callbacks nature
		const connectionResolver = {}

		connectionResolver.promise = new Promise((resolve, reject) => {
			Object.assign(connectionResolver, {
				resolve,
				reject
			})
		})
			.then(() => connectionResolver.succeeded = true)
			.catch(() => connectionResolver.succeeded = false)

		let heartbeatHandle, heartbeatResponseHandle

		//
		// OPEN
		//
		connection.addEventListener("open", e => {

			log(`${logPrefix} - Connected`)
			connectionResolver.resolve()

			if (kiss.websocket.RECONNECTION_SYMBOL in config || kiss.websocket.reconnecting) {
				if (!(kiss.websocket.RECONNECTION_SYMBOL in config)) {
					config[kiss.websocket.RECONNECTION_SYMBOL] = {
						attempt: 0
					}
				} else {
					config[kiss.websocket.RECONNECTION_SYMBOL].attempt = 0
				}

				kiss.pubsub.publish("EVT_RECONNECTED")
			} else {
				kiss.pubsub.publish("EVT_CONNECTED")
			}

			// Hook
			if (typeof config.onopen === "function") {
				try {
					config.onopen()
				} catch (err) {
					log(`${logPrefix} - Could not execute 'onopen' hook properly - Error:`, 4, err)
				}
			}

			if (!heartbeatEnabled) return

			heartbeatHandle = setInterval(() => {
				// We only set the timeout if it is not defined or have been reset.
				// If we are already waiting for a ping response, we just resend it.
				// The server may have been busy or may have missed the first one, we want to be
				// certain that it is not reachable by sending several pings.
				if (!heartbeatResponseHandle) {
					heartbeatResponseHandle = setTimeout(() => {
						connection.close(4000, "SERVER_NOT_RESPONDING")
					}, heartbeatTimeout)
				}

				connection.send("ping")
			}, heartbeatDelay)
		})

		//
		// CLOSE
		//

		connection.addEventListener("close", async e => {
			// We check errors there, since the close event is ALWAYS emitted, even on connection error.
			// In fact, we do not need websocket error handler at all.
			log(`${logPrefix} - Closed ${e.reason} (${e.code})`)

			// We enter closing state. No external code must try a reconnection during this process.
			kiss.websocket.closing = true

			// Do not need the heartbeat anymore
			clearInterval(heartbeatHandle)

			// Hook
			if (typeof config.onclose === "function") {
				try {
					config.onclose()
				} catch (err) {
					log(`${logPrefix} - Could not execute 'onclose' hook properly - Error:`, 4, err)
				}
			}

			if (e.code === 4002) {
				// To many user sockets, we actually WANT to block the UI. And we REALLY don't want to reconnect.
				// Consequence would be to indefinitely loop from disconnection to reconnection across all tabs... not very fun for the user.
				log(`${logPrefix} - Connection locked by server (too many sockets have been opened by this user).`)

				// TODO : since there we want to do something special, EVT_CONNECTION_LOCK may be more accurate ?
				kiss.pubsub.publish("EVT_CONNECTION_LOST")
				return

			} else if (e.code === 4003) {
				// We have a definitive close. We will not try to reconnect.
				log(`${logPrefix} - Connection definitively closed.`)
				kiss.pubsub.publish("EVT_CONNECTION_CLOSED")
				return

			} else if (!e.reason && e.code === 1006) {
				// The websocket specification forbid the client to check the status code.
				// If the socket is rejected because invalid token, it will send the 1006 code. With no message.
				// So... if we want this reconnection algorithm to stop stupidly trying to reconnect with a wrong JWT,
				// we need to force the client to logout and to reauthenticate.
				// @see https://websockets.spec.whatwg.org//#feedback-from-the-protocol

				if (!await kiss.session.checkTokenValidity(true)) {
					log(`${logPrefix} - Can't reconnect. No valid token available, and current token can't be renewed !`)
					kiss.pubsub.publish("EVT_UNUSABLE_TOKEN")
					return kiss.session.logout()
				}
			} else if (e.code === 1001) {
				// TODO : Server goes away (restart/shutdown)
				// we may want to display a maintenance screen, and increase the reconnection time
				log(`${logPrefix} - Server gone.`)
				kiss.pubsub.publish("EVT_SERVER_GONE")

			}

			// Configuring the reconnection process. May or may not be used
			if (!config[kiss.websocket.RECONNECTION_SYMBOL]) {
				config[kiss.websocket.RECONNECTION_SYMBOL] = {
					attempt: 1
				}
			} else {
				config[kiss.websocket.RECONNECTION_SYMBOL].attempt++
			}

			// A real disconnection, since we were connected, so the pub/sub must be informed
			if (connectionResolver.succeeded) {
				kiss.pubsub.publish("EVT_DISCONNECTED")
			} else {
				connectionResolver.reject()
			}

			// Socket closed, from there no matter who tries to reconnect, it's ok.
			kiss.websocket.closing = false

			if (!autoReconnect) return

			if (config[kiss.websocket.RECONNECTION_SYMBOL].attempt <= maxAttempts) {
				let delay = reconnectionDelay || 5000

				// 1006 is the code for abnormal closure
				// 1001 is the code for Going Away
				// If the serveur is brutally stopped or restarted, we don't want all clients
				// to reconnect at the exact same millisecond.
				// It would be a disaster, as it may become an auto DDOS :(. Not very glorious way to die again and again.
				// So we delay randomly each reconnection in a window between the configured reconnection delay
				// and N times the reconnection delay, N being an arbitrary factor called reconnectionDelta
				if ([1001, 1006].includes(e.code) && reconnectionDelta) {
					// Draw a number between 0 and delay * reconnectionDelta
					delay += Math.floor(Math.random() * ((delay * reconnectionDelta) + 1))
				}

				log(`${logPrefix} - Will try to reconnect to the server in ${delay} ms...`)

				setTimeout(async () => {
					// This one is tricky. When the first connection attempt fails because session token must
					// be regenerated, a reconnection procedure starts.
					// But as soon as the token is automatically regenerated by kiss, init() is called a second
					// time, resulting in the opening of two connections as a result of a race condition.
					// If a connection is open, opening or closing, we abort the current call to avoid this.
					if (
						"readyState" in kiss.websocket.connection &&
						kiss.websocket.connection.readyState !== WebSocket.CLOSED
					) {
						log(`${logPrefix} - Reconnection aborted`)
						return
					}

					log(
						`${logPrefix} - Trying to reconnect (attempt ${
							config[kiss.websocket.RECONNECTION_SYMBOL].attempt
						}/${maxAttempts})`
					)

					try {
						// One may want to await the reconnection process to succeed or ack its error, so we carry along the init promise.
						const promise = kiss.websocket.init(config)
						kiss.pubsub.publish(
							"EVT_RECONNECTING",
							promise
						)

						kiss.websocket.reconnecting = true
					} catch (err) {
						kiss.websocket.reconnecting = false
						log(`${logPrefix} - Unable to reconnect: `, 4, err)
					}
				}, delay)
			} else {
				log(`${logPrefix} - Connection lost (max attempts (${maxAttempts}) reached !)`)
				kiss.pubsub.publish("EVT_CONNECTION_LOST")
			}
		})

		//
		// MESSAGE
		//
		connection.addEventListener("message", message => {
			if (message.data === "pong") {
				// Received in time, we have to clean the timeout
				clearTimeout(heartbeatResponseHandle)
				heartbeatResponseHandle = null
				return
			}

			try {
				log(`${logPrefix} - onmessage - Data:`, 1, message.data)

				let json = JSON.parse(message.data)

				// Mark the message as coming from a websocket
				json.websocket = true
				kiss.pubsub.publish(json.channel, json)

				// Hook
				if (typeof config.onmessage === "function") {
					try {
						config.onmessage(message)
					} catch (err) {
						log(`${logPrefix} - Could not execute 'onmessage' hook properly - Error:`, 4, err)
					}
				}
			} catch (err) {
				log(`${logPrefix} - onmessage - Error:`, 4, err)
			}
		})

		kiss.websocket.connection = connection

		return connectionResolver.promise
	},

	close() {
		// Check if the connection have been initialised
		if (typeof this.connection.close === "function") {
			this.connection.close(4003, "DEFINITIVE_CLOSE")
		} else throw new Error("No connection to close: kiss.websocket.init has not been called.")
	},

	/**
	 * Send a message to the server via WebSocket
	 *
	 * @param {object} jsonData - Any valid JSON
	 *
	 * @example
	 * kiss.data.send({
	 *  userId: "john.doe@airprocess.com",
	 *  text: "Hello, how are you?"
	 * })
	 */
	send(jsonData) {
		// Check if the connection have been initialised
		if (typeof this.connection.send === "function") {
			const message = JSON.stringify(jsonData)
			this.connection.send(message)
		} else throw new Error("No connection to opened: kiss.websocket.init has not been called.")
	}
}