import { Effect, ExecutionStrategy, Exit, Scope, Stream, SubscriptionRef } from 'effect';
import { getMarketId } from '../../../effect-services/warehouse/utils';
import { makeWatcher } from '../../../effect/utils';
import { setMarkets } from './set-markets';
export class WarehouseService extends Effect.Service()('WarehouseService', {
    dependencies: [],
    // HINT: this must be scoped, because we want to clear all resources created within this service when the WarehouseService exits
    scoped: Effect.gen(function* () {
        const marketsSubscriptionRef = yield* SubscriptionRef.make({});
        const scope = yield* Effect.scope;
        yield* Effect.yieldNow();
        let streamScope = yield* Scope.fork(scope, ExecutionStrategy.parallel);
        const $markets = yield* makeWatcher({});
        /** ----------------------------------------------------------------------------------------------
         * watch our MarketConnectionServices and propagate store changes into our aggregated $markets object
         * _______________________________________________________________________________________________ */
        yield* marketsSubscriptionRef.changes.pipe(Stream.runForEach((markets) => Effect.gen(function* () {
            yield* Scope.close(streamScope, Exit.void);
            streamScope = yield* Scope.fork(scope, ExecutionStrategy.parallel);
            let statusStream = Stream.empty;
            for (const market of Object.values(markets)) {
                // we need to merge, because concat would wait for the first stream to end before going into the next stream
                statusStream = Stream.merge(statusStream, market.storeSubscriptionRef.changes);
            }
            yield* statusStream
                .pipe(Stream.runForEach((market) => Effect.gen(function* () {
                const id = getMarketId(market.connection);
                yield* SubscriptionRef.update($markets.ref, (v) => ({ ...v, [id]: market }));
                yield* Effect.logDebug(`🛍️ ${id.padEnd(24, ' ')} | Status: ${market.status} | Sync: ${market.syncStatus}`);
            })))
                .pipe(Effect.interruptible, Effect.forkIn(streamScope));
            // console.log('⭐️ markets changed', Array.from(Object.keys(markets)).join(' | '));
        })), Effect.interruptible, Effect.forkScoped);
        return {
            $markets,
            setMarkets: (connections) => 
            // tie the scope to the WarehouseService's scope
            setMarkets(marketsSubscriptionRef, connections).pipe(Scope.extend(scope)),
        };
    }),
}) {
}
