如何在Java中正确实现生产者消费者模式。

huangapple 未分类评论63阅读模式
英文:

how to properly implement producer - consumer in Java

问题

我是您的中文翻译,以下是翻译好的内容:

我是这样实现生产者消费者的。

但是它抛出了错误。

我尝试使用了使用锁的这种方法。链接

class Testclass {
    Boolean isFresh = false;
    int count = 0;
    public synchronized void GET(String threadName){
        while(!isFresh){
            try {
                isFresh.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("GET方法被调用:" + count + " " + threadName);
        isFresh = false;
        isFresh.notify();

    }

    public synchronized void PUT(String threadName){
        while(isFresh){
            try{
                isFresh.wait();
            }catch( InterruptedException e){
                e.printStackTrace();
            }
        }
        count++;
        System.out.println("PUT方法被调用:" + count + " " + threadName);
        isFresh = true;
        isFresh.notify();
    }

}
class Producer implements Runnable{
    Testclass q;
    String name;
    Producer(Testclass q, String name){
        this.q = q;
        this.name = name;
    }
    public void run(){
        while(true){
            int time = (int)(Math.random() * 10000);

            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            q.PUT(this.name);
        }

    }
}

class Consumer implements Runnable{
    Testclass q;
    String name ;
    Consumer(Testclass q,String name){
        this.q = q;
        this.name = name;
    }
    public void run(){
        while(true){
            int time = (int)(Math.random() * 10000);

            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            q.GET(this.name);
        }

    }
}

public class Main {
    public static void main(String[] args){
        Testclass t = new Testclass();
        Thread t1 = new Thread(new Consumer(t, "消费者 1"));
        Thread t2 = new Thread(new Consumer(t, "消费者 2"));
        Thread t3 = new Thread(new Producer(t, "生产者 1"));
        Thread t4 = new Thread(new Producer(t, "生产者 2"));
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        try{
            t1.join();
        }catch (InterruptedException e){
            e.printStackTrace();
        }

    }
}

这个实现会抛出以下错误。

请解释为什么所有线程都抛出了 Illegal MonitorStateException 错误?

PUT方法被调用1 生产者 1
Exception in thread "Thread-2" java.lang.IllegalMonitorStateException: 当前线程不是所有者
	at java.base/java.lang.Object.notify(Native Method)
	at Testclass.PUT(Main.java:34)
	at Producer.run(Main.java:54)
	at java.base/java.lang.Thread.run(Thread.java:832)
GET方法被调用1 消费者 1
Exception in thread "Thread-0" java.lang.IllegalMonitorStateException: 当前线程不是所有者
	at java.base/java.lang.Object.notify(Native Method)
	at Testclass.GET(Main.java:19)
	at Consumer.run(Main.java:76)
	at java.base/java.lang.Thread.run(Thread.java:832)
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException: 当前线程不是所有者
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Object.wait(Object.java:321)
	at Testclass.GET(Main.java:12)
	at Consumer.run(Main.java:76)
	at java.base/java.lang.Thread.run(Thread.java:832)
PUT方法被调用2 生产者 2
Exception in thread "Thread-3" java.lang.IllegalMonitorStateException: 当前线程不是所有者
	at java.base/java.lang.Object.notify(Native Method)
	at Testclass.PUT(Main.java:34)
	at Producer.run(Main.java:54)
	at java.base/java.lang.Thread.run(Thread.java:832)

Process finished with exit code 0

我想知道为什么输出会是这样的?正确的实现方法是什么?

英文:

I implemented producer consumer like this.

But it is throwing error.

I tried to use this method of using lock. Link

class Testclass {
    Boolean isFresh = false;
    int count = 0;
    public synchronized void GET(String threadName){
        while(!isFresh){
            try {
                isFresh.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("GET method was called : " + count + " " + threadName);
        isFresh = false;
        isFresh.notify();

    }

    public synchronized void PUT(String threadName){
        while(isFresh){
            try{
                isFresh.wait();
            }catch( InterruptedException e){
                e.printStackTrace();
            }
        }
        count++;
        System.out.println("PUT method was called : " + count + " " + threadName);
        isFresh = true;
        isFresh.notify();
    }

}
class Producer implements Runnable{
    Testclass q;
    String name;
    Producer(Testclass q, String name){
        this.q = q;
        this.name = name;
    }
    public void run(){
        while(true){
            int time = (int)(Math.random() * 10000);

            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            q.PUT(this.name);
        }

    }
}

class Consumer implements Runnable{
    Testclass q;
    String name ;
    Consumer(Testclass q,String name){
        this.q = q;
        this.name = name;
    }
    public void run(){
        while(true){
            int time = (int)(Math.random() * 10000);

            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            q.GET(this.name);
        }

    }
}


public class Main {
    public static void main(String[] args){
        Testclass t = new Testclass();
        Thread t1 = new Thread(new Consumer(t, "consumer 1"));
        Thread t2 = new Thread(new Consumer(t, "consumer 2"));
        Thread t3 = new Thread(new Producer(t, "producer 1"));
        Thread t4 = new Thread(new Producer(t, "producer 2"));
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        try{
            t1.join();
        }catch (InterruptedException e){
            e.printStackTrace();
        }

    }



}

this implementation throws following error.

Please explain
why all the threads are throwing Illegal MonitorStateException?

PUT method was called : 1 producer 1
Exception in thread "Thread-2" java.lang.IllegalMonitorStateException: current thread is not owner
	at java.base/java.lang.Object.notify(Native Method)
	at Testclass.PUT(Main.java:34)
	at Producer.run(Main.java:54)
	at java.base/java.lang.Thread.run(Thread.java:832)
GET method was called : 1 consumer 1
Exception in thread "Thread-0" java.lang.IllegalMonitorStateException: current thread is not owner
	at java.base/java.lang.Object.notify(Native Method)
	at Testclass.GET(Main.java:19)
	at Consumer.run(Main.java:76)
	at java.base/java.lang.Thread.run(Thread.java:832)
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException: current thread is not owner
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Object.wait(Object.java:321)
	at Testclass.GET(Main.java:12)
	at Consumer.run(Main.java:76)
	at java.base/java.lang.Thread.run(Thread.java:832)
PUT method was called : 2 producer 2
Exception in thread "Thread-3" java.lang.IllegalMonitorStateException: current thread is not owner
	at java.base/java.lang.Object.notify(Native Method)
	at Testclass.PUT(Main.java:34)
	at Producer.run(Main.java:54)
	at java.base/java.lang.Thread.run(Thread.java:832)

Process finished with exit code 0

I want to know why is the output like that?
and what is the correct way of implementing it?

答案1

得分: 0

你应该使用 BlockingQueue。这样你就不需要使用 wait/notify,甚至不需要使用 synchronized。

class Testclass {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
    AtomicInteger count = new AtomicInteger(0);
    public void GET(String threadName) throws InterruptedException{
        Integer i = queue.take();
        count.getAndIncrement();
        System.out.println("GET method was called : " + count + " " + threadName);
    }

    public void PUT(String threadName) throws InterruptedException{

        int c = count.get();
        queue.put(c);
        count.getAndIncrement();
        System.out.println("PUT method was called : " + count + " " + threadName);
    }

}

我不确定你想如何处理 count 变量。通过更改队列大小,可以轻松地进行扩展。

英文:

You should use a BlockingQueue. Then you don't have to use wait/notify or even synchronzied.

class Testclass {
    BlockingQueue&lt;Integer&gt; queue = new ArrayBlockingQueue(1);
    AtomicInteger count = new AtomicInteger(0);
    public void GET(String threadName) throws InterruptedException{
        Integer i = queue.take();
        count.getAndIncrement();
        System.out.println(&quot;GET method was called : &quot; + count + &quot; &quot; + threadName);
    }

    public void PUT(String threadName) throws InterruptedException{

        int c = count.get();
        queue.put(c);
        count.getAndIncrement();
        System.out.println(&quot;PUT method was called : &quot; + count + &quot; &quot; + threadName);
    }

}

I'm not sure what you want to do with count. This is readily extensible by changing the size of your Queue.

huangapple
  • 本文由 发表于 2020年7月27日 17:17:31
  • 转载请务必保留本文链接:https://java.coder-hub.com/63112235.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定