import { DataSource } from '@angular/cdk/collections';
import { BehaviorSubject, Observable, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

export class PagedDataSource<T extends { id: string }> implements DataSource<T> {
  private entitesSubject = new BehaviorSubject<T[]>([]);
  private loadingSubject = new BehaviorSubject<boolean>(false);
  private hasDataSubject = new BehaviorSubject<boolean>(false);
  private destroy$: Subject<boolean> = new Subject<boolean>();

  public loading$ = this.loadingSubject.asObservable();
  public hasData$ = this.hasDataSubject.asObservable();
  public hasData = true;
  private hasDataTimeout: any;

  constructor(
    listSelector: Observable<Array<any>>,
    private filter?: (x: any) => any,
  ) {
    listSelector.pipe(takeUntil(this.destroy$)).subscribe((x) => {
      this.entitesSubject.next(filter ? x.filter(filter) : x);
      this.stopLoading();
      if (this.hasDataTimeout) {
        clearTimeout(this.hasDataTimeout);
      }
      const hasData = !!x && x.length > 0;
      if (hasData) {
        this.hasData = hasData;
        this.hasDataSubject.next(this.hasData);
      } else {
        this.hasDataTimeout = setTimeout(() => {
          this.hasData = hasData;
          this.hasDataSubject.next(this.hasData);
        }, 10);
      }
    });
  }

  connect(): Observable<T[]> {
    return this.entitesSubject.asObservable();
  }

  disconnect(): void {
    this.entitesSubject.complete();
    this.loadingSubject.complete();
    this.destroy$.next(true);
    this.destroy$.complete();
  }

  startLoading(): void {
    this.loadingSubject.next(true);
  }

  stopLoading(): void {
    this.loadingSubject.next(false);
  }
}
