Source: server/index.js

const EventEmitter = require("events");
const { formatResponse } = require("../util/format");
const JsonRpcServerProtocol = require("./protocol/base");

/**
 * Creates an instance of JsonRpcServerFactory
 * @extends events
 */
class JsonRpcServerFactory extends EventEmitter {
  /**
   * @param {object} options
   * @param {Object} [options.host] Host IP to open server with
   * @param {Object} [options.port] Host port to open server with
   * @param {Number} [options.version=2] JSON-RPC version to use (1|2)
   * @param {String} [options.delimiter="\n"] Delimiter to use for [JsonRpcServerProtocol]{@link JsonRpcServerProtocol}
   * @param {Boolean} [options.exlusive=false] disallow port sharing
   * @property {object} methods Key value pairs of server method to function call
   * @property {array} clients List of client connections which are instances of [JsonRpcServerProtocol]{@link JsonRpcServerProtocol}
   * @property {boolean} listening  Inidicates if the server is currently listening
   * @property {Object} protocol  Instance of [JsonRpcServerProtocol]{@link JsonRpcServerProtocol} to use for client connections
   */
  constructor(options) {
    super();
    if (!(this instanceof JsonRpcServerFactory)) {
      return new JsonRpcServerFactory(options);
    }

    const defaults = {
      host: "127.0.0.1",
      port: 8100,
      exclusive: false,
      version: 2,
      delimiter: "\n"
    };

    this.options = {
      ...defaults,
      ...(options || {})
    };

    this.methods = {};
    this.clients = [];
    this.listening = false;
    this.protocol = JsonRpcServerProtocol;
  }

  /**
   * Start listening for client connections to server.
   *
   * Calls [setServer]{@link JsonRpcServerFactory#setServer} and [buildProtocol]{@link JsonRpcServerFactory#buildProtocol}.
   *
   * Establishes `error` and `close` listeners.
   *
   * @returns {Promise} Resolves host and port address for server.
   */
  listen() {
    return new Promise((resolve, reject) => {
      if (this.listening) {
        // not having this caused MaxEventListeners error
        return reject(Error("server already listening"));
      }
      const { host, port, exclusive } = this.options;
      this.setServer();
      this.server.listen({ host, port, exclusive });
      this._setupListeners(reject);
      this.server.on("listening", () => {
        this.listening = true;
        this.buildProtocol();
        resolve({
          host: this.server.address().address,
          port: this.server.address().port
        });
      });
    });
  }

  /**
   * Establishes the client connection using the protocol instance
   * and adds the newly connected client to `this.clients`.
   *
   * Registers the event to call `clientDisconnected` when a client a closes the connection
   *
   * @abstract
   * @example
   * const pcol = JsonRpcServerProtocol()
   * pcol.clientConnected()
   * this.clients.push(pcol)
   */
  buildProtocol() {
    throw new Error("function must be overwritten in subclass");
  }

  /**
   * Set the `server` property for the server factory
   * @abstract
   * @example
   * this.server = new net.Server()
   */
  setServer() {
    throw new Error("function must be overwritten in subclass");
  }

  /**
   * Setup the `error` and `close` events for the factory and server.
   *
   * Calls [JsonRpcServerFactory]{@link JsonRpcServerFactory#_removeClients} when server closes
   * and sets `listening` to false
   *
   * @param {function} cb Callback to invoke if server receives an `error` event
   * @private
   */
  _setupListeners(cb) {
    this.server.on("error", cb);
    this.server.on("close", () => {
      this.listening = false;
      this._removeClients();
    });
  }

  /**
   * Closes the server connection and kicks all connected clients.
   *
   * Sets `listening` property to `false`.
   *
   * @returns {Promise} Will reject if any error was present
   */
  close() {
    this.listening = false;
    return new Promise((resolve, reject) => {
      this._removeClients();
      this.server.close((error) => {
        if (error) {
          reject(error);
        }
        resolve();
      });
    });
  }

  /**
   * Kicks all connected clients.
   *
   * Removes all entries from `this.clients`
   *
   * @private
   */
  _removeClients() {
    for (const pcol of this.clients) {
      pcol.client.destroy();
    }
    this.clients = [];
  }

  /**
   * Register a method and associated function with the server.
   *
   * The function will be called when a client makes a request to this method.
   *
   * @param {string} name Name of method
   * @param {function} cb Function to call when client makes request to method
   */
  method(name, cb) {
    this.methods[name] = cb;
  }

  /**
   * Call function when notification with event name comes in.
   *
   * @param {string} method Method name to listen for notification
   * @param {function} cb Name of callback function fired when method event comes in
   *
   * @example
   * function world(){
   *  return 'foo'
   * }
   * server.onNotify("hello", world)
   */
  onNotify(method, cb) {
    this.on(method, cb);
  }

