producer-consumer 模式

生产者负责创建消息
消费者负责消费
本文使用 java 实现,主要使用 BlockingQueue

关于 BlockingQueue

BlockingQueue--阻塞队列,
当消息满的时候尝试再放入消息时会阻塞,
当消息空的时候尝试取消息时也会阻塞。

测试代码

public static void main(String[] args) throws InterruptedException {

        BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);

        for (int i=0; i<8; i++) {
            LOGGER.info("i=" +i);
            queue.put(i + "th");
        }

        for (int i=0; i<7; i++) {
            String result = queue.take();
            System.out.println(result);
        }
}
21:49:30.823 [main] INFO com.carl.producer_consumer.TaskQueue - i=0
21:49:30.827 [main] INFO com.carl.producer_consumer.TaskQueue - i=1
21:49:30.827 [main] INFO com.carl.producer_consumer.TaskQueue - i=2
21:49:30.827 [main] INFO com.carl.producer_consumer.TaskQueue - i=3
21:49:30.827 [main] INFO com.carl.producer_consumer.TaskQueue - i=4
21:49:30.827 [main] INFO com.carl.producer_consumer.TaskQueue - i=5

可以看到当尝试 put 第 6 个数据的时候,队列阻塞了
所以日常工作中生产者与消费者在不同的系统中或在不同的线程中工作

package com.carl.producer_consumer;

public class Task {

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    private String name;
    private String status;

    Task(String name, String status) {
        this.name = name;
        this.status = status;
    }
}

package com.carl.producer_consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class TaskQueue {

    private BlockingQueue<Task> queue = new LinkedBlockingQueue<Task>(3);

    public TaskQueue() {

    }

    public Task take() throws InterruptedException {
        return queue.take();
    }

    public void put(Task task) throws InterruptedException {
        queue.put(task);
    }
}

package com.carl.producer_consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskProducer.class);
    private TaskQueue queue;
    TaskProducer(TaskQueue taskQ) {
        this.queue = taskQ;
    }

    public void producer(Task task) throws InterruptedException {
        LOGGER.debug(task.getName() + "," + task.getStatus());
        queue.put(task);;
    }
}
package com.carl.producer_consumer;

import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskConsumer {

    private static Logger LOGGER = LoggerFactory.getLogger(TaskConsumer.class);

    private TaskQueue queue;
    TaskConsumer(TaskQueue queue) {
        this.queue = queue;
    }

    public void consumer() throws InterruptedException {
        Task task = queue.take();
        LOGGER.debug(task.getName() + " is consume " + task.getStatus());
    }
}
package com.carl.producer_consumer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Main {

    static TaskQueue taskQ = new TaskQueue();
    static ExecutorService threadPool = Executors.newFixedThreadPool(6);
    static int i=0;
    private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) {
        TaskProducer producer = new TaskProducer(taskQ);
        threadPool.submit(() ->{
            while(true) {
                i++;
                producer.producer(new Task("the "+i+" queue:", "create finish"));
                Thread.sleep(1000);
            }
        });

        TaskConsumer consumer = new TaskConsumer(taskQ);
        threadPool.submit(() ->{
            while(true) {
                consumer.consumer();
                Thread.sleep(3000);
            }
        });
    }
}
08:11:47.949 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 1 queue:,create finish
08:11:47.952 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 1 queue: is consume create finish
08:11:48.952 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 2 queue:,create finish
08:11:49.953 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 3 queue:,create finish
08:11:50.953 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 2 queue: is consume create finish
08:11:50.953 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 4 queue:,create finish
08:11:51.953 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 5 queue:,create finish
08:11:52.953 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 6 queue:,create finish
08:11:53.953 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 3 queue: is consume create finish
08:11:54.953 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 7 queue:,create finish
08:11:56.953 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 4 queue: is consume create finish
08:11:57.963 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 8 queue:,create finish
08:11:59.954 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 5 queue: is consume create finish
08:12:00.954 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 9 queue:,create finish
08:12:02.954 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 6 queue: is consume create finish
08:12:03.954 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 10 queue:,create finish
08:12:05.954 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 7 queue: is consume create finish
08:12:06.954 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 11 queue:,create finish
08:12:08.954 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 8 queue: is consume create finish
08:12:09.955 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 12 queue:,create finish
08:12:11.955 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 9 queue: is consume create finish
08:12:12.955 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 13 queue:,create finish
08:12:14.955 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 10 queue: is consume create finish
08:12:15.955 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 14 queue:,create finish
08:12:17.955 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 11 queue: is consume create finish
08:12:18.955 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 15 queue:,create finish
08:12:20.956 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 12 queue: is consume create finish
08:12:21.956 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 16 queue:,create finish
08:12:23.956 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 13 queue: is consume create finish
08:12:24.956 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 17 queue:,create finish
08:12:26.956 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 14 queue: is consume create finish
08:12:27.956 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 18 queue:,create finish
08:12:29.957 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 15 queue: is consume create finish
08:12:30.957 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 19 queue:,create finish
08:12:32.957 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 16 queue: is consume create finish
08:12:33.957 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 20 queue:,create finish
08:12:35.957 [pool-1-thread-2] DEBUG com.carl.producer_consumer.TaskConsumer - the 17 queue: is consume create finish
08:12:36.957 [pool-1-thread-1] DEBUG com.carl.producer_consumer.TaskProducer - the 21 queue:,create finish

我的理解

虽然生产者和消费者在时间延迟上不一致,但由于阻塞的关系,它会等到有消息被取出之后再生产,
这可能是一种 “背压(backpressure)” 说可能是因为还不了解背压的真正概念
背压的概念是在 RXJAVA 模式中了解到的,等写出 demo 的时候再说明吧


↙↙↙阅读原文可查看相关链接,并与作者交流