import { connect as connectMqtt, Client, QoS, MqttClient } from 'mqtt';
import { BehaviorSubject, Observable, Subject } from 'rxjs';
import { map } from 'rxjs/operators';
import { MqttQos } from './mqtt-qos';
import { MqttStorage } from './persistence/mqtt-storage';
import { sleep } from '@weavix/utils/src/sleep';
import { MqttTransformationType } from './mqtt-transformation-type';
import { MqttStore } from './persistence/mqtt-store';

export interface TopicPayload<T> {
    topic: string; // full topic string
    replacements: string[]; // Regex group matches (replacements[0] is full topic string, replacements[1] is first wildcard, replacements[2] is second wildcard, etc.)
    payload: T;
    dup: boolean;
}

export interface TopicSubscription<T> {
    observable: ObservableSubscribe<TopicPayload<T>>;
    qos: MqttQos;
    resolve?: Function;
    reject?: Function;
}

export type ObservableSubscribe<T> = Observable<T> & { subscribed: Promise<boolean> };

const REFRESH_TIME = 1800000; // Subscriptions error out and the client starts a fresh connection after this long
const timeout = async () => {
    const error = new Error('timeout');
    await sleep(10000);
    throw error;
}

export type MqttServiceOptions = {
    url: string,
    ssl?: boolean,
    timeout?: number,
    port?: number,
    source: string, // tells what kind of mqtt client is connecting for communication reports
    token?: string,
    platform?: string,
    storage: MqttStorage,
    messageIdProvider?: any
}

interface Packet {
    topic: string;
    qos: QoS | -1;
    date: number;
}

export class MqttService {
    private queued: Array<Packet> = [];
    private transit: Array<Packet> = [];

    private working = false;

    private client: Client;
    private clientResolve;
    private clientPromise = new Promise<Client>(resolve => this.clientResolve = resolve);

    private subscriptions: { [topic: string]: TopicSubscription<any> } = {};
    public message$: Subject<{ topic: string; payload: Buffer; dup: boolean; }> = new Subject();

    public connected$ = new BehaviorSubject<boolean>(false);
    public socket$ = new BehaviorSubject<any>(null);
    private storage: MqttStore;
    private lastConnected: number;
    private lastSubscriptionRefresh: number;
    private refreshingSubscriptions = false;
    public clientId: string;
    private clientPrefix = `${this.options.platform}_${Math.random()}`;
    private connectCount = 0;
    private randomlyReconnecting = false;

    public constructor(private options: MqttServiceOptions) {
        this.storage = new MqttStore('outgoing', options.storage);
        if (!options.platform) options.platform = 'unknown';
    }

    async randomlyReconnect() {
        if (this.randomlyReconnecting) return;
        this.randomlyReconnecting = true;
        while (this.randomlyReconnecting) {
            await sleep(500 + Math.random() * 5000);
            this.client?.end(true);
        }
    }

    /**
     * Connects/reconnects to the mqtt server.
     * This should be called whenever a user logs in.
     *
     * @param {string} token The bearer token for authentication
     * @memberof MqttService
     */
    connect() {
        this.clientId = `${this.clientPrefix}_${this.connectCount++}`;
        this.disconnect();
        this.newConnection();
    }

    reconnect() {
        // if we are disconnected, we will retry connection on our own
        if (!this.client || !this.connected$.value) return;
        console.log('MQTT: force reconnect');
        const client = this.client;
        this.client?.end(true);
        setTimeout(() => {
            if (client === this.client) this.newConnection();
        }, 100);
    }

    private newConnection() {
        console.log(`MQTT: options: ${this.options.url} ${this.options.port} ${this.clientId}`);
        if (this.checkForStaleConnection()) {
            return;
        }

        const port = Number(this.options.port ?? '443');
        const client = connectMqtt({
            hostname: this.options.url,
            port,
            path: `/mqtt?source=${this.options.source}`,
            protocol: (port === 443 || this.options.ssl) ? 'wss' : 'ws',
            clean: false,
            keepalive: this.options.timeout ?? 240,
            clientId: this.clientId,
            username: '',
            password: this.options.token ?? '',
            reconnectPeriod: 0,
            messageIdProvider: this.options.messageIdProvider
        });
        this.socket$.next((client as any).stream.socket);

        ['connect', 'disconnect', 'error', 'close', 'end', 'offline', 'outgoingEmpty'].forEach(evt => {
            client.on(evt, (...args: any[]) => console.log(`MQTT: ${evt}`, ...args));
        });
        client.on('reconnect', () => console.log(`MQTT: reconnect`, client.options));
        client.on('packetreceive', () => {
            if (this.client !== client) return;

            // might receive erroneous packets from sleeping / waking computer, so catch that here
            if (this.checkForStaleConnection()) {
                return;
            }

            // Using packet receive since if there are no publish/subscribe messages, this gets ping acks
            this.lastConnected = new Date().getTime();
        });

        client.on('message', (topic: string, payload: Buffer, packet) => {
            console.log(`MQTT: Received message ${topic} ${payload.length}`);
            this.message$.next({ topic, payload, dup: packet.dup });
        });

        let refreshTimeout;
        const refreshSubscriptions = () => {
            if(client !== this.client) return;

            Object.keys(this.subscriptions).forEach(topic => {
                this.queued.push({ topic, qos: this.subscriptions[topic].qos, date: new Date().getTime() });
            });
            this.refreshingSubscriptions = true;
            this.work();
            refreshTimeout = setTimeout(refreshSubscriptions, REFRESH_TIME);
        };

        client.on('close', () => {
            console.log('MQTT: disconnected');
            this.connected$.next(false);
            if (refreshTimeout) clearTimeout(refreshTimeout);
            setTimeout(() => {
                console.log(`MQTT: reconnecting ${this.client === client}`);
                if (this.client === client) this.newConnection();
            }, 1000);
        });
        client.on('connect', () => {
            console.log(`MQTT: connected`);
            this.connected$.next(true);
            this.reset();
            if (!this.lastSubscriptionRefresh) this.lastSubscriptionRefresh = new Date().getTime();
            const timeSinceRefresh = new Date().getTime() - this.lastSubscriptionRefresh;
            refreshTimeout = setTimeout(refreshSubscriptions, Math.max(1, REFRESH_TIME - timeSinceRefresh));
        });

        this.client = client;
        if (this.clientResolve) this.clientResolve(client);
        this.clientPromise = new Promise(resolve => this.clientResolve = resolve)

        console.log('MQTT: done initializing');
    }

