import {DoWork, runWorker} from 'observable-webworker';
import {Observable} from 'rxjs/internal/Observable';
import {StompConnect, StompEvent, StompEventType} from './stomp-event';
import {Subject} from 'rxjs/internal/Subject';
import {marshallStompFrame, StompFrame, unmarshallSingleStompFrame} from '../ws/stomp-frame';
import {WebSocketSubject, WebSocketSubjectConfig} from 'rxjs/webSocket';
import {Subscription} from 'rxjs/internal/Subscription';
import * as moment from 'moment';
import {tap} from 'rxjs/operators';

export class StompWorker implements DoWork<StompEvent, StompEvent> {

    constructor() {
        this.subject = new Subject<StompEvent>();
        this.client = null;
        this.socketSubscription = null;
        this.heartbeatInterval = 0;
        this.heartbeatMargin = 1500;
        this.heartbeatReceiveMargin = this.heartbeatMargin + 8000;
        this.heartbeatIntervalId = null;
        this.isConnecting = false;
    }
    private client: WebSocketSubject<StompFrame> | null;
    private socketSubscription: Subscription | null;
    private subject: Subject<StompEvent>;
    private lastSent: number;
    private lastReceived: number;
    private heartbeatInterval: number;
    private readonly heartbeatMargin: number;
    private readonly heartbeatReceiveMargin: number;
    private heartbeatIntervalId: any | null;
    private isConnecting: boolean;
    private inputSubscription: Subscription;

    private static onDeserialize(e: MessageEvent): StompFrame {
        // console.log('deserializer');
        // console.log(e);
        const stompFrame = unmarshallSingleStompFrame(e.data);
        // console.log('deserializer returning');
        // console.log(stompFrame);
        return stompFrame;
    }

    private static onSerialize(e: StompFrame): string {
        try {
            // console.log('serializer');
            // console.log(e);
            const result = marshallStompFrame(e);
            // console.log('serializer returning');
            // console.log(result);
            return result;
        } catch (e) {
            console.error(e);
            throw e;
        }
    }

    private static onComplete(): void {
        console.debug('WebSocketSubject complete');
    }

    selectTransferables(output: StompEvent): Transferable[] {
        return [];
    }

    work(input$: Observable<StompEvent>): Observable<StompEvent> {
        this.inputSubscription = input$.pipe(
            tap((stompEvent) => {
                // console.log('received event');
                // console.log(stompEvent);
            })
        ).subscribe((stompEvent) => {
            switch (stompEvent.type) {
                case StompEventType.OPEN:
                    this.connect(stompEvent.connect);
                    break;
                case StompEventType.CLOSE:
                    this.client?.complete();
                    this.inputSubscription?.unsubscribe();
                    this.inputSubscription = null;
                    break;
                case StompEventType.FRAME:
                    this.send(stompEvent.frame);
                    break;
            }
        });
        return this.subject.asObservable().pipe(
            tap((stompEvent) => {
                // console.log('sent event');
                // console.log(stompEvent);
            })
        );
    }

    private isConnected(): boolean {
        return this.client && !this.client.isStopped && !this.client.closed && this.socketSubscription && !this.socketSubscription.closed;
    }

    private isNotConnected(): boolean {
        return !this.client || this.client.isStopped || this.client.closed || !this.socketSubscription || this.socketSubscription.closed;
    }

    private connect(stompConnect: StompConnect): void {
        if (this.isNotConnected() && !this.isConnecting) {
            this.heartbeatInterval = stompConnect.heartbeat;
            const wsc: WebSocketSubjectConfig<StompFrame> = {
                url: stompConnect.url,
                serializer: (e) => StompWorker.onSerialize(e),
                deserializer: (e) => StompWorker.onDeserialize(e),
                closeObserver: {
                    next: (closeEvent: CloseEvent) => this.onClose(closeEvent)
                },
                openObserver: {
                    next: (event: Event) => this.onOpen(event)
                }
            };
            this.client = new WebSocketSubject(wsc);
            this.socketSubscription = this.client.subscribe(
                (frame) => this.onFrame(frame),
                (error) => this.onError(error),
                () => StompWorker.onComplete());
            if (this.heartbeatIntervalId) {
                clearInterval(this.heartbeatIntervalId);
            }
            this.heartbeatIntervalId = setInterval(() => this.heartbeatCheck(), 1000);
        }
    }

    private onClose(closeEvent: CloseEvent): void {
        // console.log('closeObserver');
        // console.log(closeEvent);
        this.isConnecting = false;
        this.client?.unsubscribe();
        this.client = null;
        this.socketSubscription?.unsubscribe();
        this.socketSubscription = null;
        clearInterval(this.heartbeatIntervalId);
        this.heartbeatIntervalId = null;
        this.subject.next({
            type: StompEventType.CLOSE,
            close: {
                code: closeEvent.code,
                reason: closeEvent.reason,
                wasClean: closeEvent.wasClean
            }
        });
        if (!this.inputSubscription) {
            this.subject.complete();
        }
    }

    private send(frame: StompFrame): void {
        if (this.isConnected()) {
            this.lastSent = moment().valueOf();
            this.client?.next(frame);
        }
    }

    private heartbeatCheck(): void {
        if (this.heartbeatInterval > 0 && this.isConnected()) {
            const now = moment().valueOf();
            const receivedInterval = this.heartbeatInterval + this.heartbeatReceiveMargin;
            const sentInterval = this.heartbeatInterval - this.heartbeatMargin;
            const lastReceivedDelta = now - this.lastReceived;
            // console.log(`lastReceivedDelta: ${lastReceivedDelta}`);
            if (lastReceivedDelta > receivedInterval) {
                console.log(`this.client?.complete()`);
                this.client?.complete();
                return;
            }
            const lastSentDelta = now - this.lastSent;
            // console.log(`lastSentDelta: ${lastSentDelta}`);
            if (lastSentDelta > sentInterval) {
                this.send({});
            }
        }
    }

    private onOpen(event: Event): void {
        // console.log('openObserver');
        // console.log(event);
        // console.debug(event);
        if (event.type === 'open') {
            this.isConnecting = false;
            this.subject.next({
                type: StompEventType.OPEN
            });
        }
    }

    private onFrame(frame: StompFrame): void {
        // console.log('received frame');
        // console.log(frame);
        this.lastReceived = moment().valueOf();
        this.subject.next({
            type: StompEventType.FRAME,
            frame: frame
        });
    }

    private onError(error: any): void {
        console.error(error);
        this.subject.next({
            type: StompEventType.ERROR,
            error: JSON.stringify(error)
        });
    }
}

runWorker(StompWorker);
