package org.nuxeo.lib.stream.log.chronicle;

import java.io.Externalizable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.TailerState;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.codec.NoCodec;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;

/* loaded from: input_file:org/nuxeo/lib/stream/log/chronicle/ChronicleLogTailer.class */
public class ChronicleLogTailer<M extends Externalizable> implements LogTailer<M> {
    protected static final long POLL_INTERVAL_MS = 100;
    protected static final Set<LogPartitionGroup> tailersId = Collections.newSetFromMap(new ConcurrentHashMap());
    private static final Log log = LogFactory.getLog(ChronicleLogTailer.class);
    protected final String basePath;
    protected final ExcerptTailer cqTailer;
    protected final ChronicleLogOffsetTracker offsetTracker;
    protected final LogPartitionGroup id;
    protected final LogPartition partition;
    protected final Codec<M> codec;
    protected volatile boolean closed = false;

    public ChronicleLogTailer(Codec<M> codec, String str, ExcerptTailer excerptTailer, LogPartition logPartition, String str2, ChronicleRetentionDuration chronicleRetentionDuration) {
        Objects.requireNonNull(str2);
        this.codec = codec;
        this.basePath = str;
        this.cqTailer = excerptTailer;
        this.partition = logPartition;
        this.id = new LogPartitionGroup(str2, logPartition.name(), logPartition.partition());
        registerTailer();
        this.offsetTracker = new ChronicleLogOffsetTracker(str, logPartition.partition(), str2, chronicleRetentionDuration);
        toLastCommitted();
    }

    protected void registerTailer() {
        if (!tailersId.add(this.id)) {
            throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + this.id);
        }
    }

    protected void unregisterTailer() {
        tailersId.remove(this.id);
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public LogRecord<M> read(Duration duration) throws InterruptedException {
        LogRecord<M> read = read();
        if (read != null) {
            return read;
        }
        long millis = duration.toMillis();
        long currentTimeMillis = System.currentTimeMillis() + millis;
        long min = Math.min(POLL_INTERVAL_MS, millis);
        while (read == null && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(min);
            read = read();
        }
        return read;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LogRecord<M> read() {
        if (this.closed) {
            throw new IllegalStateException("The tailer has been closed.");
        }
        ArrayList arrayList = new ArrayList(1);
        long index = this.cqTailer.index();
        if (NoCodec.NO_CODEC.equals(this.codec)) {
            try {
                if (!this.cqTailer.readDocument(wireIn -> {
                    arrayList.add((Externalizable) wireIn.read(ChronicleLogAppender.MSG_KEY).object());
                })) {
                    return null;
                }
            } catch (ClassCastException e) {
                throw new IllegalArgumentException(e);
            }
        } else if (!this.cqTailer.readDocument(wireIn2 -> {
            arrayList.add(this.codec.decode(wireIn2.read().bytes()));
        })) {
            return null;
        }
        return new LogRecord<>((Externalizable) arrayList.get(0), new LogOffsetImpl(this.partition, index));
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public LogOffset commit(LogPartition logPartition) {
        if (!this.partition.equals(logPartition)) {
            throw new IllegalArgumentException("Cannot commit this partition: " + logPartition + " from " + this.id);
        }
        long index = this.cqTailer.index();
        this.offsetTracker.commit(index);
        if (log.isTraceEnabled()) {
            log.trace(String.format("Commit %s:+%d", this.id, Long.valueOf(index)));
        }
        return new LogOffsetImpl(logPartition, index);
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void commit() {
        commit(this.partition);
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void toEnd() {
        log.debug(String.format("toEnd: %s", this.id));
        this.cqTailer.toEnd();
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void toStart() {
        log.debug(String.format("toStart: %s", this.id));
        this.cqTailer.toStart();
        if (this.cqTailer.state().equals(TailerState.FOUND_CYCLE)) {
            return;
        }
        log.info("Unable to move to start because the tailer is not initialized, " + this);
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void toLastCommitted() {
        long lastCommittedOffset = this.offsetTracker.getLastCommittedOffset();
        if (lastCommittedOffset <= 0) {
            log.debug(String.format("toLastCommitted: %s, not found, move toStart", this.id));
            this.cqTailer.toStart();
            return;
        }
        log.debug(String.format("toLastCommitted: %s, found: %d", this.id, Long.valueOf(lastCommittedOffset)));
        if (this.cqTailer.moveToIndex(lastCommittedOffset) || this.cqTailer.index() == lastCommittedOffset) {
            return;
        }
        toStart();
        long index = this.cqTailer.index();
        if (lastCommittedOffset >= index) {
            throw new IllegalStateException("Unable to move to the last committed offset: " + lastCommittedOffset + " for tailer: " + this);
        }
        log.error("The last committed offset: " + lastCommittedOffset + " for tailer: " + this + " points to a record that has been deleted by the retention policy. Records have been lost, continuing from the beginning of the partition offset: " + index);
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void seek(LogOffset logOffset) {
        if (!this.partition.equals(logOffset.partition())) {
            throw new IllegalStateException("Cannot seek, tailer " + this + " has no assignment for partition: " + logOffset);
        }
        log.debug("Seek to " + logOffset + " from tailer: " + this);
        if (!this.cqTailer.moveToIndex(logOffset.offset()) && this.cqTailer.index() != logOffset.offset()) {
            throw new IllegalStateException("Unable to seek to offset, " + this + " offset: " + logOffset);
        }
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void reset() {
        reset(new LogPartition(this.id.name, this.id.partition));
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void reset(LogPartition logPartition) {
        if (!this.partition.equals(logPartition)) {
            throw new IllegalArgumentException("Cannot reset this partition: " + logPartition + " from " + this.id);
        }
        log.info("Reset offset for partition: " + logPartition + " from tailer: " + this);
        this.cqTailer.toStart();
        commit(logPartition);
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public LogOffset offsetForTimestamp(LogPartition logPartition, long j) {
        throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp");
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public Collection<LogPartition> assignments() {
        return Collections.singletonList(new LogPartition(this.id.name, this.id.partition));
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public String group() {
        return this.id.group;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer, java.lang.AutoCloseable
    public void close() {
        if (this.closed) {
            return;
        }
        log.debug("Closing: " + toString());
        this.offsetTracker.close();
        unregisterTailer();
        this.closed = true;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public boolean closed() {
        return this.closed;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public Codec<M> getCodec() {
        return this.codec;
    }

    public String toString() {
        return "ChronicleLogTailer{basePath='" + this.basePath + "', id=" + this.id + ", closed=" + this.closed + ", codec=" + this.codec + '}';
    }
}
