/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamsConfig
extends AbstractConfig {
    private static final Logger log = LoggerFactory.getLogger(StreamsConfig.class);
    private static final ConfigDef CONFIG;
    private final boolean eosEnabled = "exactly_once".equals(this.getString("processing.guarantee"));
    private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
    private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
    public static final int DUMMY_THREAD_INDEX = 1;
    public static final String TOPIC_PREFIX = "topic.";
    public static final String CONSUMER_PREFIX = "consumer.";
    public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
    public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
    public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
    public static final String PRODUCER_PREFIX = "producer.";
    public static final String ADMIN_CLIENT_PREFIX = "admin.";
    public static final String NO_OPTIMIZATION = "none";
    public static final String OPTIMIZE = "all";
    public static final String UPGRADE_FROM_0100 = "0.10.0";
    public static final String UPGRADE_FROM_0101 = "0.10.1";
    public static final String UPGRADE_FROM_0102 = "0.10.2";
    public static final String UPGRADE_FROM_0110 = "0.11.0";
    public static final String UPGRADE_FROM_10 = "1.0";
    public static final String UPGRADE_FROM_11 = "1.1";
    public static final String AT_LEAST_ONCE = "at_least_once";
    public static final String EXACTLY_ONCE = "exactly_once";
    public static final String APPLICATION_ID_CONFIG = "application.id";
    private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
    public static final String APPLICATION_SERVER_CONFIG = "application.server";
    private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application";
    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition.";
    public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
    private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
    public static final String CLIENT_ID_CONFIG = "client.id";
    private static final String CLIENT_ID_DOC = "An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'.";
    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor. (Note, if <code>processing.guarantee</code> is set to <code>exactly_once</code>, the default value is <code>100</code>, otherwise the default value is <code>30000</code>.";
    public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
    private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.";
    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
    public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
    private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";
    public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
    private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
    public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";
    public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
    public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
    private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well";
    public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
    private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well";
    public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
    private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
    public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
    public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
    public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
    public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
    public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
    private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>org.apache.kafka.streams.processor.PartitionGrouper</code> interface.";
    public static final String POLL_MS_CONFIG = "poll.ms";
    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
    public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
    private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>at_least_once</code> (default) and <code>exactly_once</code>. Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting <code>transaction.state.log.replication.factor</code> and <code>transaction.state.log.min.isr</code>.";
    public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
    public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
    public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
    private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
    public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
    public static final String RETRIES_CONFIG = "retries";
    public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
    public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
    private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class or class name that implements the <code>org.apache.kafka.streams.state.RocksDBConfigSetter</code> interface";
    public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least <code>state.cleanup.delay.ms</code> will be removed";
    public static final String STATE_DIR_CONFIG = "state.dir";
    private static final String STATE_DIR_DOC = "Directory location for state store.";
    public static final String TOPOLOGY_OPTIMIZATION = "topology.optimization";
    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
    public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
    private static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. When upgrading from 1.2 to a newer version it is not required to specify this config.Default is null. Accepted values are \"0.10.0\", \"0.10.1\", \"0.10.2\", \"0.11.0\", \"1.0\", \"1.1\" (for upgrading from the corresponding old version).";
    public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
    private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";
    private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS;
    private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS;
    private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS;
    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
    private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;

    public static String consumerPrefix(String consumerProp) {
        return CONSUMER_PREFIX + consumerProp;
    }

    public static String mainConsumerPrefix(String consumerProp) {
        return MAIN_CONSUMER_PREFIX + consumerProp;
    }

    public static String restoreConsumerPrefix(String consumerProp) {
        return RESTORE_CONSUMER_PREFIX + consumerProp;
    }

    public static String globalConsumerPrefix(String consumerProp) {
        return GLOBAL_CONSUMER_PREFIX + consumerProp;
    }

    public static String producerPrefix(String producerProp) {
        return PRODUCER_PREFIX + producerProp;
    }

    public static String adminClientPrefix(String adminClientProp) {
        return ADMIN_CLIENT_PREFIX + adminClientProp;
    }

    public static String topicPrefix(String topicProp) {
        return TOPIC_PREFIX + topicProp;
    }

    public static ConfigDef configDef() {
        return new ConfigDef(CONFIG);
    }

    public StreamsConfig(Map<?, ?> props) {
        this(props, true);
    }

    protected StreamsConfig(Map<?, ?> props, boolean doLog) {
        super(CONFIG, props, doLog);
    }

    protected Map<String, Object> postProcessParsedConfig(Map<String, Object> parsedValues) {
        Map configUpdates = CommonClientConfigs.postProcessReconnectBackoffConfigs((AbstractConfig)this, parsedValues);
        boolean eosEnabled = EXACTLY_ONCE.equals(parsedValues.get(PROCESSING_GUARANTEE_CONFIG));
        if (eosEnabled && !this.originals().containsKey(COMMIT_INTERVAL_MS_CONFIG)) {
            log.debug("Using {} default value of {} as exactly once is enabled.", (Object)COMMIT_INTERVAL_MS_CONFIG, (Object)100L);
            configUpdates.put(COMMIT_INTERVAL_MS_CONFIG, 100L);
        }
        return configUpdates;
    }

    private Map<String, Object> getCommonConsumerConfigs() {
        Map<String, Object> clientProvidedProps = this.getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
        this.checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
        this.checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
        HashMap<String, Object> consumerProps = new HashMap<String, Object>(this.eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
        consumerProps.putAll(this.getClientCustomProps());
        consumerProps.putAll(clientProvidedProps);
        consumerProps.put(BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
        return consumerProps;
    }

    private void checkIfUnexpectedUserSpecifiedConsumerConfig(Map<String, Object> clientProvidedProps, String[] nonConfigurableConfigs) {
        Object maxInFlightRequests;
        if (this.eosEnabled && (maxInFlightRequests = clientProvidedProps.get("max.in.flight.requests.per.connection")) != null) {
            int maxInFlightRequestsAsInteger;
            if (maxInFlightRequests instanceof Integer) {
                maxInFlightRequestsAsInteger = (Integer)maxInFlightRequests;
            } else if (maxInFlightRequests instanceof String) {
                try {
                    maxInFlightRequestsAsInteger = Integer.parseInt(((String)maxInFlightRequests).trim());
                }
                catch (NumberFormatException e) {
                    throw new ConfigException("max.in.flight.requests.per.connection", maxInFlightRequests, "String value could not be parsed as 32-bit integer");
                }
            } else {
                throw new ConfigException("max.in.flight.requests.per.connection", maxInFlightRequests, "Expected value to be a 32-bit integer, but it was a " + maxInFlightRequests.getClass().getName());
            }
            if (maxInFlightRequestsAsInteger > 5) {
                throw new ConfigException("max.in.flight.requests.per.connection", (Object)maxInFlightRequestsAsInteger, "Can't exceed 5 when exactly-once processing is enabled");
            }
        }
        for (String config : nonConfigurableConfigs) {
            if (!clientProvidedProps.containsKey(config)) continue;
            String eosMessage = "processing.guarantee is set to exactly_once. Hence, ";
            String nonConfigurableConfigMessage = "Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ";
            if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
                if (clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config))) continue;
                log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", "consumer", config, "", clientProvidedProps.get(config), CONSUMER_DEFAULT_OVERRIDES.get(config)));
                clientProvidedProps.remove(config);
                continue;
            }
            if (!this.eosEnabled) continue;
            if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
                if (clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) continue;
                log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", "consumer", config, "processing.guarantee is set to exactly_once. Hence, ", clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
                clientProvidedProps.remove(config);
                continue;
            }
            if (!PRODUCER_EOS_OVERRIDES.containsKey(config) || clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) continue;
            log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", "producer", config, "processing.guarantee is set to exactly_once. Hence, ", clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
            clientProvidedProps.remove(config);
        }
    }

    @Deprecated
    public Map<String, Object> getConsumerConfigs(String groupId, String clientId) {
        return this.getMainConsumerConfigs(groupId, clientId, 1);
    }

    public Map<String, Object> getMainConsumerConfigs(String groupId, String clientId, int threadIdx) {
        int batchSize;
        int segmentSize;
        Map<String, Object> consumerProps = this.getCommonConsumerConfigs();
        Map mainConsumerProps = this.originalsWithPrefix(MAIN_CONSUMER_PREFIX);
        for (Map.Entry entry : mainConsumerProps.entrySet()) {
            consumerProps.put((String)entry.getKey(), entry.getValue());
        }
        consumerProps.put(APPLICATION_ID_CONFIG, groupId);
        consumerProps.put("group.id", groupId);
        consumerProps.put(CLIENT_ID_CONFIG, clientId);
        String groupInstanceId = (String)consumerProps.get("group.instance.id");
        if (groupInstanceId != null) {
            consumerProps.put("group.instance.id", groupInstanceId + "-" + threadIdx);
        }
        consumerProps.put(UPGRADE_FROM_CONFIG, this.getString(UPGRADE_FROM_CONFIG));
        consumerProps.put(REPLICATION_FACTOR_CONFIG, this.getInt(REPLICATION_FACTOR_CONFIG));
        consumerProps.put(APPLICATION_SERVER_CONFIG, this.getString(APPLICATION_SERVER_CONFIG));
        consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, this.getInt(NUM_STANDBY_REPLICAS_CONFIG));
        consumerProps.put("partition.assignment.strategy", StreamsPartitionAssignor.class.getName());
        consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, this.getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
        AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(this.getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
        consumerProps.put(StreamsConfig.adminClientPrefix(RETRIES_CONFIG), adminClientDefaultConfig.getInt(RETRIES_CONFIG));
        consumerProps.put(StreamsConfig.adminClientPrefix(RETRY_BACKOFF_MS_CONFIG), adminClientDefaultConfig.getLong(RETRY_BACKOFF_MS_CONFIG));
        Map topicProps = this.originalsWithPrefix(TOPIC_PREFIX, false);
        Map<String, Object> producerProps = this.getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
        if (topicProps.containsKey(StreamsConfig.topicPrefix("segment.bytes")) && producerProps.containsKey("batch.size") && (segmentSize = Integer.parseInt(topicProps.get(StreamsConfig.topicPrefix("segment.bytes")).toString())) < (batchSize = Integer.parseInt(producerProps.get("batch.size").toString()))) {
            throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic", segmentSize, batchSize));
        }
        consumerProps.putAll(topicProps);
        return consumerProps;
    }

    public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
        Map<String, Object> baseConsumerProps = this.getCommonConsumerConfigs();
        Map restoreConsumerProps = this.originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
        for (Map.Entry entry : restoreConsumerProps.entrySet()) {
            baseConsumerProps.put((String)entry.getKey(), entry.getValue());
        }
        baseConsumerProps.remove("group.id");
        baseConsumerProps.put(CLIENT_ID_CONFIG, clientId);
        baseConsumerProps.put("auto.offset.reset", NO_OPTIMIZATION);
        return baseConsumerProps;
    }

    public Map<String, Object> getGlobalConsumerConfigs(String clientId) {
        Map<String, Object> baseConsumerProps = this.getCommonConsumerConfigs();
        Map globalConsumerProps = this.originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
        for (Map.Entry entry : globalConsumerProps.entrySet()) {
            baseConsumerProps.put((String)entry.getKey(), entry.getValue());
        }
        baseConsumerProps.remove("group.id");
        baseConsumerProps.put(CLIENT_ID_CONFIG, clientId + "-global-consumer");
        baseConsumerProps.put("auto.offset.reset", NO_OPTIMIZATION);
        return baseConsumerProps;
    }

    public Map<String, Object> getProducerConfigs(String clientId) {
        Map<String, Object> clientProvidedProps = this.getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
        this.checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
        HashMap<String, Object> props = new HashMap<String, Object>(this.eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
        props.putAll(this.getClientCustomProps());
        props.putAll(clientProvidedProps);
        props.put(BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
        props.put(CLIENT_ID_CONFIG, clientId);
        return props;
    }

    public Map<String, Object> getAdminConfigs(String clientId) {
        Map<String, Object> clientProvidedProps = this.getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.putAll(this.getClientCustomProps());
        props.putAll(clientProvidedProps);
        props.put(CLIENT_ID_CONFIG, clientId);
        return props;
    }

    private Map<String, Object> getClientPropsWithPrefix(String prefix, Set<String> configNames) {
        Map<String, Object> props = this.clientProps(configNames, this.originals());
        props.putAll(this.originalsWithPrefix(prefix));
        return props;
    }

    private Map<String, Object> getClientCustomProps() {
        Map props = this.originals();
        props.keySet().removeAll(CONFIG.names());
        props.keySet().removeAll(ConsumerConfig.configNames());
        props.keySet().removeAll(ProducerConfig.configNames());
        props.keySet().removeAll(AdminClientConfig.configNames());
        props.keySet().removeAll(this.originalsWithPrefix(CONSUMER_PREFIX, false).keySet());
        props.keySet().removeAll(this.originalsWithPrefix(PRODUCER_PREFIX, false).keySet());
        props.keySet().removeAll(this.originalsWithPrefix(ADMIN_CLIENT_PREFIX, false).keySet());
        return props;
    }

    public Serde defaultKeySerde() {
        Object keySerdeConfigSetting = this.get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
        try {
            Serde serde = (Serde)this.getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
            serde.configure(this.originals(), true);
            return serde;
        }
        catch (Exception e) {
            throw new StreamsException(String.format("Failed to configure key serde %s", keySerdeConfigSetting), e);
        }
    }

    public Serde defaultValueSerde() {
        Object valueSerdeConfigSetting = this.get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
        try {
            Serde serde = (Serde)this.getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
            serde.configure(this.originals(), false);
            return serde;
        }
        catch (Exception e) {
            throw new StreamsException(String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e);
        }
    }

    public TimestampExtractor defaultTimestampExtractor() {
        return (TimestampExtractor)this.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
    }

    public DeserializationExceptionHandler defaultDeserializationExceptionHandler() {
        return (DeserializationExceptionHandler)this.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
    }

    public ProductionExceptionHandler defaultProductionExceptionHandler() {
        return (ProductionExceptionHandler)this.getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
    }

    private Map<String, Object> clientProps(Set<String> configNames, Map<String, Object> originals) {
        HashMap<String, Object> parsed = new HashMap<String, Object>();
        for (String configName : configNames) {
            if (!originals.containsKey(configName)) continue;
            parsed.put(configName, originals.get(configName));
        }
        return parsed;
    }

    public static void main(String[] args) {
        System.out.println(CONFIG.toHtmlTable());
    }

    static {
        NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = new String[]{"enable.auto.commit"};
        NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = new String[]{"isolation.level"};
        NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = new String[]{"enable.idempotence", "max.in.flight.requests.per.connection"};
        CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, APPLICATION_ID_DOC).define(BOOTSTRAP_SERVERS_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).").define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.INT, (Object)1, ConfigDef.Importance.HIGH, REPLICATION_FACTOR_DOC).define(STATE_DIR_CONFIG, ConfigDef.Type.STRING, (Object)"/tmp/kafka-streams", ConfigDef.Importance.HIGH, STATE_DIR_DOC).define(CACHE_MAX_BYTES_BUFFERING_CONFIG, ConfigDef.Type.LONG, (Object)0xA00000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.MEDIUM, CACHE_MAX_BYTES_BUFFERING_DOC).define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.MEDIUM, CLIENT_ID_DOC).define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, ConfigDef.Type.CLASS, (Object)LogAndFailExceptionHandler.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC).define(DEFAULT_KEY_SERDE_CLASS_CONFIG, ConfigDef.Type.CLASS, (Object)Serdes.ByteArraySerde.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_KEY_SERDE_CLASS_DOC).define(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ConfigDef.Type.CLASS, (Object)DefaultProductionExceptionHandler.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC).define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ConfigDef.Type.CLASS, (Object)FailOnInvalidTimestamp.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC).define(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ConfigDef.Type.CLASS, (Object)Serdes.ByteArraySerde.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_VALUE_SERDE_CLASS_DOC).define(NUM_STANDBY_REPLICAS_CONFIG, ConfigDef.Type.INT, (Object)0, ConfigDef.Importance.MEDIUM, NUM_STANDBY_REPLICAS_DOC).define(NUM_STREAM_THREADS_CONFIG, ConfigDef.Type.INT, (Object)1, ConfigDef.Importance.MEDIUM, NUM_STREAM_THREADS_DOC).define(MAX_TASK_IDLE_MS_CONFIG, ConfigDef.Type.LONG, (Object)0L, ConfigDef.Importance.MEDIUM, MAX_TASK_IDLE_MS_DOC).define(PROCESSING_GUARANTEE_CONFIG, ConfigDef.Type.STRING, (Object)AT_LEAST_ONCE, (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{AT_LEAST_ONCE, EXACTLY_ONCE}), ConfigDef.Importance.MEDIUM, PROCESSING_GUARANTEE_DOC).define(SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, (Object)"PLAINTEXT", ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC).define(TOPOLOGY_OPTIMIZATION, ConfigDef.Type.STRING, (Object)NO_OPTIMIZATION, (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{NO_OPTIMIZATION, OPTIMIZE}), ConfigDef.Importance.MEDIUM, TOPOLOGY_OPTIMIZATION_DOC).define(APPLICATION_SERVER_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, APPLICATION_SERVER_DOC).define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, ConfigDef.Type.INT, (Object)1000, ConfigDef.Importance.LOW, BUFFERED_RECORDS_PER_PARTITION_DOC).define(COMMIT_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, (Object)30000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, COMMIT_INTERVAL_MS_DOC).define(CONNECTIONS_MAX_IDLE_MS_CONFIG, ConfigDef.Type.LONG, (Object)540000L, ConfigDef.Importance.LOW, "Close idle connections after the number of milliseconds specified by this config.").define(METADATA_MAX_AGE_CONFIG, ConfigDef.Type.LONG, (Object)300000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.").define(METRICS_NUM_SAMPLES_CONFIG, ConfigDef.Type.INT, (Object)2, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)1), ConfigDef.Importance.LOW, "The number of samples maintained to compute metrics.").define(METRIC_REPORTER_CLASSES_CONFIG, ConfigDef.Type.LIST, (Object)"", ConfigDef.Importance.LOW, "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.").define(METRICS_RECORDING_LEVEL_CONFIG, ConfigDef.Type.STRING, (Object)Sensor.RecordingLevel.INFO.toString(), (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()}), ConfigDef.Importance.LOW, "The highest recording level for metrics.").define(METRICS_SAMPLE_WINDOW_MS_CONFIG, ConfigDef.Type.LONG, (Object)30000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "The window of time a metrics sample is computed over.").define(PARTITION_GROUPER_CLASS_CONFIG, ConfigDef.Type.CLASS, (Object)DefaultPartitionGrouper.class.getName(), ConfigDef.Importance.LOW, PARTITION_GROUPER_CLASS_DOC).define(POLL_MS_CONFIG, ConfigDef.Type.LONG, (Object)100L, ConfigDef.Importance.LOW, POLL_MS_DOC).define(RECEIVE_BUFFER_CONFIG, ConfigDef.Type.INT, (Object)32768, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)-1), ConfigDef.Importance.LOW, "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.").define(RECONNECT_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, (Object)50L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.").define(RECONNECT_BACKOFF_MAX_MS_CONFIG, ConfigDef.Type.LONG, (Object)1000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.").define(RETRIES_CONFIG, ConfigDef.Type.INT, (Object)0, (ConfigDef.Validator)ConfigDef.Range.between((Number)0, (Number)Integer.MAX_VALUE), ConfigDef.Importance.LOW, "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error.").define(RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, (Object)100L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.").define(REQUEST_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)40000, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.").define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, ROCKSDB_CONFIG_SETTER_CLASS_DOC).define(SEND_BUFFER_CONFIG, ConfigDef.Type.INT, (Object)131072, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)-1), ConfigDef.Importance.LOW, "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.").define(STATE_CLEANUP_DELAY_MS_CONFIG, ConfigDef.Type.LONG, (Object)600000L, ConfigDef.Importance.LOW, STATE_CLEANUP_DELAY_MS_DOC).define(UPGRADE_FROM_CONFIG, ConfigDef.Type.STRING, null, (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{null, UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11}), ConfigDef.Importance.LOW, UPGRADE_FROM_DOC).define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, ConfigDef.Type.LONG, (Object)86400000L, ConfigDef.Importance.LOW, WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC);
        HashMap<Object, Object> tempProducerDefaultOverrides = new HashMap<String, String>();
        tempProducerDefaultOverrides.put("linger.ms", "100");
        PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
        tempProducerDefaultOverrides = new HashMap<String, Object>(PRODUCER_DEFAULT_OVERRIDES);
        tempProducerDefaultOverrides.put("delivery.timeout.ms", Integer.MAX_VALUE);
        tempProducerDefaultOverrides.put("enable.idempotence", true);
        PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
        HashMap<Object, Object> tempConsumerDefaultOverrides = new HashMap<String, Object>();
        tempConsumerDefaultOverrides.put("max.poll.records", "1000");
        tempConsumerDefaultOverrides.put("auto.offset.reset", "earliest");
        tempConsumerDefaultOverrides.put("enable.auto.commit", "false");
        tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
        CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
        tempConsumerDefaultOverrides = new HashMap<String, Object>(CONSUMER_DEFAULT_OVERRIDES);
        tempConsumerDefaultOverrides.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
    }

    public static class InternalConfig {
        public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
        public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
    }
}

