Source: client/protocol/base.js

const { formatRequest, formatError } = require("../../util/format");
const { ERR_CODES, ERR_MSGS } = require("../../util/constants");
const MessageBuffer = require("../../util/buffer");
const logging = require("../../util/logger");

/**
 * Creates an instance of the base client protocol class.
 * This is the class that all other client protocols inherit from.
 */
class JsonRpcClientProtocol {
  /**
   * JsonRpcClientProtocol contructor
   * @param {class} factory Instance of [JsonRpcClientFactory]{@link JsonRpcClientFactory}
   * @param {(1|2)} version JSON-RPC version to make requests with
   * @param {string} delimiter Delimiter to use for message buffer
   * @property {class} factory Instance of [JsonRpcClientFactory]{@link JsonRpcClientFactory}
   * @property {class} connector The socket instance for the client
   * @property {(1|2)} version JSON-RPC version to use
   * @property {string} delimiter Delimiter to use for message buffer
   * @property {number} message_id Current message ID
   * @property {number} serving_message_id Current message ID. Used for external functions to hook into
   * @property {Object} pendingCalls Key value pairs for pending message IDs to promise resolve/reject objects
   * @property {Object.<string|number, JSON>} responseQueue Key value pairs for outstanding message IDs to response object
   * @property {Object} server Server host and port object {host: "x.x.x.x", port: xxxx}
   * @property {class} messageBuffer Instance of [MessageBuffer]{@link MessageBuffer}
   *
   */
  constructor(factory, version, delimiter) {
    if (!(this instanceof JsonRpcClientProtocol)) {
      return new JsonRpcClientProtocol(factory, version, delimiter);
    }
    this.factory = factory;
    this.connector = undefined;
    this.listener = undefined;
    this.delimiter = delimiter;
    this.version = version;
    this.message_id = 1;
    this.serving_message_id = 1;
    this.pendingCalls = {};
    this.responseQueue = {};
    this.server = this.factory.server;
    this._connectionTimeout = undefined;
    this.messageBuffer = new MessageBuffer(this.delimiter);
  }

  /**
   * Set the `connector` attribute for the protocol instance.
   * The connector is essentially the socket or connection instance for the client.
   *
   * @abstract
   *
   */
  setConnector() {
    throw Error("function must be overwritten in subclass");
  }

  /**
   * Make the connection to the server.
   *
   * Calls [setConnector]{@link JsonRpcClientProtocol#setConnector} to establish the client connection.
   *
   * Calls [listen]{@link JsonRpcClientProtocol#listen} if connection was successful, and will resolve the promise.
   *
   * Will retry connection on the `connectionTimeout` interval.
   * Number of connection retries is based on `remainingRetries`.
   *
   * If `null` is set for number of retries, then connections will attempt indefinitely.
   *
   * Will reject the promise if connect or re-connect attempts fail and there are no remaining retries.
   *
   * @returns Promise
   *
   */
  connect() {
    return new Promise((resolve, reject) => this._retryConnection(resolve, reject));
  }

  /**
   *
   * Manage the connection attempts for the client.
   *
   * @param {Promise.resolve} resolve `Promise.resolve` passed from [connect]{@link JsonRpcClientProtocol#connect}
   * @param {Promise.reject} reject `Promise.reject` passed from [connect]{@link JsonRpcClientProtocol#connect}
   *
   * @returns Promise
   *
   * @private
   */
  _retryConnection(resolve, reject) {
    this.setConnector();
    this.connector.connect(this.server);
    this.connector.setEncoding("utf8");
    this.connector.on("connect", () => {
      this.factory.remainingRetries = this.factory.options.retries;
      this.listener = this.connector;
      this.listen();
      resolve(this.server);
    });
    this.connector.on("error", error => this._onConnectionFailed(error, resolve, reject));
    this.connector.on("close", (hadError) => {
      if (!hadError) {
        this.factory.emit("serverDisconnected");
      }
    });
  }

  /**
   *
   * Handle connection attempt errors. Log failures and
   * retry if required.
   *
   *
   * @param {Error} error `node.net` system error (https://nodejs.org/api/errors.html#errors_common_system_errors)
   * @param {Promise.resolve} resolve `Promise.resolve` passed from [connect]{@link JsonRpcClientProtocol#connect}
   * @param {Promise.reject} reject `Promise.reject` passed from [connect]{@link JsonRpcClientProtocol#connect}
   *
   * @returns Promise
   *
   * @private
   */
  _onConnectionFailed(error, resolve, reject) {
    if (this.factory.remainingRetries > 0) {
      this.factory.remainingRetries -= 1;
      logging
        .getLogger()
        .error(
          `Failed to connect. Address [${this.server.host}:${this.server.port}]. Retrying. ${this.factory.remainingRetries} attempts left.`
        );
    } else if (this.factory.remainingRetries === 0) {
      this.factory.pcolInstance = undefined;
      return reject(error);
    } else {
      logging
        .getLogger()
        .error(
          `Failed to connect. Address [${this.server.host}:${this.server.port}]. Retrying.`
        );
    }
    this._connectionTimeout = setTimeout(() => {
      this._retryConnection(resolve, reject);
    }, this.factory.connectionTimeout);
  }

