/*
 * Decompiled with CFR 0.152.
 */
package com.riven.redisson.listener;

import com.alibaba.fastjson.JSONObject;
import com.riven.redisson.listener.AbstractRedissonListenerContainer;
import com.riven.redisson.listener.ContainerProperties;
import com.riven.redisson.listener.SimpleRedissonMessageListenerAdapter;
import com.riven.redisson.message.FastJsonCodec;
import com.riven.redisson.message.RedissonMessage;
import java.util.Objects;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.ListObjectDecoder;
import org.redisson.command.CommandAsyncExecutor;

public class SimpleRedissonListenerContainer
extends AbstractRedissonListenerContainer {
    private RedisCommand<Object> LPOP_VALUE = new RedisCommand("LPOP", new ListObjectDecoder(1));
    private AsyncMessageProcessingConsumer takeMessageTask;

    public SimpleRedissonListenerContainer(ContainerProperties containerProperties) {
        super(containerProperties);
    }

    @Override
    protected void doStart() {
        this.takeMessageTask = new AsyncMessageProcessingConsumer();
        this.getTaskExecutor().execute(this.takeMessageTask);
    }

    @Override
    protected void doStop() {
        this.takeMessageTask.stop();
    }

    private final class AsyncMessageProcessingConsumer
    implements Runnable {
        private volatile Thread currentThread = null;
        private volatile AbstractRedissonListenerContainer.ConsumerStatus status = AbstractRedissonListenerContainer.ConsumerStatus.CREATED;

        private AsyncMessageProcessingConsumer() {
        }

        @Override
        public void run() {
            if (this.status != AbstractRedissonListenerContainer.ConsumerStatus.CREATED) {
                System.out.println("consumer currentThread [" + this.currentThread.getName() + "] will exit, because consumer status is " + (Object)((Object)this.status) + ",expected is CREATED");
                return;
            }
            String queue = SimpleRedissonListenerContainer.this.getContainerProperties().getQueue();
            Redisson redisson = (Redisson)SimpleRedissonListenerContainer.this.getRedissonClient();
            RBlockingQueue blockingQueue = redisson.getBlockingQueue(queue, FastJsonCodec.INSTANCE);
            if (blockingQueue == null) {
                System.out.println("error occurred while create blockingQueue for queue [" + queue + "]");
                return;
            }
            CommandAsyncExecutor commandExecutor = redisson.getCommandExecutor();
            this.currentThread = Thread.currentThread();
            this.status = AbstractRedissonListenerContainer.ConsumerStatus.RUNNING;
            long maxWaitMillis = 100L;
            long emptyFetchTimes = 0L;
            do {
                try {
                    RFuture asyncResult = commandExecutor.writeAsync(blockingQueue.getName(), blockingQueue.getCodec(), SimpleRedissonListenerContainer.this.LPOP_VALUE, blockingQueue.getName());
                    Object object = commandExecutor.get(asyncResult);
                    if (Objects.isNull(object)) {
                        Thread.sleep(Math.min(++emptyFetchTimes * 5L, 100L));
                        continue;
                    }
                    RedissonMessage redissonMessage = JSONObject.parseObject(object.toString(), RedissonMessage.class);
                    emptyFetchTimes = 0L;
                    SimpleRedissonMessageListenerAdapter redissonListener = (SimpleRedissonMessageListenerAdapter)SimpleRedissonListenerContainer.this.getRedissonListener();
                    redissonListener.onMessage(redissonMessage);
                }
                catch (InterruptedException | RedisException asyncResult) {
                }
                catch (Exception e) {
                    System.out.println("error occurred while take message from redisson");
                }
            } while (this.status != AbstractRedissonListenerContainer.ConsumerStatus.STOPPED);
            System.out.println("consumer currentThread [" + this.currentThread.getName() + "] will exit, because of STOPPED status");
            this.currentThread = null;
        }

        private void stop() {
            if (this.currentThread != null) {
                this.status = AbstractRedissonListenerContainer.ConsumerStatus.STOPPED;
                this.currentThread.interrupt();
            }
        }
    }
}

