private final int batchSize;
private final AtomicInteger counter = new AtomicInteger();
ElementsImpl(int i) {
batchSize = i;
}
public int getBatchSize() {
return batchSize;
}
@Override
public void inc() {
counter.incrementAndGet();
}
@Override
public boolean reserve() {
int val = counter.get();
return val >= batchSize && counter.compareAndSet(val, val -
batchSize);
}
@Override
public void cancel() {
counter.addAndGet(batchSize);
}
@Override
public void releaseQueued(SynchronousQueue
m*****n 发帖数: 204
3
package test;
import java.util.concurrent.Semaphore;
import test.IElements.WaterElements;
public class H2OExtraThread {
private final Semaphore sema = new Semaphore(0);
private volatile boolean isRunning = true;
// Called by the control thread
private void controllerRun() {
package test;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import test.IElements.WaterElements;
public class H2OLockFree {
private final AtomicLong sequencer = new AtomicLong();
private final AtomicInteger contention = new AtomicInteger();
public void addO() {
addElements(WaterElements.O);
}
private void addElements(WaterElements ele) {
long id = sequencer.incrementAndGet();
contention.incrementAndGet();
ele.inc();
while (true) {
WaterElements failure = null;
next_ele:
for (WaterElements e : WaterElements.values()) {
while (id == sequencer.get() && contention.get() > 1) {
if (e.reserve()) continue next_ele;
try { Thread.sleep(1); } catch (Exception err) {}
}
failure = e;
break;
}
public class H2O {
public static class ThreadHolder {
Thread thread;
ThreadHolder(Thread thread) {
this.thread = thread;
}
}
protected LinkedList hlist = new LinkedList();
protected LinkedList olist = new LinkedList();
public boolean h() {
ThreadHolder current = new ThreadHolder(Thread.currentThread());
synchronized (current) {
ThreadHolder[] threadsToWakeUp = null;
// updating the queues requires a global lock;
synchronized (this) {
this.hlist.add(current);
threadsToWakeUp = this.getThreadsToWakeUp();
}
// now the wait and wake up is only in the current lock;
boolean notInterrupted = this
.checkWaitAndWakeUp(current, threadsToWakeUp);
if (notInterrupted) {
this.doH();
}
return notInterrupted;
}
}
private ThreadHolder[] getThreadsToWakeUp() {
if (this.hlist.size() >= 2 && this.olist.size() >= 1) {
ThreadHolder[] holders = new ThreadHolder[3];
holders[0] = this.hlist.poll();
holders[1] = this.hlist.poll();
holders[2] = this.olist.poll();
return holders;
}
return null;
}
// return true if the wait is not interrupted.
private boolean checkWaitAndWakeUp(ThreadHolder current,
ThreadHolder[] threadsToWakeUp) {
boolean wait = true;
if (threadsToWakeUp != null) {
for (ThreadHolder threadToWakeUp : threadsToWakeUp) {
if (threadToWakeUp == current) {
wait = false;
} else {
// this lock ensures notify will only happen after the
// thread wait;
synchronized (threadToWakeUp) {
threadToWakeUp.notifyAll();
}
}
}
}
if (wait) {
try {
current.wait();
} catch (InterruptedException e) {
return false;
}
}
return true;
}
// similiar to h();
public boolean o() {
ThreadHolder current = new ThreadHolder(Thread.currentThread());
synchronized (current) {
ThreadHolder[] threadsToWakeUp = null;
// updating the queues requires a global lock;
synchronized (this) {
this.olist.add(current);
threadsToWakeUp = this.getThreadsToWakeUp();
}
// now the wait and wake up is only in the current lock;
boolean notInterrupted = this
.checkWaitAndWakeUp(current, threadsToWakeUp);
if (notInterrupted) {
this.doO();
}
return notInterrupted;
}
}
// Override this to extend the business logic
protected void doH() {
// business logic;
}
// Override this to extend the business logic
protected void doO() {
// business logic
}
}
如果允许我大展身手的话。我会用EIP Aggregator pattern http://www.eaipatterns.com/Aggregator.html
如果要考我java,我就用 new concurrent package, 简单明了
public class H2O {
private BlockingQueue hq = Queues.newArrayBlockingQueue();
private BlockingQueue oq = Queues.newArrayBlockingQueue();
public H2O H() {
CountDownLatch latch = new CountDownLatch(1);
hq.add(latch);
latch.await();
return this;
}
public H2O O() {
CountDownLatch latch = new CountDownLatch(1);
oq.add(latch);
latch.await();
return this;
}
public H2O() {
Executors.newSingleThreadPool().execute(new Runnable() {
void run() {
while(true) {
CountDownLatch o = oq.take();
CountDownLatch h1 =hq.take();
hq.take().countDown();
h1.countDown();
o.countDown();
}
}
}
}
}
f********d 发帖数: 51
15
没有额外thread的版本
public class H2O {
private BlockingQueue hq = Queues.newArrayBlockingQueue();
public H2O H() {
CountDownLatch latch = new CountDownLatch(1);
hq.add(latch);
latch.await();
return this;
}
public H2O O() {
// this while is to make sure there is no such case where
// one O got one H and then being prempted, another O got the new H
// above situation will result 2O and 2H produces no H2O
while(true) {
CountDownLatch first = hq.take();
CountDownLatch second = hq.poll();
if (second == null) {
hq.add(first);
continue;
}
first.countDown();
second.countDown();
return this;
}
}
}
f********d 发帖数: 51
16
in a concurrent implementation, Thread.sleep is always a big NONO
【在 m*****n 的大作中提到】 : package test; : import java.util.concurrent.atomic.AtomicInteger; : import java.util.concurrent.atomic.AtomicLong; : import test.IElements.WaterElements; : public class H2OLockFree { : private final AtomicLong sequencer = new AtomicLong(); : private final AtomicInteger contention = new AtomicInteger(); : public void addO() { : addElements(WaterElements.O); : }
【在 f********d 的大作中提到】 : 没有额外thread的版本 : public class H2O { : private BlockingQueue hq = Queues.newArrayBlockingQueue(); : public H2O H() { : CountDownLatch latch = new CountDownLatch(1); : hq.add(latch); : latch.await(); : return this; : } : public H2O O() {
f********d 发帖数: 51
20
I've passed the unit tests pretty well in different scenarios.
of course you can use ReentrantLock and 2 Conditions. And you need one more
thing which is the CyclicBarrier for the H(2)
Problem came when you implement.
When O came, you normally will signal the Condition for O. but what if there
's no H came before or only one came?
Now you need to await on H's condition.
Now you meet a pretty weird situation of signal first or await first.
normally the strategy to solve is using while(true) and keep trying to avoid
deadlocks.
That's fine the code can work. but can you think of any better way of
handling this and how can it be extended in the future?
First thing come into mind is, why I need that while(true)? because I dont
know whether the other side is waiting by Condition interface. What about
introduce one?
alright, now since you need a collection, what about a concurrent one to
help? BlockingQueue came into the door.
Now i still need condition to do await. now you could do lock and new
condition and await. but the question is, is there any class can do this job
much easier?
That's where the CountDownLatch kicks in.
okay, now back to the extensible question? what if you have C2H4O2? then you
code is pretty messy. using the first approach i gave.
in the process method, The runnable is actually describing how H2O or C2H4O2
. and you could make it better as a DSL and encapsulate and abstract to
Element and ElementDefintion.
in that way, you can loop thru all the ElementDefitions(a formula). now the
constructor can take a DSL of ElementDefinition.
Now to this point, it becomes a trival version of Aggregator in Camel or
Spring integration. now what's left over?
of course, Exception handlings.
There will be more considierations on that front, but this is also a test to
see how sophisticated the interviewee is.
btw, I used the same questioin in my interviews. very few people can go this
far.
while(true) {
CountDownLatch o = oq.take(); // take an O
CountDownLatch h1 =hq.take(); // take an H
hq.take().countDown(); // take another H
h1.countDown(); // count down all the rest
o.countDown();
}
any new formula, you only need to make sure the first 3 lines reflects your
formula well. it becomes very extensible. and you can also do
Collection list = Lists.newArrayList();
for (ElementDefinition ed : defs) {
BlockingQueue queue = ed.getQueue();
for (int i =0; i < ed.getNumber() ; i++) {
list.add(queue.take());
}
}
for (CountDownLAtch latch : list) {
latch.countDown();
}
with DSL, you could
val H2 = new ElementDefinition(2);
val O = new ElementDefinition(1);
defs.add(H2);
defs.add(O);
void H() {
H2.await();
}
class ElementDefinition {
....
public void await() {
CountDownLatch latch = new CountDownLatch(1);
queue.add(latch);
latch.await();
}
....
}
this is synchronized implementaiton. you can also have interface Element to
replace CountDownLAtch.
for synchronized implementation, use ElementSychImpl with CountDownLatch.
for asycn, get a call back.
Now the codes will be quite extensible and easier to understand
more
there
avoid
【在 f********d 的大作中提到】 : I've passed the unit tests pretty well in different scenarios. : of course you can use ReentrantLock and 2 Conditions. And you need one more : thing which is the CyclicBarrier for the H(2) : Problem came when you implement. : When O came, you normally will signal the Condition for O. but what if there : 's no H came before or only one came? : Now you need to await on H's condition. : Now you meet a pretty weird situation of signal first or await first. : normally the strategy to solve is using while(true) and keep trying to avoid : deadlocks.
i*******6 发帖数: 107
22
My 5 cents:
import java.util.concurrent.*;
class MyO implements Runnable{
int index;
CountDownLatch lock;
public MyO (int index, CountDownLatch lock) {
this.index = index;
this.lock = lock;
}
public void run() {
try {
Thread.sleep((long) (Math.random() * 1000));
lock.await();
System.out.println("Take O:" + index + " to form an H2O!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyH implements Runnable{
int index;
CountDownLatch lock;
public MyH(int index, CountDownLatch lock) {
this.index = index;
this.lock = lock;
}
public void run() {
try {
hread.sleep((long) (Math.random() * 1000));
lock.await();
System.out.println("take H:"+index+" to form a H2O!");
} catch(Exception e) {
e.printStackTrace();
}
}
}
public class MyH2O {
private static BlockingQueue hq = new
LinkedBlockingQueue();
private static BlockingQueue oq = new
LinkedBlockingQueue();
public static void main(String args[]){
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new Thread() {
public void run(){
try{
while (true) {
Thread.sleep((long) (Math.random() * 1000));
CountDownLatch firstH = hq.take();
CountDownLatch secondH = hq.poll();
CountDownLatch firstO = oq.take();
if (secondH == null) {
hq.add(firstH);
continue;
}
firstH.countDown();
secondH.countDown();
firstO.countDown();
System.out.println("A H2O created!");
break;
}
} catch(Exception e) {
e.printStackTrace();
}
}
});
for (int i=0;i<100;i++) {
CountDownLatch lock = new CountDownLatch(1);
int type = (int)(Math.random()*2);
if (type == 0){
hq.add(lock);
service.submit(new MyH(i+1,lock));
} else {
oq.add(lock);
service.submit(new MyO(i+1,lock));
}
}
service.shutdown();
}
}
n********r 发帖数: 102
23
mark一下。。学习了
m*****n 发帖数: 204
24
我对CountDownLatch没太多想过。学习了。
);
);
【在 f********d 的大作中提到】 : 如果允许我大展身手的话。我会用EIP Aggregator pattern : http://www.eaipatterns.com/Aggregator.html : 如果要考我java,我就用 new concurrent package, 简单明了 : public class H2O { : private BlockingQueue hq = Queues.newArrayBlockingQueue(); : private BlockingQueue oq = Queues.newArrayBlockingQueue(); : public H2O H() { : CountDownLatch latch = new CountDownLatch(1); : hq.add(latch); : latch.await();
synchronized(ti){
if(ti!=null && !ti.isNotified())
ti.wait();
}
}
-----------------------------
测试结果
o
h
o
h
1 h2o
end .......,Thread[Thread-5,5,main]
end .......,Thread[Thread-3,5,main]
end .......,Thread[Thread-4,5,main]
h
h
1 h2o
end .......,Thread[Thread-2,5,main]
end .......,Thread[Thread-6,5,main]
end .......,Thread[Thread-1,5,main]
-------------------------------
class ThreadInfo{
private boolean isNotified=false;
private Thread t;
private boolean isH;
public Thread getT() {
return t;
}
public void setT(Thread t) {
this.t = t;
}
public boolean isH() {
return isH;
}
public void setH(boolean isH) {
this.isH = isH;
}
public boolean isO() {
return !isH();
}
public void notified(){
this.isNotified = true;
}
public boolean isNotified(){
return this.isNotified;
}
}
private final int batchSize;
private final AtomicInteger counter = new AtomicInteger();
ElementsImpl(int i) {
batchSize = i;
}
public int getBatchSize() {
return batchSize;
}
@Override
public void inc() {
counter.incrementAndGet();
}
@Override
public boolean reserve() {
int val = counter.get();
return val >= batchSize && counter.compareAndSet(val, val -
batchSize);
}
@Override
public void cancel() {
counter.addAndGet(batchSize);
}
@Override
public void releaseQueued(SynchronousQueue queue,
boolean coversSelf) {
for (int i = 0; i < (!coversSelf? batchSize : batchSize - 1); i+
+) {
try {
queue.take();
} catch (InterruptedException ie) {
// Bug!
}
}
if (!coversSelf) {
return;
}
// Starvation prevention: should I wait or go?
if (queue.poll() != null) {
this.enqueue(queue);
}
// Otherwise I'm the last one and should go.
}
@Override
public void enqueue(SynchronousQueue queue) {
queue.offer(this);
}
}
public static enum WaterElements {
H(2), O(1);
private final IElements ele;
private WaterElements(int atomCount) {
this.ele = new ElementsImpl(atomCount);
}
public void inc() {
ele.inc();
}
public boolean reserve() {
return ele.reserve();
}
public void cancel() {
ele.cancel();
}
public void releaseQueued(WaterElements caller) {
ele.releaseQueued(waitQueue, caller == this);
}
public void enqueue() {
ele.enqueue(waitQueue);
}
private static final SynchronousQueue waitQueue =
new SynchronousQueue(true);
public static final int totalAtomsPerMolecule;
static {
int sum = 0;
for (WaterElements we : WaterElements.values()) {
sum += we.ele.getBatchSize();
}
totalAtomsPerMolecule = sum;
}
public static WaterElements reserveAll() {
for (WaterElements we : WaterElements.values()) {
if (!we.reserve()) return we;
}
return null;
}
public static void releaseAll(WaterElements caller) {
for (WaterElements we : WaterElements.values()) {
we.releaseQueued(caller); }
}
public static void cancelAll(WaterElements reserveFailure) {
WaterElements[] values = WaterElements.values();
for (int i = reserveFailure.ordinal() - 1; i >= 0; i--) {
values[i].cancel();
}
}
}
m*****n 发帖数: 204
40
package test;
import java.util.concurrent.Semaphore;
import test.IElements.WaterElements;
public class H2OExtraThread {
private final Semaphore sema = new Semaphore(0);
private volatile boolean isRunning = true;
// Called by the control thread
private void controllerRun() {
package test;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import test.IElements.WaterElements;
public class H2OLockFree {
private final AtomicLong sequencer = new AtomicLong();
private final AtomicInteger contention = new AtomicInteger();
public void addO() {
addElements(WaterElements.O);
}
private void addElements(WaterElements ele) {
long id = sequencer.incrementAndGet();
contention.incrementAndGet();
ele.inc();
while (true) {
WaterElements failure = null;
next_ele:
for (WaterElements e : WaterElements.values()) {
while (id == sequencer.get() && contention.get() > 1) {
if (e.reserve()) continue next_ele;
try { Thread.sleep(1); } catch (Exception err) {}
}
failure = e;
break;
}
public class H2O {
public static class ThreadHolder {
Thread thread;
ThreadHolder(Thread thread) {
this.thread = thread;
}
}
protected LinkedList hlist = new LinkedList();
protected LinkedList olist = new LinkedList();
public boolean h() {
ThreadHolder current = new ThreadHolder(Thread.currentThread());
synchronized (current) {
ThreadHolder[] threadsToWakeUp = null;
// updating the queues requires a global lock;
synchronized (this) {
this.hlist.add(current);
threadsToWakeUp = this.getThreadsToWakeUp();
}
// now the wait and wake up is only in the current lock;
boolean notInterrupted = this
.checkWaitAndWakeUp(current, threadsToWakeUp);
if (notInterrupted) {
this.doH();
}
return notInterrupted;
}
}
private ThreadHolder[] getThreadsToWakeUp() {
if (this.hlist.size() >= 2 && this.olist.size() >= 1) {
ThreadHolder[] holders = new ThreadHolder[3];
holders[0] = this.hlist.poll();
holders[1] = this.hlist.poll();
holders[2] = this.olist.poll();
return holders;
}
return null;
}
// return true if the wait is not interrupted.
private boolean checkWaitAndWakeUp(ThreadHolder current,
ThreadHolder[] threadsToWakeUp) {
boolean wait = true;
if (threadsToWakeUp != null) {
for (ThreadHolder threadToWakeUp : threadsToWakeUp) {
if (threadToWakeUp == current) {
wait = false;
} else {
// this lock ensures notify will only happen after the
// thread wait;
synchronized (threadToWakeUp) {
threadToWakeUp.notifyAll();
}
}
}
}
if (wait) {
try {
current.wait();
} catch (InterruptedException e) {
return false;
}
}
return true;
}
// similiar to h();
public boolean o() {
ThreadHolder current = new ThreadHolder(Thread.currentThread());
synchronized (current) {
ThreadHolder[] threadsToWakeUp = null;
// updating the queues requires a global lock;
synchronized (this) {
this.olist.add(current);
threadsToWakeUp = this.getThreadsToWakeUp();
}
// now the wait and wake up is only in the current lock;
boolean notInterrupted = this
.checkWaitAndWakeUp(current, threadsToWakeUp);
if (notInterrupted) {
this.doO();
}
return notInterrupted;
}
}
// Override this to extend the business logic
protected void doH() {
// business logic;
}
// Override this to extend the business logic
protected void doO() {
// business logic
}
}
如果允许我大展身手的话。我会用EIP Aggregator pattern http://www.eaipatterns.com/Aggregator.html
如果要考我java,我就用 new concurrent package, 简单明了
public class H2O {
private BlockingQueue hq = Queues.newArrayBlockingQueue();
private BlockingQueue oq = Queues.newArrayBlockingQueue();
public H2O H() {
CountDownLatch latch = new CountDownLatch(1);
hq.add(latch);
latch.await();
return this;
}
public H2O O() {
CountDownLatch latch = new CountDownLatch(1);
oq.add(latch);
latch.await();
return this;
}
public H2O() {
Executors.newSingleThreadPool().execute(new Runnable() {
void run() {
while(true) {
CountDownLatch o = oq.take();
CountDownLatch h1 =hq.take();
hq.take().countDown();
h1.countDown();
o.countDown();
}
}
}
}
}
f********d 发帖数: 51
52
没有额外thread的版本
public class H2O {
private BlockingQueue hq = Queues.newArrayBlockingQueue();
public H2O H() {
CountDownLatch latch = new CountDownLatch(1);
hq.add(latch);
latch.await();
return this;
}
public H2O O() {
// this while is to make sure there is no such case where
// one O got one H and then being prempted, another O got the new H
// above situation will result 2O and 2H produces no H2O
while(true) {
CountDownLatch first = hq.take();
CountDownLatch second = hq.poll();
if (second == null) {
hq.add(first);
continue;
}
first.countDown();
second.countDown();
return this;
}
}
}
f********d 发帖数: 51
53
in a concurrent implementation, Thread.sleep is always a big NONO
【在 m*****n 的大作中提到】 : package test; : import java.util.concurrent.atomic.AtomicInteger; : import java.util.concurrent.atomic.AtomicLong; : import test.IElements.WaterElements; : public class H2OLockFree { : private final AtomicLong sequencer = new AtomicLong(); : private final AtomicInteger contention = new AtomicInteger(); : public void addO() { : addElements(WaterElements.O); : }
【在 f********d 的大作中提到】 : 没有额外thread的版本 : public class H2O { : private BlockingQueue hq = Queues.newArrayBlockingQueue(); : public H2O H() { : CountDownLatch latch = new CountDownLatch(1); : hq.add(latch); : latch.await(); : return this; : } : public H2O O() {
f********d 发帖数: 51
57
I've passed the unit tests pretty well in different scenarios.
of course you can use ReentrantLock and 2 Conditions. And you need one more
thing which is the CyclicBarrier for the H(2)
Problem came when you implement.
When O came, you normally will signal the Condition for O. but what if there
's no H came before or only one came?
Now you need to await on H's condition.
Now you meet a pretty weird situation of signal first or await first.
normally the strategy to solve is using while(true) and keep trying to avoid
deadlocks.
That's fine the code can work. but can you think of any better way of
handling this and how can it be extended in the future?
First thing come into mind is, why I need that while(true)? because I dont
know whether the other side is waiting by Condition interface. What about
introduce one?
alright, now since you need a collection, what about a concurrent one to
help? BlockingQueue came into the door.
Now i still need condition to do await. now you could do lock and new
condition and await. but the question is, is there any class can do this job
much easier?
That's where the CountDownLatch kicks in.
okay, now back to the extensible question? what if you have C2H4O2? then you
code is pretty messy. using the first approach i gave.
in the process method, The runnable is actually describing how H2O or C2H4O2
. and you could make it better as a DSL and encapsulate and abstract to
Element and ElementDefintion.
in that way, you can loop thru all the ElementDefitions(a formula). now the
constructor can take a DSL of ElementDefinition.
Now to this point, it becomes a trival version of Aggregator in Camel or
Spring integration. now what's left over?
of course, Exception handlings.
There will be more considierations on that front, but this is also a test to
see how sophisticated the interviewee is.
btw, I used the same questioin in my interviews. very few people can go this
far.
f********d 发帖数: 51
58
while(true) {
CountDownLatch o = oq.take(); // take an O
CountDownLatch h1 =hq.take(); // take an H
hq.take().countDown(); // take another H
h1.countDown(); // count down all the rest
o.countDown();
}
any new formula, you only need to make sure the first 3 lines reflects your
formula well. it becomes very extensible. and you can also do
Collection list = Lists.newArrayList();
for (ElementDefinition ed : defs) {
BlockingQueue queue = ed.getQueue();
for (int i =0; i < ed.getNumber() ; i++) {
list.add(queue.take());
}
}
for (CountDownLAtch latch : list) {
latch.countDown();
}
with DSL, you could
val H2 = new ElementDefinition(2);
val O = new ElementDefinition(1);
defs.add(H2);
defs.add(O);
void H() {
H2.await();
}
class ElementDefinition {
....
public void await() {
CountDownLatch latch = new CountDownLatch(1);
queue.add(latch);
latch.await();
}
....
}
this is synchronized implementaiton. you can also have interface Element to
replace CountDownLAtch.
for synchronized implementation, use ElementSychImpl with CountDownLatch.
for asycn, get a call back.
Now the codes will be quite extensible and easier to understand
more
there
avoid
【在 f********d 的大作中提到】 : I've passed the unit tests pretty well in different scenarios. : of course you can use ReentrantLock and 2 Conditions. And you need one more : thing which is the CyclicBarrier for the H(2) : Problem came when you implement. : When O came, you normally will signal the Condition for O. but what if there : 's no H came before or only one came? : Now you need to await on H's condition. : Now you meet a pretty weird situation of signal first or await first. : normally the strategy to solve is using while(true) and keep trying to avoid : deadlocks.
i*******6 发帖数: 107
59
My 5 cents:
import java.util.concurrent.*;
class MyO implements Runnable{
int index;
CountDownLatch lock;
public MyO (int index, CountDownLatch lock) {
this.index = index;
this.lock = lock;
}
public void run() {
try {
Thread.sleep((long) (Math.random() * 1000));
lock.await();
System.out.println("Take O:" + index + " to form an H2O!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyH implements Runnable{
int index;
CountDownLatch lock;
public MyH(int index, CountDownLatch lock) {
this.index = index;
this.lock = lock;
}
public void run() {
try {
hread.sleep((long) (Math.random() * 1000));
lock.await();
System.out.println("take H:"+index+" to form a H2O!");
} catch(Exception e) {
e.printStackTrace();
}
}
}
public class MyH2O {
private static BlockingQueue hq = new
LinkedBlockingQueue();
private static BlockingQueue oq = new
LinkedBlockingQueue();
public static void main(String args[]){
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new Thread() {
public void run(){
try{
while (true) {
Thread.sleep((long) (Math.random() * 1000));
CountDownLatch firstH = hq.take();
CountDownLatch secondH = hq.poll();
CountDownLatch firstO = oq.take();
if (secondH == null) {
hq.add(firstH);
continue;
}
firstH.countDown();
secondH.countDown();
firstO.countDown();
System.out.println("A H2O created!");
break;
}
} catch(Exception e) {
e.printStackTrace();
}
}
});
for (int i=0;i<100;i++) {
CountDownLatch lock = new CountDownLatch(1);
int type = (int)(Math.random()*2);
if (type == 0){
hq.add(lock);
service.submit(new MyH(i+1,lock));
} else {
oq.add(lock);
service.submit(new MyO(i+1,lock));
}
}
service.shutdown();
}
}
synchronized(ti){
if(ti!=null && !ti.isNotified())
ti.wait();
}
}
-----------------------------
测试结果
o
h
o
h
1 h2o
end .......,Thread[Thread-5,5,main]
end .......,Thread[Thread-3,5,main]
end .......,Thread[Thread-4,5,main]
h
h
1 h2o
end .......,Thread[Thread-2,5,main]
end .......,Thread[Thread-6,5,main]
end .......,Thread[Thread-1,5,main]
-------------------------------
class ThreadInfo{
private boolean isNotified=false;
private Thread t;
private boolean isH;
public Thread getT() {
return t;
}
public void setT(Thread t) {
this.t = t;
}
public boolean isH() {
return isH;
}
public void setH(boolean isH) {
this.isH = isH;
}
public boolean isO() {
return !isH();
}
public void notified(){
this.isNotified = true;
}
public boolean isNotified(){
return this.isNotified;
}
}
I think mine is short, not tested.
public class PH2O {
private static final Object PLACE_HOLDER = new Object();
AtomicInteger extraH = new AtomicInteger();
BlockingQueue requiredO = new LinkedBlockingDeque();
BlockingQueue releasedH = new LinkedBlockingDeque();
public void h() throws InterruptedException {
while (true) {
if (extraH.compareAndSet(1, 0)) {
requiredO.add(new Object());
break;
}
if (extraH.compareAndSet(0, 1)) {
break;
}
}
releasedH.take();
}
public void o() throws InterruptedException {
requiredO.take();
releasedH.offer(PLACE_HOLDER);
releasedH.offer(PLACE_HOLDER);
}
}
【在 b**m 的大作中提到】 : I think mine is short, not tested. : public class PH2O { : private static final Object PLACE_HOLDER = new Object(); : AtomicInteger extraH = new AtomicInteger(); : BlockingQueue requiredO = new LinkedBlockingDeque(); : BlockingQueue releasedH = new LinkedBlockingDeque(); : public void h() throws InterruptedException { : while (true) { : if (extraH.compareAndSet(1, 0)) { : requiredO.add(new Object());
x*******i 发帖数: 26
80
O() can release 2 permits while there is only one H thread waiting. The H()
then is unblocked. 不符合题目要求把。