import mqtt from 'mqtt';
import { TopicDispatcher } from './topic-dispatcher';
import { MqttMessageCallback } from './mqtt-message-callback';
import { AuthStore, useAuthStore } from '../auth/AuthStore';
import { makeClientId } from './client-id';

export interface MqttMessageHandle {
  // `qos`: 0, 1 or 2; default: 1
  qos?: 0 | 1 | 2;
  topic: string;
}

export interface MqttMessage extends MqttMessageHandle {
  content: object | string;
}

export type UnsubscribeFunction = () => void;

const DEFAULT_QOS = 1;

class SubscriptionHandle {
  private nSubscribers: number;

  constructor(readonly mqttOnSubscribed: Promise<void>) {
    this.nSubscribers = 1;
  }

  incrementSubscribers() {
    ++this.nSubscribers;
  }

  decrementSubscribers() {
    --this.nSubscribers;
  }

  hasSubscribers() {
    return this.nSubscribers > 0;
  }
}

export class MqttService {
  private readonly topicDispatcher = new TopicDispatcher();
  private readonly subscriptionHandlesByTopic = new Map<
    string,
    SubscriptionHandle
  >();

  constructor(private readonly client: mqtt.MqttClient) {
    this.client.on('connect', this.onMqttConnect.bind(this));
    this.client.on('disconnect', this.onMqttDisconnect.bind(this));
    this.client.on('error', this.onMqttError.bind(this));
    this.client.on('end', this.onMqttEnd.bind(this));
    this.client.on('reconnect', this.onMqttReconnect.bind(this));
    this.client.on('message', this.onMttMessage.bind(this));
  }

  async close() {
    await new Promise<void>((resolve, reject) => {
      this.client.end(true, undefined, (error) => {
        if (error) {
          reject(error);
        } else {
          resolve();
        }
      });
    });
  }

  publish(message: MqttMessage) {
    console.log(`Publishing message into topic: [${message.topic}]`);
    const qos = message.qos ?? DEFAULT_QOS;
    const content = Buffer.isBuffer(message.content)
      ? message.content
      : typeof message.content === 'object'
        ? JSON.stringify(message.content)
        : message.content;

    this.client.publish(
      message.topic,
      content,
      {
        qos: qos,
      },
      (err) => {
        if (err) {
          console.error(`failed to publish message: ${err}`);
        }
      },
    );
  }

  async subscribe(
    handle: MqttMessageHandle,
    messageCallback: MqttMessageCallback,
  ): Promise<UnsubscribeFunction> {
    const topic = handle.topic;
    const qos = handle.qos ?? DEFAULT_QOS;

    const unsubscribeDispatcher = this.topicDispatcher.subscribe(
      topic,
      messageCallback,
    );

    let subscriptionHandle = this.subscriptionHandlesByTopic.get(topic);
    if (subscriptionHandle === undefined) {
      subscriptionHandle = new SubscriptionHandle(
        new Promise((resolve) => {
          this.client.subscribe(topic, { qos: qos }, (err) => {
            if (err) {
              console.error(
                `subscribe error; topic: "${topic}"; error: ${err}`,
              );
              //Do not reject() since mqtt client uses mqtt client auto-resubscribe
            }

            resolve();
          });
        }),
      );

      this.subscriptionHandlesByTopic.set(topic, subscriptionHandle);
    } else {
      subscriptionHandle.incrementSubscribers();
    }

    await subscriptionHandle.mqttOnSubscribed;

    let isUnsubscribed = false;
    return () => {
      if (isUnsubscribed) {
        return;
      }

      isUnsubscribed = true;

      unsubscribeDispatcher();

      if (subscriptionHandle) {
        subscriptionHandle.decrementSubscribers();
        if (!subscriptionHandle.hasSubscribers()) {
          this.client.unsubscribe(topic);
          this.subscriptionHandlesByTopic.delete(topic);
        }
      }
    };
  }

  private onMqttConnect() {
    console.log('mqtt connected');
  }

  private onMqttReconnect() {
    console.log('mqtt reconnected');
  }

  private onMqttDisconnect() {
    console.log('mqtt disconnected');
  }

  private onMqttEnd() {
    console.log('mqtt end');
  }

  private onMqttError(err: Error) {
    console.error(`mqtt error: ${err}`);
  }

  private onMttMessage(topic: string, payload: Buffer) {
    try {
      this.topicDispatcher.dispatch(topic, payload);
    } catch (e) {
      console.error(
        `failed to dispatch mqtt message;  topic: ${topic}, error: ${e};`,
      );
    }
  }
}

function getApiTokenSubject(token: string): string | undefined {
  try {
    const tokenPayloadBase64 = token.split('.')[1];
    if (!tokenPayloadBase64) {
      return;
    }

    const tokenPayloadString = atob(tokenPayloadBase64);
    const tokenPayload = JSON.parse(tokenPayloadString);
    return tokenPayload.sub;
  } catch (e: unknown) {
    console.error('failed to parse token', e);
  }
}

export const mqttService = new Promise<MqttService>((resolve) => {
  let created = false;

  const create = (auth: AuthStore) => {
    if (created) {
      return;
    }

    try {
      if (!auth.isAuthenticated || !auth.token) {
        return;
      }
      const clientId = makeClientId();
      const mqttUsername = getApiTokenSubject(auth.token) ?? clientId;
      resolve(
        new MqttService(
          mqtt.connect(
            (process.env.REACT_APP_MQTT_URL ??
              'wss://mqtt-sit.liftmanager05.io:8084') as string,
            {
              clientId: clientId,
              username: mqttUsername,
              password: auth.token,
              clean: true,
              path: '/mqtt',
            },
          ),
        ),
      );
      created = true;
    } catch (e: unknown) {
      console.error('failed to create mqtt', e);
    }
  };

  useAuthStore.subscribe((state) => {
    create(state);
  });

  create(useAuthStore.getState());
});
