/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.runtime.stream.tests;

import java.io.Externalizable;
import java.time.Duration;
import javax.inject.Inject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.StreamManager;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.runtime.stream.StreamService;
import org.nuxeo.runtime.test.runner.Deploy;
import org.nuxeo.runtime.test.runner.Deploys;
import org.nuxeo.runtime.test.runner.Features;
import org.nuxeo.runtime.test.runner.FeaturesRunner;
import org.nuxeo.runtime.test.runner.RuntimeFeature;

@RunWith(value=FeaturesRunner.class)
@Features(value={RuntimeFeature.class})
@Deploys(value={@Deploy(value={"org.nuxeo.runtime.stream"}), @Deploy(value={"org.nuxeo.runtime.stream:test-stream-contrib.xml"})})
public class TestStreamService {
    @Inject
    public StreamService service;

    @Test
    public void testLogManagerAccess() {
        Assert.assertNotNull((Object)this.service);
        LogManager manager = this.service.getLogManager("default");
        Assert.assertNotNull((Object)manager);
        LogManager manager2 = this.service.getLogManager("import");
        Assert.assertNotNull((Object)manager2);
        try {
            this.service.getLogManager("unknown");
            Assert.fail((String)"Expected exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.service.getLogManager("customDisabled");
            Assert.fail((String)"Expected exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        LogManager manager3 = this.service.getLogManager("default");
        Assert.assertNotNull((Object)manager3);
        manager3.exists("input");
        Assert.assertEquals((long)1L, (long)manager3.size("input"));
    }

    @Test
    public void testBasicLogUsage() throws Exception {
        LogManager manager = this.service.getLogManager("default");
        String logName = "myLog";
        String key = "a key";
        String value = "a value";
        LogAppender appender = manager.getAppender(logName);
        appender.append(key, (Externalizable)Record.of((String)key, (byte[])value.getBytes("UTF-8")));
        try (LogTailer tailer = manager.createTailer("myGroup", logName);){
            LogRecord logRecord = tailer.read(Duration.ofSeconds(1L));
            Assert.assertNotNull((Object)logRecord);
            Assert.assertEquals((Object)key, (Object)((Record)logRecord.message()).getKey());
            Assert.assertEquals((Object)value, (Object)new String(((Record)logRecord.message()).getData(), "UTF-8"));
        }
    }

    @Test
    public void testStreamProcessor() throws Exception {
        LogManager manager = this.service.getLogManager("default");
        StreamManager streamManager = this.service.getStreamManager("default");
        LogTailer tailer = manager.createTailer("counter", "output");
        String key = "a key";
        String value = "a value";
        streamManager.append("input", Record.of((String)key, (byte[])value.getBytes("UTF-8")));
        streamManager.append("input", Record.of((String)"skipMeNow", null));
        streamManager.append("input", Record.of((String)"changeMeNow", null));
        LogRecord logRecord = tailer.read(Duration.ofSeconds(1L));
        Assert.assertNotNull((String)"Record not found in output stream", (Object)logRecord);
        Assert.assertEquals((Object)key, (Object)((Record)logRecord.message()).getKey());
        Assert.assertEquals((Object)value, (Object)new String(((Record)logRecord.message()).getData(), "UTF-8"));
        logRecord = tailer.read(Duration.ofSeconds(1L));
        Assert.assertNotNull((String)"Record not found in output stream", (Object)logRecord);
        Assert.assertEquals((Object)"changedNow", (Object)((Record)logRecord.message()).getKey());
    }

    @Test
    public void testDisabledStreamProcessor() throws Exception {
        StreamManager streamManager = this.service.getStreamManager("default");
        try {
            streamManager.append("streamThatDoesNotExist", Record.of((String)"key", null));
            Assert.fail((String)"Expected exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            streamManager.append("input2", Record.of((String)"key", null));
            Assert.fail((String)"Expected exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testDefaultPartitions() throws Exception {
        LogManager manager = this.service.getLogManager("default");
        String streamName = "s1";
        Assert.assertTrue((boolean)manager.exists(streamName));
        Assert.assertEquals((long)2L, (long)manager.size(streamName));
    }

    @Test
    public void testRegisterOnlyStreamProcessor() throws Exception {
        LogManager manager = this.service.getLogManager("default");
        Assert.assertTrue((boolean)manager.exists("input3"));
        Assert.assertTrue((boolean)manager.exists("output3"));
        Assert.assertTrue((boolean)manager.exists("registerInput"));
        StreamManager streamManager = this.service.getStreamManager("default");
        streamManager.append("input3", Record.of((String)"key", null));
        LogTailer tailer = manager.createTailer("test", "output3");
        Assert.assertNull((Object)tailer.read(Duration.ofSeconds(1L)));
        StreamProcessor processor = streamManager.createStreamProcessor("registerProcessor");
        Assert.assertNotNull((Object)processor);
        processor.start();
        processor.drainAndStop(Duration.ofSeconds(5L));
        Assert.assertEquals((Object)"key", (Object)((Record)tailer.read(Duration.ofSeconds(1L)).message()).getKey());
    }
}

