import { Injectable, OnDestroy } from '@angular/core';
import { RxStompService } from '@stomp/ng2-stompjs';
import { Subscription, Observable, Subject } from 'rxjs';
import { AuthService, UserObject } from '../../core/services/auth.service';
import { mqRxStompConfig } from './mq-rx-stomp.config';
import { Message, StompHeaders } from '@stomp/stompjs';

import { getRandomString } from 'src/app/core/helpers/getRandomString';
import { delay, filter, first } from 'rxjs/operators';

@Injectable({
  providedIn: 'root',
})
export class MQService implements OnDestroy {
  userObjectSubscription: Subscription | undefined;
  userTopicSubscription: Subscription | undefined;
  userChannel = new Subject<Message>();
  private userChannelID: string = '';

  projectSubscription: Subscription | undefined;

  public clientID: string = '';

  constructor(
    private authService: AuthService,
    private rxStompService: RxStompService
  ) {
    this.initUserSubscription();
  }

  ngOnDestroy(): void {
    this.userObjectSubscription?.unsubscribe();
    this.userTopicSubscription?.unsubscribe();
    this.userChannelID = undefined;
  }

  initUserSubscription() {
    this.userObjectSubscription = this.authService.user$.subscribe((user) => {
      if (user) {
        this.connect(user);

        this.userChannelID = 'label-' + user.userLabelID;
        this.subscribeToUserChannel();
      } else {
        this.disconnect();
        this.userChannelID = undefined;
        this.userTopicSubscription?.unsubscribe();
      }
    });
  }

  private subscribeToUserChannel() {
    // NOTE: this should work even without the timeout, according to ng2-stompjs docs
    // this.userTopicSubscription = this.topic(this.userChannelID).subscribe(message => { this.userChannel.next(message) })

    // subscribe to the user channel only after connecting + wait 1000ms
    this.rxStompService.connected$
      .pipe(
        filter((state) => state == 1),
        first(),
        delay(1000)
      )
      .subscribe((s) => {
        this.userTopicSubscription = this.topic(this.userChannelID).subscribe(
          (message) => {
            this.userChannel.next(message);
          }
        );
      });
  }

  connect(user: UserObject) {
    this.clientID = user.email + ':' + getRandomString(6);

    var c = mqRxStompConfig;
    c.connectHeaders = {
      login: user.email,
      passcode: user.accessToken,
      'client-id': this.clientID,
    };

    this.rxStompService.configure(c);
    this.rxStompService.activate();
  }

  disconnect() {
    this.rxStompService.deactivate();
    this.projectSubscription?.unsubscribe();
  }

  // topic subscription
  topic(channel: string): Observable<Message> {
    const destination = '/topic/' + channel;
    const subscriptionID = channel;
    const subHeaders: StompHeaders = {
      id: String(subscriptionID),
      ack: 'client-individual',
    };

    return this.rxStompService.watch(destination, subHeaders);
  }

  sendMessage(message: string, channel: string) {
    this.rxStompService.publish({
      destination: '/topic/' + channel,
      body: message,
    });
  }

  publish(message: any) {
    this.rxStompService.publish(message);
  }

  publishOnUserChannel(body: string) {
    this.rxStompService.publish({
      destination: '/topic/' + this.userChannelID,
      body: body,
    });
  }
}
