package org.apache.camel.support;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/support/EventDrivenPollingConsumer.class */
public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor, IsSingleton {
    private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class);
    private final BlockingQueue<Exchange> queue;
    private ExceptionHandler interruptedExceptionHandler;
    private Consumer consumer;
    private boolean blockWhenFull;
    private long blockTimeout;
    private final int queueCapacity;

    public EventDrivenPollingConsumer(Endpoint endpoint) {
        this(endpoint, 1000);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, int i) {
        super(endpoint);
        this.blockWhenFull = true;
        this.queueCapacity = i;
        if (i <= 0) {
            this.queue = new LinkedBlockingQueue();
        } else {
            this.queue = new ArrayBlockingQueue(i);
        }
        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), (Class<?>) EventDrivenPollingConsumer.class);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> blockingQueue) {
        super(endpoint);
        this.blockWhenFull = true;
        this.queue = blockingQueue;
        this.queueCapacity = blockingQueue.remainingCapacity();
        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), (Class<?>) EventDrivenPollingConsumer.class);
    }

    @Override // org.apache.camel.support.PollingConsumerSupport
    public Processor getProcessor() {
        return this;
    }

    public boolean isBlockWhenFull() {
        return this.blockWhenFull;
    }

    public void setBlockWhenFull(boolean z) {
        this.blockWhenFull = z;
    }

    public long getBlockTimeout() {
        return this.blockTimeout;
    }

    public void setBlockTimeout(long j) {
        this.blockTimeout = j;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    public Exchange receiveNoWait() {
        return receive(0L);
    }

    public Exchange receive() {
        Exchange take;
        if (!isRunAllowed() || !isStarted()) {
            throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
        }
        while (isRunAllowed()) {
            synchronized (this) {
                try {
                    beforePoll(0L);
                    take = this.queue.take();
                } catch (InterruptedException e) {
                    try {
                        handleInterruptedException(e);
                        afterPoll();
                    } finally {
                        afterPoll();
                    }
                }
            }
            return take;
        }
        LOG.trace("Consumer is not running, so returning null");
        return null;
    }

    public Exchange receive(long j) {
        Exchange poll;
        if (!isRunAllowed() || !isStarted()) {
            throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
        }
        synchronized (this) {
            try {
                try {
                    poll = this.queue.poll(beforePoll(j), TimeUnit.MILLISECONDS);
                    afterPoll();
                } catch (InterruptedException e) {
                    handleInterruptedException(e);
                    afterPoll();
                    return null;
                }
            } catch (Throwable th) {
                afterPoll();
                throw th;
            }
        }
        return poll;
    }

    public void process(Exchange exchange) throws Exception {
        if (!isBlockWhenFull()) {
            this.queue.add(exchange);
            return;
        }
        try {
            if (getBlockTimeout() <= 0) {
                this.queue.put(exchange);
            } else if (!this.queue.offer(exchange, getBlockTimeout(), TimeUnit.MILLISECONDS)) {
                throw new ExchangeTimedOutException(exchange, getBlockTimeout());
            }
        } catch (InterruptedException e) {
            LOG.debug("Put interrupted, are we stopping? {}", Boolean.valueOf(isStopping() || isStopped()));
        }
    }

    public ExceptionHandler getInterruptedExceptionHandler() {
        return this.interruptedExceptionHandler;
    }

    public void setInterruptedExceptionHandler(ExceptionHandler exceptionHandler) {
        this.interruptedExceptionHandler = exceptionHandler;
    }

    public Consumer getDelegateConsumer() {
        return this.consumer;
    }

    protected void handleInterruptedException(InterruptedException interruptedException) {
        getInterruptedExceptionHandler().handleException(interruptedException);
    }

    protected long beforePoll(long j) {
        if (this.consumer instanceof PollingConsumerPollingStrategy) {
            try {
                j = this.consumer.beforePoll(j);
            } catch (Exception e) {
                LOG.debug("Error occurred before polling " + this.consumer + ". This exception will be ignored.", e);
            }
        }
        return j;
    }

    protected void afterPoll() {
        if (this.consumer instanceof PollingConsumerPollingStrategy) {
            try {
                this.consumer.afterPoll();
            } catch (Exception e) {
                LOG.debug("Error occurred after polling " + this.consumer + ". This exception will be ignored.", e);
            }
        }
    }

    protected Consumer getConsumer() {
        return this.consumer;
    }

    protected Consumer createConsumer() throws Exception {
        return getEndpoint().createConsumer(this);
    }

    protected void doStart() throws Exception {
        this.consumer = createConsumer();
        if (this.consumer instanceof PollingConsumerPollingStrategy) {
            this.consumer.onInit();
        } else {
            ServiceHelper.startService(this.consumer);
        }
    }

    protected void doStop() throws Exception {
        ServiceHelper.stopService(this.consumer);
    }

    protected void doShutdown() throws Exception {
        ServiceHelper.stopAndShutdownService(this.consumer);
        this.queue.clear();
    }

    public boolean isSingleton() {
        return true;
    }
}
