avatar

目录
生产者-消费者问题的Java语言实现

生产者-消费者问题是操作系统线程通信中的一个典型问题,解决它的方法主要是锁定资源。
在大二学操作系统的时候曾经使用C++语言实现,实现方式为循环链表(作为缓冲区)+信号量,并在NachOS虚拟机下成功运行。同样,接下来会给出Java语言的多种实现方式。

可以先回忆一下问题:

题文

由N个生产者生产N件商品,每个生产者生产完后将商品放入仓库,由N个消费者从仓库取走N件商品,这个过程同时进行。仓库只能容纳一个人,最多容纳N/2个商品。假设N为64。

解题思路1:信号量

信号量本质上是一个数值,有两种操作:wait()signal(),wait操作给使这个数值自减,signal操作使这个数值自增。
同步信号量用于同步,标识过程是否满足条件继续进行,它不为0时表示可以往下进行,为0时表示需要等待。
互斥信号量用于竞争,标识某个资源是否可用。它不为0时,代表这个资源未被占用,为0时标识这个资源已被占用。
当wait()操作在自减前发现信号量为0时,会等待别的线程或进程发来的一次signal()。
值得注意的是,在占用互斥信号量前,要先占用同步信号量;在释放同步信号量之前,要先释放互斥信号量。

本题使用3个信号量解决,其中一个同步信号量用于标识仓库剩余容量,提供给生产者;一个同步信号量用于标识仓库内的商品数,提供给给消费者;一个二元互斥信号量,用于标识仓库是否可以进入。

题解1:信号量实现

以下是一年前写的C++实现,运行在NachOS实验系统虚拟机上

cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
int ring[32];
int front, rear;//循环队列实现
int proId = 1;
Semaphore p((char*)"Producer", 32);//仓库内剩余容量
Semaphore c((char*)"Consumer", 0);//仓库内剩余商品数
Semaphore in((char*)"mutex", 1);//仓库是否可进入

static void printBuff() {//打印仓库内商品编号的函数
for(int i = 0; i < 32; i++)
printf("%d ", ring[i]);
printf("\n");
}

static void Producer() {//生产者线程
p.P();//等待仓库内空位
in.P();//等待进入仓库
ring[rear] = proId;
printf("I am producer, I have put No. %d product in %d position.\n", proId, rear);
rear = (rear + 1) % 32;
proId++;//放商品
printBuff();
in.V();//释放仓库
p.V();//告知消费者们仓库内多了一件商品
}

static void Consumer() {
c.P();//等待仓库内有商品
in.P();//等待进入仓库
printf("I am consumer, I have get No. %d product in %d position.\n", ring[front], front);
ring[front] = 0;
front = (front + 1) % 32;//消费一件商品
printBuff();
in.V();//释放仓库
p.V();//告诉生产者们仓库内多了一个空位
}

void
Thread::SelfTest()
{

for(int count = 0; count < 64; count++) {
Thread* prd = new Thread("Producer");
Thread* con = new Thread("Consumer");
prd->Fork((VoidFunctionPtr)Producer, NULL);
con->Fork((VoidFunctionPtr)Consumer, NULL);
}

}

同样,JDK的concurrent包中也有信号量这个API,它使用acquire()和release()两个方法来代替wait()和signal()。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import java.util.*;
import java.util.concurrent.*;

public class Main {
public final static int MAX_CAPACITY = 1 << 5;
public Semaphore producer;
public Semaphore consumer;
public Semaphore in;
public int productId;
public Queue<Integer> buffer;

public Main() {
producer = new Semaphore(Main.MAX_CAPACITY);
consumer = new Semaphore(0);
in = new Semaphore(1);
productId = 0;
buffer = new LinkedList();
}

public void run() {
for(int i = 0; i < (MAX_CAPACITY << 1); i++) {
Producer producerThread = new Producer(i);
new Thread(producerThread).start();
Consumer consumerThread = new Consumer(i);
new Thread(consumerThread).start();
}
}

public static void main(String[] args) {
Main main = new Main();
main.run();
}

class Producer implements Runnable {
public int id;
public Producer(int id) {
this.id = id;
}
@Override
public void run() {
try {
producer.acquire();
in.acquire();
buffer.add(productId);
System.out.println("I am producer " + id + ", I have put No." + productId++ + " production in position " + (buffer.size()-1) + ".");
in.release();
consumer.release();
} catch (InterruptedException ie) {
System.out.println(ie.toString());
}
}
}

class Consumer implements Runnable {
public int id;
public Consumer(int id) {
this.id = id;
}
@Override
public void run() {
try {
consumer.acquire();
in.acquire();
int production = buffer.poll();
System.out.println("I am consumer " + id + ", I have get No." + production + " production in position "+ buffer.size() + ".");
in.release();
producer.release();
} catch (InterruptedException ie) {
System.out.println(ie.toString());
}
}
}
}

解题思路2:锁+轮询

在信号量的解法中,其实我们可以看到两个同步信号量判断了队列是否为空或者非空,所以可以直接把它作为一个条件循环判断,当发现不能继续执行下去时,让出CPU时间给其它线程,暂时释放这个对象的锁,并“订阅”这个buffer对象,当接到这个对象“可用”的通知时,返回到下一条语句。当然,在获取buffer这个临界区资源时,我们依然要给它加上一个锁。这里的锁采用synchronized关键字实现。

题解2:锁+轮询实现

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import java.util.*;