  /**
   * Ends connection to the server.
   *
   * Sets `JsonRpcClientFactory.pcolInstance` to `undefined`
   *
   * Clears the connection timeout
   *
   * @param {function} cb Called when connection is sucessfully closed
   */
  end(cb) {
    clearTimeout(this._connectionTimeout);
    this.factory.pcolInstance = undefined;
    this.connector.end(cb);
  }

  /**
   * Setup `this.listner.on("data")` event to listen for data coming into the client.
   *
   * Pushes received data into `messageBuffer` and calls
   * [_waitForData]{@link JsonRpcClientProtocol#_waitForData}
   */
  listen() {
    this.listener.on("data", (data) => {
      this.messageBuffer.push(data);
      this._waitForData();
    });
  }

  /**
   * Accumulate data while [MessageBuffer.isFinished]{@link MessageBuffer#isFinished} is returning false.
   *
   * If the buffer returns a message it will be passed to [verifyData]{@link JsonRpcClientProtocol#verifyData}
   *
   * @private
   *
   */
  _waitForData() {
    while (!this.messageBuffer.isFinished()) {
      const message = this.messageBuffer.handleData();
      try {
        this.verifyData(message);
      } catch (e) {
        this.gotError(e);
      }
    }
  }

  /**
   * Verify the incoming data returned from the `messageBuffer`
   *
   * Throw an error if its not a valid JSON-RPC object.
   *
   * Call [gotNotification]{@link JsonRpcClientProtocol#gotNotification} if the message a notification.
   *
   * Call [gotBatch]{@link JsonRpcClientProtocol#gotBatch} if the message is a batch request.
   *
   * @param {string} chunk Data to verify
   */
  verifyData(chunk) {
    try {
      // will throw an error if not valid json
      const message = JSON.parse(chunk);
      if (Array.isArray(message)) {
        this.gotBatch(message);
      } else if (!(message === Object(message))) {
        // error out if it cant be parsed
        throw SyntaxError();
      } else if (!("id" in message)) {
        // no id, so assume notification
        this.gotNotification(message);
      } else if (message.error) {
        // got an error back so reject the message
        const { id } = message;
        const { code, data } = message.error;
        const errorMessage = message.error.message;
        this._raiseError(errorMessage, code, id, data);
      } else if ("result" in message) {
        // Got a result, so must be a response
        this.gotResponse(message);
      } else {
        const code = ERR_CODES.unknown;
        const errorMessage = ERR_MSGS.unknown;
        this._raiseError(errorMessage, code, null);
      }
    } catch (e) {
      if (e instanceof SyntaxError) {
        const code = ERR_CODES.parseError;
        const errorMessage = `Unable to parse message: '${chunk}'`;
        this._raiseError(errorMessage, code, null);
      } else {
        throw e;
      }
    }
  }

  /**
   * Called when the received `message` is a notification.
   * Emits an event using `message.method` as the name.
   * The data passed to the event is the `message`.
   *
   * @param {JSON} message A valid JSON-RPC message object
   */
  gotNotification(message) {
    this.factory.emit(message.method, message);
  }

  /**
   * Called when the received message is a batch.
   *
   * Calls [gotNotification]{@link JsonRpcClientProtocol#gotNotification} for every
   * notification in the batch.
   *
   * Calls [gotBatchResponse]{@link JsonRpcClientProtocol#gotBatchResponse} otherwise.
   *
   * @param {JSON[]} message A valid JSON-RPC batch message
   */
  gotBatch(message) {
    // possible batch request
    message.forEach((res) => {
      if (res && res.method && !res.id) {
        this.gotNotification(res);
      }
    });
    this.gotBatchResponse(message);
  }

  /**
   * Called when the received message is a response object from the server.
   *
   * Gets the response using [getResponse]{@link JsonRpcClientProtocol#getResponse}.
   *
   * Resolves the message and removes it from the `responseQueue`. Cleans up any
   * outstanding timers.
   *
   * @param {JSON} message A valid JSON-RPC message object
   */
  gotResponse(message) {
    this.serving_message_id = message.id;
    this.responseQueue[message.id] = message;
    try {
      const response = this.getResponse(message.id);
      this.pendingCalls[message.id].resolve(response);
      delete this.responseQueue[message.id];
      this.factory.cleanUp(message.id);
    } catch (e) {
      if (e instanceof TypeError) {
        // response id likely not in the queue
        logging
          .getLogger()
          .error(
            `Message has no outstanding calls: ${JSON.stringify(message)}`
          );
      }
    }
  }

