import type { Row, ShapeStreamInterface } from '@electric-sql/client';

import { Effect, Schedule, Stream, SubscriptionRef } from 'effect';

import type { IMakeShapeStreamOptions } from './types';

import { makeShapeStream } from './make-shape-stream';
import { clearYearMonthFromTableStatement, getElectricShapesStatement } from './sql';
import { getYearMonthPreviousAndCurrent } from './utils';

export const makeContinuousShape = (options: Omit<IMakeShapeStreamOptions, 'yearMonth'>) =>
	Effect.gen(function* () {
		const shapesRef = yield* SubscriptionRef.make<
			Record<string, { stream: ShapeStreamInterface<Row<unknown>>; isSynchronized: boolean }>
		>({});

		const isSynchronizedRef = yield* SubscriptionRef.make<boolean>(false);

		yield* shapesRef.changes.pipe(
			Stream.runForEach((state) =>
				Effect.gen(function* () {
					const isSynchronized = Object.values(state).every((shape) => shape.isSynchronized);
					yield* SubscriptionRef.update(isSynchronizedRef, (prev) => isSynchronized);
				}),
			),
			Effect.forkScoped,
		);

		// const shapesByYearMonth: Record<string, { stream: ShapeStreamInterface<Row<unknown>>, isSynchronized: boolean }> = {};

		const abortController = new AbortController();

		/** ----------------------------------------------------------------------------------------------
		 * program to cleanup shapes (remove subscription in electric schema and delete data in table)
		 * _______________________________________________________________________________________________ */
		const cleanupShape = (table: string, yearMonth: string) =>
			Effect.gen(function* () {
				yield* Effect.tryPromise({
					catch: (error) => {
						console.error(`Failed to delete subscription for ${table} and ${yearMonth}`, error);
					},
					try: () => options.pg.electric.deleteSubscription(table + ':' + yearMonth),
				});

				yield* Effect.tryPromise({
					catch: (error) => {
						console.error(`Failed to delete data for ${table} and ${yearMonth}`, error);
					},
					try: () =>
						options.pg.query(clearYearMonthFromTableStatement(table, yearMonth)).catch((error) => {
							console.error(`Failed to delete data for ${table} and ${yearMonth}`, error);
							throw error;
						}),
				});
			});

		/** ----------------------------------------------------------------------------------------------
		 * program to watch our partition keys, trigger cleanups and shape creation
		 * _______________________________________________________________________________________________ */
		const program = Effect.gen(function* () {
			if (options.userActivityLatch) {
				yield* options.userActivityLatch.await;
			}

			const requiredYearMonths = yield* getYearMonthPreviousAndCurrent;

			// get existing shapes in electric schema
			const existingShapesInElectricSchema = yield* Effect.tryPromise({
				catch: (error) => {
					console.error('Failed to fetch existing shapes:', error);
				},
				try: () =>
					options.pg
						.query<{
							key: string;
							shape_metadata: any;
							last_lsn: string;
						}>(getElectricShapesStatement(options.table))
						.catch((error) => {
							if (
								error instanceof Error &&
								error.message.includes('relation "electric.subscriptions_metadata" does not exist')
							) {
								console.info('electric schema does not yet exist');
								return { rows: [] };
							} else {
								throw error;
							}
						}),
			});

			// filter only those that have a yearMonth as second part of the key (this will be obsolete as soon as we've migrated to the new conitinuous shapes)
			const yearMonthsInElectricSchema = existingShapesInElectricSchema.rows
				.filter((shape) => shape.key.split(':').length > 1)
				.map((shape) => shape.key.split(':')[1]);

			const yearMonthsObsolete = yearMonthsInElectricSchema.filter((key) => !requiredYearMonths.includes(key));

			// now check if the obsolete ones have active shape streams and end them and clean up

			for (const yearMonth of yearMonthsObsolete) {
				yield* Effect.logDebug(`💿 Removing shape stream for table ${options.table} and yearMonth ${yearMonth}`);

				const shape = (yield* shapesRef)[yearMonth];
				if (shape) {
					shape.stream.unsubscribeAll();
					yield* SubscriptionRef.update(shapesRef, (prev) => {
						const newState = { ...prev };
						delete newState[yearMonth];
						return newState;
					});
				}

				yield* cleanupShape(options.table, yearMonth);
			}

			// now create new shapes if necessary

			for (const yearMonth of requiredYearMonths) {
				if ((yield* shapesRef)[yearMonth]) {
					continue;
				}

				yield* Effect.logDebug(`💿 Setting up shape stream for table ${options.table} and yearMonth ${yearMonth}`);

				const result = yield* makeShapeStream({
					...options,
					onIsSynchronizedChanged: (isSynchronized) => {
						Effect.runPromise(
							SubscriptionRef.update(shapesRef, (prev) => {
								const newState = { ...prev };
								newState[yearMonth] = {
									...newState[yearMonth],
									isSynchronized,
								};
								return newState;
							}),
						).catch((err) =>
							console.error(`Failed to update shape stream for ${options.table} and yearMonth ${yearMonth}`, err),
						);
					},
					signal: abortController.signal,
					yearMonth,
				});

				yield* SubscriptionRef.update(shapesRef, (prev) => {
					const newState = { ...prev };
					newState[yearMonth] = {
						isSynchronized: false,
						stream: result.stream,
					};
					return newState;
				});
			}
		});

		// add a forked effect with a finalizer that ends the streams !

		const cleanup = Effect.gen(function* () {
			yield* Effect.addFinalizer((exit) =>
				Effect.gen(function* () {
					console.log('🔴 cleanup here ---');

					abortController.abort();

					for (const yearMonth of Object.keys(yield* shapesRef)) {
						(yield* shapesRef)[yearMonth].stream.unsubscribeAll();
					}

					return Effect.void;
				}),
			);
		});

		// fork the finalizer program

		yield* Effect.forkScoped(cleanup);

		// fork main program that will be repeated with a schedule

		yield* Effect.forkScoped(
			program.pipe(Effect.repeat({ schedule: Schedule.spaced('10 seconds') }), Effect.interruptible),
		);

		return {
			isSynchronizedRef,
		};
	});