  /**
   * Remove function name from listening for notifications.
   *
   * @param {string} method Method name to remove
   * @param {function} cb Name of the callback function to remove
   *
   * @example
   * function world(){
   *  return 'foo'
   * }
   * server.removeOnNotify("hello", world)
   */
  removeOnNotify(method, cb) {
    this.removeListener(method, cb);
  }

  /**
   * Remove all functions listening for notification.
   *
   * @param {string} method Method name to remove events for
   */
  removeAllOnNotify(method) {
    this.removeAllListeners(method);
  }

  /**
   * @param {Array.<Array.<string, Array|object>>} notifications Array of notifications
   * @returns {boolean[]|Error[]} Returns list of error objects if there was an error sending to any client.
   * Returns true if the entire data was sent successfully
   * Returns false if all or part of the data was not sent to the client.
   *
   * @example
   * server.notify([
   *    ["hello", ["world"]],
   *    ["foo", {"bar": "baz"}]
   * ])
   */
  notify(notifications) {
    if (notifications.length === 0 || !Array.isArray(notifications)) {
      throw new Error("Invalid arguments");
    }
    const responses = this._getNotificationResponses(notifications);
    let response;
    if (responses.length === 1) {
      response = formatResponse(responses[0]);
    } else {
      // batch notification responses
      response = "[";
      responses.forEach((res, idx) => {
        response += formatResponse(res);
        response += idx === responses.length - 1 ? "" : ",";
      });
      response += "]";
      response = JSON.stringify(JSON.parse(response)) + this.options.delimiter;
    }
    if (this.clients.length === 0) {
      return [Error("No clients connected")];
    }
    return this.clients.map((pcol) => {
      try {
        return this.sendNotification(pcol.client, response);
      } catch (e) {
        // possibly client disconnected
        return e;
      }
    });
  }

  /**
   * Send notification to client
   *
   * @param {class} client Client instance
   * @param {string} response Stringified JSON-RPC message to sent to client
   * @throws Will throw an error if client is not defined
   */
  sendNotification(client, response) {
    return client.write(response);
  }

  /**
   * Generate objects for notifications to send to client
   *
   * @param {Array.<string, Array>} notifications Array of notifications to send to client.
   * @returns {JSON} Returns a valid JSON-RPC response object
   * @private
   */
  _getNotificationResponses(notifications) {
    return notifications.map(([method, params]) => {
      if ((!method && !params) || (!method && params)) {
        throw new Error("Unable to generate a response object");
      }
      const response = {
        method,
        params,
        delimiter: this.options.delimiter
      };
      if (this.options.version === 2) {
        response.jsonrpc = "2.0";
      }
      return response;
    });
  }

  /**
   * Called when client receives a `connection` event.
   *
   * @param {JsonRpcServerProtocol} pcol A {@link JsonRpcServerProtocol} instance
   * @returns {JsonRpcServerProtocol.client} Returns a client for a given `JsonRpcServerProtocol` instance
   */
  clientConnected(pcol) {
    return pcol.client;
  }

  /**
   * Called when client disconnects from server.
   *
   * If overwriting, its recommended to call {@link JsonRpcServerFactory._removeFromArray} manually
   * to ensure `this.clients` is cleaned up
   *
   * Calls `this._removeFromArray` and removes disconnected client from `this.clients` list
   *
   * @param {JsonRpcServerProtocol} pcol A {@link JsonRpcServerProtocol} instance
   * @returns {object|error} Returns an object of {host, port} for the given protocol instance, or {error}
   * if there was an error retrieving the client
   */
  clientDisconnected(pcol) {
    return this._removeFromArray(pcol, this.clients);
  }

  /**
   * Removes item from the given list
   *
   * @param {*} item Any item in the list
   * @param {array} array The array to remove the item from
   * @returns {*|error} Returns the item that was removed from the array or {error}
   *
   */
  _removeFromArray(item, array) {
    const itemIndex = array.findIndex(c => c === item);
    if (itemIndex === -1) {
      return {
        error: `Unable to remove ${JSON.stringify(item)}`
      };
    }
    const [removedItem] = array.splice(itemIndex, 1);
    return removedItem;
  }

  /**
   * Returns a list of all server class methods which start with 'handle'.
   *
   * @param {object} toCheck The object get prototype function names from
   * @private
   */
  _getAllFuncs(toCheck) {
    return Object.getOwnPropertyNames(Object.getPrototypeOf(toCheck)).filter(
      (e, i, arr) => {
        if (
          e !== arr[i + 1]
          && typeof toCheck[e] === "function"
          && e.startsWith("handle")
        ) {
          return true;
        }
        return false;
      }
    );
  }
}

module.exports = JsonRpcServerFactory;

/**
 * HTTP server constructor
 * @type HttpServerFactory
 * @static
 */
JsonRpcServerFactory.http = require("./http");

/**
 * TCP server constructor
 * @type TcpServerFactory
 * @static
 */
JsonRpcServerFactory.tcp = require("./tcp");

/**
 * WS server constructor
 * @type WsServerFactory
 * @static
 */
JsonRpcServerFactory.ws = require("./ws");