package io.karatelabs.kafka;

import com.intuit.karate.FileUtils;
import com.intuit.karate.Json;
import com.intuit.karate.core.ChannelSession;
import com.intuit.karate.graal.JsArray;
import com.intuit.karate.graal.JsEngine;
import com.intuit.karate.graal.JsFunction;
import com.intuit.karate.graal.JsValue;
import com.intuit.karate.graal.Methods;
import com.intuit.karate.shell.Command;
import io.karatelabs.avro.AvroDeserializer;
import io.karatelabs.avro.AvroUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.graalvm.polyglot.Value;
import org.graalvm.polyglot.proxy.ProxyObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/karatelabs/kafka/KarateKafkaConsumer.class */
public class KarateKafkaConsumer implements ChannelSession, ProxyObject {
    private static final Logger logger = LoggerFactory.getLogger(KarateKafkaConsumer.class);
    private static final String TOPIC = "topic";
    private static final String FILTER = "filter";
    private static final String COUNT = "count";
    private static final String COLLECT = "collect";
    private static final String SCHEMA = "schema";
    private static final String HEADER = "header";
    private static final String HEADERS = "headers";
    private static final String START = "start";
    private static final String[] KEYS = {TOPIC, FILTER, COUNT, COLLECT, SCHEMA, HEADER, HEADERS, START};
    private static final Set<String> KEY_SET = new HashSet(Arrays.asList(KEYS));
    private static final JsArray KEY_ARRAY = new JsArray(KEYS);
    private final KafkaConsumer<String, Object> kafka;
    private JsFunction.Executable function;
    private String schemaName;
    private String topic = null;
    private int messageCount = 1;
    private Map<String, String> headers = new LinkedHashMap();
    private final CompletableFuture<Boolean> partitionFuture = new CompletableFuture<>();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final List results = new ArrayList();
    public final CompletableFuture<List> resultsFuture = new CompletableFuture<>();
    private final Methods.FunVar HEADER_FUNCTION = objArr -> {
        header(objArr[0].toString(), objArr[1].toString());
        return this;
    };

