package org.zbus.mq;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.zbus.broker.Broker;
import org.zbus.kit.log.Logger;
import org.zbus.kit.log.LoggerFactory;
import org.zbus.mq.Protocol;
import org.zbus.net.Sync;
import org.zbus.net.http.Message;

/* loaded from: classes4.dex */
public class Consumer extends MqAdmin implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Consumer.class);
    private Message.MessageInvoker client;
    private int consumeTimeout;
    private volatile ConsumerExceptionHandler consumerExceptionHandler;
    private volatile ConsumerHandler consumerHandler;
    private ThreadPoolExecutor consumerHandlerExecutor;
    private int consumerHandlerPoolSize;
    private boolean consumerHandlerRunInPool;
    private final Runnable consumerTask;
    private volatile Thread consumerThread;
    private int inFlightMessageCount;
    private boolean ownConsumerHandlerExecutor;
    private String topic;

    /* loaded from: classes4.dex */
    public interface ConsumerExceptionHandler {
        void onException(Exception exc, Consumer consumer);
    }

    /* loaded from: classes4.dex */
    public interface ConsumerHandler {
        void handle(Message message, Consumer consumer) throws IOException;
    }

    public Consumer(Broker broker, String str, Protocol.MqMode... mqModeArr) {
        super(broker, str, mqModeArr);
        this.topic = null;
        this.consumeTimeout = 120000;
        this.consumerThread = null;
        this.consumerHandlerPoolSize = 64;
        this.inFlightMessageCount = 64;
        this.consumerHandlerRunInPool = false;
        this.ownConsumerHandlerExecutor = false;
        this.consumerTask = new Runnable() { // from class: org.zbus.mq.Consumer.1
            @Override // java.lang.Runnable
            public void run() {
                Consumer.this.initConsumerHandlerPoolIfNeeded();
                while (true) {
                    try {
                        try {
                            try {
                                final Message take = Consumer.this.take();
                                if (Consumer.this.consumerHandler == null) {
                                    Consumer.log.warn("Missing consumerHandler, call onMessage first");
                                } else if (!Consumer.this.consumerHandlerRunInPool || Consumer.this.consumerHandlerExecutor == null) {
                                    try {
                                        Consumer.this.consumerHandler.handle(take, Consumer.this);
                                    } catch (IOException e) {
                                        Consumer.log.error(e.getMessage(), e);
                                    }
                                } else {
                                    Consumer.this.consumerHandlerExecutor.submit(new Runnable() { // from class: org.zbus.mq.Consumer.1.1
                                        @Override // java.lang.Runnable
                                        public void run() {
                                            try {
                                                Consumer.this.consumerHandler.handle(take, Consumer.this);
                                            } catch (IOException e2) {
                                                Consumer.log.error(e2.getMessage(), e2);
                                            }
                                        }
                                    });
                                }
                            } catch (InterruptedException unused) {
                                Consumer.this.close();
                                return;
                            }
                        } catch (MqException e2) {
                            if (Consumer.this.consumerExceptionHandler == null) {
                                throw e2;
                            }
                            Consumer.this.consumerExceptionHandler.onException(e2, Consumer.this);
                            return;
                        }
                    } catch (IOException e3) {
                        if (Consumer.this.consumerExceptionHandler != null) {
                            Consumer.this.consumerExceptionHandler.onException(e3, Consumer.this);
                        } else {
                            Consumer.log.error(e3.getMessage(), e3);
                        }
                    }
                }
            }
        };
    }

    public Consumer(MqConfig mqConfig) {
        super(mqConfig);
        this.topic = null;
        this.consumeTimeout = 120000;
        this.consumerThread = null;
        this.consumerHandlerPoolSize = 64;
        this.inFlightMessageCount = 64;
        this.consumerHandlerRunInPool = false;
        this.ownConsumerHandlerExecutor = false;
        this.consumerTask = new Runnable() { // from class: org.zbus.mq.Consumer.1
            @Override // java.lang.Runnable
            public void run() {
                Consumer.this.initConsumerHandlerPoolIfNeeded();
                while (true) {
                    try {
                        try {
                            try {
                                final Message take = Consumer.this.take();
                                if (Consumer.this.consumerHandler == null) {
                                    Consumer.log.warn("Missing consumerHandler, call onMessage first");
                                } else if (!Consumer.this.consumerHandlerRunInPool || Consumer.this.consumerHandlerExecutor == null) {
                                    try {
                                        Consumer.this.consumerHandler.handle(take, Consumer.this);
                                    } catch (IOException e) {
                                        Consumer.log.error(e.getMessage(), e);
                                    }
                                } else {
                                    Consumer.this.consumerHandlerExecutor.submit(new Runnable() { // from class: org.zbus.mq.Consumer.1.1
                                        @Override // java.lang.Runnable
                                        public void run() {
                                            try {
                                                Consumer.this.consumerHandler.handle(take, Consumer.this);
                                            } catch (IOException e2) {
                                                Consumer.log.error(e2.getMessage(), e2);
                                            }
                                        }
                                    });
                                }
                            } catch (InterruptedException unused) {
                                Consumer.this.close();
                                return;
                            }
                        } catch (MqException e2) {
                            if (Consumer.this.consumerExceptionHandler == null) {
                                throw e2;
                            }
                            Consumer.this.consumerExceptionHandler.onException(e2, Consumer.this);
                            return;
                        }
                    } catch (IOException e3) {
                        if (Consumer.this.consumerExceptionHandler != null) {
                            Consumer.this.consumerExceptionHandler.onException(e3, Consumer.this);
                        } else {
                            Consumer.log.error(e3.getMessage(), e3);
                        }
                    }
                }
            }
        };
        this.topic = mqConfig.getTopic();
    }

    private Broker.BrokerHint brokerHint() {
        Broker.BrokerHint brokerHint = new Broker.BrokerHint();
        brokerHint.setEntry(this.mq);
        return brokerHint;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initConsumerHandlerPoolIfNeeded() {
        if (this.consumerHandlerRunInPool && this.consumerHandlerExecutor == null) {
            int i = this.consumerHandlerPoolSize;
            this.consumerHandlerExecutor = new ThreadPoolExecutor(i, i, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue(this.inFlightMessageCount), new ThreadPoolExecutor.CallerRunsPolicy());
            this.ownConsumerHandlerExecutor = true;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        stop();
    }

    public ThreadPoolExecutor getConsumeExecutor() {
        return this.consumerHandlerExecutor;
    }

    public int getConsumerHandlerPoolSize() {
        return this.consumerHandlerPoolSize;
    }

    public int getInFlightMessageCount() {
        return this.inFlightMessageCount;
    }

    public String getTopic() {
        return this.topic;
    }

    @Override // org.zbus.mq.MqAdmin
    protected void invokeAsync(Message message, Sync.ResultCallback<Message> resultCallback) throws IOException {
        synchronized (this) {
            if (this.client == null) {
                this.client = this.broker.getInvoker(brokerHint());
            }
            this.client.invokeAsync(message, resultCallback);
        }
    }

    @Override // org.zbus.mq.MqAdmin
    protected Message invokeSync(Message message) throws IOException, InterruptedException {
        Message invokeSync;
        synchronized (this) {
            if (this.client == null) {
                this.client = this.broker.getInvoker(brokerHint());
            }
            invokeSync = this.client.invokeSync(message, 10000);
        }
        return invokeSync;
    }

    public boolean isConsumeHandlerRunInPool() {
        return this.consumerHandlerRunInPool;
    }

    public void onException(ConsumerExceptionHandler consumerExceptionHandler) {
        this.consumerExceptionHandler = consumerExceptionHandler;
    }

    public void onMessage(ConsumerHandler consumerHandler) throws IOException {
        this.consumerHandler = consumerHandler;
    }

    public void routeMessage(Message message) throws IOException {
        message.setCmd(Protocol.Route);
        message.setAck(false);
        if (message.isResponse()) {
            message.setOriginStatus(message.getStatus());
            message.asRequest();
        }
        this.client.invokeAsync(message, null);
    }

    public void setConsumeTimeout(int i) {
        this.consumeTimeout = i;
    }

    public void setConsumerHandlerExecutor(ThreadPoolExecutor threadPoolExecutor) {
        ThreadPoolExecutor threadPoolExecutor2 = this.consumerHandlerExecutor;
        if (threadPoolExecutor2 != null && this.ownConsumerHandlerExecutor) {
            threadPoolExecutor2.shutdown();
        }
        this.consumerHandlerExecutor = threadPoolExecutor;
    }

    public void setConsumerHandlerPoolSize(int i) {
        this.consumerHandlerPoolSize = i;
    }

    public void setConsumerHandlerRunInPool(boolean z) {
        this.consumerHandlerRunInPool = z;
    }

    public void setInFlightMessageCount(int i) {
        this.inFlightMessageCount = i;
    }

    public void setTopic(String str) {
        if (!Protocol.MqMode.isEnabled(this.mode, Protocol.MqMode.PubSub)) {
            throw new IllegalStateException("topic require PubSub mode");
        }
        this.topic = str;
    }

    public synchronized void start() throws IOException {
        if (this.consumerThread == null) {
            this.consumerThread = new Thread(this.consumerTask);
            this.consumerThread.setName("ConsumerThread");
        }
        if (this.consumerThread.isAlive()) {
            return;
        }
        this.consumerThread.start();
    }

    public synchronized void start(ConsumerHandler consumerHandler) throws IOException {
        onMessage(consumerHandler);
        start();
    }

    public void stop() {
        ThreadPoolExecutor threadPoolExecutor;
        if (this.consumerThread != null) {
            this.consumerThread.interrupt();
            this.consumerThread = null;
        }
        if (this.ownConsumerHandlerExecutor && (threadPoolExecutor = this.consumerHandlerExecutor) != null) {
            threadPoolExecutor.shutdown();
            this.consumerHandlerExecutor = null;
        }
        try {
            if (this.client != null) {
                this.broker.closeInvoker(this.client);
            }
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    public void subscribe(String str) throws IOException {
        if (!Protocol.MqMode.isEnabled(this.mode, Protocol.MqMode.PubSub)) {
            throw new IllegalStateException("subscribe require PubSub mode");
        }
        synchronized (this) {
            if (this.client == null) {
                this.client = this.broker.getInvoker(brokerHint());
            }
        }
        Message message = new Message();
        message.setCmd(Protocol.Consume);
        message.setMq(this.mq);
        message.setHead("token", this.accessToken);
        message.setTopic(str);
        this.client.invokeAsync(message, null);
    }

    public Message take() throws InterruptedException, IOException {
        Message take;
        do {
            take = take(this.consumeTimeout);
        } while (take == null);
        return take;
    }

    /* JADX WARN: Removed duplicated region for block: B:52:0x00a9 A[EXC_TOP_SPLITTER, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.zbus.net.http.Message take(int r5) throws java.io.IOException, java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 215
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.zbus.mq.Consumer.take(int):org.zbus.net.http.Message");
    }
}
