# 生产者-消费者(基于 Monitor)
public class PCModel {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
Queue<String> queue = new LinkedList<>();
int cap;
public void add(String thing) {
lock.lock();
try {
while (queue.size() == cap) {
notFull.await();
}
queue.offer(thing);
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public String fetch() {
lock.lock();
try {
while (queue.size() == 0) {
notEmpty.await();
}
String thing = queue.poll();
notFull.signal();
return thing;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return "";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 生产者-消费者(基于 Semaphore)
/**
* 基本思路:
* 一个 Semaphore 只对应一个等待队列,因此至少需要两个 Sem
* 1. 起到 notFull 的作用
* 2. 起到 notEmpty 的作用
* 另外还需要保证,一次只能有一个线程操作队列,因此需要一个 mutex
*/
public class PCModelBasedSem {
final Semaphore mutex = new Semaphore(1);
final Semaphore empty;
final Semaphore full;
Queue<String> queue = new LinkedList<>();
public PCModelBasedSem(int size) {
// empty 表示目前空的位子个数
// full 表示目前满的位子个数
empty = new Semaphore(size);
full = new Semaphore(size);
full.drainPermits();
}
public void add() throws InterruptedException {
empty.acquire();
try {
mutex.acquire();
try {
queue.add("sth");
} finally {
mutex.release();
}
} finally {
full.release();
}
}
public void get() throws InterruptedException {
full.acquire();
try {
mutex.acquire();
try {
queue.remove();
} finally {
mutex.release();
}
} finally {
empty.release();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 限流器(基于 Semaphore)
class Connection {
public void send(String msg) {
System.out.println("send: " + msg);
}
}
class ObjectPool {
final List<Connection> pool;
final Semaphore sem;
public ObjectPool(int size) {
// 因为存在多个线程同时进入临界区,需要使用线程安全的容器
pool = new Vector<>();
for (int i = 0; i < size; i++) {
pool.add(new Connection());
}
sem = new Semaphore(size);
}
public void sendMsg(String msg) throws InterruptedException {
Connection conn = null;
sem.acquire();
try {
conn = pool.remove(0);
conn.send(msg);
}finally {
pool.add(conn);
sem.release();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 高效的缓存(基于 ReadWriteLock)
/**
* 基于读写锁,实现一个高效的缓存
*/
public class RWCache<K, V> {
final Map<K, V> cache = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
V val = null;
r.lock();
try {
val = cache.get(key);
} finally {
r.unlock();
}
// 缓存查询存在
if (val != null) return val;
// 不存在,则去数据库或者其他地方获取
w.lock();
try {
// 再次验证,之前可能有数据被更新了
val = cache.get(key);
if (val == null) {
// 查询数据库。。。
V newVal = null;
val = newVal;
cache.put(key, val);
}
} finally {
w.unlock();
}
return val;
}
void put(K key, V val) {
w.lock();
try {
cache.put(key, val);
} finally {
w.unlock();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 高效的缓存(基于 StampedLock)
public class StampTest {
private final Map<String, Integer> map = new HashMap<>();
private final StampedLock sl = new StampedLock();
// 写锁
public void put(String k, int val) {
long stamp = sl.writeLock();
try {
map.put(k, val);
} finally {
sl.unlockWrite(stamp);
}
}
// 乐观读 升级 读锁
public void get() {
// 乐观读
long stamp = sl.tryOptimisticRead();
int val = map.get("");
if (!sl.validate(stamp)) {
// 如果发生了改变,升级为乐观锁
stamp = sl.readLock();
try {
val = map.get("");
} finally {
sl.unlockRead(stamp);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# CountDownLatch 等待
public class CountDown {
public static void main(String[] args) throws InterruptedException {
// 设置为 2,说明要等待两个条件
CountDownLatch count = new CountDownLatch(2);
List<Integer> list = new Vector<>();
new Thread(() -> {
list.add(fib(5));
count.countDown();
}).start();
new Thread(() -> {
list.add(fib(6));
count.countDown();
}).start();
count.await();
System.out.println(list.get(0) + list.get(1));
}
public static int fib(int n) {
if (n == 0 || n == 1) return 1;
return fib(n-1) + fib(n-2);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# CyclicBarrier 步调协调
public class CyclicBarrierTest {
Vector<Integer> n1 = new Vector<>();
Vector<Integer> n2 = new Vector<>();
final Executor executor = Executors.newFixedThreadPool(2);
// 当计算器为 0 时,自动调用回调函数,且重置计算器
final CyclicBarrier barrier = new CyclicBarrier(2, () -> {
executor.execute(() -> sum());
});
public void sum() {
System.out.println(n1.remove(0) + n2.remove(0));
}
public void start() {
new Thread(() -> {
int i = 0;
while (true) {
n1.add(i++);
try {
Thread.sleep(1000);
// 计算器减1,且等待为 0
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
int i = 0;
while (true) {
n2.add(i++);
try {
Thread.sleep(1000);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public static void main(String[] args) {
CyclicBarrierTest test = new CyclicBarrierTest();
test.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49