package org.zbus.rpc.mq;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.zbus.broker.Broker;
import org.zbus.kit.log.Logger;
import org.zbus.kit.log.LoggerFactory;
import org.zbus.mq.Consumer;
import org.zbus.mq.MqConfig;
import org.zbus.net.http.Message;

/* loaded from: classes4.dex */
public class Service implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Service.class);
    protected ServiceConfig config;
    private Consumer[][] consumerGroups;
    private ThreadPoolExecutor executor;
    private boolean isStarted = false;

    public Service() {
    }

    public Service(ServiceConfig serviceConfig) {
        this.config = serviceConfig;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Consumer[][] consumerArr = this.consumerGroups;
        if (consumerArr != null) {
            for (Consumer[] consumerArr2 : consumerArr) {
                for (Consumer consumer : consumerArr2) {
                    consumer.stop();
                }
            }
        }
        ThreadPoolExecutor threadPoolExecutor = this.executor;
        if (threadPoolExecutor != null) {
            threadPoolExecutor.shutdown();
        }
    }

    public void start() throws IOException {
        if (this.isStarted) {
            return;
        }
        ServiceConfig serviceConfig = this.config;
        if (serviceConfig == null) {
            throw new IllegalArgumentException("Missing ServiceConfig");
        }
        if (serviceConfig.getMq() == null || "".equals(this.config.getMq())) {
            throw new IllegalArgumentException("MQ required");
        }
        if (this.config.getMessageProcessor() == null && this.config.getConsumerHandler() == null) {
            throw new IllegalArgumentException("ConsumerHandler or MessageProcessor required");
        }
        if (this.config.isConsumerHandlerRunInPool()) {
            int consumerHandlerPoolSize = this.config.getConsumerHandlerPoolSize();
            this.executor = new ThreadPoolExecutor(consumerHandlerPoolSize, consumerHandlerPoolSize, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue(this.config.getInFlightMessageCount()), new ThreadPoolExecutor.CallerRunsPolicy());
        }
        final Message.MessageProcessor messageProcessor = this.config.getMessageProcessor();
        Broker[] brokers = this.config.getBrokers();
        int consumerCount = this.config.getConsumerCount();
        if (brokers.length < 1 || consumerCount < 1) {
            return;
        }
        this.consumerGroups = new Consumer[brokers.length];
        int i = 0;
        while (true) {
            Consumer[][] consumerArr = this.consumerGroups;
            if (i >= consumerArr.length) {
                break;
            }
            Consumer[] consumerArr2 = new Consumer[consumerCount];
            consumerArr[i] = consumerArr2;
            MqConfig mqConfig = new MqConfig();
            mqConfig.setBroker(brokers[i]);
            mqConfig.setMq(this.config.getMq());
            mqConfig.setMode(this.config.getMode());
            mqConfig.setTopic(this.config.getTopic());
            mqConfig.setVerbose(this.config.isVerbose());
            mqConfig.setAccessToken(this.config.getAccessToken());
            mqConfig.setRegisterToken(this.config.getRegisterToken());
            Consumer.ConsumerHandler consumerHandler = this.config.getConsumerHandler();
            for (int i2 = 0; i2 < consumerCount; i2++) {
                Consumer consumer = new Consumer(mqConfig);
                consumerArr2[i2] = consumer;
                if (this.config.isConsumerHandlerRunInPool()) {
                    consumer.setConsumerHandlerExecutor(this.executor);
                    consumer.setConsumerHandlerRunInPool(true);
                }
                if (consumerHandler == null) {
                    consumerHandler = new Consumer.ConsumerHandler() { // from class: org.zbus.rpc.mq.Service.1
                        @Override // org.zbus.mq.Consumer.ConsumerHandler
                        public void handle(Message message, Consumer consumer2) throws IOException {
                            if (Service.this.config.isVerbose()) {
                                Service.log.info("Request:\n" + message);
                            }
                            String mq = message.getMq();
                            String id = message.getId();
                            String sender = message.getSender();
                            Message process = messageProcessor.process(message);
                            if (process != null) {
                                process.setId(id);
                                process.setMq(mq);
                                process.setRecver(sender);
                                if (Service.this.config.isVerbose()) {
                                    Service.log.info("Response:\n" + process);
                                }
                                consumer2.routeMessage(process);
                            }
                        }
                    };
                }
                consumer.onMessage(consumerHandler);
            }
            i++;
        }
        int i3 = 0;
        while (true) {
            Consumer[][] consumerArr3 = this.consumerGroups;
            if (i3 >= consumerArr3.length) {
                this.isStarted = true;
                return;
            }
            for (Consumer consumer2 : consumerArr3[i3]) {
                consumer2.start();
            }
            i3++;
        }
    }
}
