关于多线程同步的初步教程--Metux的设计及使用
上一篇中的Mutex是一个独占锁,只能有一个线程拥有该锁,并且即时是同一个线程,如果已经持有一个Mutex时,再次企图获取该锁时仍然会阻塞。有的时候我们需要锁能够像Java语言的synchronized那样,同一个线程可以重新进入,只要已经拥有了该锁,而不用在该锁上阻塞。我们可以对上篇中的Mutex的实现进行改造,实现一个可重入的锁--ReentrantLock。这需要ReentrantLock中记录当前锁的拥有者(线程),同时设置一个整型变量,记录当前线程进入的次数。
public class ReentrantLock implements Sync {
protected Thread owner_ = null;
protected long holds_ = 0;
//......
}
在获取、释放锁时,首先判断该线程是否是锁的拥有者。如果是当前线程已经拥有该锁,则在每一次acquire()时增1,在release()时减1在次数减少到0时,说明该锁的当前拥有者已经完全释放该锁,不再拥有该锁。所以,将拥有者设置为null。如果当前线程不是锁的拥有者,那么在企图获取锁时在该锁上wait(),在release()方法中,如果拥有者已经完全释放锁,那么就将拥有者清零,并notify()其它线程。
public void acquire() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Thread caller = Thread.currentThread();
synchronized(this) { // 在this上同步
if (caller == owner_)
++holds_;
else {
try {
while (owner_ != null) wait();
owner_ = caller;
holds_ = 1;
}
catch (InterruptedException ex) {
notify();
throw ex;
}
}
}
}
public synchronized void release() { //在this上同步
if (Thread.currentThread() != owner_)
throw new Error("Illegal Lock usage");
if (--holds_ == 0) {
owner_ = null;
notify();
}
}
注意上面的代码要对owner_和holds_在this上进行同步,以解决在这两个变量上的竞态条件。attempt()方法实现和Mutex类似,也添加了锁拥有者的检查及计数:
public boolean attempt(long msecs) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Thread caller = Thread.currentThread();
synchronized(this) {
if (caller == owner_) {
++holds_;
return true;
}
else if (owner_ == null) {
owner_ = caller;
holds_ = 1;
return true;
}
else if (msecs <= 0)
return false;
else {
long waitTime = msecs;
long start = System.currentTimeMillis();
try {
for (;;) {
wait(waitTime);
if (caller == owner_) {
++holds_;
return true;
}
else if (owner_ == null) {
owner_ = caller;
holds_ = 1;
return true;
}
else {
waitTime = msecs - (System.currentTimeMillis() - start);
if (waitTime <= 0)
return false;
}
}
}
catch (InterruptedException ex) {
notify();
throw ex;
}
}
}
}
由于ReentrantLock增加了对拥有者的计数,所以,也提供了额外的两个方法:holds()和release(long),用于返回当前线程进入的次数(如果当前线程不拥有该锁,则holds()返回0),以及一次性释放多个锁:
public synchronized long holds() {
if (Thread.currentThread() != owner_) return 0;
return holds_;
}
public synchronized void release(long n) {
if (Thread.currentThread() != owner_ || n > holds_)
throw new Error("Illegal Lock usage");
holds_ -= n;
if (holds_ == 0) {
owner_ = null;
notify();
}
}
使用实例
Doug Lea的concurrent包现在已经给广泛使用,在JBoss的org.jboss.mx.loading.UnifiedLoaderRepository 中就使用了concurrent包中的ReentrantLock进行JBoss中的类装载中的同步控制。下面看org.jboss.mx.loading.UnifiedLoaderRepository中对ReentrantLock的使用:
public class UnifiedLoaderRepository
extends LoaderRepository
implements NotificationBroadcaster, UnifiedLoaderRepositoryMBean
{
private ReentrantLock reentrantLock = new ReentrantLock(); //生成一个重入锁
public Class loadClass(String name, boolean resolve, ClassLoader cl)
throws ClassNotFoundException
{
try
{
try
{
// Only one thread at a time can load classes
// Pass the classloader to release its lock when blocking the thread
// We cannot use synchronized (this), as we MUST release the lock
// on the classloader. Change this only after discussion on the
// developer's list !
synchronize(cl); //对传入的ClassLoader上进行同步
// This syncronized block is necessary to synchronize with add/removeClassLoader
// See comments in add/removeClassLoader; we iterate on the classloaders, must avoid
// someone removes or adds a classloader in the meanwhile.
synchronized (this)
{
// Try the cache before anything else.
Class cls = loadClassFromCache(name, cl);
// Found in cache, we're done
if (cls != null) {return cls;}
// Not found in cache, ask the calling classloader
cls = loadClassFromClassLoader(name, resolve, cl);
// The calling classloader sees the class, we're done
if (cls != null) {return cls;}
// Not visible by the calling classloader, iterate on the other classloaders
cls = loadClassFromRepository(name, resolve, cl);
// Some other classloader sees the class, we're done
if (cls != null) {return cls;}
// This class is not visible
throw new ClassNotFoundException(name);
}
}
finally
{
unsynchronize(cl); //使用完毕后释放重入锁
}
}
catch (ClassCircularityError x)
{
//........
}
}
//.........
}
上面代码中的synchronize()和unsynchronize()方法如下:
private void synchronize(ClassLoader cl)
{
// This method
// 1- must allow only one thread at a time,
// 2- must allow a re-entrant thread,
// 3- must unlock the given classloader waiting on it,
// 4- must not hold any other lock.
// If these 4 are not done, deadlock will happen.
// Point 3 is necessary to fix Jung's RFE#4670071
// Beware also that is possible that a classloader arrives here already locked
// (for example via loadClassInternal()) and here we cannot synchronize on 'this'
// otherwise we deadlock in loadClass() where we first synchronize on 'this' and
// then on the classloader (resource ordering problem).
// We solve this by using a reentrant lock.
// Save and clear the interrupted state of the incoming thread
boolean threadWasInterrupted = Thread.currentThread().interrupted();
try
{
// Only one thread can pass this barrier
// Other will accumulate here and let passed one at a time to wait on the classloader,
// like a dropping sink
reentrantLock.acquire();
while (!isThreadAllowed(Thread.currentThread()))
{
// This thread is not allowed to run (another one is already running)
// so I release() to let another thread to enter (will come here again)
// and they will wait on the classloader to release its lock.
// It is important that the wait below is not wait(0) since it may be
// possible that a notifyAll arrives before the wait.
// It is also important that this release() is outside the sync block on
// the classloader, to avoid deadlock with threads that triggered
// loadClassInternal(), locking the classloader
reentrantLock.release();
synchronized (cl)
{
// Threads will wait here on the classloader object.
// Waiting on the classloader is fundamental to workaround Jung's RFE#4670071
// However, we cannot wait(0), since it is possible that 2 threads will try to load
// classes with different classloaders, so one will enter, the other wait, but
// since they're using different classloaders, nobody will wake up the waiting one.
// So we wait for some time and then try again.
try {cl.wait(137);}
catch (InterruptedException ignored) {}
}
// A notifyAll() has been issued, all threads will accumulate here
// and only one at a time will pass, exactly equal to the barrier
// before the 'while' statement (dropping sink).
// Must be outside the synchronized block on the classloader to avoid that
// waiting on the reentrant lock will hold the lock on the classloader
try
{
reentrantLock.acquire();
}
catch (InterruptedException ignored)
{
}
}
}
catch(InterruptedException ignored)
{
}
finally
{
// I must keep track of the threads that entered, also of the reentrant ones,
// see unsynchronize()
increaseThreadsCount();
// I release the lock, allowing another thread to enter.
// This new thread will not be allowed and will wait() on the classloader object,
// releasing its lock.
reentrantLock.release();
// Restore the interrupted state of the thread
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
private void unsynchronize(ClassLoader cl)
{
// Save and clear the interrupted state of the incoming thread
boolean threadWasInterrupted = Thread.currentThread().interrupted();
try
{
reentrantLock.acquire();
// Reset the current thread only if we're not reentrant
if (decreaseThreadsCount() == 0)
{
setCurrentThread(null);
}
}
catch (InterruptedException ignored)
{
}
finally
{
reentrantLock.release();
// Notify all threads waiting on this classloader
// This notification must be after the reentrantLock's release() to avoid this scenario:
// - Thread A is loading a class in the ULR
// - Thread B triggers a loadClassInternal which locks the UCL
// - Thread A calls unsynchronize, locks the reentrantLock
// and waits to acquire the lock on the UCL
// - Thread B calls synchronize and waits to lock the reentrantLock
synchronized (cl)
{
cl.notifyAll();
}
// Restore the interrupted state of the thread
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
没有评论:
发表评论