import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

type Callback<T> = (message: T) => void;
type ErrorCallback = (err: unknown) => void;

const reconnectingDelayMs = 4000;

export default class SocketClient<T> {
  private readonly url: string;

  private reconnecting?: ReturnType<typeof setTimeout>;

  private readonly handleMessage: (message: T) => void;

  private webSocket?: WebSocketSubject<T>;

  public subscriptions: { [key: string]: {
    callbacks: Callback<any>[],
    errorCallbacks: ErrorCallback[],
    payload: T
  } } = {};

  public errorCallback(key?: string) {
    let keys: string[];

    if (key) {
      keys = this.subscriptions[key] ? [key] : [];
    } else {
      keys = Object.keys(this.subscriptions);
    }
    const errorCallbacks = keys.reduce((callbacks, val) => (
      [...callbacks, ...this.subscriptions[val].errorCallbacks.filter((callback) => (!!callback))]
    ), [] as ErrorCallback[]);

    return (error: any) => {
      errorCallbacks.forEach((callback) => callback(error));
    };
  }

  constructor({
    url,
    handleMessage,
  }: {
    url: string,
    handleMessage: (message: T) => void,
  }) {
    this.url = url;
    this.handleMessage = handleMessage;
  }

  public ping(payload: T) {
    this.webSocket?.next(payload);
  }

  public subscribe(
    key: string,
    payload: T,
    callback: Callback<any>,
    onError?: ErrorCallback,
  ) {
    if (!this.webSocket) {
      this.openSocket();
    }

    if (!this.subscriptions[key]) {
      this.webSocket?.next(payload);
      this.subscriptions[key] = { callbacks: [], errorCallbacks: [], payload };
    }

    const subscriptionIndex = this.subscriptions[key].callbacks.push(callback) - 1;
    if (onError) this.subscriptions[key].errorCallbacks[subscriptionIndex] = onError;
  }

  public unsubscribe(key: string, payload: T, callback: Callback<any>) {
    if (!this.subscriptions[key]) return;

    const subscriptionIndex = this.subscriptions[key]
      .callbacks
      .findIndex((item) => item === callback);

    if (subscriptionIndex === -1) return;

    this.subscriptions[key].callbacks.splice(subscriptionIndex, 1);
    this.subscriptions[key].errorCallbacks.splice(subscriptionIndex, 1);

    if (this.subscriptions[key].callbacks.length !== 0) return;

    delete this.subscriptions[key];
    this.webSocket?.next(payload);

    if (Object.keys(this.subscriptions).length) return;

    this.closeSocket();
  }

  public reconnect() {
    if (this.reconnecting) return;

    this.reconnecting = setTimeout(() => {
      this.closeSocket();
      this.openSocket();
      Object.values(this.subscriptions).forEach(({ payload }) => {
        this.webSocket?.next(payload);
      });
      clearTimeout(this.reconnecting);
      delete this.reconnecting;
    }, reconnectingDelayMs);
  }

  private openSocket() {
    this.webSocket = webSocket(this.url);
    this.webSocket.subscribe({
      next: this.handleMessage,
      error: this.reconnect.bind(this),
    });
  }

  private closeSocket() {
    this.webSocket?.complete();
    this.webSocket = undefined;
  }
}
