import { Observable, ObservedValueOf, Subject, concat, forkJoin, throwError, timer } from "rxjs";
import { catchError, finalize, map, switchMap, tap } from "rxjs/operators";

export function batch<T = any>(all: T[], batchSize: number = 5) {
    let batches = all.reduce((acc: T[][], iss: T) => {
        let last: T[] = acc[acc.length - 1];
        if (last.length < batchSize) {
            last.push(iss);
        }
        else {
            acc.push([iss]);
        }
        return acc;
    }, [[]]);

    if (batches[0].length == 0) return []
    return batches.filter(i => i.length);
}


export function packConcat<T = any>(source$: Observable<T>): Observable<T[]> {
    const content: T[] = []
    const returnObservable: Subject<T[]> = new Subject<T[]>();
    return source$.pipe(
        tap(item => {
            content.push(item)
        }),
        finalize(() => {
            returnObservable.next(content);
            returnObservable.complete()
        }),
        switchMap(
            _ => returnObservable
        ),
        catchError(err => {
            returnObservable.error(err);
            return throwError(err)
        }));
}

export function concatJoin<T, K extends keyof T>(sourcesObject: { [K in keyof T]: Observable<T[K]>; }): Observable<{ [K in keyof T]?: T[K]; }> {
    const result: { [K in keyof T]?: T[K] } = {}
    const queries: Observable<T[Extract<keyof T, string>]>[] = []
    for (const k in sourcesObject) {
        queries.push(sourcesObject[k].pipe(tap(kr => result[k] = kr)));
    }
    return concat(...queries).pipe(packConcat, map(_ => result))

}