    private checkForStaleConnection() {
        if (this.lastConnected && new Date().getTime() - this.lastConnected > REFRESH_TIME
            || this.lastSubscriptionRefresh && new Date().getTime() - this.lastSubscriptionRefresh > 2 * REFRESH_TIME - 5 * 60_000) {
            this.connect();
            return true;
        }
        return false;
    }

    /**
     * Publishes data to the given topic
     *
     * @param {string} topic The resolved topic (mqtt wildcard characters should not be present)
     * @param {unknown} data The payload
     * @param {MqttQos} [qos=1] The delivery guarantee
     * @memberof MqttService
     */
    publish(topic: string, data: unknown, qos: MqttQos = 1, expirationMs?: number) {
        const stringified = typeof data === 'string' ? data : JSON.stringify(data || {});
        return this.publishRaw(topic, stringified, qos, expirationMs);
    }

    /**
     * Publishes data to the given topic
     *
     * @param {string} topic The resolved topic (mqtt wildcard characters should not be present)
     * @param {unknown} data The payload
     * @param {MqttQos} [qos=1] The delivery guarantee
     * @memberof MqttService
     */
     async publishRaw(topic: string, data: string | Buffer, qos: MqttQos = 1, expirationMs?: number) {
        console.log(`MQTT: Publishing message ${topic}`, data.length);
        const messageId = `${Math.random()}`.substring(2);
        const packet = { messageId, topic, qos, payload: data };
        if (qos > 0) this.storage.put(packet as any, expirationMs);
        return this.publishImpl(messageId, topic, data, qos);
    }

    private async publishImpl(messageId: string, topic: string, data: string | Buffer, qos: MqttQos = 1, dup = false) {
        if (!this.connected$.value) return;

        const client = await this.waitForClient();
        const promise = new Promise((resolve, reject) => {
            client.publish(topic, data, { qos: Math.abs(qos) as any, dup }, (err) => {
                if (err) {
                    reject(err);
                    console.error(err);
                    return;
                }
                if (qos > 0) this.storage.del({ messageId } as any);
                resolve(null);
            });
        });
        await Promise.race([timeout(), promise]);
    }

    /**
     * Subscribes to the given topic.
     *
     * @template T The type of payload that will be delivered on this topic
     * @param {string} topic The resolved topic (mqtt wildcard characters should not be present)
     * @param {QoS} [qos=1] The receipt guarantee
     * @return {*}  {ObservableSubscribe<T>}
     * @memberof MqttService
     */
    subscribe<T>(topic: string, qos: QoS = 1, transformation: MqttTransformationType = MqttTransformationType.json): ObservableSubscribe<T> {
        const obs = this.subscribeWithTopic<T>(topic, qos, transformation);
        const piped: Observable<T> = obs.pipe(map(x => x.payload));
        const pipedWithSubscribe = Object.assign(piped, {
            subscribed: obs.subscribed
        });
        return pipedWithSubscribe;
    }


