# 生产者-消费者(基于 Monitor)

public class PCModel {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    Queue<String> queue = new LinkedList<>();

    int cap;

    public void add(String thing) {
        lock.lock();
        try {
            while (queue.size() == cap) {
                notFull.await();
            }
            queue.offer(thing);
            notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    public String fetch() {
        lock.lock();
        try {
            while (queue.size() == 0) {
                notEmpty.await();
            }
            String thing = queue.poll();
            notFull.signal();
            return thing;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return "";
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 生产者-消费者(基于 Semaphore)

/**
 * 基本思路:
 *  一个 Semaphore 只对应一个等待队列,因此至少需要两个 Sem
 *      1. 起到 notFull 的作用
 *      2. 起到 notEmpty 的作用
 *  另外还需要保证,一次只能有一个线程操作队列,因此需要一个 mutex
 */
public class PCModelBasedSem {
    final Semaphore mutex = new Semaphore(1);
    final Semaphore empty;
    final Semaphore full;

    Queue<String> queue = new LinkedList<>();

    public PCModelBasedSem(int size) {
        // empty 表示目前空的位子个数
        // full 表示目前满的位子个数
        empty = new Semaphore(size);
        full = new Semaphore(size);
        full.drainPermits();
    }

    public void add() throws InterruptedException {
        empty.acquire();
        try {
            mutex.acquire();
            try {
                queue.add("sth");
            } finally {
                mutex.release();
            }
        } finally {
            full.release();
        }
    }

    public void get() throws InterruptedException {
        full.acquire();
        try {
            mutex.acquire();
            try {
                queue.remove();
            } finally {
                mutex.release();
            }
        } finally {
            empty.release();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

# 限流器(基于 Semaphore)

class Connection {
    public void send(String msg) {
        System.out.println("send: " + msg);
    }
}

class ObjectPool {
    final List<Connection> pool;
    final Semaphore sem;

    public ObjectPool(int size) {
        // 因为存在多个线程同时进入临界区,需要使用线程安全的容器
        pool = new Vector<>();
        for (int i = 0; i < size; i++) {
            pool.add(new Connection());
        }
        sem = new Semaphore(size);
    }

    public void sendMsg(String msg) throws InterruptedException {
        Connection conn = null;
        sem.acquire();
        try {
            conn = pool.remove(0);
            conn.send(msg);
        }finally {
            pool.add(conn);
            sem.release();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 高效的缓存(基于 ReadWriteLock)

/**
 * 基于读写锁,实现一个高效的缓存
 */
public class RWCache<K, V> {
    final Map<K, V> cache = new HashMap<>();
    final ReadWriteLock rwl = new ReentrantReadWriteLock();
    // 读锁
    final Lock r = rwl.readLock();
    // 写锁
    final Lock w = rwl.writeLock();

    // 读缓存
    V get(K key) {
        V val = null;
        r.lock();
        try {
            val = cache.get(key);
        } finally {
            r.unlock();
        }
        // 缓存查询存在
        if (val != null) return val;
        // 不存在,则去数据库或者其他地方获取
        w.lock();
        try {
            // 再次验证,之前可能有数据被更新了
            val = cache.get(key);
            if (val == null) {
                // 查询数据库。。。
                V newVal = null;
                val = newVal;
                cache.put(key, val);
            }
        } finally {
            w.unlock();
        }
        return val;
    }

    void put(K key, V val) {
        w.lock();
        try {
            cache.put(key, val);
        } finally {
            w.unlock();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

# 高效的缓存(基于 StampedLock)

public class StampTest {
    private final Map<String, Integer> map = new HashMap<>();
    private final StampedLock sl = new StampedLock();

    // 写锁
    public void put(String k, int val) {
        long stamp = sl.writeLock();
        try {
            map.put(k, val);
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    // 乐观读 升级 读锁
    public void get() {
        // 乐观读
        long stamp = sl.tryOptimisticRead();
        int val = map.get("");
        if (!sl.validate(stamp)) {
            // 如果发生了改变,升级为乐观锁
            stamp = sl.readLock();
            try {
                val = map.get("");
            } finally {
                sl.unlockRead(stamp);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# CountDownLatch 等待

public class CountDown {
    public static void main(String[] args) throws InterruptedException {
        // 设置为 2,说明要等待两个条件
        CountDownLatch count = new CountDownLatch(2);
        List<Integer> list = new Vector<>();
        new Thread(() -> {
            list.add(fib(5));
            count.countDown();
        }).start();
        new Thread(() -> {
            list.add(fib(6));
            count.countDown();
        }).start();
        count.await();
        System.out.println(list.get(0) + list.get(1));
    }

    public static int fib(int n)  {
        if (n == 0 || n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# CyclicBarrier 步调协调

public class CyclicBarrierTest {
    Vector<Integer> n1 = new Vector<>();
    Vector<Integer> n2 = new Vector<>();

    final Executor executor = Executors.newFixedThreadPool(2);

    // 当计算器为 0 时,自动调用回调函数,且重置计算器
    final CyclicBarrier barrier = new CyclicBarrier(2, () -> {
        executor.execute(() -> sum());
    });

    public void sum() {
        System.out.println(n1.remove(0) + n2.remove(0));
    }

    public void start() {
        new Thread(() -> {
            int i = 0;
            while (true) {
                n1.add(i++);
                try {
                    Thread.sleep(1000);
                    // 计算器减1,且等待为 0
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(() -> {
            int i = 0;
            while (true) {
                n2.add(i++);
                try {
                    Thread.sleep(1000);
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    public static void main(String[] args) {
        CyclicBarrierTest test = new CyclicBarrierTest();
        test.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
Last Updated: 9/18/2020, 6:40:58 AM