  /**
   * Get the outstanding request object for the given ID.
   *
   * @param {string|number} id ID of outstanding request
   */
  getResponse(id) {
    return this.responseQueue[id];
  }

  /**
   * Send a message to the server
   *
   * @param {string} request Stringified JSON-RPC message object
   * @param {function=} cb Callback function to be called when message has been sent
   */
  write(request, cb) {
    this.connector.write(request, cb);
  }

  /**
   * Generate a stringified JSON-RPC message object.
   *
   * @param {string} method Name of the method to use in the request
   * @param {Array|JSON} params Params to send
   * @param {boolean=} id If true it will use instances `message_id` for the request id, if false will generate a notification request
   * @example
   * client.message("hello", ["world"]) // returns {"jsonrpc": "2.0", "method": "hello", "params": ["world"], "id": 1}
   * client.message("hello", ["world"], false) // returns {"jsonrpc": "2.0", "method": "hello", "params": ["world"]}
   */
  message(method, params, id = true) {
    const request = formatRequest({
      method,
      params,
      id: id ? this.message_id : undefined,
      version: this.version,
      delimiter: this.delimiter
    });
    if (id) {
      this.message_id += 1;
    }
    return request;
  }

  /**
   * Send a notification to the server.
   *
   * Promise will resolve if the request was sucessfully sent, and reject if
   * there was an error sending the request.
   *
   * @param {string} method Name of the method to use in the notification
   * @param {Array|JSON} params Params to send
   * @return Promise
   * @example
   * client.notify("hello", ["world"])
   */
  notify(method, params) {
    return new Promise((resolve, reject) => {
      const request = this.message(method, params, false);
      try {
        this.write(request, () => {
          resolve(request);
        });
      } catch (e) {
        // this.connector is probably undefined
        reject(e);
      }
    });
  }

  /**
   * Send a request to the server
   *
   * Promise will resolve when a response has been received for the request.
   *
   * Promise will reject if the server responds with an error object, or if
   * the response is not received within the set `requestTimeout`
   *
   * @param {string} method Name of the method to use in the request
   * @param {Array|JSON} params Params to send
   * @returns Promise
   * @example
   * client.send("hello", {"foo": "bar"})
   */
  send(method, params) {
    return new Promise((resolve, reject) => {
      const request = this.message(method, params);
      this.pendingCalls[JSON.parse(request).id] = { resolve, reject };
      try {
        this.write(request);
      } catch (e) {
        // this.connector is probably undefined
        reject(e);
      }
      this._timeoutPendingCalls(JSON.parse(request).id);
    });
  }

  /**
   * Method used to call [message]{@link JsonRpcClientProtocol#message}, [notify]{@link JsonRpcClientProtocol#notify} and [send]{@link JsonRpcClientProtocol#send}
   *
   * @returns Object
   * @example
   * client.request().send("hello", ["world"])
   * client.request().notify("foo")
   * client.request().message("foo", ["bar"])
   */
  request() {
    const self = this;
    return {
      message: this.message.bind(self),
      send: this.send.bind(self),
      notify: this.notify.bind(self)
    };
  }

  /**
   * Used to send a batch request to the server.
   *
   * Recommend using [message]{@link JsonRpcClientProtocol#message} to construct the message objects.
   *
   * Will use the IDs for the requests in the batch in an array as the keys for `pendingCalls`.
   *
   * How a client should associate batch responses to their requests is not in the spec, so this is the solution.
   *
   * @param {Array} requests An array of valid JSON-RPC message objects
   * @returns Promise
   * @example
   * client.batch([client.message("foo", ["bar"]), client.message("hello", [], false)])
   */
  batch(requests) {
    return new Promise((resolve, reject) => {
      const batchIds = [];
      const batchRequests = [];
      for (const request of requests) {
        const json = JSON.parse(request);
        batchRequests.push(json);
        if (json.id) {
          batchIds.push(json.id);
        }
      }
      this.pendingCalls[String(batchIds)] = { resolve, reject };
      const request = JSON.stringify(batchRequests);
      try {
        this.write(request + this.delimiter);
      } catch (e) {
        // this.connector is probably undefined
        reject(e);
      }
      this._timeoutPendingCalls(String(batchIds));
    });
  }

