package org.nuxeo.lib.stream.computation.log;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Computation;
import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
import org.nuxeo.lib.stream.computation.ComputationPolicy;
import org.nuxeo.lib.stream.log.LogPartition;

/* loaded from: input_file:org/nuxeo/lib/stream/computation/log/ComputationPool.class */
public class ComputationPool {
    private static final Log log = LogFactory.getLog(ComputationPool.class);
    protected final ComputationMetadataMapping metadata;
    protected final int threads;
    protected final Supplier<Computation> supplier;
    protected final List<List<LogPartition>> defaultAssignments;
    protected final List<ComputationRunner> runners;
    protected final LogStreamManager streamManager;
    protected final ComputationPolicy policy;
    protected ExecutorService threadPool;

    /* loaded from: input_file:org/nuxeo/lib/stream/computation/log/ComputationPool$NamedThreadFactory.class */
    protected static class NamedThreadFactory implements ThreadFactory {
        protected final AtomicInteger count = new AtomicInteger(0);
        protected final String prefix;

        public NamedThreadFactory(String str) {
            this.prefix = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, String.format("%s-%02d", this.prefix, Integer.valueOf(this.count.getAndIncrement())));
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                ComputationPool.log.error("Uncaught exception: " + th.getMessage(), th);
            });
            return thread;
        }
    }

    public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping computationMetadataMapping, List<List<LogPartition>> list, LogStreamManager logStreamManager, ComputationPolicy computationPolicy) {
        Objects.requireNonNull(computationPolicy);
        this.supplier = supplier;
        this.metadata = computationMetadataMapping;
        this.threads = list.size();
        this.streamManager = logStreamManager;
        this.defaultAssignments = list;
        this.policy = computationPolicy;
        this.runners = new ArrayList(this.threads);
    }

    public String getComputationName() {
        return this.metadata.name();
    }

    public void start() {
        if (this.threads == 0) {
            log.info(this.metadata.name() + ": Empty pool");
            return;
        }
        log.info(this.metadata.name() + ": Starting pool");
        this.threadPool = Executors.newFixedThreadPool(this.threads, new NamedThreadFactory(this.metadata.name() + "Pool"));
        this.defaultAssignments.forEach(list -> {
            ComputationRunner computationRunner = new ComputationRunner(this.supplier, this.metadata, list, this.streamManager, this.policy);
            this.threadPool.submit(computationRunner);
            this.runners.add(computationRunner);
        });
        this.threadPool.shutdown();
        log.debug(this.metadata.name() + ": Pool started, threads: " + this.threads);
    }

    public boolean isTerminated() {
        if (this.threadPool == null) {
            return true;
        }
        return this.threadPool.isTerminated();
    }

    public boolean waitForAssignments(Duration duration) throws InterruptedException {
        log.info(this.metadata.name() + ": Wait for partitions assignments");
        if (this.threadPool == null || this.threadPool.isTerminated()) {
            return true;
        }
        Iterator<ComputationRunner> it = this.runners.iterator();
        while (it.hasNext()) {
            if (!it.next().waitForAssignments(duration)) {
                return false;
            }
        }
        return true;
    }

    public boolean drainAndStop(Duration duration) {
        if (this.threadPool == null || this.threadPool.isTerminated()) {
            return true;
        }
        log.info(this.metadata.name() + ": Draining");
        this.runners.forEach((v0) -> {
            v0.drain();
        });
        boolean awaitPoolTermination = awaitPoolTermination(duration);
        stop(Duration.ofSeconds(1L));
        return awaitPoolTermination;
    }

    public boolean stop(Duration duration) {
        if (this.threadPool == null || this.threadPool.isTerminated()) {
            return true;
        }
        log.info(this.metadata.name() + ": Stopping");
        this.runners.forEach((v0) -> {
            v0.stop();
        });
        boolean awaitPoolTermination = awaitPoolTermination(duration);
        shutdown();
        return awaitPoolTermination;
    }

    public void shutdown() {
        if (this.threadPool != null && !this.threadPool.isTerminated()) {
            log.info(this.metadata.name() + ": Shutting down");
            this.threadPool.shutdownNow();
            try {
                this.threadPool.awaitTermination(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn(this.metadata.name() + ": Interrupted in shutdown");
            }
        }
        this.runners.clear();
        this.threadPool = null;
    }

    protected boolean awaitPoolTermination(Duration duration) {
        try {
            if (this.threadPool.awaitTermination(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                return true;
            }
            log.warn(this.metadata.name() + ": Timeout on wait for pool termination");
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn(this.metadata.name() + ": Interrupted while waiting for pool termination");
            return false;
        }
    }

    public long getLowWatermark() {
        Set set = (Set) this.runners.stream().map((v0) -> {
            return v0.getLowWatermark();
        }).filter(watermark -> {
            return watermark.getValue() > 1;
        }).collect(Collectors.toSet());
        long orElse = set.stream().filter(watermark2 -> {
            return !watermark2.isCompleted();
        }).mapToLong((v0) -> {
            return v0.getValue();
        }).min().orElse(0L);
        boolean z = true;
        if (orElse == 0) {
            z = false;
            orElse = set.stream().filter((v0) -> {
                return v0.isCompleted();
            }).mapToLong((v0) -> {
                return v0.getValue();
            }).max().orElse(0L);
        }
        if (log.isTraceEnabled() && orElse > 0) {
            log.trace(this.metadata.name() + ": low: " + orElse + " " + (z ? "Pending" : "Completed"));
        }
        return orElse;
    }
}
