import { Space, SpaceMember } from '@ably/spaces';
import { makeWatcher } from '@packages/lib/effect';
import { OrganizationUserSchema } from '@packages/lib/schema/user';
import { Chunk, Effect, Stream, SubscriptionRef } from 'effect';
import { isPresent } from 'ts-extras';

import { AuthService } from '@/frachter/effect/global-services/AuthService';
import { IRealtimeMembersStore, RealtimeUserMember } from '@/frachter/types';

import { spaceMemberToRealtimeMember } from './utils';

export function makeRealtimePresenceApi(space: Space) {
	return Effect.gen(function* () {
		yield* Effect.logDebug('makeRealtimePresenceApi: START');
		const authService = yield* AuthService;

		// TODO: the hook should divide into cloudprint members and user members !
		// but better would be to have separate stores for cloudprint members and user members!

		const $members = yield* makeWatcher<IRealtimeMembersStore>({
			cloudprint: [],
			users: { all: [], others: [], self: null },
		});

		// const $userMembers = yield* makeWatcher<{
		// 	self: RealtimeUserMember | null;
		// 	others: RealtimeUserMember[];
		// 	all: RealtimeUserMember[];
		// }>({ all: [], others: [], self: null });

		// const $cloudprintMembers = yield* makeWatcher<Record<string, RealtimeCloudprintMember>>({});

		// TODO: use the change stream to propagate the changes to the $userMembers and $cloudprintMembers stores

		const processMembers = (members: (SpaceMember | null)[], organizationUsers?: OrganizationUserSchema[]) =>
			Stream.fromIterable(members).pipe(
				Stream.mapEffect((member) => spaceMemberToRealtimeMember(member, organizationUsers)),
				Stream.filter(isPresent),
				Stream.runCollect,
				Effect.andThen(Chunk.toArray),
			);

		const updateMembersTask = Effect.gen(function* () {
			const organizationUsers = (yield* authService.$session.ref)?.organizationUsers;

			const self = Effect.promise(() =>
				space.members
					.getSelf()
					.then((v) => [v])
					.catch(() => [null]),
			).pipe(Effect.andThen((v) => processMembers(v, organizationUsers)));

			const others = Effect.promise(() => space.members.getOthers().catch(() => [] as SpaceMember[])).pipe(
				Effect.andThen((v) => processMembers(v, organizationUsers)),
			);

			const all = Effect.promise(() => space.members.getAll().catch(() => [] as SpaceMember[])).pipe(
				Effect.andThen((v) => processMembers(v, organizationUsers)),
			);

			const result = yield* Effect.all({ all, others, self } as const, { concurrency: 'unbounded' });

			yield* SubscriptionRef.set($members.ref, {
				cloudprint: result.all.filter((v) => v.type === 'cloudprint'),
				users: {
					all: result.all.filter((v) => v.type === 'user'),
					others: result.others.filter((v) => v.type === 'user'),
					self: result.self[0] as RealtimeUserMember | null,
				},
			});
		});

		// HINT: could we also use a finalizer for the space.members listener ? that will acquire the subscription and then release it when the scope closes?
		/** ----------------------------------------------------------------------------------------------
		 * update members when the space members change
		 * _______________________________________________________________________________________________ */
		yield* Stream.async<SpaceMember>((emit) => {
			space.members.subscribe((update) => {
				// HINT: no idea why this is called multiple times, but with Stream.changes, we can dedupe it!
				// console.log('UPDATE FROM MEMBERS', update);
				// HINT: use queueMicrotask because the space members subscription updates happen before the state for the ably member getters have been updated!
				queueMicrotask(() => void emit(Effect.succeed(Chunk.of(update))));
			});
		}).pipe(
			// Stream.changes, // ⚠️ DO NOT DO THIS! otherwise we might have stale disconnected members!
			Stream.runForEach(() => updateMembersTask),
			Effect.interruptible,
			Effect.forkScoped,
		);

		/** ----------------------------------------------------------------------------------------------
		 * update members when auth session changes (e.g. when a user has changed their profile)
		 * _______________________________________________________________________________________________ */
		yield* authService.$session.stream
			.pipe(
				Stream.changes,
				Stream.runForEach(() => updateMembersTask),
			)
			.pipe(Effect.interruptible, Effect.forkScoped);

		yield* updateMembersTask;

		yield* Effect.logDebug('makeRealtimePresenceApi: READY');

		yield* Effect.addFinalizer((exit) =>
			Effect.gen(function* () {
				yield* Effect.logDebug('makeRealtimePresenceApi: CLOSING');
				// stop listener!
				space.members.off();
				yield* Effect.logDebug('makeRealtimePresenceApi: CLOSED');
			}),
		);

		return {
			$members,
		};
	});
}
