import { BehaviorSubject, concat, of, ReplaySubject, skip, Subject, timer } from 'rxjs';
import { HttpClient } from '@angular/common/http';
import { concatMap, delay, share, takeUntil, tap } from 'rxjs/operators';
import { Injectable, NgZone } from '@angular/core';

@Injectable()
export abstract class PollingAbstractService<T> {
  abstract baseUrl: string;

  _stateSource = new ReplaySubject<T>(1);
  state$ = this._stateSource.asObservable();

  protected _pollingTimeInMilliseconds = 5000;

  private _stopPollingSource = new Subject<void>();
  private _pollSubject$ = new BehaviorSubject('');
  private _isPolling = false;

  protected constructor(protected http: HttpClient, protected ngZone: NgZone) {}

  startPolling(maxPolls = 0) {
    if (this._isPolling) {
      this.stopPolling();
    }
    this._isPolling = true;
    this._stopPollingSource = new Subject();
    this._poll(maxPolls);
  }

  private _poll(maxPolls: number) {
    let iterations = 0;
    this.ngZone.runOutsideAngular(() => {
      // Prevents multiple request from being sent out as it only completes once _pollSubject$ completes as well
      const retry$ = of('').pipe(
        delay(this._pollingTimeInMilliseconds),
        tap(() => {
          this._pollSubject$.next('');
          if (++iterations === maxPolls) {
            this.stopPolling();
          }
        }),
        skip(1) // skip initial value
      );

      this._pollSubject$
        .pipe(
          concatMap(() => concat(this.http.get<T>(this.baseUrl), retry$)),
          share(),
          takeUntil(this._stopPollingSource)
        )
        .subscribe({
          next: (result: T) => {
            this.ngZone.run(() => this._stateSource.next(result));
          },
          error: (error) => {
            if (error.status === 0) {
              timer(this._pollingTimeInMilliseconds).subscribe(() => {
                if (this._isPolling) {
                  console.warn(
                    `Unable to poll due to network error. Retrying in ${this._pollingTimeInMilliseconds / 1000}s...`
                  );
                  if (maxPolls) {
                    maxPolls -= --iterations;
                  }
                  this.ngZone.run(() => this._poll(maxPolls));
                }
              });
            } else {
              this.ngZone.run(() => this._stateSource.error(error));
            }
          },
          complete: () => {
            this.ngZone.run(() => this._stateSource.complete());
          },
        });
    });
  }

  stopPolling() {
    this._isPolling = false;
    this._stopPollingSource.next();
    this._stateSource = new ReplaySubject<T>(1);
    this.state$ = this._stateSource.asObservable();
  }

  getIsPolling(): boolean {
    return this._isPolling;
  }
}