    public KarateKafkaConsumer(Map<String, Object> map) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:29092");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        String uuid = UUID.randomUUID().toString();
        logger.debug("consumer group id: {}", uuid);
        properties.put("group.id", uuid);
        properties.putAll(map);
        if (map.containsKey("schema.registry.url")) {
            properties.put("value.deserializer", AvroDeserializer.class.getName());
            String str = (String) map.get("schema.registry.url");
            if (str != null) {
                Command.waitForHttp(str + "/subjects");
            }
        } else {
            properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        }
        this.kafka = new KafkaConsumer<>(properties);
    }

    public static Map<String, Object> toMap(ConsumerRecord<String, Object> consumerRecord) {
        HashMap hashMap = new HashMap();
        hashMap.put("offset", Long.valueOf(consumerRecord.offset()));
        HashMap hashMap2 = new HashMap();
        hashMap.put(HEADERS, hashMap2);
        for (Header header : consumerRecord.headers().toArray()) {
            hashMap2.put(header.key(), FileUtils.toString(header.value()));
        }
        hashMap.put("key", consumerRecord.key());
        Object value = consumerRecord.value();
        if (value instanceof GenericRecord) {
            hashMap.put("value", Json.of(AvroUtils.toJson((GenericRecord) value)).value());
        } else {
            if (!(value instanceof byte[])) {
                String str = "unknown type after deserialization: " + value.getClass();
                logger.error(str);
                throw new RuntimeException(str);
            }
            hashMap.put("value", Json.of(FileUtils.toString((byte[]) value)).value());
        }
        return hashMap;
    }

    public Void start() {
        this.kafka.subscribe(Collections.singletonList(this.topic));
        logger.debug("subscribed to topic: {}", this.topic);
        com.intuit.karate.Logger logger2 = KafkaChannel.logger();
        this.executor.submit(() -> {
            while (true) {
                ConsumerRecords poll = this.kafka.poll(Duration.ofMillis(1000L));
                if (poll != null && !poll.isEmpty()) {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        Map<String, Object> map = toMap((ConsumerRecord) it.next());
                        if (test(map)) {
                            this.results.add(map);
                            logger2.debug("<< consumed: {}", new Object[]{map});
                        } else {
                            logger2.debug("<< ignored: {}", new Object[]{map});
                        }
                    }
                }
                if (!this.partitionFuture.isDone()) {
                    Set assignment = this.kafka.assignment();
                    if (assignment.isEmpty()) {
                        try {
                            logger.debug("waiting for partition ...");
                            Thread.sleep(1000L);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    } else {
                        logger.debug("partitions: {}", assignment);
                        this.partitionFuture.complete(true);
                    }
                } else {
                    if (this.results.size() >= this.messageCount) {
                        this.resultsFuture.complete(this.results);
                        return;
                    }
                    logger2.debug("messages collected so far: {}", new Object[]{Integer.valueOf(this.results.size())});
                }
            }
        });
        try {
            this.partitionFuture.get();
            logger.debug("partition ready");
            return null;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {
        this.kafka.close();
        this.executor.shutdown();
    }

    protected boolean test(Map<String, Object> map) {
        if (this.function == null) {
            return true;
        }
        return JsValue.isTruthy(this.function.execute(JsEngine.global(), new Object[]{map}));
    }

    private void filter(Value value) {
        this.function = value.asProxyObject();
    }

    public void count(int i) {
        this.messageCount = i;
    }

    public List collect() {
        try {
            try {
                List list = this.resultsFuture.get();
                KafkaChannel.logger().debug("<< collect {}", new Object[]{list});
                close();
                return list;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            close();
            throw th;
        }
    }

    public void header(String str, String str2) {
        if (str2 == null) {
            this.headers.remove(str);
        } else {
            this.headers.put(str, str2);
        }
    }

    public void headers(Value value) {
        this.headers = (Map) JsValue.toJava(value);
    }

    public void schema(String str) {
        this.schemaName = str;
    }

    public void topic(String str) {
        this.topic = str;
    }

    public Object getMember(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -1274492040:
                if (str.equals(FILTER)) {
                    z = 4;
                    break;
                }
                break;
            case -1221270899:
                if (str.equals(HEADER)) {
                    z = false;
                    break;
                }
                break;
            case -907987551:
                if (str.equals(SCHEMA)) {
                    z = 6;
                    break;
                }
                break;
            case 94851343:
                if (str.equals(COUNT)) {
                    z = 3;
                    break;
                }
                break;
            case 109757538:
                if (str.equals(START)) {
                    z = 7;
                    break;
                }
                break;
            case 110546223:
                if (str.equals(TOPIC)) {
                    z = 5;
                    break;
                }
                break;
            case 795307910:
                if (str.equals(HEADERS)) {
                    z = true;
                    break;
                }
                break;
            case 949444906:
                if (str.equals(COLLECT)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return this.HEADER_FUNCTION;
            case true:
                return this::headers;
            case true:
                return this::collect;
            case true:
                return Integer.valueOf(this.messageCount);
            case true:
                return this::filter;
            case true:
                return this::topic;
            case true:
                return this::schema;
            case true:
                return this::start;
            default:
                logger.warn("no such property: {}", str);
                return null;
        }
    }

    public void putMember(String str, Value value) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -1274492040:
                if (str.equals(FILTER)) {
                    z = 4;
                    break;
                }
                break;
            case -907987551:
                if (str.equals(SCHEMA)) {
                    z = true;
                    break;
                }
                break;
            case 94851343:
                if (str.equals(COUNT)) {
                    z = 3;
                    break;
                }
                break;
            case 110546223:
                if (str.equals(TOPIC)) {
                    z = false;
                    break;
                }
                break;
            case 795307910:
                if (str.equals(HEADERS)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.topic = value.asString();
                return;
            case true:
                this.schemaName = value.asString();
                return;
            case true:
                headers(value);
                return;
            case true:
                count(value.asInt());
                return;
            case true:
                filter(value);
                return;
            default:
                logger.warn("put not supported on session object: {} - {}", str, value);
                return;
        }
    }

    public Object getMemberKeys() {
        return KEY_ARRAY;
    }

    public boolean hasMember(String str) {
        return KEY_SET.contains(str);
    }
}
