AutoResetEvent, ManualResetEvent是C#中常用的线程同步方法,在Java中可以模拟,AutoResetEvent使用Semaphore,增加的是许可证数量,程序里只有一个许可证,那么当这个许可被使用后,就会自动锁定。相反,ManualResetEvent使用countdownlatch,增加的是“latch”,也就是障碍,或者门闩;当障碍解除时,所有程序都可以运行而不被阻塞,如果要实现同步,就必须manual reset,也就是手动加latch。
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class AutoResetEvent
{
private final Semaphore event;
private final Integer mutex;
public AutoResetEvent(boolean signalled)
{
event = new Semaphore(signalled ? 1 : 0);
mutex = new Integer(-1);
}
public void set()
{
synchronized (mutex)
{
if (event.availablePermits() == 0)
{
event.release();
}
}
}
public void reset()
{
event.drainPermits();
}
public void waitOne() throws InterruptedException
{
event.acquire();
}
public boolean waitOne(int timeout, TimeUnit unit) throws InterruptedException
{
return event.tryAcquire(timeout, unit);
}
public boolean isSignalled()
{
return event.availablePermits() > 0;
}
public boolean waitOne(int timeout) throws InterruptedException
{
return waitOne(timeout, TimeUnit.MILLISECONDS);
}
}
AutoResetEvent在MSDN中的例子程序在http://msdn.microsoft.com/en-us/library/system.threading.autoresetevent.aspx
我们可以改写一个java版本,用的是java版本的AutoResetEvent
import java.util.Date;
import java.util.Random;
class TermInfo
{
public long[] terms;
public int order;
public long baseValue;
public AutoResetEvent trigger;
}
public class AutoResetEventTest
{
private final static int numTerms = 3;
public static void main(String[] args0) throws InterruptedException
{
AutoResetEvent trigger = new AutoResetEvent(false);
TermInfo tinfo = new TermInfo();
Thread termThread;
long[] terms = new long[numTerms];
int result = 0;
tinfo.terms = terms;
tinfo.trigger = trigger;
for (int i = 0; i < numTerms; i++)
{
tinfo.order = i;
// Create and start the term calc thread.
TermThreadProc termThreadProc = new TermThreadProc(tinfo);
termThread = new Thread(termThreadProc);
termThread.start();
// simulate a number crunching delay
Thread.sleep(1000);
Date date = new Date();
tinfo.baseValue = Integer.parseInt(String.valueOf((date.getTime())).substring(10));
trigger.set();
termThread.join();
result += terms[i];
}
System.out.format("Result = %d", result);
System.out.println();
}
}
class TermThreadProc implements Runnable
{
public TermInfo termInfo;
public TermThreadProc(TermInfo termInfo)
{
this.termInfo = termInfo;
}
@Override
public void run()
{
TermInfo tinfo = termInfo;
System.out.format("Term[%d] is starting...", tinfo.order);
System.out.println();
// set the precalculation
Date date = new Date();
long preValue = Integer.parseInt(String.valueOf((date.getTime())).substring(10)) + tinfo.order;
// wait for base value to be ready
try
{
tinfo.trigger.waitOne();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
Random rnd = new Random(tinfo.baseValue);
tinfo.terms[tinfo.order] = preValue * rnd.nextInt(10000);
System.out.format("Term[%d] has finished with a value of: %d", tinfo.order, tinfo.terms[tinfo.order]);
System.out.println();
}
}
//ManualResetEvent 的Java实现
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ManualResetEvent
{
private volatile CountDownLatch event;
private final Integer mutex;
public ManualResetEvent(boolean signalled)
{
mutex = new Integer(-1);
if (signalled)
{
event = new CountDownLatch(0);
}
else
{
event = new CountDownLatch(1);
}
}
public void set()
{
event.countDown();
}
public void reset()
{
synchronized (mutex)
{
if (event.getCount() == 0)
{
event = new CountDownLatch(1);
}
}
}
public void waitOne() throws InterruptedException
{
event.await();
}
public boolean waitOne(int timeout, TimeUnit unit) throws InterruptedException
{
return event.await(timeout, unit);
}
public boolean isSignalled()
{
return event.getCount() == 0;
}
public boolean waitOne(int timeout) throws InterruptedException
{
return waitOne(timeout, TimeUnit.MILLISECONDS);
}
}
MSDN地址:http://msdn.microsoft.com/en-us/library/system.threading.manualresetevent.aspx
Java测试:
import java.util.Scanner;
import java.io.IOException;
public class ManualResetEventTest
{
// mre is used to block and release threads manually. It is
// created in the unsignaled state.
static AutoResetEvent mre = new AutoResetEvent(false);
public static void main(String[] arg0) throws IOException, InterruptedException
{
System.out.println("\nStart 3 named threads that block on a ManualResetEvent:\n");
Scanner keyIn = new Scanner(System.in);
System.out.print("Press the enter key to continue");
keyIn.nextLine();
for (int i = 0; i <= 2; i++)
{
threadProc threadProc = new threadProc();
Thread t = new Thread(threadProc);
t.setName("Thread_" + i);
t.start();
}
Thread.sleep(500);
System.out.println("\nWhen all three threads have started, press Enter to call Set()"
+ "\nto release all the threads.\n");
keyIn.nextLine();
mre.set();
Thread.sleep(500);
System.out.println("\nWhen a ManualResetEvent is signaled, threads that call WaitOne()"
+ "\ndo not block. Press Enter to show this.\n");
keyIn.nextLine();
for (int i = 3; i <= 4; i++)
{
threadProc threadProc = new threadProc();
Thread t = new Thread(threadProc);
t.setName("Thread_" + i);
t.start();
}
Thread.sleep(500);
System.out.println("\nPress Enter to call Reset(), so that threads once again block"
+ "\nwhen they call WaitOne().\n");
keyIn.nextLine();
mre.reset();
// Start a thread that waits on the ManualResetEvent.
threadProc threadProc = new threadProc();
Thread t5 = new Thread(threadProc);
t5.setName("Thread_5");
t5.start();
Thread.sleep(500);
System.out.println("\nPress Enter to call Set() and conclude the demo.");
keyIn.nextLine();
mre.set();
}
}
class threadProc implements Runnable
{
@Override
public void run()
{
String name = Thread.currentThread().getName();
System.out.println(name + " starts and calls mre.WaitOne()");
try
{
ManualResetEventTest.mre.waitOne();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println(name + " ends.");
}
}
0
顶
0
踩
分享到:
2011-04-07 16:39
浏览 2539
评论