package org.zbus.mq.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.zbus.kit.log.Logger;
import org.zbus.kit.log.LoggerFactory;
import org.zbus.mq.Protocol;
import org.zbus.mq.disk.MessageQueue;
import org.zbus.net.Session;
import org.zbus.net.http.Message;

/* loaded from: classes3.dex */
public class MQ extends AbstractMQ {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MQ.class);
    protected final BlockingQueue<PullSession> pullQ;
    protected final Map<String, Session> pullSessions;

    public MQ(String str, MessageQueue messageQueue) {
        super(str, messageQueue);
        this.pullSessions = new ConcurrentHashMap();
        this.pullQ = new LinkedBlockingQueue();
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public void cleanSession() {
        Iterator it = this.pullQ.iterator();
        while (it.hasNext()) {
            PullSession pullSession = (PullSession) it.next();
            if (!pullSession.session.isActive()) {
                this.pullSessions.remove(pullSession.session.id());
                it.remove();
            }
        }
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public void cleanSession(Session session) {
        this.pullSessions.remove(session.id());
        Iterator it = this.pullQ.iterator();
        while (it.hasNext()) {
            if (session == ((PullSession) it.next()).session) {
                it.remove();
                return;
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        while (true) {
            PullSession poll = this.pullQ.poll();
            if (poll == null) {
                return;
            }
            try {
                poll.session.close();
            } catch (IOException e) {
                log.warn(e.getMessage(), e);
            }
        }
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public void consume(Message message, Session session) throws IOException {
        if (!this.pullSessions.containsKey(session.id())) {
            this.pullSessions.put(session.id(), session);
        }
        for (PullSession pullSession : this.pullQ) {
            if (pullSession.getSession() == session) {
                pullSession.setPullMessage(message);
                dispatch();
                return;
            }
        }
        this.pullQ.offer(new PullSession(session, message));
        dispatch();
    }

    public int consumerOnlineCount() {
        return this.pullSessions.size();
    }

    /* JADX WARN: Code restructure failed: missing block: B:29:0x0074, code lost:
    
        r6.lastUpdateTime = java.lang.System.currentTimeMillis();
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x007a, code lost:
    
        r2 = r1.getPullMessage();
        r3 = org.zbus.net.http.Message.copyWithoutBody(r0);
        r3.setOriginId(r0.getId());
        r3.setId(r2.getId());
     */
    /* JADX WARN: Code restructure failed: missing block: B:34:0x0094, code lost:
    
        if (r3.getStatus() != null) goto L36;
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x00a0, code lost:
    
        if ("/".equals(r3.getUrl()) != false) goto L35;
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x00a2, code lost:
    
        r3.setOriginUrl(r3.getUrl());
     */
    /* JADX WARN: Code restructure failed: missing block: B:38:0x00a9, code lost:
    
        r3.setStatus(200);
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x00ae, code lost:
    
        r1.getSession().write(r3);
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x00b7, code lost:
    
        r1 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:42:0x00b8, code lost:
    
        org.zbus.mq.server.MQ.log.error(r1.getMessage(), r1);
        r6.msgQ.offer(r0);
     */
    @Override // org.zbus.mq.server.AbstractMQ
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    void dispatch() throws java.io.IOException {
        /*
            r6 = this;
        L0:
            java.util.concurrent.BlockingQueue<org.zbus.mq.server.PullSession> r0 = r6.pullQ
            java.lang.Object r0 = r0.peek()
            if (r0 == 0) goto Lcb
            org.zbus.mq.disk.MessageQueue r0 = r6.msgQ
            int r0 = r0.size()
            if (r0 <= 0) goto Lcb
            monitor-enter(r6)
            java.util.concurrent.BlockingQueue<org.zbus.mq.server.PullSession> r0 = r6.pullQ     // Catch: java.lang.Throwable -> Lc8
            java.lang.Object r0 = r0.peek()     // Catch: java.lang.Throwable -> Lc8
            org.zbus.mq.server.PullSession r0 = (org.zbus.mq.server.PullSession) r0     // Catch: java.lang.Throwable -> Lc8
            if (r0 != 0) goto L1d
            monitor-exit(r6)     // Catch: java.lang.Throwable -> Lc8
            goto L0
        L1d:
            org.zbus.net.Session r0 = r0.getSession()     // Catch: java.lang.Throwable -> Lc8
            boolean r0 = r0.isActive()     // Catch: java.lang.Throwable -> Lc8
            if (r0 != 0) goto L2e
            java.util.concurrent.BlockingQueue<org.zbus.mq.server.PullSession> r0 = r6.pullQ     // Catch: java.lang.Throwable -> Lc8
            r0.poll()     // Catch: java.lang.Throwable -> Lc8
            monitor-exit(r6)     // Catch: java.lang.Throwable -> Lc8
            goto L0
        L2e:
            org.zbus.mq.disk.MessageQueue r0 = r6.msgQ     // Catch: java.lang.Throwable -> Lc8
            java.lang.Object r0 = r0.poll()     // Catch: java.lang.Throwable -> Lc8
            org.zbus.net.http.Message r0 = (org.zbus.net.http.Message) r0     // Catch: java.lang.Throwable -> Lc8
            if (r0 != 0) goto L3b
            monitor-exit(r6)     // Catch: java.lang.Throwable -> Lc8
            goto Lcb
        L3b:
            java.lang.String r1 = "expire"
            java.lang.String r1 = r0.getHead(r1)     // Catch: java.lang.Throwable -> Lc8
            if (r1 == 0) goto L6b
            java.lang.Long r1 = java.lang.Long.valueOf(r1)     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            long r1 = r1.longValue()     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            long r3 = java.lang.System.currentTimeMillis()     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            int r5 = (r1 > r3 ? 1 : (r1 == r3 ? 0 : -1))
            if (r5 >= 0) goto L6b
            org.zbus.kit.log.Logger r1 = org.zbus.mq.server.MQ.log     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            java.lang.StringBuilder r2 = new java.lang.StringBuilder     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            r2.<init>()     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            java.lang.String r3 = "Remove expired message: \n"
            r2.append(r3)     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            r2.append(r0)     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            java.lang.String r2 = r2.toString()     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            r1.info(r2)     // Catch: java.lang.Exception -> L6b java.lang.Throwable -> Lc8
            monitor-exit(r6)     // Catch: java.lang.Throwable -> Lc8
            goto L0
        L6b:
            java.util.concurrent.BlockingQueue<org.zbus.mq.server.PullSession> r1 = r6.pullQ     // Catch: java.lang.Throwable -> Lc8
            java.lang.Object r1 = r1.poll()     // Catch: java.lang.Throwable -> Lc8
            org.zbus.mq.server.PullSession r1 = (org.zbus.mq.server.PullSession) r1     // Catch: java.lang.Throwable -> Lc8
            monitor-exit(r6)     // Catch: java.lang.Throwable -> Lc8
            long r2 = java.lang.System.currentTimeMillis()
            r6.lastUpdateTime = r2
            org.zbus.net.http.Message r2 = r1.getPullMessage()     // Catch: java.lang.Exception -> Lb7
            org.zbus.net.http.Message r3 = org.zbus.net.http.Message.copyWithoutBody(r0)     // Catch: java.lang.Exception -> Lb7
            java.lang.String r4 = r0.getId()     // Catch: java.lang.Exception -> Lb7
            r3.setOriginId(r4)     // Catch: java.lang.Exception -> Lb7
            java.lang.String r2 = r2.getId()     // Catch: java.lang.Exception -> Lb7
            r3.setId(r2)     // Catch: java.lang.Exception -> Lb7
            java.lang.String r2 = r3.getStatus()     // Catch: java.lang.Exception -> Lb7
            if (r2 != 0) goto Lae
            java.lang.String r2 = "/"
            java.lang.String r4 = r3.getUrl()     // Catch: java.lang.Exception -> Lb7
            boolean r2 = r2.equals(r4)     // Catch: java.lang.Exception -> Lb7
            if (r2 != 0) goto La9
            java.lang.String r2 = r3.getUrl()     // Catch: java.lang.Exception -> Lb7
            r3.setOriginUrl(r2)     // Catch: java.lang.Exception -> Lb7
        La9:
            r2 = 200(0xc8, float:2.8E-43)
            r3.setStatus(r2)     // Catch: java.lang.Exception -> Lb7
        Lae:
            org.zbus.net.Session r1 = r1.getSession()     // Catch: java.lang.Exception -> Lb7
            r1.write(r3)     // Catch: java.lang.Exception -> Lb7
            goto L0
        Lb7:
            r1 = move-exception
            org.zbus.kit.log.Logger r2 = org.zbus.mq.server.MQ.log
            java.lang.String r3 = r1.getMessage()
            r2.error(r3, r1)
            org.zbus.mq.disk.MessageQueue r1 = r6.msgQ
            r1.offer(r0)
            goto L0
        Lc8:
            r0 = move-exception
            monitor-exit(r6)     // Catch: java.lang.Throwable -> Lc8
            throw r0
        Lcb:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.zbus.mq.server.MQ.dispatch():void");
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public Protocol.MqInfo getMqInfo() {
        Protocol.MqInfo mqInfo = new Protocol.MqInfo();
        mqInfo.name = this.name;
        mqInfo.lastUpdateTime = this.lastUpdateTime;
        mqInfo.creator = getCreator();
        mqInfo.mode = this.mode;
        mqInfo.unconsumedMsgCount = this.msgQ.size();
        mqInfo.consumerCount = this.pullSessions.size();
        mqInfo.consumerInfoList = new ArrayList();
        Iterator it = this.pullQ.iterator();
        while (it.hasNext()) {
            mqInfo.consumerInfoList.add(((PullSession) it.next()).getConsumerInfo());
        }
        return mqInfo;
    }

    public String toString() {
        return "MQ [name=" + this.name + ", creator=" + getCreator() + "]";
    }
}
