/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.reactive;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class ElementsStream {
    private static <V> void take(Callable<RFuture<V>> factory, FluxSink<V> emitter, AtomicLong counter, AtomicReference<RFuture<V>> futureRef) {
        RFuture<Object> future;
        try {
            future = factory.call();
        }
        catch (Exception e2) {
            emitter.error(e2);
            return;
        }
        futureRef.set(future);
        future.onComplete((res, e) -> {
            if (e != null) {
                emitter.error((Throwable)e);
                return;
            }
            emitter.next(res);
            if (counter.decrementAndGet() == 0L) {
                emitter.complete();
            }
            ElementsStream.take(factory, emitter, counter, futureRef);
        });
    }

    public static <V> Flux<V> takeElements(Callable<RFuture<V>> callable) {
        return Flux.create(emitter -> emitter.onRequest(n -> {
            AtomicLong counter = new AtomicLong(n);
            AtomicReference futureRef = new AtomicReference();
            ElementsStream.take(callable, emitter, counter, futureRef);
            emitter.onDispose(() -> ((RFuture)futureRef.get()).cancel(true));
        }));
    }
}

