import { Space } from '@ably/spaces';
import { makeWatcherFromStream, validateZodSchema, ZodValidationError } from '@packages/lib/effect';
import { InboundMessage as AblyInboundMessage, RealtimeChannel as AblyRealtimeChannel, messageCallback } from 'ably';
import { Chunk, Deferred, Effect, Option, Scope, Stream, StreamEmit } from 'effect';
import { TimeoutException } from 'effect/Cause';
import { z } from 'zod';

import { RealtimeClientService } from './RealtimeClientService';

export function makeMessageStream(channel: AblyRealtimeChannel) {
	return Effect.gen(function* () {
		yield* Effect.logDebug('makeMessageWatcher: START');

		let listener: messageCallback<AblyInboundMessage> | null = null;

		function cleanup() {
			if (listener) {
				channel.unsubscribe(listener);
				listener = null;
			}
		}

		const stream = Stream.async((emit: StreamEmit.Emit<never, never, AblyInboundMessage, void>) => {
			listener = (message: AblyInboundMessage) => {
				void emit(Effect.succeed(Chunk.of(message)));
			};

			channel
				.subscribe((message) => {
					void emit(Effect.succeed(Chunk.of(message)));
				})
				.catch(() => {
					return emit(Effect.fail(Option.none()));
				});

			return Effect.succeed(cleanup);
		});

		yield* Effect.addFinalizer((exit) =>
			// eslint-disable-next-line require-yield
			Effect.gen(function* () {
				cleanup();
			}),
		);

		const $stream = yield* makeWatcherFromStream(stream);

		yield* Effect.logDebug('makeMessageWatcher: END');

		return $stream;
	});
}

export type IMessageEnvelope = {
	receiverClientId: string;
	payload: Record<string, any>;
	topic?: string;
};

/**
 * WARN: do not use topics !
 */
export const sendMessage = (envelope: IMessageEnvelope, organizationId: string, messageId?: string) =>
	Effect.gen(function* () {
		console.log('send message');
		const realtimeClient = yield* RealtimeClientService;
		const channel = realtimeClient.client.channels.get(`org:${organizationId}:${envelope.receiverClientId}::$space`);

		const payload = messageId ? { ...envelope.payload, __r: messageId } : envelope.payload;

		yield* Effect.zip(
			Effect.promise(() => realtimeClient.client.connection.whenState('connected')),
			Effect.promise(() =>
				envelope.topic ? channel.publish(envelope.topic, payload) : channel.publish({ data: payload }),
			),
		);
	});

export function sendMessageAck(
	envelope: IMessageEnvelope,
	organizationId: string,
	senderSpace: Space,
): Effect.Effect<AblyInboundMessage, TimeoutException, Scope.Scope | RealtimeClientService>;
export function sendMessageAck<TSchema extends z.ZodSchema>(
	envelope: IMessageEnvelope,
	organizationId: string,
	senderSpace: Space,
	replySchema: TSchema,
): Effect.Effect<z.infer<TSchema>, ZodValidationError | TimeoutException, Scope.Scope | RealtimeClientService>;
export function sendMessageAck<TSchema extends z.ZodSchema>(
	envelope: IMessageEnvelope,
	organizationId: string,
	senderSpace: Space,
	replySchema?: TSchema,
) {
	return Effect.gen(function* <TTSchema extends z.ZodSchema = TSchema>() {
		const messageId = crypto.randomUUID();
		const deferred = yield* Deferred.make<AblyInboundMessage, TimeoutException>();
		const $stream = yield* makeMessageStream(senderSpace.channel);

		const [, , message] = yield* Effect.all(
			[
				$stream.stream.pipe(
					Stream.runForEachWhile((v) => {
						console.log('reply received', v);
						if (v.data.__r === messageId) {
							return Effect.zipLeft(Effect.succeed(false), Deferred.succeed(deferred, v));
						}
						return Effect.succeed(true);
					}),
					/* messages will be kept in ably's queues for 120 seconds before they are lost, so a timeout of 60 seconds is great */
					Effect.timeout('60 seconds'),
					Effect.catchAll((e) => {
						console.log('fail our deferred');
						return Effect.zipLeft(Effect.void, Deferred.fail(deferred, e));
					}),
				),
				sendMessage(envelope, organizationId, messageId),
				Deferred.await(deferred),
			],
			{ concurrency: 'unbounded' },
		);

		if (replySchema) {
			const d = yield* validateZodSchema(replySchema, message?.data);

			// eslint-disable-next-line @typescript-eslint/no-unsafe-return
			return d as z.infer<TTSchema>;
		}

		return message;
	});
}

export function sendMessageAckSchema<TSchema extends z.ZodSchema>(
	envelope: IMessageEnvelope,
	organizationId: string,
	senderSpace: Space,
	replySchema: TSchema,
): Effect.Effect<z.infer<TSchema>, ZodValidationError | TimeoutException, Scope.Scope | RealtimeClientService> {
	return Effect.gen(function* <TTSchema extends z.ZodSchema = TSchema>() {
		const messageId = crypto.randomUUID();
		const deferred = yield* Deferred.make<AblyInboundMessage, TimeoutException>();
		const $stream = yield* makeMessageStream(senderSpace.channel);

		const [, , message] = yield* Effect.all(
			[
				$stream.stream.pipe(
					Stream.runForEachWhile((v) => {
						console.log('reply received', v);
						if (v.data.__r === messageId) {
							return Effect.zipLeft(Effect.succeed(false), Deferred.succeed(deferred, v));
						}
						return Effect.succeed(true);
					}),
					Effect.timeout('10 seconds'),
					Effect.catchAll((e) => {
						console.log('fail our deferred');
						return Effect.zipLeft(Effect.void, Deferred.fail(deferred, e));
					}),
				),
				sendMessage(envelope, organizationId, messageId),
				Deferred.await(deferred),
			],
			{ concurrency: 'unbounded' },
		);

		if (replySchema) {
			const d = yield* validateZodSchema(replySchema, message?.data);

			// eslint-disable-next-line @typescript-eslint/no-unsafe-return
			return d as z.infer<TTSchema>;
		}

		return message;
	});
}
