/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.functions.response;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.data.model.ExprBooleanValue;
import org.opensearch.sql.data.model.ExprByteValue;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprFloatValue;
import org.opensearch.sql.data.model.ExprIntegerValue;
import org.opensearch.sql.data.model.ExprLongValue;
import org.opensearch.sql.data.model.ExprShortValue;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.spark.functions.response.SparkSqlFunctionResponseHandle;

public class DefaultSparkSqlFunctionResponseHandle
implements SparkSqlFunctionResponseHandle {
    private Iterator<ExprValue> responseIterator;
    private ExecutionEngine.Schema schema;
    private static final Logger logger = LogManager.getLogger(DefaultSparkSqlFunctionResponseHandle.class);

    public DefaultSparkSqlFunctionResponseHandle(JSONObject responseObject) {
        this.constructIteratorAndSchema(responseObject);
    }

    private void constructIteratorAndSchema(JSONObject responseObject) {
        ArrayList<ExprTupleValue> result = new ArrayList<ExprTupleValue>();
        JSONObject items = responseObject.getJSONObject("data");
        logger.info("Spark Application ID: " + items.getString("applicationId"));
        List<ExecutionEngine.Schema.Column> columnList = this.getColumnList(items.getJSONArray("schema"));
        for (int i = 0; i < items.getJSONArray("result").length(); ++i) {
            JSONObject row = new JSONObject(items.getJSONArray("result").get(i).toString().replace("'", "\""));
            LinkedHashMap<String, ExprValue> linkedHashMap = DefaultSparkSqlFunctionResponseHandle.extractRow(row, columnList);
            result.add(new ExprTupleValue(linkedHashMap));
        }
        this.schema = new ExecutionEngine.Schema(columnList);
        this.responseIterator = result.iterator();
    }

    private static LinkedHashMap<String, ExprValue> extractRow(JSONObject row, List<ExecutionEngine.Schema.Column> columnList) {
        LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<String, ExprValue>();
        for (ExecutionEngine.Schema.Column column : columnList) {
            ExprType type = column.getExprType();
            if (type == ExprCoreType.BOOLEAN) {
                linkedHashMap.put(column.getName(), (ExprValue)ExprBooleanValue.of((Boolean)row.getBoolean(column.getName())));
                continue;
            }
            if (type == ExprCoreType.LONG) {
                linkedHashMap.put(column.getName(), (ExprValue)new ExprLongValue((Number)row.getLong(column.getName())));
                continue;
            }
            if (type == ExprCoreType.INTEGER) {
                linkedHashMap.put(column.getName(), (ExprValue)new ExprIntegerValue((Number)row.getInt(column.getName())));
                continue;
            }
            if (type == ExprCoreType.SHORT) {
                linkedHashMap.put(column.getName(), (ExprValue)new ExprShortValue((Number)row.getInt(column.getName())));
                continue;
            }
            if (type == ExprCoreType.BYTE) {
                linkedHashMap.put(column.getName(), (ExprValue)new ExprByteValue((Number)row.getInt(column.getName())));
                continue;
            }
            if (type == ExprCoreType.DOUBLE) {
                linkedHashMap.put(column.getName(), (ExprValue)new ExprDoubleValue((Number)row.getDouble(column.getName())));
                continue;
            }
            if (type == ExprCoreType.FLOAT) {
                linkedHashMap.put(column.getName(), (ExprValue)new ExprFloatValue((Number)Float.valueOf(row.getFloat(column.getName()))));
                continue;
            }
            if (type == ExprCoreType.DATE) {
                linkedHashMap.put(column.getName(), (ExprValue)new ExprStringValue(row.getString(column.getName())));
                continue;
            }
            if (type == ExprCoreType.TIMESTAMP) {
                linkedHashMap.put(column.getName(), (ExprValue)new ExprTimestampValue(row.getString(column.getName())));
                continue;
            }
            if (type == ExprCoreType.STRING) {
                linkedHashMap.put(column.getName(), (ExprValue)new ExprStringValue(DefaultSparkSqlFunctionResponseHandle.jsonString(row, column.getName())));
                continue;
            }
            throw new RuntimeException("Result contains invalid data type");
        }
        return linkedHashMap;
    }

    private List<ExecutionEngine.Schema.Column> getColumnList(JSONArray schema) {
        ArrayList<ExecutionEngine.Schema.Column> columnList = new ArrayList<ExecutionEngine.Schema.Column>();
        for (int i = 0; i < schema.length(); ++i) {
            JSONObject column = new JSONObject(schema.get(i).toString().replace("'", "\""));
            columnList.add(new ExecutionEngine.Schema.Column(column.get("column_name").toString(), column.get("column_name").toString(), (ExprType)this.getDataType(column.get("data_type").toString())));
        }
        return columnList;
    }

    private ExprCoreType getDataType(String sparkDataType) {
        switch (sparkDataType) {
            case "boolean": {
                return ExprCoreType.BOOLEAN;
            }
            case "long": {
                return ExprCoreType.LONG;
            }
            case "integer": {
                return ExprCoreType.INTEGER;
            }
            case "short": {
                return ExprCoreType.SHORT;
            }
            case "byte": {
                return ExprCoreType.BYTE;
            }
            case "double": {
                return ExprCoreType.DOUBLE;
            }
            case "float": {
                return ExprCoreType.FLOAT;
            }
            case "timestamp": {
                return ExprCoreType.DATE;
            }
            case "date": {
                return ExprCoreType.TIMESTAMP;
            }
            case "string": 
            case "varchar": 
            case "char": {
                return ExprCoreType.STRING;
            }
        }
        return ExprCoreType.UNKNOWN;
    }

    private static String jsonString(JSONObject jsonObject, String key) {
        return jsonObject.has(key) ? jsonObject.getString(key) : "";
    }

    @Override
    public boolean hasNext() {
        return this.responseIterator.hasNext();
    }

    @Override
    public ExprValue next() {
        return this.responseIterator.next();
    }

    @Override
    public ExecutionEngine.Schema schema() {
        return this.schema;
    }
}

