import { firstValueFrom, fromEvent, Observable, shareReplay, Subscription, takeUntil } from 'rxjs';
import { useQuery, useQueryClient, UseQueryResult as UseQueryResultTS } from '@tanstack/react-query';
import { useEffect, useRef } from 'react';

type UseQueryResult<T> = Pick<
	UseQueryResultTS<T | undefined, Error | null>,
	'data' | 'error' | 'isPending' | 'refetch'
>;

export function useWatchObservableQuery<T>({
	queryKey,
	queryFn,
}: {
	queryKey: unknown[];
	queryFn: () => Observable<T> | undefined | null;
}): UseQueryResult<T> {
	const queryClient = useQueryClient();
	const subscription = useRef<Subscription>();

	useEffect(
		() => () => {
			if (subscription.current) {
				subscription.current.unsubscribe();
			}
		},
		[],
	);

	return useQuery({
		queryKey,
		queryFn({ signal }) {
			if (subscription.current) subscription.current.unsubscribe();
			let source$ = queryFn();

			if (source$ == null) return source$;

			source$ = source$.pipe(
				takeUntil(fromEvent(signal, 'abort')),
				shareReplay({
					refCount: true,
					bufferSize: 1,
				}),
			);

			let valueReceivedOnce = false;

			subscription.current = source$.subscribe({
				next(_data) {
					valueReceivedOnce = true;
					queryClient.setQueryData(queryKey, _data);
				},
				error(err) {
					// This should never happen, but if it does, we want to throw the error
					// The first error is handled by firstValueFrom
					// Subsequent errors should not happen because we are listening to the cache
					if (valueReceivedOnce) {
						throw err;
					}
				},
			});

			return firstValueFrom(source$);
		},
	});
}
