import { subDays } from 'date-fns';
import { Duration, Effect, Match, Option, Stream } from 'effect';
import { z } from 'zod';
import { apis } from '../../../../api';
import { ordersV4FindPartnerOrders } from '../../../../api/providers/otto_market/__generated__/hey-api';
import { zPartnerOrderListOrdersV4 } from '../../../../api/providers/otto_market/__generated__/hey-api/zod.gen';
import { FatalMarketError, MarketValidationError, UserRecoverableMarketError, } from '../../../../effect-services/warehouse/errors';
import { executeRequest, retryRequest } from '../../../../effect/request';
import { sdkToRequest } from '../../../../effect/request/hey-api';
import { updateRequestConfig } from '../../../../effect/request/request-config-service';
import { WarehouseGlobalsService } from '../../warehouse/warehouse-globals-service';
import { zPartnerOrderOrdersV4Fixed } from './schema';
import { formatOttoDate } from './utils';
export const makeSynchronizeOrders = (queue, options) => Effect.gen(function* () {
    // HINT: with the proxy, we don't need to explicitly provide a `RequestConfigSerivce` to the
    // executeRequest function, as the proxy will require it anyways!
    const proxy = yield* Effect.gen(function* () {
        const warehouseConfigService = yield* WarehouseGlobalsService;
        // warehouseConfigService.db.query.
        return updateRequestConfig({
            proxy: {
                enforce: true,
                getAuthorizationHeaderValue: warehouseConfigService.getBackendAuthorizationHeaderValue,
                url: `${warehouseConfigService.backendBaseUrl}/proxy/market/${apis.otto_market.id}/${options.connection.merchantId}`,
            },
        });
    });
    /** ----------------------------------------------------------------------------------------------
     * get most recent raw order (if available)
     * and use it to determine the fromDate for the paginated stream
     * _______________________________________________________________________________________________ */
    const mostRecentOrder = Option.getOrNull(yield* options.getMostRecentRawOrder);
    const rawParsed = yield* mostRecentOrder
        ? Effect.promise(() => zPartnerOrderOrdersV4Fixed
            .parseAsync(mostRecentOrder.raw)
            .then((raw) => raw)
            .catch((err) => {
            console.error(err);
            return null;
        }))
        : Effect.succeed(null);
    const earliestDate = subDays(new Date(), options.maxDays ?? 14);
    const fromDate = rawParsed?.lastModifiedDate ? new Date(rawParsed.lastModifiedDate) : earliestDate;
    /** ----------------------------------------------------------------------------------------------
     * create a paginated stream that will be used to fetch all orders from the Otto API
     * since the timestamp. It will use the returned cursor to fetch all items until
     * there are no more left.
     * _______________________________________________________________________________________________ */
    let cycles = 0;
    const paginatedStream = Stream.paginateEffect(fromDate, (current) => {
        const effect = Effect.gen(function* () {
            cycles++;
            const request = yield* sdkToRequest(ordersV4FindPartnerOrders, {
                query: current instanceof Date ? { fromDate: formatOttoDate(current) } : { nextcursor: current },
            });
            const requestURL = new URL(request.url);
            const baseUrl = apis.otto_market.baseUrl[options.connection.environment];
            const url = `${baseUrl}${requestURL.pathname}${requestURL.search}`;
            // tie the execute request to the latch so that it only ever runs when it's open!
            const result = yield* executeRequest(new Request(url, request), {
                200: {
                    body: z.object({
                        links: zPartnerOrderListOrdersV4.shape.links,
                        // use this to ensure that we parse orders individually later to know which order might probably not match the schema
                        resources: z.array(z.any()).optional(),
                    }),
                },
            }).pipe(
            /* proxy the requeust */
            proxy, 
            /* retry the request */
            retryRequest({ maxRetries: options.maxRetries ?? 25 }), 
            /* map the error for the warehouse to handle */
            Effect.mapError((error) => {
                return Match.value(error).pipe(Match.tag('frachter/RequestClientError', (e) => e.response.status === 401
                    ? new UserRecoverableMarketError({ cause: error, reason: 'unauthorized' })
                    : new UserRecoverableMarketError({ cause: error, reason: 'other' })), Match.tag('frachter/ResponseValidationError', (e) => new FatalMarketError({
                    cause: e,
                    message: 'Otto market response validation failed.',
                    ...options.connection,
                })), Match.orElse(() => new UserRecoverableMarketError({ cause: error, reason: 'other' })));
            }));
            let nextCursor = null;
            const href = result.data.links?.[0]?.href;
            if (href) {
                nextCursor = new URL(href, 'https://_').searchParams.get('nextcursor');
            }
            let items = result.data.resources ?? [];
            // THIS IS PROBLEMATIC because we haven't PARSED the items yet
            let skippedOrders = [];
            if (current instanceof Date && rawParsed) {
                // TODO: here we'd also need to log errors when the item is not parseable
                /* this is our first iteration, so we must filter out all items before and including the mostRecentRawOrder */
                const index = items.findIndex((item) => {
                    const parsed = zPartnerOrderOrdersV4Fixed.safeParse(item);
                    if (!parsed.success) {
                        return false;
                    }
                    return (parsed.data.salesOrderId === rawParsed.salesOrderId &&
                        parsed.data.lastModifiedDate === rawParsed.lastModifiedDate);
                });
                if (index !== -1) {
                    skippedOrders = items.slice(0, index + 1);
                    items = items.slice(index + 1);
                }
            }
            yield* Effect.logDebug(`Found ${items.length} updated orders from market ${options.connection.marketId}:${options.connection.merchantId}. [cycles: ${cycles}, skipped: ${skippedOrders.length}]`, {
                current,
                cursor: { lastModifiedDate: rawParsed?.lastModifiedDate, orderNumber: rawParsed?.orderNumber },
                fromDate,
                skippedOrders,
            });
            return [items, nextCursor ? Option.some(nextCursor) : Option.none()];
        });
        return effect;
    });
    // .pipe(Stream.onEnd(Console.log('stream ended')));
    /** ----------------------------------------------------------------------------------------------
     * run the stream and process each item
     *
     * this will always be processed for each successful value
     * only when it's done, it will allow the underlying stream
     * to produce new values
     *
     * Therefore it's safe to let the stream error, as it will only
     * ever error when the last items have been processed
     * _______________________________________________________________________________________________ */
    yield* Stream.runForEach(paginatedStream, (items) => Effect.gen(function* () {
        for (const item of items) {
            const parsed = zPartnerOrderOrdersV4Fixed.safeParse(item);
            if (parsed.success) {
                /* put items into the queue 1 by 1 to let the queue control the rate of processing */
                yield* queue.offer({
                    data: parsed.data,
                    ...options.connection,
                });
            }
            else {
                /* log an error that will be posted to sentry via our frachter logger, use original date as cause */
                yield* Effect.logError(new MarketValidationError({ cause: item, zError: parsed.error, ...options.connection }));
            }
        }
        yield* Effect.sleep(Duration.seconds(0.5));
    }).pipe(Effect.interruptible));
});
