// HINT: this one should also have access to the database, so that it can push new/updated orders into the database
// through the paginated synchronizer stream
import { and, eq } from 'drizzle-orm';
import { Context, Duration, Effect, Layer, Option, Queue, Schedule, SubscriptionRef } from 'effect';
import { makeLatch } from 'effect/Effect';
import { tables } from '../../../db/drizzle-pg-local-schema';
import { UserActivityService } from '../../../effect-services/UserActivityService';
import { MarketCapability } from '../../../effect-services/warehouse/types';
import { getMarketId } from '../../../effect-services/warehouse/utils';
import { RequestConfigService } from '../../../effect/request';
import { WarehouseGlobalsService } from './warehouse-globals-service';
import { WarehouseRegistryService } from './warehouse-registry';
export class MarketConnection extends Context.Tag('MarketConnection')() {
}
const REFRESH_DURATION_SECONDS = 60 * 1;
/**
 * MarketConnectionService is our generic market service.
 *
 * It will get the corresponding MarketServiceX depending on its marketId.
 *
 * MarketServicX will then be used to
 * - synchronize orders
 * - fulfill orders
 * etc.
 */
export class MarketConnectionService extends Effect.Service()('MarketConnectionService', {
    dependencies: [],
    effect: Effect.gen(function* () {
        const connection = yield* MarketConnection;
        const warehouseGlobals = yield* WarehouseGlobalsService;
        const userActivityService = yield* Effect.serviceOption(UserActivityService);
        const registry = yield* WarehouseRegistryService;
        const { drizzle } = warehouseGlobals;
        const marketService = yield* registry.getMarketService(connection.marketId);
        const ordersQueue = yield* Queue.bounded(100);
        /**
         * we use this latch to pause / resume processes that run
         * for this market service
         */
        const latch = yield* makeLatch(false);
        const storeSubscriptionRef = yield* SubscriptionRef.make({
            connection,
            status: 'idle',
            syncStatus: 'idle',
        });
        const id = getMarketId(connection);
        yield* Effect.logDebug(`${id} - market service initializing`);
        /** ----------------------------------------------------------------------------------------------
         * Synchronize orders
         * _______________________________________________________________________________________________ */
        /**
         * blocker to prevent fetching new orders while
         * - we have an error
         * - the user is not actively using the tab
         */
        const blocker = Effect.gen(function* () {
            yield* latch.await;
            yield* Option.match(userActivityService, {
                onNone: () => Effect.void,
                onSome: (userActivityService) => {
                    return userActivityService.latch.await;
                },
            });
        });
        if (marketService.warehouse.capabilities.has(MarketCapability.SYNCHRONIZE_ORDERS)) {
            /* make synchronizer */
            const synchronizer = marketService.warehouse
                .makeSynchronizeOrders(ordersQueue, {
                connection,
                getMostRecentRawOrder: Effect.promise(() => drizzle.query.marketOrdersLocal
                    .findFirst({
                    orderBy(fields, { desc }) {
                        return desc(fields.updatedAt);
                    },
                    where(fields) {
                        return and(eq(fields.marketId, connection.marketId), eq(fields.merchantId, connection.merchantId));
                    },
                })
                    .then((v) => (v ? Option.some(v) : Option.none()))
                    .catch(() => Option.none())),
                maxDays: 14,
                maxRetries: 10,
            })
                .pipe(Effect.provide(RequestConfigService.Default));
            const syncProgram = Effect.gen(function* () {
                yield* blocker;
                yield* SubscriptionRef.update(storeSubscriptionRef, (v) => ({
                    ...v,
                    syncStatus: 'synchronizing',
                }));
                yield* synchronizer;
                yield* SubscriptionRef.update(storeSubscriptionRef, (v) => ({ ...v, syncStatus: 'idle' }));
            });
            /* execute synchronizer */
            yield* Effect.repeat(syncProgram.pipe(Effect.catchTags({
                'frachter/FatalMarketError': (error) => Effect.gen(function* () {
                    yield* Effect.dieMessage(`${id} - fatal market error: ${error}`);
                }),
                'frachter/UserRecoverableMarketError': (error) => Effect.gen(function* () {
                    // TODO: here we need the notification servcie now and set the sync status to error
                    //
                }),
            })), {
                schedule: Schedule.spaced(Duration.seconds(REFRESH_DURATION_SECONDS)).pipe(Schedule.jittered),
                times: Number.POSITIVE_INFINITY,
            }).pipe(Effect.interruptible, Effect.forkScoped);
            /* Handle queue */
            yield* Queue.take(ordersQueue)
                .pipe(Effect.andThen(({ data, ...connection }) => Effect.gen(function* () {
                const normalizedOrder = yield* marketService.warehouse.normalizeOrder(data, connection);
                const normalizedShippingAddress = yield* marketService.warehouse.normalizeShippingAddress(data, connection);
                yield* Effect.promise(() => drizzle
                    .insert(tables.marketOrdersLocal)
                    .values(normalizedOrder)
                    .onConflictDoUpdate({
                    set: normalizedOrder,
                    target: [
                        tables.marketOrdersLocal.marketId,
                        tables.marketOrdersLocal.merchantId,
                        tables.marketOrdersLocal.orderId,
                    ],
                }));
                if (normalizedShippingAddress) {
                    yield* Effect.promise(() => drizzle
                        .insert(tables.marketOrderAddresses)
                        .values(normalizedShippingAddress)
                        .onConflictDoUpdate({
                        set: normalizedShippingAddress,
                        target: [
                            tables.marketOrderAddresses.orderMarketId,
                            tables.marketOrderAddresses.orderMerchantId,
                            tables.marketOrderAddresses.orderId,
                            tables.marketOrderAddresses.type,
                        ],
                    })
                        .catch((err) => {
                        console.error('🔴 error inserting market order address', {
                            address: normalizedShippingAddress,
                            err,
                            order: normalizedOrder,
                        });
                    }));
                }
            })))
                .pipe(Effect.interruptible, Effect.forever, Effect.forkScoped);
        }
        const open = Effect.gen(function* () {
            yield* Effect.logDebug(`${id} - opening market`);
            yield* SubscriptionRef.update(storeSubscriptionRef, (v) => ({ ...v, status: 'opening' }));
            yield* latch.open;
            // TODO: here we need to start the sync process (and then also update the store and use the notifications service)
            yield* SubscriptionRef.update(storeSubscriptionRef, (v) => ({ ...v, status: 'open' }));
        });
        const close = Effect.gen(function* () {
            yield* Effect.logDebug(`${id} - closing market`);
            yield* SubscriptionRef.update(storeSubscriptionRef, (v) => ({ ...v, status: 'closing' }));
            yield* latch.close;
            yield* SubscriptionRef.update(storeSubscriptionRef, (v) => ({ ...v, status: 'closed' }));
        });
        yield* Effect.logDebug(`${id} - ready`);
        return {
            close,
            connection,
            open,
            storeSubscriptionRef,
        };
    }),
}) {
}
export const acquireMarketConnectionService = (connection) => Effect.gen(function* () {
    const preparedLayer = Layer.provide(MarketConnectionService.Default, Layer.succeed(MarketConnection, connection));
    const marketService = yield* MarketConnectionService.pipe(Effect.provide(preparedLayer));
    yield* marketService.open;
    return marketService;
});
export const releaseMarketConnectionService = (marketService) => Effect.gen(function* () {
    const store = yield* marketService.storeSubscriptionRef;
    const id = `${marketService.connection.marketId}:${marketService.connection.merchantId}`;
    yield* Effect.logDebug('🟢 release market service', id);
    if (store.status === 'open') {
        yield* marketService.close;
    }
    else {
        yield* Effect.logDebug(`${id} - market service is not open: `, store);
    }
});
