17 Jul 2022

并发编程之管程与信号量

大家都知道,一般我们所遇到的并发问题的根源来自于数据可见性、编译器的代码执行优化这2点 数据可见性:多核cpu缓存导致出现的问题 编译器优化:优化代码的结构和执行顺序
常见的优化手段也是比较清楚的,加Volatile关键字,可以禁用cpu的缓存

而我们今天介绍的管程与信号量,也是解决并发问题的万能钥匙
Java所采用的管程技术,synchronized, wait,notify, notifyAll这三个方法都是管程的组成部分,管程与信号量是等价的, 但是管程更容易使用,所以Java选择了管程

信号量:Semaphore 整体来说,Semaphore模型是比较简单的,简单来说,一个计数器,一个等待队列,三个方法(init, down, up) init用于设置计数器的初始值,down计数器-1,如果当前计数器值<0,当前线程被阻塞,up计数器+1,如果此时计数器值<=0,唤醒 等待队列中得一个线程,并从等待队列中移除,这里的三个方法都是原子性的,比如

static int count;
//初始化信号量
static final Semaphore s = new Semaphore(1);
//互斥
static void addOne() {

    s.acquire();
    try {
        count += 1; 
    } finally {
        s.release();
    }

}

信号量是如何保证互斥的,假设2个线程T1和T2同时访问addOne()方法,当他们同时调用acquire()的时候,由于acquire是一个原子操作,
所以只能有一个线程(T1)把信号量里的计数器减为0,另外一个T2则是把计数器值变为-1,从这边看到,其实信号量适合做一个限流器,限流器使用lock来实现同样需要自定义


class ObjPool<T, R> {
  final List<T> pool;
  // 用信号量实现限流器
  final Semaphore sem;
  // 构造函数
  ObjPool(int size, T t){
    pool = new Vector<T>(){};
    for(int i=0; i<size; i++){
      pool.add(t);
    }
    sem = new Semaphore(size);
  }
  // 利用对象池的对象,调用func
  R exec(Function<T,R> func) {
    T t = null;
    sem.acquire();
    try {
      t = pool.remove(0);
      return func.apply(t);
    } finally {
      pool.add(t);
      sem.release();
    }
  }
}
// 创建对象池
ObjPool<Long, String> pool = 
  new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

Java jdk并发包内容很丰富,最明显的还是Lock和Condition2个接口来实现管程,其中Lock用于解决互斥问题,Condition用于解决 同步问题,其实可以看到,Semaphore与管程实现思路是一致的,可以用Semaphore来实现管程,也可以用管程来实现Semaphore,只不过 Semaphore需要手动去实现类似Condition条件变量的功能,来实现类似生产者消费者的功能。

比如我们使用管程模拟一个生产者消费者模式
ReentrantLock lock = new ReentrantLock();
//条件变量,队列不满
Condition notFull = lock.newCondition();
//条件变量,队列不空
Condition notEmpty = lock.newCondition();

void produceData() {

    lock.lock();
    
    try {
    
        while(队列满了) {
            //生产者阻塞,不加数据
            notFull.await();
        }
        
        // 入队操作
        
        // 入队成功后,有数据,可以消费
        notEmpty.signal();
        
    } finally {
        lock.unlock();
    }

}

void cusData() {

    lock.lock();
    
    try {
    
        while(队列为空) {
            notEmpty.await();
        }
        
        // 消费数据
        
        
        // 消费完成后,可以再加新的数据
        notFull.singal();
    
    } finally {
        lock.unlock();
    }

}

值得注意的是,如果你使用过dubbo,清楚他的一个调用流程,其实是client阻塞等待获取server端返回的结果,其实调用线程是处理阻塞状态的,也就是异步转换为同步,这个时候dubbo采用的技术 原理也是通过管程实现的。


Tags:
0 comments