    /**
     * Subscribes to the given topic.
     * Use this endpoint when you need metadata about the topic returned in
     * addition to the payload data.
     *
     * @template T The type of payload that will be delivered on this topic
     * @param {string} topic The resolved topic (mqtt wildcard characters should not be present)
     * @param {QoS} [qos=1] The receipt guarantee
     * @return {*}  {ObservableSubscribe<TopicPayload<T>>}
     * @memberof MqttService
     */
    subscribeWithTopic<T>(topic: string, qos: QoS = 1, transformation: MqttTransformationType = MqttTransformationType.json): ObservableSubscribe<TopicPayload<T>> {
        let observable: ObservableSubscribe<TopicPayload<T>> = this.subscriptions[topic]?.observable;
        if (!observable) {
            const regex = new RegExp(`^${topic.replace(/#/g, '(.*)').replace(/(\^|\+)/g, '([^/]+)')}$`);
            let count = 0;
            let timeout: any;
            observable = this.message$.pipe(obs => {
                return new Observable(observer => {
                    const subscription = obs.subscribe({
                        next: v => {
                            const replacements = v.topic.match(regex);
                            if (!replacements) return;
                            const payload = transformation === MqttTransformationType.json ? JSON.parse(v.payload.toString('utf8')) : v.payload;
                            try {
                                observer.next({ replacements, topic: v.topic, payload, dup: v.dup });
                            } catch (e) {
                                console.error(e);
                            }
                        },
                        error: err => observer.error(err),
                    });
                    count++;
                    clearTimeout(timeout);
                    return () => {
                        subscription.unsubscribe();
                        count--;
                        if (count <= 0 && this.subscriptions[topic]?.observable === observable) {
                            delete this.subscriptions[topic];
                            this.queued.push({ topic, qos: -1, date: new Date().getTime() });
                            this.work();
                        }
                    };
                });
            }) as ObservableSubscribe<TopicPayload<T>>;
            observable.subscribed = new Promise((resolve, reject) => this.subscriptions[topic] = ({
                observable,
                qos,
                resolve,
                reject
            }));
            this.queued.push({ topic, qos, date: new Date().getTime() });
            this.work();
        }
        return observable;
    }

    disconnect() {
        console.log(`MQTT: shutting down client`);
        const client = this.client;
        const subscriptions = this.subscriptions;
        const message$ = this.message$;
        this.client = null;
        this.lastConnected = null;
        this.lastSubscriptionRefresh = null;
        this.refreshingSubscriptions = false;
        this.message$ = new Subject();
        this.subscriptions = {};
        this.queued = [];
        this.transit = [];
        client?.end();
        this.connected$.next(false);
        Object.values(subscriptions).forEach(x => {
            x.reject('mqtt client closed');
        });
        message$.error('disconnected'); // Run last so old client stuff is not picked up
    }

    clearStorage() {
        this.storage.clear();
    }

    private reset() {
        // Anything in transit goes back to the front of the queue, other queued stuff still remains
        this.queued = [...this.transit, ...this.queued];
        this.transit = [];

        // Send any unconfirmed publishes
        const stream = this.storage.createStream();
        while (true) {
            const packet = stream.read(1);
            if(!packet) break;
            this.publishImpl(packet.messageId, packet.topic, packet.payload, packet.qos, true);
        }
        this.work(true);
    }

    private async work(immediate = false) {
        if (this.working) return;
        this.working = true;

        try {
            if (!this.connected$.value) return; // Don't send anything til connected
            if (!immediate) await sleep(50); // Give some time to batch subscription requests

            const client = await this.waitForClient();
            const subs: { [topic: string]: { qos: QoS } } = {};
            const unsubs: { [topic: string]: -1 } = {};
            const transit = this.queued;
            this.queued = [];
            transit.forEach(x => {
                if (x.qos === -1) {
                    delete subs[x.topic];
                    unsubs[x.topic] = x.qos;
                } else {
                    delete unsubs[x.topic];
                    subs[x.topic] = { qos: x.qos };
                }
            });
            this.transit.push(...transit);
            if (Object.keys(subs).length) {
                client.subscribe(subs, (err, granted) => {
                    if (err) {
                        return console.error(err);
                    }

                    if (client !== this.client) return;

                    if (this.checkForStaleConnection()) {
                        return;
                    }

                    if (this.refreshingSubscriptions) {
                        this.refreshingSubscriptions = false;
                        this.lastSubscriptionRefresh = new Date().getTime();
                    }

                    // Only remove transit added from here (future transit could have unsubbed the same topic)
                    this.transit = this.transit.filter(x => !transit.some(y => y === x) || !subs[x.topic]);
                    
                    console.log(`MQTT: Subscribed to ${Object.keys(subs).join(', ')}`);
                    granted?.forEach(x => {
                        if (x.qos === 128) console.warn(`MQTT: Subscribe not authorized to ${x.topic}`);
                    });

                    Object.keys(subs).forEach(topic => {
                        this.subscriptions[topic]?.resolve();
                    });
                });
            }
            if (Object.keys(unsubs).length) {
                client.unsubscribe(Object.keys(unsubs), (err) => {
                    if (err) {
                        return console.error(err);
                    }

                    if (client !== this.client) return;

                    // Only remove transit added from here (future transit could have subbed the same topic);
                    this.transit = this.transit.filter(x => !transit.some(y => y === x) || !unsubs[x.topic]);
                    
                    console.log(`MQTT: Unsubscribed from ${Object.keys(unsubs).join(', ')}`);
                });
            }
        } catch (e) {
            console.error(e);
        } finally {
            this.working = false;
        }
    }

    private async waitForClient(): Promise<MqttClient> {
        /*
            wait a few seconds to cover out-of-order things during startup.
            buf if it takes too long, maybe we've logged out or something
        */ 
        if (this.client) return this.client;
        return Promise.race([
            timeout(),
            this.clientPromise
        ]);
    }
}
