publicclassMain{ publicfinalstaticint MAX_CAPACITY = 1 << 5; public Semaphore producer; public Semaphore consumer; public Semaphore in; publicint productId; public Queue<Integer> buffer;
publicMain(){ producer = new Semaphore(Main.MAX_CAPACITY); consumer = new Semaphore(0); in = new Semaphore(1); productId = 0; buffer = new LinkedList(); }
publicvoidrun(){ for(int i = 0; i < (MAX_CAPACITY << 1); i++) { Producer producerThread = new Producer(i); new Thread(producerThread).start(); Consumer consumerThread = new Consumer(i); new Thread(consumerThread).start(); } }
publicstaticvoidmain(String[] args){ Main main = new Main(); main.run(); }
classProducerimplementsRunnable{ publicint id; publicProducer(int id){ this.id = id; } @Override publicvoidrun(){ try { producer.acquire(); in.acquire(); buffer.add(productId); System.out.println("I am producer " + id + ", I have put No." + productId++ + " production in position " + (buffer.size()-1) + "."); in.release(); consumer.release(); } catch (InterruptedException ie) { System.out.println(ie.toString()); } } }
classConsumerimplementsRunnable{ publicint id; publicConsumer(int id){ this.id = id; } @Override publicvoidrun(){ try { consumer.acquire(); in.acquire(); int production = buffer.poll(); System.out.println("I am consumer " + id + ", I have get No." + production + " production in position "+ buffer.size() + "."); in.release(); producer.release(); } catch (InterruptedException ie) { System.out.println(ie.toString()); } } } }
publicMain(){ productId = 0; buffer = new LinkedList(); }
publicvoidrun(){ for (int i = 0; i < (MAX_CAPACITY << 1); i++) { Producer producerThread = new Producer(i); new Thread(producerThread).start(); Consumer consumerThread = new Consumer(i); new Thread(consumerThread).start(); } }
publicstaticvoidmain(String[] args){ Main main = new Main(); main.run(); }
classProducerimplementsRunnable{ publicint id;
publicProducer(int id){ this.id = id; }
@Override publicvoidrun(){ synchronized (buffer) { //锁定临界区资源 while (buffer.size() == MAX_CAPACITY) // 看看仓库是否已满,如果满了,“订阅”仓库,进入阻塞状态 try { buffer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } buffer.add(productId); System.out.println("I am producer " + id + ", I have put No." + productId++ + " production in position " + (buffer.size() - 1) + "."); buffer.notifyAll(); // 放了一个商品之后,通知所有“订阅”了仓库的生产者和消费者,让它们重新进入就绪状态,重新看看对于自己来说仓库是否“可用”,即是否可以放或取商品 }
} }
classConsumerimplementsRunnable{ publicint id;
publicConsumer(int id){ this.id = id; }
@Override publicvoidrun(){ synchronized (buffer) { while (buffer.size() == 0) try { buffer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } int production = buffer.poll(); System.out.println("I am consumer " + id + ", I have get No." + production + " production in position " + buffer.size() + "."); buffer.notifyAll(); } } } }
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at Main.run(Main.java:18) at Main.main(Main.java:27)
因为去掉了速度较低的IO,所以线程创建的速度变快了,一时间堆内存中存在大量的存活线程对象,导致了Out Of Memory。 这时候我们就需要用一个东西来限制最大的线程。 在Java concurrent包中,还有一个东西叫线程池,首先它可以减少线程对象频繁创建销毁的性能损耗,其次提供了更为友好的线程管理接口,而且它的一个实现类可以帮助解决我们线程过多的OOM问题。 我们要使用的是定长线程池,来避免OOM,只要把Main类的run方法改一下就可以解决问题。 它的实现方式是使用一个阻塞队列,当系统资源不够运行那么多线程时,会把需要运行的线程放置到队列中,当可用资源高于限制时才会取出来运行。
publicclassMain{ publicfinalstaticint MAX_CAPACITY = 20; publicint productId; public Queue<Integer> buffer;
publicMain(){ productId = 0; buffer = new LinkedList(); }
publicvoidrun(){ ExecutorService threadPool = Executors.newFixedThreadPool(512); // 创建一个线程执行服务器,限制线程池大小为512 for (int i = 0; i < 100000; i++) { Producer producerThread = new Producer(i); threadPool.execute(producerThread); // 提交一个生产者线程 Consumer consumerThread = new Consumer(i); // 提交一个消费者线程 threadPool.execute(consumerThread); } // 停止线程池里面的线程 threadPool.shutdown(); }
publicstaticvoidmain(String[] args){ Main main = new Main(); main.run(); }
classProducerimplementsRunnable{ publicint id;
publicProducer(int id){ this.id = id; }
@Override publicvoidrun(){ synchronized (buffer) { while (buffer.size() == MAX_CAPACITY) try { buffer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } buffer.add(productId); // System.out.println("I am producer " + id + ", I have put No." + productId++ + " production in position " + (buffer.size() - 1) + "."); buffer.notifyAll(); }
} }
classConsumerimplementsRunnable{ publicint id;
publicConsumer(int id){ this.id = id; }
@Override publicvoidrun(){ synchronized (buffer) { while (buffer.size() == 0) try { buffer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } int production = buffer.poll(); // System.out.println("I am consumer " + id + ", I have get No." + production + " production in position " + buffer.size() + "."); buffer.notifyAll(); } } } }