  /**
   * Associate the ids in the batch message to their corresponding `pendingCalls`.
   *
   * Will call [_resolveOrRejectBatch]{@link JsonRpcClientProtocol#_resolveOrRejectBatch} when object is determined.
   *
   * @param {Array} batch Array of valid JSON-RPC message objects
   */
  gotBatchResponse(batch) {
    const batchResponseIds = [];
    batch.forEach((message) => {
      if ("id" in message) {
        batchResponseIds.push(message.id);
      }
    });
    if (batchResponseIds.length === 0) {
      // dont do anything here since its basically an invalid response
      return;
    }
    // find the resolve and reject objects that match the batch request ids
    for (const ids of Object.keys(this.pendingCalls)) {
      const arrays = [JSON.parse(`[${ids}]`), batchResponseIds];
      const difference = arrays.reduce((a, b) => a.filter(c => !b.includes(c)));
      if (difference.length === 0) {
        this.factory.cleanUp(ids);
        this._resolveOrRejectBatch(batch, batchResponseIds);
      }
    }
  }

  /**
   * Returns the batch response.
   *
   * Overwrite if class needs to reformat response in anyway (i.e. in [HttpClientProtocol]{@link HttpClientProtocol})
   *
   * @param {Array} batch  Array of valid JSON-RPC message objects
   */
  getBatchResponse(batch) {
    return batch;
  }

  /**
   * Will reject the request associated with the given ID with a JSON-RPC formated error object.
   *
   * Removes the id from `pendingCalls` and deletes outstanding timeouts.
   *
   * @param {string|number} id ID of the request to timeout
   * @private
   */
  _timeoutPendingCalls(id) {
    this.factory.timeouts[id] = setTimeout(() => {
      this.factory.cleanUp(id);
      try {
        const error = JSON.parse(
          formatError({
            jsonrpc: this.version,
            delimiter: this.delimiter,
            id: typeof id === "string" ? null : id,
            code: ERR_CODES.timeout,
            message: ERR_MSGS.timeout
          })
        );
        this.pendingCalls[id].reject(error);
        delete this.pendingCalls[id];
      } catch (e) {
        if (e instanceof TypeError) {
          logging
            .getLogger()
            .error(`Message has no outstanding calls. ID [${id}]`);
        }
      }
    }, this.factory.requestTimeout);
  }

  /**
   * Resolve or reject the given batch request based on the given batch IDs.
   *
   * @param {Array} batch Valid JSON-RPC batch request
   * @param {string} batchIds Stringified list of batch IDs associated with the given batch
   * @private
   */
  _resolveOrRejectBatch(batch, batchIds) {
    const batchResponse = this.getBatchResponse(batch);
    try {
      const invalidBatches = [];
      batch.forEach((message) => {
        if (message.error) {
          // reject the whole message if there are any errors
          this.pendingCalls[batchIds].reject(batchResponse);
          invalidBatches.push(batchIds);
        }
      });
      if (invalidBatches.length !== 0) {
        invalidBatches.forEach((id) => {
          delete this.pendingCalls[id];
        });
      } else {
        this.pendingCalls[batchIds].resolve(batchResponse);
        delete this.pendingCalls[batchIds];
      }
    } catch (e) {
      if (e instanceof TypeError) {
        // no outstanding calls
        logging
          .getLogger()
          .log(
            `Batch response has no outstanding calls. Response IDs [${batchIds}]`
          );
      }
    }
  }

  /**
   * Throws an Error whos message is a JSON-RPC error object
   *
   * @param {string} message Error message
   * @param {number} code Error code
   * @param {string|number=} id ID for error message object
   * @param {*=} data Optional data to include about the error
   * @throws Error
   * @private
   */
  _raiseError(message, code, id, data) {
    const error = formatError({
      jsonrpc: this.version,
      delimiter: this.delimiter,
      id,
      code,
      message,
      data
    });
    throw new Error(error);
  }

  /**
   * Calls [rejectPendingCalls]{@link JsonRpcClientProtocol#rejectPendingCalls} with error object.
   *
   * If the object cannot be parsed, then an unkown error code is sent with a `null` id.
   *
   * @param {string} error Stringified JSON-RPC error object
   */
  gotError(error) {
    let err;
    try {
      err = JSON.parse(error.message);
    } catch (e) {
      err = JSON.parse(
        formatError({
          jsonrpc: this.version,
          delimiter: this.delimiter,
          id: null,
          code: ERR_CODES.unknown,
          message: JSON.stringify(error, Object.getOwnPropertyNames(error))
        })
      );
    }
    this.rejectPendingCalls(err);
  }

  /**
   * Reject the pending call for the given ID is in the error object.
   *
   * If the error object has a null id, then log the message to the logging
        .getLogger().
   *
   * @param {string} error Stringified JSON-RPC error object
   *
   */
  rejectPendingCalls(error) {
    try {
      this.pendingCalls[error.id].reject(error);
      this.factory.cleanUp(error.id);
    } catch (e) {
      if (e instanceof TypeError) {
        // error object id probably not a pending response
        logging
          .getLogger()
          .error(`Message has no outstanding calls: ${JSON.stringify(error)}`);
      }
    }
  }
}

module.exports = JsonRpcClientProtocol;