温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Java并发编程中Semaphore计数信号量的示例分析

发布时间:2021-08-11 10:48:28 来源:亿速云 阅读:172 作者:小新 栏目:编程语言

这篇文章主要为大家展示了“Java并发编程中Semaphore计数信号量的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Java并发编程中Semaphore计数信号量的示例分析”这篇文章吧。

Semaphore 是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量)。

简单示例:

package me.socketthread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreLearn {    //信号量总数    private static final int SEM_MAX = 12;    public static void main(String[] args) {       Semaphore sem = new Semaphore(SEM_MAX);      //创建线程池      ExecutorService threadPool = Executors.newFixedThreadPool(3);      //在线程池中执行任务      threadPool.execute(new MyThread(sem, 7));      threadPool.execute(new MyThread(sem, 4));      threadPool.execute(new MyThread(sem, 2));      //关闭池      threadPool.shutdown();    }  }    class MyThread extends Thread {      private volatile Semaphore sem;  // 信号量      private int count;    // 申请信号量的大小             MyThread(Semaphore sem, int count) {        this.sem = sem;        this.count = count;      }      public void run() {        try {         // 从信号量中获取count个许可          sem.acquire(count);          Thread.sleep(2000);          System.out.println(Thread.currentThread().getName() + " acquire count="+count);        } catch (InterruptedException e) {          e.printStackTrace();        } finally {          // 释放给定数目的许可,将其返回到信号量。          sem.release(count);          System.out.println(Thread.currentThread().getName() + " release " + count + "");        }      }    }

执行结果:

pool-1-thread-2 acquire count=4 pool-1-thread-1 acquire count=7 pool-1-thread-1 release 7 pool-1-thread-2 release 4 pool-1-thread-3 acquire count=2 pool-1-thread-3 release 2

线程1和线程2会并发执行,因为两者的信号量和没有超过总信号量,当前两个线程释放掉信号量之后线程3才能继续执行。

源码分析:

1、构造函数

在构造函数中会初始化信号量值,这值最终是作为锁标志位state的值

Semaphore sem = new Semaphore(12);//简单来说就是给锁标识位state赋值为12

2、Semaphore.acquire(n);简单理解为获取锁资源,如果获取不到线程阻塞

Semaphore.acquire(n);//从锁标识位state中获取n个信号量,简单来说是state = state-n 此时state大于0表示可以获取信号量,如果小于0则将线程阻塞
public void acquire(int permits) throws InterruptedException {      if (permits < 0) throw new IllegalArgumentException();      //获取锁      sync.acquireSharedInterruptibly(permits);    }

acquireSharedInterruptibly中的操作是获取锁资源,如果可以获取则将state= state-permits,否则将线程阻塞

public final void acquireSharedInterruptibly(int arg)        throws InterruptedException {      if (Thread.interrupted())        throw new InterruptedException();      if (tryAcquireShared(arg) < 0)//tryAcquireShared中尝试获取锁资源        doAcquireSharedInterruptibly(arg); //将线程阻塞    }

tryAcquireShared中的操作是尝试获取信号量值,简单来说就是state=state-acquires ,如果此时小于0则返回负值,否则返回大于新值,再判断是否将当线程线程阻塞

protected int tryAcquireShared(int acquires) {        for (;;) {          if (hasQueuedPredecessors())            return -1;        //获取state值          int available = getState();        //从state中获取信号量          int remaining = available - acquires;          if (remaining < 0 ||            compareAndSetState(available, remaining))          //如果信号量小于0则直接返回,表示无法获取信号量,否则将state值修改为新值            return remaining;        }      }

doAcquireSharedInterruptibly中的操作简单来说是将当前线程添加到FIFO队列中并将当前线程阻塞。

/会将线程添加到FIFO队列中,并阻塞   private void doAcquireSharedInterruptibly(int arg)       throws InterruptedException {       //将线程添加到FIFO队列中       final Node node = addWaiter(Node.SHARED);       boolean failed = true;       try {         for (;;) {           final Node p = node.predecessor();           if (p == head) {             int r = tryAcquireShared(arg);             if (r >= 0) {               setHeadAndPropagate(node, r);               p.next = null; // help GC               failed = false;               return;             }           }           //parkAndCheckInterrupt完成线程的阻塞操作           if (shouldParkAfterFailedAcquire(p, node) &&             parkAndCheckInterrupt())             throw new InterruptedException();         }       } finally {         if (failed)           cancelAcquire(node);       }     }

3、Semaphore.release(int permits),这个函数的实现操作是将state = state+permits并唤起处于FIFO队列中的阻塞线程。

public void release(int permits) {      if (permits < 0) throw new IllegalArgumentException();    //state = state+permits,并将FIFO队列中的阻塞线程唤起      sync.releaseShared(permits);    }

releaseShared中的操作是将state = state+permits,并将FIFO队列中的阻塞线程唤起。

public final boolean releaseShared(int arg) {      //tryReleaseShared将state设置为state = state+arg      if (tryReleaseShared(arg)) {        //唤起FIFO队列中的阻塞线程        doReleaseShared();        return true;      }      return false;    }

tryReleaseShared将state设置为state = state+arg

protected final boolean tryReleaseShared(int releases) {        for (;;) {          int current = getState();          int next = current + releases;          if (next < current) // overflow            throw new Error("Maximum permit count exceeded");          //将state值设置为state=state+releases          if (compareAndSetState(current, next))            return true;        }      }

doReleaseShared()唤起FIFO队列中的阻塞线程

private void doReleaseShared() {          for (;;) {         Node h = head;         if (h != null && h != tail) {           int ws = h.waitStatus;           if (ws == Node.SIGNAL) {             if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))               continue;      // loop to recheck cases             //完成阻塞线程的唤起操作             unparkSuccessor(h);           }           else if (ws == 0 &&                !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))             continue;        // loop on failed CAS         }         if (h == head)          // loop if head changed           break;       }     }

总结:Semaphore简单来说设置了一个信号量池state,当线程执行时会从state中获取值,如果可以获取则线程执行,并且在执行后将获取的资源返回到信号量池中,并唤起其他阻塞线程;如果信号量池中的资源无法满足某个线程的需求则将此线程阻塞。

Semaphore源码:

public class Semaphore implements java.io.Serializable {    private static final long serialVersionUID = -3222578661600680210L;    private final Sync sync;    abstract static class Sync extends AbstractQueuedSynchronizer {      private static final long serialVersionUID = 1192457210091910933L;      //设置锁标识位state的初始值      Sync(int permits) {        setState(permits);      }      //获取锁标识位state的值,如果state值大于其需要的值则表示锁可以获取      final int getPermits() {        return getState();      }      //获取state值减去acquires后的值,如果大于等于0则表示锁可以获取      final int nonfairTryAcquireShared(int acquires) {        for (;;) {          int available = getState();          int remaining = available - acquires;          if (remaining < 0 ||            compareAndSetState(available, remaining))            return remaining;        }      }      //释放锁      protected final boolean tryReleaseShared(int releases) {        for (;;) {          int current = getState();          //将state值加上release值          int next = current + releases;          if (next < current) // overflow            throw new Error("Maximum permit count exceeded");          if (compareAndSetState(current, next))            return true;        }      }      //将state的值减去reductions      final void reducePermits(int reductions) {        for (;;) {          int current = getState();          int next = current - reductions;          if (next > current) // underflow            throw new Error("Permit count underflow");          if (compareAndSetState(current, next))            return;        }      }      final int drainPermits() {        for (;;) {          int current = getState();          if (current == 0 || compareAndSetState(current, 0))            return current;        }      }    }    //非公平锁    static final class NonfairSync extends Sync {      private static final long serialVersionUID = -2694183684443567898L;      NonfairSync(int permits) {        super(permits);      }      protected int tryAcquireShared(int acquires) {        return nonfairTryAcquireShared(acquires);      }    }    //公平锁    static final class FairSync extends Sync {      private static final long serialVersionUID = 2014338818796000944L;      FairSync(int permits) {        super(permits);      }      protected int tryAcquireShared(int acquires) {        for (;;) {          if (hasQueuedPredecessors())            return -1;          int available = getState();          int remaining = available - acquires;          if (remaining < 0 ||            compareAndSetState(available, remaining))            return remaining;        }      }    }    //设置信号量    public Semaphore(int permits) {      sync = new NonfairSync(permits);    }    public Semaphore(int permits, boolean fair) {      sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }    //获取锁    public void acquire() throws InterruptedException {      sync.acquireSharedInterruptibly(1);    }    public void acquireUninterruptibly() {      sync.acquireShared(1);    }    public boolean tryAcquire() {      return sync.nonfairTryAcquireShared(1) >= 0;    }    public boolean tryAcquire(long timeout, TimeUnit unit)      throws InterruptedException {      return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));    }    public void release() {      sync.releaseShared(1);    }    //获取permits值锁    public void acquire(int permits) throws InterruptedException {      if (permits < 0) throw new IllegalArgumentException();      sync.acquireSharedInterruptibly(permits);    }    public void acquireUninterruptibly(int permits) {      if (permits < 0) throw new IllegalArgumentException();      sync.acquireShared(permits);    }    public boolean tryAcquire(int permits) {      if (permits < 0) throw new IllegalArgumentException();      return sync.nonfairTryAcquireShared(permits) >= 0;    }    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)      throws InterruptedException {      if (permits < 0) throw new IllegalArgumentException();      return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));    }    //释放    public void release(int permits) {      if (permits < 0) throw new IllegalArgumentException();      sync.releaseShared(permits);    }    public int availablePermits() {      return sync.getPermits();    }    public int drainPermits() {      return sync.drainPermits();    }    protected void reducePermits(int reduction) {      if (reduction < 0) throw new IllegalArgumentException();      sync.reducePermits(reduction);    }    public boolean isFair() {      return sync instanceof FairSync;    }    public final boolean hasQueuedThreads() {      return sync.hasQueuedThreads();    }    public final int getQueueLength() {      return sync.getQueueLength();    }    protected Collection<Thread> getQueuedThreads() {      return sync.getQueuedThreads();    }    public String toString() {      return super.toString() + "[Permits = " + sync.getPermits() + "]";    }  }

以上是“Java并发编程中Semaphore计数信号量的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI