package io.karatelabs.karate.kafka;

import com.intuit.karate.FileUtils;
import com.intuit.karate.Json;
import io.karatelabs.karate.avro.AvroUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/karatelabs/karate/kafka/KarateKafkaConsumer.class */
public class KarateKafkaConsumer {
    static final Logger logger = LoggerFactory.getLogger(KarateKafkaConsumer.class);
    private final KafkaConsumer<String, Object> kafka;
    private final CompletableFuture<Boolean> partitionFuture = new CompletableFuture<>();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final List results = new ArrayList();
    public final CompletableFuture<List> resultsFuture = new CompletableFuture<>();

    public KarateKafkaConsumer(Properties properties) {
        this.kafka = new KafkaConsumer<>(properties);
    }

    public static Map<String, Object> toMap(ConsumerRecord<String, Object> consumerRecord) {
        HashMap hashMap = new HashMap();
        hashMap.put("offset", Long.valueOf(consumerRecord.offset()));
        HashMap hashMap2 = new HashMap();
        hashMap.put("headers", hashMap2);
        for (Header header : consumerRecord.headers().toArray()) {
            hashMap2.put(header.key(), FileUtils.toString(header.value()));
        }
        hashMap.put("key", consumerRecord.key());
        Object value = consumerRecord.value();
        if (value instanceof GenericRecord) {
            hashMap.put("value", Json.of(AvroUtils.toJson((GenericRecord) value)).value());
        } else {
            if (!(value instanceof byte[])) {
                String str = "unknown type after deserialization: " + value.getClass();
                logger.error(str);
                throw new RuntimeException(str);
            }
            hashMap.put("value", Json.of(FileUtils.toString((byte[]) value)).value());
        }
        return hashMap;
    }

    public KafkaSession listen(String str, KafkaUtils kafkaUtils) {
        this.kafka.subscribe(Collections.singletonList(str));
        logger.debug("kafka consumer subscibed to topic: {}", str);
        com.intuit.karate.Logger logger2 = KafkaUtils.logger();
        KafkaSession kafkaSession = new KafkaSession(str, kafkaUtils);
        this.executor.submit(() -> {
            while (true) {
                ConsumerRecords poll = this.kafka.poll(Duration.ofMillis(1000L));
                if (poll != null && !poll.isEmpty()) {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        Map<String, Object> map = toMap((ConsumerRecord) it.next());
                        if (kafkaSession.test(map)) {
                            this.results.add(map);
                            logger2.debug("<< consumed: {}", new Object[]{map});
                        } else {
                            logger2.debug("<< ignored: {}", new Object[]{map});
                        }
                    }
                }
                if (!this.partitionFuture.isDone()) {
                    Set assignment = this.kafka.assignment();
                    if (assignment.isEmpty()) {
                        try {
                            logger.debug("waiting for partition ...");
                            Thread.sleep(1000L);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    } else {
                        logger.debug("partitions: {}", assignment);
                        this.partitionFuture.complete(true);
                    }
                } else {
                    if (this.results.size() >= kafkaSession.getMessageCount()) {
                        this.resultsFuture.complete(this.results);
                        return;
                    }
                    logger2.debug("messages collected so far: {}", new Object[]{Integer.valueOf(this.results.size())});
                }
            }
        });
        try {
            this.partitionFuture.get();
            logger.debug("kafka consumer partition ready");
            return kafkaSession;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {
        this.kafka.close();
        this.executor.shutdown();
    }
}
