import * as mqtt from 'mqtt';
import { MqttClient } from 'mqtt';
import { Observable } from 'rxjs';
import { share, filter, map } from 'rxjs/operators';
import { tokenDecoder } from '../utils/tokenDecoder';

export class MqttService {
    private client: MqttClient | null = null;
    private topicObservables: { [topic: string]: Observable<any> } = {};

    constructor(private url: string, private tenant: string, private storage: Storage) { }

    onMessages(topic: string): Observable<any> {
        if (this.topicObservables[topic] === undefined) {
            this.topicObservables[topic] = this.createMessageObservableForTopic(topic);
        }

        return this.topicObservables[topic];
    }

    private createMessageObservableForTopic(topic: string): Observable<any> {
        return new Observable(subscriber => {
            const client = this.getClient();
            client.subscribe(topic, { qos: 1 });

            const subscription = this.messagesObservable
                .pipe(
                    filter(([receivedTopic]) => topic === receivedTopic),
                    map(([topic, message]) => message)
                ).subscribe(subscriber);

            return () => {
                subscription.unsubscribe();
                client.unsubscribe(topic);
            }
        }).pipe(share());
    }

    private messagesObservable = new Observable<[string, any]>(subscriber => {
        const client = this.getClient();

        client.on('message', (topic, message) => {
            subscriber.next([topic, message.toString()]);
        });

        return () => this.closeClient();
    }).pipe(share())

    private getClient(): MqttClient {
        if (this.client === null) {
            const token = this.storage.getItem('auth_token')!;
            this.client = mqtt.connect(this.url, {
                clientId: tokenDecoder(token).userId,
                clean: false,
                username: `${this.tenant}:auth_token`,
                password: token,
                reconnectPeriod: 10 * 1000,
            });
        }

        return this.client;
    }

    private closeClient(): void {
        if (this.client !== null) {
            this.client.end();
            this.client = null;
        }
    }
}