package org.hobbit.sparql_snb.systems.neptune;

import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.auth.profile.internal.ProfileKeyConstants;
import com.amazonaws.services.neptune.model.DBClusterRoleAlreadyExistsException;
import com.google.common.io.CharStreams;
import com.google.gson.JsonObject;
import com.jcraft.jsch.ChannelExec;
import com.nurkiewicz.asyncretry.backoff.BoundedMaxBackoff;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import jdk.nashorn.internal.objects.Global;
import jdk.nashorn.internal.parser.JSONParser;
import jdk.nashorn.internal.runtime.Context;
import jdk.nashorn.internal.runtime.ErrorManager;
import jdk.nashorn.internal.runtime.options.Options;
import jdk.nashorn.internal.scripts.JO;
import org.aksw.jena_sparql_api.core.utils.UpdateRequestUtils;
import org.aksw.jena_sparql_api.http.QueryExecutionFactoryHttp;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.jena.atlas.json.io.JSWriter;
import org.apache.jena.rdf.model.NodeIterator;
import org.apache.jena.rdf.model.Property;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.rdf.model.ResIterator;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.sparql.sse.Tags;
import org.apache.jena.vocabulary.RDF;
import org.codehaus.plexus.configuration.processor.ConfigurationResourceHandler;
import org.hobbit.awscontroller.AWSController;
import org.hobbit.awscontroller.SSH.HSession;
import org.hobbit.awscontroller.SSH.SshConnector;
import org.hobbit.awscontroller.SSH.SshTunnelsProvider;
import org.hobbit.core.components.AbstractComponent;
import org.hobbit.sdk.JenaKeyValue;
import org.hobbit.sparql_snb.Constants;
import org.hobbit.sparql_snb.systems.TripleStoreSystemAdapter;
import org.hobbit.vocab.HOBBIT;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/hobbit/sparql_snb/systems/neptune/NeptuneSystemAdapter.class */
public class NeptuneSystemAdapter extends TripleStoreSystemAdapter {
    private static final String[] paramKeys = {SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR, SDKGlobalConfiguration.SECRET_KEY_ENV_VAR, "AWS_ROLE_ARN", SDKGlobalConfiguration.AWS_REGION_ENV_VAR};
    NeptuneClusterManager neptuneClusterManager;
    String bucketName;
    JenaKeyValue parameters;
    AWSController awsController;
    SshConnector sshConnector;
    String neptuneClusterIp;
    CloseableHttpClient httpclient;
    Context context;
    private String sparqlHostAndPort;
    String instanceType;
    Semaphore waitingMutex = new Semaphore(0);
    String clusterName = "neptune";
    String bastionHostUser = "ec2-user";
    String keyfilepath = "sshkeys/hobbit_2.pem";
    String neptuneClusterPort = "8182";
    int attemptsLimit = 12;

    @Override // org.hobbit.sparql_snb.systems.TripleStoreSystemAdapter, org.hobbit.core.components.AbstractSystemAdapter, org.hobbit.core.components.AbstractCommandReceivingComponent, org.hobbit.core.components.AbstractComponent, org.hobbit.core.components.Component
    public void init() throws Exception {
        this.LOGGER = LoggerFactory.getLogger(NeptuneSystemAdapter.class);
        super.init();
    }

