由买买提看人间百态

boards

本页内容为未名空间相应帖子的节选和存档,一周内的贴子最多显示50字,超过一周显示500字 访问原贴
JobHunting版 - 再问一个blockingqueue的问题
相关主题
这个Java blocking queue实现是不是有问题?这个用stack实现queue
thread safe blocking queue问题求救: 打印binary tree
thread-safe blockingqueue如何用JAVA中的circular array of queue 解决Josephus problem? (转载)
Java Blocking Queue问题问个题:get max value from Queue, with O(1)?
question 2: o(1) euque and dequeue?面试题
请教一个系统设计问题 (转载)一道很难的面试题
如何实现binary tree的从下到上的分层打印?Two programming questions...
share 面试题A家电面
相关话题的讨论汇总
话题: queue话题: thread话题: enque
进入JobHunting版参与讨论
1 (共1页)
c********t
发帖数: 5706
1
blockingqueue已经讨论过不少次了。我想问一下有可能执行enque的thread 和deque的
thread同时在wait吗?还有就是如何保证 enque顺序呢,比如三个enque threads都先
后wait, 唤醒的时候能保证唤醒的是第一个thread吗?Thread scheduling并不能保证
吧?那么不久违反了queueFIFO的性质吗?
r*******h
发帖数: 315
2
1. 要看你讨论是哪里实现的blockingqueue。general的定义只要求queue空和满的时候
wait
2. 在java的specs中,blocking queue必须是thread safe,那么就有可能同时wait
3. 在java中这个顺序是不保证的,不过可以理解为enqueque还未成功,从queue的角度
谈不上谁先谁后
c********t
发帖数: 5706
3
多谢!1,3明白了。2 我的意思是会不会同时 执行deque的thread 进入 while(queue.
isEmpty()) { wait();}, 而另一个执行enque的thread进入while(queue.size()==
limit){ wait();} ? limit capacity大于等于1。
我觉得应该不会吧。

【在 r*******h 的大作中提到】
: 1. 要看你讨论是哪里实现的blockingqueue。general的定义只要求queue空和满的时候
: wait
: 2. 在java的specs中,blocking queue必须是thread safe,那么就有可能同时wait
: 3. 在java中这个顺序是不保证的,不过可以理解为enqueque还未成功,从queue的角度
: 谈不上谁先谁后

D**C
发帖数: 6754
4
我认为不可能

queue.

【在 c********t 的大作中提到】
: 多谢!1,3明白了。2 我的意思是会不会同时 执行deque的thread 进入 while(queue.
: isEmpty()) { wait();}, 而另一个执行enque的thread进入while(queue.size()==
: limit){ wait();} ? limit capacity大于等于1。
: 我觉得应该不会吧。

c********t
发帖数: 5706
5
多谢!
我想明白了。我原来想如果deque 和enque thread不会同时等待的话,synchronized +
notify不就行吗?(不是notify all) 后来想到由于synchronized, 他们其实会同时等
待,光notify就不行了。
还有一个问题就是 为什么要用循环array来做,而不用一个local的queue呢?

【在 D**C 的大作中提到】
: 我认为不可能
:
: queue.

z*****6
发帖数: 16
6
jave 有reentrant lock,有实现算法可以保证线程之间fair,但是会造成performance
的degradation。想想也正常吧,no free lunch。
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/
ReentrantLock.html
c********t
发帖数: 5706
7
多谢!

performance

【在 z*****6 的大作中提到】
: jave 有reentrant lock,有实现算法可以保证线程之间fair,但是会造成performance
: 的degradation。想想也正常吧,no free lunch。
: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/
: ReentrantLock.html

k****r
发帖数: 807
8
不知为什么, 感觉这种情况似乎用condition更合适:
class BoundedBuffer {
final Lock lock = new ReentrantLock();//锁对象
final Condition notFull = lock.newCondition();//写线程条件
final Condition notEmpty = lock.newCondition();//读线程条件

final Object[] items = new Object[100];//缓存队列
int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)//如果队列满了
notFull.await();//阻塞写线程
items[putptr] = x;//赋值
if (++putptr == items.length) putptr = 0;//如果写索引写到队列的最后一
个位置了,那么置为0
++count;//个数++
notEmpty.signal();//唤醒读线程
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)//如果队列为空
notEmpty.await();//阻塞读线程
Object x = items[takeptr];//取值
if (++takeptr == items.length) takeptr = 0;//如果读索引读到队列的最后
一个位置了,那么置为0
--count;//个数--
notFull.signal();//唤醒写线程
return x;
} finally {
lock.unlock();
}
}
}
c********t
发帖数: 5706
9
你说得对,因为只需要唤醒一个。但是为什么要用循环数组,直接用LinkedList或
ArrayDeque不行吗?



