博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
流控思路——多消费者定量生产(第100篇)
阅读量:6719 次
发布时间:2019-06-25

本文共 2241 字,大约阅读时间需要 7 分钟。

  hot3.png

多线程消费队列到指定个数时触发一个生产线程往队列中补充元素,保证队列中有足够的数据供消费,不至于使消费线程等待,也不至于在队列中堆得过多。假设10人消费,先放2个篮子,每个篮子10个(篮子得够大,怎么也得够在场的人分一次),吃完一篮子赶紧叫人再提一篮子来,谁负责叫人?吃篮子里最后一个的,或吃另外一篮第一个的,这样得知道哪个是最后一个,哪个是第一个。另外一个方法在篮子底部放个托盘,谁拿到托盘谁负责叫人,全部消费完时,篮子里不放托盘了,按人数在篮子里放甜点,每人一份,吃完收工。

final long startTime = System.currentTimeMillis();//开始时间int index = 0;//模拟处理索引final int person = 10;//模拟消费者个数  final LinkedBlockingQueue queue = new LinkedBlockingQueue(1000);/**托盘*/class Salver{};/**甜点*/class Dessert{};/**模拟启动程序*/void start() {    /* 1、先来两篮子 */    for (int k = 0; k < 2; k++) {        for (int i = 0; i < person; i++) {            queue.add(++index);            queue.add(new Salver());        }    }     /* 2、启动消费者 */    for (int i = 0; i < person; i++) {        new Thread(new Consumer()).start();    }}/**消费者*/class Consumer implements Runnable{    @Override    public void run() {        try {            while (true) {                Object thing = queue.take();                                if (thing instanceof Salver) {//拿到托盘,叫人再来一篮子,接着取下一个                    new Thread(new Producer()).start();                    continue;                } else if (thing instanceof Dessert) {//吃完甜点收工                    break;                }                                /*模拟实际处理*/                System.out.println(thing);                Thread.sleep(1000);            }        } catch (Exception e) {            e.printStackTrace();        }    }}/**生产者*/class Producer implements Runnable {    @Override    public void run() {        try {            synchronized(Producer.class){//避免没拿来消费完同时拿                /* 消费1分钟停止,根据实际情况调整,比如库里没有待处理数据或不足一篮子 */                if (System.currentTimeMillis() - startTime > 60 * 1000) {                    /* 人均一份甜点 */                    for (int i = 0; i < person; i++) {                        queue.put(new Dessert());                    }                } else {                    /* 加一篮子 */                    for (int i = 0; i < person; i++) {                        queue.put(index++);                    }                    /* 放一托盘 */                    queue.put(new Salver());                }            }        } catch (Exception e) {            e.printStackTrace();        }    }}

ps:

转载于:https://my.oschina.net/h2do/blog/524605

你可能感兴趣的文章
Akka笔记之消息传递
查看>>
《企业大数据系统构建实战:技术、架构、实施与应用》一1.3 本章小结
查看>>
为什么不能用memcached存储Session?
查看>>
《C++编程风格(修订版)》——2.2 明确定义的状态
查看>>
页面加载显示进度条
查看>>
Logstash 日志搜集处理框架 安装配置
查看>>
Manifest.xml 入门基础(一) 概述与&lt;manifest&gt;标签
查看>>
2016全球最强数据库大盘点
查看>>
可视化与领域驱动设计
查看>>
数据结构实践——字符串加密
查看>>
其他转移指令(0904)
查看>>
《卸甲笔记》-多表查询之一
查看>>
安装部署nvm、npm、nodejs之前先了解清楚三者之间关系
查看>>
linux 磁盘管理下(LVM逻辑卷创建和管理,磁盘配额设置方法以及小技巧)
查看>>
NFS Volume Provider(Part III) - 每天5分钟玩转 OpenStack(64)
查看>>
MySQL 安装详解
查看>>
使用Express + Socket.io + MongoDB实现简单的聊天室
查看>>
【cocos2d-x】横向滚屏射击游戏②----虚拟控制手柄
查看>>
Docker 之 容器网络管理
查看>>
基于时间点的恢复
查看>>