package org.hobbit.core.components;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.hobbit.core.Constants;
import org.hobbit.core.rabbit.DataHandler;
import org.hobbit.core.rabbit.DataReceiver;
import org.hobbit.core.rabbit.DataReceiverImpl;
import org.hobbit.core.rabbit.DataSender;
import org.hobbit.core.rabbit.DataSenderImpl;
import org.hobbit.core.rabbit.RabbitMQUtils;
import org.hobbit.utils.EnvVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/hobbit/core/components/AbstractSystemAdapter.class */
public abstract class AbstractSystemAdapter extends AbstractPlatformConnectorComponent implements GeneratedDataReceivingComponent, TaskReceivingComponent {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSystemAdapter.class);
    private static final int DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES = 100;
    private Semaphore terminateMutex;
    private Exception cause;
    private Semaphore causeMutex;
    private final int maxParallelProcessedMsgs;
    protected DataReceiver dataGenReceiver;
    protected DataReceiver taskGenReceiver;
    protected DataSender sender2EvalStore;
    protected Model systemParamModel;

    public AbstractSystemAdapter() {
        this(100);
    }

    public AbstractSystemAdapter(int i) {
        this.terminateMutex = new Semaphore(0);
        this.causeMutex = new Semaphore(1);
        this.maxParallelProcessedMsgs = i;
        this.defaultContainerType = "system";
    }

    @Override // org.hobbit.core.components.AbstractCommandReceivingComponent, org.hobbit.core.components.AbstractComponent, org.hobbit.core.components.Component
    public void init() throws Exception {
        super.init();
        this.systemParamModel = EnvVariables.getModel(Constants.SYSTEM_PARAMETERS_MODEL_KEY, (Supplier<Model>) () -> {
            return ModelFactory.createDefaultModel();
        }, LOGGER);
        this.dataGenReceiver = DataReceiverImpl.builder().maxParallelProcessedMsgs(this.maxParallelProcessedMsgs).queue(this.incomingDataQueueFactory, generateSessionQueueName(Constants.DATA_GEN_2_SYSTEM_QUEUE_NAME)).dataHandler(new DataHandler() { // from class: org.hobbit.core.components.AbstractSystemAdapter.1
            @Override // org.hobbit.core.rabbit.DataHandler
            public void handleData(byte[] bArr) {
                AbstractSystemAdapter.this.receiveGeneratedData(bArr);
            }
        }).build();
        this.taskGenReceiver = DataReceiverImpl.builder().maxParallelProcessedMsgs(this.maxParallelProcessedMsgs).queue(this.incomingDataQueueFactory, generateSessionQueueName(Constants.TASK_GEN_2_SYSTEM_QUEUE_NAME)).dataHandler(new DataHandler() { // from class: org.hobbit.core.components.AbstractSystemAdapter.2
            @Override // org.hobbit.core.rabbit.DataHandler
            public void handleData(byte[] bArr) {
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                AbstractSystemAdapter.this.receiveGeneratedTask(RabbitMQUtils.readString(wrap), RabbitMQUtils.readByteArray(wrap));
            }
        }).build();
        this.sender2EvalStore = DataSenderImpl.builder().queue(getFactoryForOutgoingDataQueues(), generateSessionQueueName(Constants.SYSTEM_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME)).build();
    }

    @Override // org.hobbit.core.components.Component
    public void run() throws Exception {
        sendToCmdQueue((byte) 1);
        this.terminateMutex.acquire();
        try {
            this.causeMutex.acquire();
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted while waiting to set the termination cause.");
        }
        if (this.cause != null) {
            throw this.cause;
        }
        this.causeMutex.release();
        this.dataGenReceiver.closeWhenFinished();
        this.taskGenReceiver.closeWhenFinished();
    }

    @Override // org.hobbit.core.components.AbstractPlatformConnectorComponent, org.hobbit.core.components.CommandReceivingComponent
    public void receiveCommand(byte b, byte[] bArr) {
        if (b == 15) {
            terminate(null);
        }
        super.receiveCommand(b, bArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendResultToEvalStorage(String str, byte[] bArr) throws IOException {
        byte[] bytes = str.getBytes(Charsets.UTF_8);
        ByteBuffer allocate = ByteBuffer.allocate(8 + bytes.length + bArr.length);
        allocate.putInt(bytes.length);
        allocate.put(bytes);
        allocate.putInt(bArr.length);
        allocate.put(bArr);
        this.sender2EvalStore.sendData(allocate.array());
    }

    protected synchronized void terminate(Exception exc) {
        if (exc != null) {
            try {
                this.causeMutex.acquire();
                this.cause = exc;
                this.causeMutex.release();
            } catch (InterruptedException e) {
                LOGGER.error("Interrupted while waiting to set the termination cause.");
            }
        }
        this.terminateMutex.release();
    }

    @Override // org.hobbit.core.components.AbstractCommandReceivingComponent, org.hobbit.core.components.AbstractComponent, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOUtils.closeQuietly(this.dataGenReceiver);
        IOUtils.closeQuietly(this.taskGenReceiver);
        this.sender2EvalStore.closeWhenFinished();
        super.close();
    }
}