public class Main {
public final static int MAX_CAPACITY = 1 << 5;
public int productId;
public Queue<Integer> buffer;

public Main() {
productId = 0;
buffer = new LinkedList();
}

public void run() {
for (int i = 0; i < (MAX_CAPACITY << 1); i++) {
Producer producerThread = new Producer(i);
new Thread(producerThread).start();
Consumer consumerThread = new Consumer(i);
new Thread(consumerThread).start();
}
}

public static void main(String[] args) {
Main main = new Main();
main.run();
}

class Producer implements Runnable {
public int id;

public Producer(int id) {
this.id = id;
}

@Override
public void run() {
synchronized (buffer) { //锁定临界区资源
while (buffer.size() == MAX_CAPACITY)
// 看看仓库是否已满,如果满了,“订阅”仓库,进入阻塞状态
try {
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
buffer.add(productId);
System.out.println("I am producer " + id + ", I have put No." + productId++ + " production in position " + (buffer.size() - 1) + ".");
buffer.notifyAll();
// 放了一个商品之后,通知所有“订阅”了仓库的生产者和消费者,让它们重新进入就绪状态,重新看看对于自己来说仓库是否“可用”,即是否可以放或取商品
}

}
}

class Consumer implements Runnable {
public int id;

public Consumer(int id) {
this.id = id;
}

@Override
public void run() {
synchronized (buffer) {
while (buffer.size() == 0)
try {
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
int production = buffer.poll();
System.out.println("I am consumer " + id + ", I have get No." + production + " production in position " + buffer.size() + ".");
buffer.notifyAll();
}
}
}
}

新的问题:OOM

现在我们来修改一下题目,我们把生产者和消费者个数改为10W,临界缓冲区大小改为20,然后把输出注释掉。
然后出现了这样的错误:

Code
1
2
3
4
5
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at Main.run(Main.java:18)
at Main.main(Main.java:27)

因为去掉了速度较低的IO,所以线程创建的速度变快了,一时间堆内存中存在大量的存活线程对象,导致了Out Of Memory。
这时候我们就需要用一个东西来限制最大的线程。
在Java concurrent包中,还有一个东西叫线程池,首先它可以减少线程对象频繁创建销毁的性能损耗,其次提供了更为友好的线程管理接口,而且它的一个实现类可以帮助解决我们线程过多的OOM问题。
我们要使用的是定长线程池,来避免OOM,只要把Main类的run方法改一下就可以解决问题。
它的实现方式是使用一个阻塞队列,当系统资源不够运行那么多线程时,会把需要运行的线程放置到队列中,当可用资源高于限制时才会取出来运行。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.util.*;
import java.util.concurrent.*;

public class Main {
public final static int MAX_CAPACITY = 20;
public int productId;
public Queue<Integer> buffer;

public Main() {
productId = 0;
buffer = new LinkedList();
}

public void run() {
ExecutorService threadPool = Executors.newFixedThreadPool(512);
// 创建一个线程执行服务器,限制线程池大小为512
for (int i = 0; i < 100000; i++) {
Producer producerThread = new Producer(i);
threadPool.execute(producerThread);
// 提交一个生产者线程
Consumer consumerThread = new Consumer(i);
// 提交一个消费者线程
threadPool.execute(consumerThread);
}
// 停止线程池里面的线程
threadPool.shutdown();
}

public static void main(String[] args) {
Main main = new Main();
main.run();
}

class Producer implements Runnable {
public int id;

public Producer(int id) {
this.id = id;
}

@Override
public void run() {
synchronized (buffer) {
while (buffer.size() == MAX_CAPACITY)
try {
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
buffer.add(productId);
// System.out.println("I am producer " + id + ", I have put No." + productId++ + " production in position " + (buffer.size() - 1) + ".");
buffer.notifyAll();
}

}
}

class Consumer implements Runnable {
public int id;

public Consumer(int id) {
this.id = id;
}

@Override
public void run() {
synchronized (buffer) {
while (buffer.size() == 0)
try {
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
int production = buffer.poll();
// System.out.println("I am consumer " + id + ", I have get No." + production + " production in position " + buffer.size() + ".");
buffer.notifyAll();
}
}
}
}

其实如果这个定长线程池的实现不够好的话是会导致死锁的。
考虑一种极端情况:当线程池内的线程全部为消费者线程,而此时缓冲区为空,如果此时无法执行生产者线程,那就会出现死锁的情况。
在使用这个线程池时我考虑到了这个问题,但是当我试着把线程上限和缓冲区大小都调为1时,发现并没有出现这样的问题,证明它的实现要比我想象中的好。
为此我去查询了定长线程池工作的具体规则:我传入的参数512指的是最大的工作线程数!而调用了wait方法进入阻塞状态的线程并不属于工作线程!它们被阻塞时应该会被放入阻塞队列中,阻塞队列理论上是可以无限长的,而Thread对象占用的内存要比它run起来时占用的内存要低很多,所以这个阻塞队列起到了缓解内存占用过高的问题。

文章作者: LightingX
文章链接: http://lightingx.top/2019/03/22/%E7%94%9F%E4%BA%A7%E8%80%85-%E6%B6%88%E8%B4%B9%E8%80%85%E9%97%AE%E9%A2%98%E7%9A%84Java%E8%AF%AD%E8%A8%80%E5%AE%9E%E7%8E%B0/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 LightingX

评论