如何在Java中正确实现生产者消费者模式。

huangapple 未分类评论54阅读模式
英文:

how to properly implement producer - consumer in Java

问题

我是您的中文翻译,以下是翻译好的内容:

我是这样实现生产者消费者的。

但是它抛出了错误。

我尝试使用了使用锁的这种方法。链接

  1. class Testclass {
  2. Boolean isFresh = false;
  3. int count = 0;
  4. public synchronized void GET(String threadName){
  5. while(!isFresh){
  6. try {
  7. isFresh.wait();
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. System.out.println("GET方法被调用:" + count + " " + threadName);
  13. isFresh = false;
  14. isFresh.notify();
  15. }
  16. public synchronized void PUT(String threadName){
  17. while(isFresh){
  18. try{
  19. isFresh.wait();
  20. }catch( InterruptedException e){
  21. e.printStackTrace();
  22. }
  23. }
  24. count++;
  25. System.out.println("PUT方法被调用:" + count + " " + threadName);
  26. isFresh = true;
  27. isFresh.notify();
  28. }
  29. }
  30. class Producer implements Runnable{
  31. Testclass q;
  32. String name;
  33. Producer(Testclass q, String name){
  34. this.q = q;
  35. this.name = name;
  36. }
  37. public void run(){
  38. while(true){
  39. int time = (int)(Math.random() * 10000);
  40. try {
  41. Thread.sleep(time);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. q.PUT(this.name);
  46. }
  47. }
  48. }
  49. class Consumer implements Runnable{
  50. Testclass q;
  51. String name ;
  52. Consumer(Testclass q,String name){
  53. this.q = q;
  54. this.name = name;
  55. }
  56. public void run(){
  57. while(true){
  58. int time = (int)(Math.random() * 10000);
  59. try {
  60. Thread.sleep(time);
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. q.GET(this.name);
  65. }
  66. }
  67. }
  68. public class Main {
  69. public static void main(String[] args){
  70. Testclass t = new Testclass();
  71. Thread t1 = new Thread(new Consumer(t, "消费者 1"));
  72. Thread t2 = new Thread(new Consumer(t, "消费者 2"));
  73. Thread t3 = new Thread(new Producer(t, "生产者 1"));
  74. Thread t4 = new Thread(new Producer(t, "生产者 2"));
  75. t1.start();
  76. t2.start();
  77. t3.start();
  78. t4.start();
  79. try{
  80. t1.join();
  81. }catch (InterruptedException e){
  82. e.printStackTrace();
  83. }
  84. }
  85. }

这个实现会抛出以下错误。

请解释为什么所有线程都抛出了 Illegal MonitorStateException 错误?

  1. PUT方法被调用1 生产者 1
  2. Exception in thread "Thread-2" java.lang.IllegalMonitorStateException: 当前线程不是所有者
  3. at java.base/java.lang.Object.notify(Native Method)
  4. at Testclass.PUT(Main.java:34)
  5. at Producer.run(Main.java:54)
  6. at java.base/java.lang.Thread.run(Thread.java:832)
  7. GET方法被调用1 消费者 1
  8. Exception in thread "Thread-0" java.lang.IllegalMonitorStateException: 当前线程不是所有者
  9. at java.base/java.lang.Object.notify(Native Method)
  10. at Testclass.GET(Main.java:19)
  11. at Consumer.run(Main.java:76)
  12. at java.base/java.lang.Thread.run(Thread.java:832)
  13. Exception in thread "Thread-1" java.lang.IllegalMonitorStateException: 当前线程不是所有者
  14. at java.base/java.lang.Object.wait(Native Method)
  15. at java.base/java.lang.Object.wait(Object.java:321)
  16. at Testclass.GET(Main.java:12)
  17. at Consumer.run(Main.java:76)
  18. at java.base/java.lang.Thread.run(Thread.java:832)
  19. PUT方法被调用2 生产者 2
  20. Exception in thread "Thread-3" java.lang.IllegalMonitorStateException: 当前线程不是所有者
  21. at java.base/java.lang.Object.notify(Native Method)
  22. at Testclass.PUT(Main.java:34)
  23. at Producer.run(Main.java:54)
  24. at java.base/java.lang.Thread.run(Thread.java:832)
  25. Process finished with exit code 0

我想知道为什么输出会是这样的?正确的实现方法是什么?

英文:

I implemented producer consumer like this.

But it is throwing error.

I tried to use this method of using lock. Link

  1. class Testclass {
  2. Boolean isFresh = false;
  3. int count = 0;
  4. public synchronized void GET(String threadName){
  5. while(!isFresh){
  6. try {
  7. isFresh.wait();
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. System.out.println("GET method was called : " + count + " " + threadName);
  13. isFresh = false;
  14. isFresh.notify();
  15. }
  16. public synchronized void PUT(String threadName){
  17. while(isFresh){
  18. try{
  19. isFresh.wait();
  20. }catch( InterruptedException e){
  21. e.printStackTrace();
  22. }
  23. }
  24. count++;
  25. System.out.println("PUT method was called : " + count + " " + threadName);
  26. isFresh = true;
  27. isFresh.notify();
  28. }
  29. }
  30. class Producer implements Runnable{
  31. Testclass q;
  32. String name;
  33. Producer(Testclass q, String name){
  34. this.q = q;
  35. this.name = name;
  36. }
  37. public void run(){
  38. while(true){
  39. int time = (int)(Math.random() * 10000);
  40. try {
  41. Thread.sleep(time);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. q.PUT(this.name);
  46. }
  47. }
  48. }
  49. class Consumer implements Runnable{
  50. Testclass q;
  51. String name ;
  52. Consumer(Testclass q,String name){
  53. this.q = q;
  54. this.name = name;
  55. }
  56. public void run(){
  57. while(true){
  58. int time = (int)(Math.random() * 10000);
  59. try {
  60. Thread.sleep(time);
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. q.GET(this.name);
  65. }
  66. }
  67. }
  68. public class Main {
  69. public static void main(String[] args){
  70. Testclass t = new Testclass();
  71. Thread t1 = new Thread(new Consumer(t, "consumer 1"));
  72. Thread t2 = new Thread(new Consumer(t, "consumer 2"));
  73. Thread t3 = new Thread(new Producer(t, "producer 1"));
  74. Thread t4 = new Thread(new Producer(t, "producer 2"));
  75. t1.start();
  76. t2.start();
  77. t3.start();
  78. t4.start();
  79. try{
  80. t1.join();
  81. }catch (InterruptedException e){
  82. e.printStackTrace();
  83. }
  84. }
  85. }

this implementation throws following error.

Please explain
why all the threads are throwing Illegal MonitorStateException?

  1. PUT method was called : 1 producer 1
  2. Exception in thread "Thread-2" java.lang.IllegalMonitorStateException: current thread is not owner
  3. at java.base/java.lang.Object.notify(Native Method)
  4. at Testclass.PUT(Main.java:34)
  5. at Producer.run(Main.java:54)
  6. at java.base/java.lang.Thread.run(Thread.java:832)
  7. GET method was called : 1 consumer 1
  8. Exception in thread "Thread-0" java.lang.IllegalMonitorStateException: current thread is not owner
  9. at java.base/java.lang.Object.notify(Native Method)
  10. at Testclass.GET(Main.java:19)
  11. at Consumer.run(Main.java:76)
  12. at java.base/java.lang.Thread.run(Thread.java:832)
  13. Exception in thread "Thread-1" java.lang.IllegalMonitorStateException: current thread is not owner
  14. at java.base/java.lang.Object.wait(Native Method)
  15. at java.base/java.lang.Object.wait(Object.java:321)
  16. at Testclass.GET(Main.java:12)
  17. at Consumer.run(Main.java:76)
  18. at java.base/java.lang.Thread.run(Thread.java:832)
  19. PUT method was called : 2 producer 2
  20. Exception in thread "Thread-3" java.lang.IllegalMonitorStateException: current thread is not owner
  21. at java.base/java.lang.Object.notify(Native Method)
  22. at Testclass.PUT(Main.java:34)
  23. at Producer.run(Main.java:54)
  24. at java.base/java.lang.Thread.run(Thread.java:832)
  25. Process finished with exit code 0

I want to know why is the output like that?
and what is the correct way of implementing it?

答案1

得分: 0

你应该使用 BlockingQueue。这样你就不需要使用 wait/notify,甚至不需要使用 synchronized。

  1. class Testclass {
  2. BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
  3. AtomicInteger count = new AtomicInteger(0);
  4. public void GET(String threadName) throws InterruptedException{
  5. Integer i = queue.take();
  6. count.getAndIncrement();
  7. System.out.println("GET method was called : " + count + " " + threadName);
  8. }
  9. public void PUT(String threadName) throws InterruptedException{
  10. int c = count.get();
  11. queue.put(c);
  12. count.getAndIncrement();
  13. System.out.println("PUT method was called : " + count + " " + threadName);
  14. }
  15. }

我不确定你想如何处理 count 变量。通过更改队列大小,可以轻松地进行扩展。

英文:

You should use a BlockingQueue. Then you don't have to use wait/notify or even synchronzied.

  1. class Testclass {
  2. BlockingQueue&lt;Integer&gt; queue = new ArrayBlockingQueue(1);
  3. AtomicInteger count = new AtomicInteger(0);
  4. public void GET(String threadName) throws InterruptedException{
  5. Integer i = queue.take();
  6. count.getAndIncrement();
  7. System.out.println(&quot;GET method was called : &quot; + count + &quot; &quot; + threadName);
  8. }
  9. public void PUT(String threadName) throws InterruptedException{
  10. int c = count.get();
  11. queue.put(c);
  12. count.getAndIncrement();
  13. System.out.println(&quot;PUT method was called : &quot; + count + &quot; &quot; + threadName);
  14. }
  15. }

I'm not sure what you want to do with count. This is readily extensible by changing the size of your Queue.

huangapple
  • 本文由 发表于 2020年7月27日 17:17:31
  • 转载请务必保留本文链接:https://java.coder-hub.com/63112235.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定