package org.nuxeo.lib.stream.computation;

import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/nuxeo/lib/stream/computation/AbstractBatchComputation.class */
public abstract class AbstractBatchComputation extends AbstractComputation {
    private static final Log log = LogFactory.getLog(AbstractBatchComputation.class);
    public static final String TIMER_BATCH = "batch";
    protected List<Record> batchRecords;
    protected String currentInputStream;
    protected boolean newBatch;
    protected long thresholdMillis;
    protected boolean removeLastRecordOnRetry;

    public AbstractBatchComputation(String str, int i, int i2) {
        super(str, i, i2);
        this.newBatch = true;
    }

    protected abstract void batchProcess(ComputationContext computationContext, String str, List<Record> list);

    public abstract void batchFailure(ComputationContext computationContext, String str, List<Record> list);

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void init(ComputationContext computationContext) {
        this.thresholdMillis = computationContext.getPolicy().getBatchThreshold().toMillis();
        computationContext.setTimer(TIMER_BATCH, System.currentTimeMillis() + this.thresholdMillis);
        this.batchRecords = new ArrayList(computationContext.getPolicy().batchCapacity);
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void processTimer(ComputationContext computationContext, String str, long j) {
        if (TIMER_BATCH.equals(str)) {
            if (!this.batchRecords.isEmpty()) {
                batchProcess(computationContext);
            }
            computationContext.setTimer(TIMER_BATCH, System.currentTimeMillis() + this.thresholdMillis);
        }
    }

    @Override // org.nuxeo.lib.stream.computation.Computation
    public void processRecord(ComputationContext computationContext, String str, Record record) {
        if (!str.equals(this.currentInputStream) && !this.batchRecords.isEmpty()) {
            batchProcess(computationContext);
        }
        if (this.newBatch) {
            this.currentInputStream = str;
            this.newBatch = false;
        }
        this.batchRecords.add(record);
        if (this.batchRecords.size() >= computationContext.getPolicy().getBatchCapacity()) {
            this.removeLastRecordOnRetry = true;
            batchProcess(computationContext);
            this.removeLastRecordOnRetry = false;
        }
    }

    private void batchProcess(ComputationContext computationContext) {
        batchProcess(computationContext, this.currentInputStream, this.batchRecords);
        checkpointBatch(computationContext);
    }

    protected void checkpointBatch(ComputationContext computationContext) {
        computationContext.askForCheckpoint();
        this.batchRecords.clear();
        this.newBatch = true;
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void processRetry(ComputationContext computationContext, Throwable th) {
        if (this.removeLastRecordOnRetry) {
            this.batchRecords.remove(this.batchRecords.size() - 1);
            this.removeLastRecordOnRetry = false;
        }
        log.warn(String.format("Computation: %s fails to process batch of %d records, last record: %s, retrying ...", this.metadata.name(), Integer.valueOf(this.batchRecords.size()), computationContext.getLastOffset()), th);
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void processFailure(ComputationContext computationContext, Throwable th) {
        log.error(String.format("Computation: %s fails to process batch of %d records after retries, last record: %s, policy: %s", this.metadata.name(), Integer.valueOf(this.batchRecords.size()), computationContext.getLastOffset(), computationContext.getPolicy()), th);
        batchFailure(computationContext, this.currentInputStream, this.batchRecords);
        this.batchRecords.clear();
        this.newBatch = true;
    }
}
