import { Injectable } from "@angular/core";
import Pusher, { Channel } from "pusher-js";
import { BehaviorSubject, merge, Observable, Subject } from "rxjs";
import { bufferCount, distinct, filter, groupBy, map, mergeMap, tap } from "rxjs/operators";
import { environment } from "src/environments/environment";
import { E_MessageKind } from "@backend/graph/_services/e-message-kind";
import { I_DownForMaintenanceMessage } from "@backend/common/interfaces/down-for-maintenance-message.interface";
import { PusherNames } from "@backend/common/pusher-utils/pusher-channel-names";
import { LocationService } from "./location.service";

@Injectable({
  providedIn: "root",
})
export class PushUpdatesService {
  private _pusher: Pusher;
  private _isConnected = false;

  constructor(private _locationService: LocationService) {}

  public get onConnected(): Observable<boolean> {
    return this._onConnected.pipe(filter((isConnected) => isConnected));
  }

  public downForMaintenanceSubject = new Subject<I_DownForMaintenanceMessage>();
  private _onConnected = new BehaviorSubject<boolean>(false);

  public connect(Authorization: string) {
    if (this._isConnected) {
      console.log("Already connected to Pusher");
      return;
    }

    try {
      this._pusher = new Pusher(environment.PATIENT_APP_PUSHER_KEY, {
        cluster: environment.PATIENT_APP_PUSHER_CLUSTER,
        forceTLS: true,
        authEndpoint: this._authEndpoint,
        auth: {
          headers: {
            Authorization,
            "Content-Type": "application/json",
          },
        },
      });

      this._createStageSpecificObservable().subscribe((data) => {
        console.log("Received data from Pusher", data);
        let items: Array<any>;
        if (Array.isArray(data.message_content)) {
          if (data.message_content.length > 0 && Array.isArray(data.message_content[0])) {
            items = data.message_content.flat();
          } else {
            items = data.message_content;
          }
        } else {
          items = [data.message_content];
        }

        for (const item of items) {
          // Message received
          switch (data.message_kind) {
            case E_MessageKind.DownForMaintenance:
              console.log("Down for maintenance", item);
              this.downForMaintenanceSubject.next(item);
              break;
            default:
              console.log("Unknown message kind", data.message_kind);
              break;
          }
        }
      });

      this._onConnected.next(true);
    } catch (error) {
      console.error("Failed to connect to Pusher", error);
    }
  }

  public createPrivateChannelObservable(channelName: string): Observable<any> {
    return this._subscribeToChannel(PusherNames.Patient.getPrivateChannelName(channelName));
  }

  /**
   * Creates an Observable based on the Pusher subscription which will combine partial messages before emitting them
   */
  private _createStageSpecificObservable(): Observable<any> {
    const channelName = PusherNames.Patient.getStageSpecificChannelName(environment.STAGE);

    return this._subscribeToChannel(channelName);
  }

  private _channels: Array<Channel> = [];

  public disconnect() {
    for (const channel of this._channels) {
      channel.disconnect();
    }
  }

  private _subscribeToChannel(channelName: string): Observable<any> {
    const partsByUid = {};
    const subject = new Subject<any>();
    const channel = this._pusher.subscribe(channelName);
    if (channel.subscribed) {
      this._channels.push(channel);
    }
    channel.bind(environment.PATIENT_APP_PUSHER_EVENT, (data) => {
      subject.next(data);
    });

    return merge(
      subject.pipe(filter((message) => message._meta?.mode !== "split")),
      subject.pipe(
        filter((message) => message._meta?.mode === "split"),
        // Store the number of parts for each unique split message
        tap((item) => {
          partsByUid[item._meta.uid] = item._meta.parts;
        }),
        // Group the parts by the unique ID so they can be buffered and then combined
        groupBy((item) => item._meta.uid),
        // Buffer the parts until we have received all parts
        mergeMap((group) =>
          group.pipe(
            distinct((part) => part._meta.part),
            bufferCount(partsByUid[group.key])
          )
        ),
        // Combine the parts to form the original JSON string and then parse to an object
        map((parts) => {
          delete partsByUid[parts[0]._meta.uid];

          return JSON.parse(
            parts
              .sort((a, b) => a._meta.part - b._meta.part)
              .map((part) => part.data)
              .join("")
          );
        })
      )
    );
  }

  private get _authEndpoint(): string {
    if (["development", "production"].includes(environment.STAGE)) {
      return `${environment.GLOBAL_URL}/v1/pusher/auth`; // Fallback to global which we should not be using
    }

    return `${environment.REST_URL}/api/pusher/auth`;
  }
}