    @Override // org.hobbit.sparql_snb.systems.TripleStoreSystemAdapter
    protected void initStage1() throws Exception {
        this.parameters = new JenaKeyValue.Builder().buildFrom(this.systemParamModel);
        this.bucketName = "data-storage-benchmark-neptune-system-adapter";
        this.httpclient = HttpClients.createDefault();
        ResIterator listResourcesWithProperty = this.systemParamModel.listResourcesWithProperty(RDF.type, (RDFNode) HOBBIT.Parameter);
        Property property = this.systemParamModel.getProperty("http://w3id.org/hobbit/vocab#defaultValue");
        while (listResourcesWithProperty.hasNext()) {
            Property property2 = this.systemParamModel.getProperty(((Resource) listResourcesWithProperty.next()).getURI());
            NodeIterator listObjectsOfProperty = this.systemParamModel.listObjectsOfProperty(property2, property);
            while (listObjectsOfProperty.hasNext()) {
                String string = listObjectsOfProperty.next().asLiteral().getString();
                if (!this.parameters.containsKey("http://w3id.org/bench#" + property2.getLocalName())) {
                    this.parameters.put("http://w3id.org/bench#" + property2.getLocalName(), string);
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        for (String str : paramKeys) {
            if (!this.parameters.containsKey("http://w3id.org/bench#" + str)) {
                arrayList.add(str);
            }
        }
        if (arrayList.size() > 0) {
            throw new Exception("Missing params: " + String.join(JSWriter.ArraySep, (CharSequence[]) arrayList.toArray(new String[0])));
        }
        String stringValueFor = this.parameters.getStringValueFor("http://w3id.org/bench#AWS_ACCESS_KEY_ID");
        String stringValueFor2 = this.parameters.getStringValueFor("http://w3id.org/bench#AWS_SECRET_KEY");
        String stringValueFor3 = this.parameters.getStringValueFor("http://w3id.org/bench#AWS_ROLE_ARN");
        String stringValueFor4 = this.parameters.getStringValueFor("http://w3id.org/bench#AWS_REGION");
        if (this.parameters.containsKey("http://w3id.org/bench#neptuneInstanceType")) {
            this.instanceType = this.parameters.getStringValueFor("http://w3id.org/bench#neptuneInstanceType");
        }
        this.awsController = new AWSController(stringValueFor, stringValueFor2, stringValueFor3, stringValueFor4);
        try {
            this.awsController.init();
        } catch (Exception e) {
            this.LOGGER.error("Failed to init aws controller: {}", e.getLocalizedMessage());
        }
        initClusterManager();
        Options options = new Options("nashorn");
        options.set("anon.functions", true);
        options.set("parse.only", true);
        options.set("scripting", true);
        this.context = new Context(options, new ErrorManager(), Thread.currentThread().getContextClassLoader());
        this.queryEndpoint = "http://" + this.sparqlHostAndPort + "/sparql";
        this.updateEndpoint = this.queryEndpoint;
    }

    public void initClusterManager() throws Exception {
        HashMap hashMap = new HashMap();
        if (this.instanceType != null) {
            hashMap.put("DbInstanceType", this.instanceType);
        }
        this.neptuneClusterManager = new NeptuneClusterManager(this.awsController, this.clusterName, Constants.SSH_KEY_NAME, hashMap);
        this.neptuneClusterManager.createCluster();
        String dBClusterEndpoint = this.neptuneClusterManager.getDBClusterEndpoint();
        this.LOGGER.info("Resolving neptune cluster host {}", dBClusterEndpoint);
        final String str = "getent hosts " + dBClusterEndpoint + " | awk '{print $1}'";
        String publicIpAddress = this.neptuneClusterManager.getBastion().getPublicIpAddress();
        HSession hSession = new HSession(this.bastionHostUser, publicIpAddress, 22, this.keyfilepath);
        this.sshConnector = SshConnector.getInstance();
        this.sshConnector.openTunnel(hSession, new Function<HSession, String>() { // from class: org.hobbit.sparql_snb.systems.neptune.NeptuneSystemAdapter.1
            @Override // java.util.function.Function
            public String apply(HSession hSession2) {
                try {
                    ChannelExec channelExec = (ChannelExec) hSession2.getSession().openChannel(Tags.tagExec);
                    channelExec.setCommand(str);
                    channelExec.setInputStream(null);
                    InputStream inputStream = channelExec.getInputStream();
                    channelExec.connect();
                    NeptuneSystemAdapter.this.neptuneClusterIp = CharStreams.toString(new InputStreamReader(inputStream)).trim();
                    NeptuneSystemAdapter.this.waitingMutex.release();
                    channelExec.disconnect();
                    return null;
                } catch (Exception e) {
                    NeptuneSystemAdapter.this.LOGGER.error("Failed to resolve hostname: {}", e.getMessage());
                    return null;
                }
            }
        }, (Function<HSession, String>) null);
        this.waitingMutex.acquire();
        this.LOGGER.info("Neptune cluster host was resolved to {}", this.neptuneClusterIp);
        SshTunnelsProvider.init(new HSession(this.bastionHostUser, publicIpAddress, 22, this.keyfilepath, new String[]{this.neptuneClusterIp + ":" + this.neptuneClusterPort}, null), new Function<HSession, String>() { // from class: org.hobbit.sparql_snb.systems.neptune.NeptuneSystemAdapter.2
            @Override // java.util.function.Function
            public String apply(HSession hSession2) {
                Map<Integer, Integer> forwardings = hSession2.getForwardings();
                NeptuneSystemAdapter.this.LOGGER.info("SSH connection to {} established. Ports forwardings: {}", hSession2.getHost(), forwardings.toString());
                NeptuneSystemAdapter.this.sparqlHostAndPort = "localhost:" + String.valueOf(forwardings.get(8182));
                NeptuneSystemAdapter.this.LOGGER.info("Sparql endpoint: {}", "http://" + NeptuneSystemAdapter.this.sparqlHostAndPort + "/sparql");
                NeptuneSystemAdapter.this.queryExecFactory = new QueryExecutionFactoryHttp("http://" + NeptuneSystemAdapter.this.sparqlHostAndPort + "/sparql");
                NeptuneSystemAdapter.this.waitingMutex.release();
                return null;
            }
        }, new Function<HSession, String>() { // from class: org.hobbit.sparql_snb.systems.neptune.NeptuneSystemAdapter.3
            @Override // java.util.function.Function
            public String apply(HSession hSession2) {
                NeptuneSystemAdapter.this.LOGGER.info("Ssh connection lost");
                return null;
            }
        });
        SshTunnelsProvider.newSshTunnel(null, null);
        this.waitingMutex.acquire();
    }

    @Override // org.hobbit.sparql_snb.systems.TripleStoreSystemAdapter
    protected void postInit() throws Exception {
        this.LOGGER.info("Deleting all triples before loading");
        try {
            this.updateExecFactory.createUpdateProcessor(UpdateRequestUtils.parse("CLEAR ALL")).execute();
        } catch (Exception e) {
            this.LOGGER.error("Failed to execute update request: {}", e.getLocalizedMessage());
        }
        this.LOGGER.info("Creating bucket {}", this.bucketName);
        this.awsController.createBucket(this.bucketName);
        this.LOGGER.info("Uploading files to bucket {}", this.bucketName);
        int i = 0;
        for (File file : new File(this.datasetFolderName).listFiles()) {
            try {
                this.awsController.putObjectToS3(this.bucketName, file);
                this.LOGGER.info("{} loaded to s3", file.getName());
            } catch (Exception e2) {
                this.LOGGER.error("Could not put {} to S3: {}", file.getName(), e2.getLocalizedMessage());
                i++;
            }
        }
        if (i > 0) {
            throw new Exception("Datasets not loaded to s3. See exceptions");
        }
        try {
            this.neptuneClusterManager.addRoleARN();
        } catch (DBClusterRoleAlreadyExistsException e3) {
            this.LOGGER.info(e3.getLocalizedMessage());
        }
    }

    @Override // org.hobbit.sparql_snb.systems.TripleStoreSystemAdapter
    protected void loadDataset(String str) throws Exception {
        int i = 0;
        File file = new File(this.datasetFolderName);
        String loaderRoleArn = this.neptuneClusterManager.getLoaderRoleArn();
        for (File file2 : file.listFiles()) {
            String str2 = "http://" + this.sparqlHostAndPort + "/loader";
            String str3 = "s3://" + this.bucketName + "/" + file2.getName();
            HttpPost httpPost = new HttpPost(str2);
            httpPost.setHeader("Accept", "application/json");
            httpPost.setHeader("Content-Type", "application/json");
            JsonObject jsonObject = new JsonObject();
            jsonObject.addProperty(ConfigurationResourceHandler.SOURCE, str3);
            jsonObject.addProperty("format", "turtle");
            jsonObject.addProperty("iamRoleArn", loaderRoleArn);
            jsonObject.addProperty(ProfileKeyConstants.REGION, this.awsController.getRegion());
            httpPost.setEntity(new StringEntity(jsonObject.toString(), ContentType.APPLICATION_JSON));
            int i2 = 0;
            boolean z = false;
            while (!z) {
                HttpEntity httpEntity = null;
                try {
                    this.LOGGER.info("Sending request to {}", str2);
                    httpEntity = this.httpclient.execute((HttpUriRequest) httpPost).getEntity();
                } catch (Exception e) {
                    this.LOGGER.error("Failed to send loader request for {}: {}", str3, e.getMessage());
                    i++;
                }
                String str4 = null;
                if (httpEntity != null) {
                    try {
                        InputStream content = httpEntity.getContent();
                        Scanner useDelimiter = new Scanner(content).useDelimiter("\\A");
                        str4 = useDelimiter.hasNext() ? useDelimiter.next() : "";
                        content.close();
                    } catch (Exception e2) {
                        this.LOGGER.error("Failed to read loader response for {}: {}", str3, e2.getMessage());
                        i++;
                    }
                }
                if (str4 == null || !str4.contains("200 OK")) {
                    this.LOGGER.warn("File not loaded: {}", str4);
                    i2++;
                    if (i2 >= this.attemptsLimit) {
                        break;
                    }
                    this.LOGGER.info("Trying another attempt ({}/{}) in 10s", Integer.valueOf(i2), Integer.valueOf(this.attemptsLimit));
                    Thread.sleep(BoundedMaxBackoff.DEFAULT_MAX_DELAY_MILLIS);
                } else {
                    String obj = ((JO) ((JO) new JSONParser(str4, new Global(this.context), false).parse()).get("payload")).get("loadId").toString();
                    JO loaderStatus = getLoaderStatus(obj);
                    if (String.valueOf(loaderStatus.get("status")).equals("LOAD_IN_PROGRESS")) {
                        this.LOGGER.info("Waiting for LOAD_COMPLETED for {}: {}", str3, "http://" + this.sparqlHostAndPort + "/loader?loadId=" + obj);
                    }
                    while (loaderStatus.get("status").equals("LOAD_IN_PROGRESS")) {
                        loaderStatus = getLoaderStatus(obj);
                        try {
                            Thread.sleep(AbstractComponent.START_WAITING_TIME_BEFORE_RETRY);
                        } catch (InterruptedException e3) {
                            this.LOGGER.debug("Thread sleep exception: {}", e3.getLocalizedMessage());
                            i++;
                        }
                        this.LOGGER.debug("{} records loaded", loaderStatus.get("totalRecords"));
                    }
                    if (loaderStatus.get("status").equals("LOAD_FAILED")) {
                        this.LOGGER.error("Load failed for {}", str3);
                        i++;
                    } else {
                        this.LOGGER.info("{} for {}. Loaded {} records in {} seconds", loaderStatus.get("status"), str3, loaderStatus.get("totalRecords"), loaderStatus.get("totalTimeSpent"));
                        z = true;
                    }
                }
            }
            if (!z) {
                this.LOGGER.error("File not loaded after {} attempts. See exceptions above", Integer.valueOf(this.attemptsLimit));
                i++;
            }
        }
        if (i > 0) {
            throw new Exception("Not all datasets were loaded. See exceptions");
        }
        this.LOGGER.info("Datasets are to loaded");
    }

    public JO getLoaderStatus(String str) {
        String str2 = null;
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.httpclient.execute((HttpUriRequest) new HttpGet("http://" + this.sparqlHostAndPort + "/loader?loadId=" + str)).getEntity().getContent()));
            StringBuffer stringBuffer = new StringBuffer();
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    break;
                }
                stringBuffer.append(readLine);
            }
            str2 = stringBuffer.toString();
        } catch (Exception e) {
            this.LOGGER.error("Failed to get loader status: {}", e.getLocalizedMessage());
        }
        try {
            return (JO) ((JO) ((JO) new JSONParser(str2, new Global(this.context), false).parse()).get("payload")).get("overallStatus");
        } catch (Exception e2) {
            this.LOGGER.error("Failed to parse loader status: {}", e2.getLocalizedMessage());
            return null;
        }
    }

    @Override // org.hobbit.sparql_snb.systems.TripleStoreSystemAdapter, org.hobbit.core.components.AbstractSystemAdapter, org.hobbit.core.components.AbstractCommandReceivingComponent, org.hobbit.core.components.AbstractComponent, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.LOGGER.warn("Skipping cluster deletion");
        this.LOGGER.info("Closing ssh sessions");
        this.sshConnector.closeSessions();
        this.LOGGER.info("Sessions have been closed");
        super.close();
    }
}
