/*
 * Decompiled with CFR 0.152.
 */
package com.riven.redisson.listener;

import com.riven.redisson.listener.AbstractRedissonListenerContainer;
import com.riven.redisson.listener.BatchRedissonMessageListenerAdapter;
import com.riven.redisson.listener.ContainerProperties;
import com.riven.redisson.message.FastJsonCodec;
import com.riven.redisson.message.RedissonMessage;
import com.riven.redisson.support.ThreadFactoryCreator;
import java.util.Collections;
import java.util.List;
import org.redisson.api.RScript;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.util.Assert;

public class BatchRedissonListenerContainer
extends AbstractRedissonListenerContainer {
    private final String fetchScript;
    private final int maxFetch;
    private AsyncMessageProcessingConsumer takeMessageTask;

    public int getMaxFetch() {
        return this.maxFetch;
    }

    public BatchRedissonListenerContainer(ContainerProperties containerProperties, int maxFetch) {
        super(containerProperties);
        Assert.isTrue(maxFetch > 0, "maxFetch must be greater than 0");
        this.maxFetch = maxFetch;
        this.fetchScript = "local expiredValues = redis.call('lrange', KEYS[1], 0, ARGV[1]); if #expiredValues > 0 then redis.call('ltrim', KEYS[1], ARGV[2], -1); end; return expiredValues;";
        this.setTaskExecutor(new SimpleAsyncTaskExecutor(ThreadFactoryCreator.create("RedissonBatchConsumeThread")));
    }

    @Override
    protected void doStart() {
        this.takeMessageTask = new AsyncMessageProcessingConsumer();
        this.getTaskExecutor().execute(this.takeMessageTask);
    }

    @Override
    protected void doStop() {
        this.takeMessageTask.stop();
    }

    private final class AsyncMessageProcessingConsumer
    implements Runnable {
        private volatile Thread currentThread = null;
        private volatile AbstractRedissonListenerContainer.ConsumerStatus status = AbstractRedissonListenerContainer.ConsumerStatus.CREATED;

        private AsyncMessageProcessingConsumer() {
        }

        @Override
        public void run() {
            if (this.status != AbstractRedissonListenerContainer.ConsumerStatus.CREATED) {
                System.out.println("consumer currentThread [" + this.currentThread.getName() + "] will exit, because consumer status is " + (Object)((Object)this.status) + ",expected is CREATED");
                return;
            }
            this.currentThread = Thread.currentThread();
            this.status = AbstractRedissonListenerContainer.ConsumerStatus.RUNNING;
            long maxWaitMillis = 100L;
            long emptyFetchTimes = 0L;
            do {
                try {
                    List<RedissonMessage> messageList = this.fetch();
                    if (messageList == null || messageList.isEmpty()) {
                        long delay = ++emptyFetchTimes * 5L;
                        delay = Math.min(delay, 100L);
                        Thread.sleep(delay);
                        continue;
                    }
                    emptyFetchTimes = 0L;
                    BatchRedissonMessageListenerAdapter redissonListener = (BatchRedissonMessageListenerAdapter)BatchRedissonListenerContainer.this.getRedissonListener();
                    redissonListener.onMessage(messageList);
                }
                catch (InterruptedException | RedisException messageList) {
                }
                catch (Exception e) {
                    System.err.println("error occurred while take message from redisson");
                }
            } while (this.status != AbstractRedissonListenerContainer.ConsumerStatus.STOPPED);
            System.out.println("consumer currentThread [" + this.currentThread.getName() + "] will exit, because of STOPPED status");
            this.currentThread = null;
        }

        private List<RedissonMessage> fetch() {
            String queue = BatchRedissonListenerContainer.this.getContainerProperties().getQueue();
            RedissonClient redissonClient = BatchRedissonListenerContainer.this.getRedissonClient();
            int fetchCount = BatchRedissonListenerContainer.this.maxFetch;
            int searchEndIndex = fetchCount - 1;
            return (List)redissonClient.getScript(FastJsonCodec.INSTANCE).eval(RScript.Mode.READ_WRITE, BatchRedissonListenerContainer.this.fetchScript, RScript.ReturnType.MULTI, Collections.singletonList(queue), searchEndIndex, fetchCount);
        }

        private void stop() {
            if (this.currentThread != null) {
                this.status = AbstractRedissonListenerContainer.ConsumerStatus.STOPPED;
                this.currentThread.interrupt();
            }
        }
    }
}