【在 k****r 的大作中提到】
: 不知为什么, 感觉这种情况似乎用condition更合适:
: class BoundedBuffer {
: final Lock lock = new ReentrantLock();//锁对象
: final Condition notFull = lock.newCondition();//写线程条件
: final Condition notEmpty = lock.newCondition();//读线程条件
:
: final Object[] items = new Object[100];//缓存队列
: int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;
:
: public void put(Object x) throws InterruptedException {

c********t
发帖数: 5706
10
原谅我再执着问一次。synchronized 等待状态是block,不需要唤醒, 和wait是有区别
的。notify是唤醒wait状态的。到底会不
会有enque, deque同时wait的时候呢?如果没有,那notify只会wake对方的莫个thread
, 我写了一个最简单的,没用
notifyAll, 用的notify. 这个也是每次只唤醒一个线程。但如果有enque,
deque同时wait,这代码会有应该有deadlock,但是测试了很多次,上百线程,也没有
发生deadlock. 请大牛们帮看看下面代码有问题吗?
public class BlockingQueue3 {
private Deque queue;
private int limit = 10;
public BlockingQueue3(int pLimit) {
limit = pLimit;
queue = new ArrayDeque<>();
}
public synchronized void put(T item) throws InterruptedException {
while (queue.size() == limit) {
wait();
}
queue.offer(item);
notify();
}
public synchronized T take() throws InterruptedException {
while (queue.size() == 0) {
wait();
}
T e = queue.poll();
notify();
return e;
}
}

+

【在 c********t 的大作中提到】
: 多谢!
:
: performance

相关主题
请教一个系统设计问题 (转载)这个用stack实现queue
如何实现binary tree的从下到上的分层打印?求救: 打印binary tree
share 面试题如何用JAVA中的circular array of queue 解决Josephus problem? (转载)
进入JobHunting版参与讨论
c********t
发帖数: 5706
11
这个是完整测试代码,有兴趣的可以跑跑试试
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
public class BlockingQueue {
private List queue;
private int limit;
public BlockingQueue(int pLimit) {
this.limit = pLimit;
queue = new LinkedList();
}
public synchronized void enqueue(T item) throws InterruptedException {
try {
if (this.queue.size() == this.limit)
System.out.println("wait, enque " + item);
//Thread.sleep(2000);
while (this.queue.size() == this.limit) {
wait();
}
} catch (InterruptedException ex) {
notify();
throw ex;
}
System.out.println("enque " + item);
// if (this.queue.size() == 0) {
System.out.println("wake up, deque");
notify();
// }
this.queue.add(item);
}
public synchronized T dequeue(int index) throws InterruptedException {
try {
if (this.queue.size() == 0)
System.out.println("wait, dequeue " + index);
//Thread.sleep(2000);
while (this.queue.size() == 0) {
wait();
}
} catch (InterruptedException ex) {
notify();
throw ex;
}
T a = queue.remove(0);
System.out.println("deque " + index + ": " + a);
// if (this.queue.size() == this.limit) {
System.out.println("wake up, enque");
notify();
// }
return a;
}
private static class Caller extends Thread {
private BlockingQueue bq;
int index;
public Caller(BlockingQueue bq, int ind) {
this.bq = bq;
index = ind;
}
public void run() {
Random r = new Random();
try {
if (r.nextBoolean())
bq.enqueue(index);
else
bq.dequeue(index);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) {
BlockingQueue bq = new BlockingQueue<>(1);
Thread[] threads = new Thread[100];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Caller(bq, i);
threads[i].start();
}
try {
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
} catch (InterruptedException e) {
System.out.println("Interrupted..");
}
}
}



【在 k****r 的大作中提到】
: 不知为什么, 感觉这种情况似乎用condition更合适:
: class BoundedBuffer {
: final Lock lock = new ReentrantLock();//锁对象
: final Condition notFull = lock.newCondition();//写线程条件
: final Condition notEmpty = lock.newCondition();//读线程条件
:
: final Object[] items = new Object[100];//缓存队列
: int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;
:
: public void put(Object x) throws InterruptedException {

l*********r
发帖数: 105
12
没必要那么复杂。用一个priority queue, element class implements comparable.
consumer:
每个consumer进来,如果blocking queue里面没有东西,创建一个priority queue的
element,然后加进priority queue, 这个consumer thread就wait on这个element。
producer:
每个producer进来,dequque priority queue,wakeup thread,直到priority queue
empty或者blocking queue empty。
以上步骤synchronize on priority queue或者blocking queue都行。
c********t
发帖数: 5706
13
既然consumer thread wait, producer wake up它, 为什么还要priority queue呢?
是想保证顺序吗?
能给个链接或代码吗?

queue

【在 l*********r 的大作中提到】
: 没必要那么复杂。用一个priority queue, element class implements comparable.
: consumer:
: 每个consumer进来,如果blocking queue里面没有东西,创建一个priority queue的
: element,然后加进priority queue, 这个consumer thread就wait on这个element。
: producer:
: 每个producer进来,dequque priority queue,wakeup thread,直到priority queue
: empty或者blocking queue empty。
: 以上步骤synchronize on priority queue或者blocking queue都行。

k****r
发帖数: 807
14
lz 你最后的code有两个问题:
1. dequeue 中idx没有用。
2. enqueue dequeue在对queue操作前notify不对:
notify();
// }
this.queue.add(item);
c********t
发帖数: 5706
15
1. dequeue 中idx没有用。
index是打印输出thread id用的。
2. enqueue dequeue在对queue操作前notify不对.
我理解顺序应该无所谓,因为唤醒的thread并不会马上执行,而是等这个thread结束后
才开始执行( synchronized).不知道对不对。

【在 k****r 的大作中提到】
: lz 你最后的code有两个问题:
: 1. dequeue 中idx没有用。
: 2. enqueue dequeue在对queue操作前notify不对:
: notify();
: // }
: this.queue.add(item);

l*********r
发帖数: 105
16
这个是给consumer thread排序,按顺序从priority queue wake up thread。
楼主的意思是要实现notify all的时候按priority wakeup,这就是一个简单的实现方
法,log(n)的复杂性,一般也就够用了。

【在 c********t 的大作中提到】
: 既然consumer thread wait, producer wake up它, 为什么还要priority queue呢?
: 是想保证顺序吗?
: 能给个链接或代码吗?
:
: queue

c********t
发帖数: 5706
17
明白了,多谢!如果不用priority queue, 只用一个 queue 不可以吗,先入先出应该
也可以保证顺序吧?

【在 l*********r 的大作中提到】
: 这个是给consumer thread排序,按顺序从priority queue wake up thread。
: 楼主的意思是要实现notify all的时候按priority wakeup,这就是一个简单的实现方
: 法,log(n)的复杂性,一般也就够用了。

l*********r
发帖数: 105
18
用priority queue,高优先权的thread就可以插队, 先wake up。FIFO queue就不能插
队了。
c********t
发帖数: 5706
19
明白了,多谢多谢!

【在 l*********r 的大作中提到】
: 用priority queue,高优先权的thread就可以插队, 先wake up。FIFO queue就不能插
: 队了。

1 (共1页)
进入JobHunting版参与讨论
相关主题
A家电面question 2: o(1) euque and dequeue?
电面两题请教一个系统设计问题 (转载)
怎么练习multi-threading,平常工作都是用Java框架如何实现binary tree的从下到上的分层打印?
what is the internal implementation of Dequeshare 面试题
这个Java blocking queue实现是不是有问题?这个用stack实现queue
thread safe blocking queue问题求救: 打印binary tree
thread-safe blockingqueue如何用JAVA中的circular array of queue 解决Josephus problem? (转载)
Java Blocking Queue问题问个题:get max value from Queue, with O(1)?
相关话题的讨论汇总
话题: queue话题: thread话题: enque