/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.asyncquery;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.ArrayList;
import java.util.Optional;
import lombok.Generated;
import org.json.JSONObject;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.functions.response.DefaultSparkSqlFunctionResponseHandle;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;

public class AsyncQueryExecutorServiceImpl
implements AsyncQueryExecutorService {
    private AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService;
    private SparkQueryDispatcher sparkQueryDispatcher;
    private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;
    private Boolean isSparkJobExecutionEnabled;

    public AsyncQueryExecutorServiceImpl() {
        this.isSparkJobExecutionEnabled = Boolean.FALSE;
    }

    public AsyncQueryExecutorServiceImpl(AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService, SparkQueryDispatcher sparkQueryDispatcher, SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier) {
        this.isSparkJobExecutionEnabled = Boolean.TRUE;
        this.asyncQueryJobMetadataStorageService = asyncQueryJobMetadataStorageService;
        this.sparkQueryDispatcher = sparkQueryDispatcher;
        this.sparkExecutionEngineConfigSupplier = sparkExecutionEngineConfigSupplier;
    }

    @Override
    public CreateAsyncQueryResponse createAsyncQuery(CreateAsyncQueryRequest createAsyncQueryRequest) {
        this.validateSparkExecutionEngineSettings();
        SparkExecutionEngineConfig sparkExecutionEngineConfig = this.sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig();
        DispatchQueryResponse dispatchQueryResponse = this.sparkQueryDispatcher.dispatch(new DispatchQueryRequest(sparkExecutionEngineConfig.getApplicationId(), createAsyncQueryRequest.getQuery(), createAsyncQueryRequest.getDatasource(), createAsyncQueryRequest.getLang(), sparkExecutionEngineConfig.getExecutionRoleARN(), sparkExecutionEngineConfig.getClusterName(), sparkExecutionEngineConfig.getSparkSubmitParameters()));
        this.asyncQueryJobMetadataStorageService.storeJobMetadata(new AsyncQueryJobMetadata(sparkExecutionEngineConfig.getApplicationId(), dispatchQueryResponse.getJobId(), dispatchQueryResponse.isDropIndexQuery(), dispatchQueryResponse.getResultIndex()));
        return new CreateAsyncQueryResponse(dispatchQueryResponse.getJobId());
    }

    @Override
    public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
        this.validateSparkExecutionEngineSettings();
        Optional<AsyncQueryJobMetadata> jobMetadata = this.asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
        if (jobMetadata.isPresent()) {
            JSONObject jsonObject = this.sparkQueryDispatcher.getQueryResponse(jobMetadata.get());
            if (JobRunState.SUCCESS.toString().equals(jsonObject.getString("status"))) {
                DefaultSparkSqlFunctionResponseHandle sparkSqlFunctionResponseHandle = new DefaultSparkSqlFunctionResponseHandle(jsonObject);
                ArrayList<ExprValue> result = new ArrayList<ExprValue>();
                while (sparkSqlFunctionResponseHandle.hasNext()) {
                    result.add(sparkSqlFunctionResponseHandle.next());
                }
                return new AsyncQueryExecutionResponse(JobRunState.SUCCESS.toString(), sparkSqlFunctionResponseHandle.schema(), result, null);
            }
            return new AsyncQueryExecutionResponse(jsonObject.optString("status", JobRunState.FAILED.toString()), null, null, jsonObject.optString("error", ""));
        }
        throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
    }

    @Override
    public String cancelQuery(String queryId) {
        Optional<AsyncQueryJobMetadata> asyncQueryJobMetadata = this.asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
        if (asyncQueryJobMetadata.isPresent()) {
            return this.sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata.get());
        }
        throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
    }

    private void validateSparkExecutionEngineSettings() {
        if (!this.isSparkJobExecutionEnabled.booleanValue()) {
            throw new IllegalArgumentException(String.format("Async Query APIs are disabled as %s is not configured in cluster settings. Please configure the setting and restart the domain to enable Async Query APIs", Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue()));
        }
    }

    @Generated
    public AsyncQueryExecutorServiceImpl(AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService, SparkQueryDispatcher sparkQueryDispatcher, SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier, Boolean isSparkJobExecutionEnabled) {
        this.asyncQueryJobMetadataStorageService = asyncQueryJobMetadataStorageService;
        this.sparkQueryDispatcher = sparkQueryDispatcher;
        this.sparkExecutionEngineConfigSupplier = sparkExecutionEngineConfigSupplier;
        this.isSparkJobExecutionEnabled = isSparkJobExecutionEnabled;
    }
}

