缓解监视器锁定惩罚:快速的重要生产者与缓慢的温顺消费者

huangapple 未分类评论43阅读模式
英文:

Mitigating monitor lock penalty: Fast important producers with a slow meek consumer

问题

TL;DR:

  1. Is there a standard way to implement a solution in Java where an unbounded underlying Queue<T> is thread-safe accessible to multiple mutually independent fast producers without them having to wait for a single slow consumer?

Premise:

Struggling with a producer-consumer problem, where multiple producers (Producer extends Thread) access an underlying queue Queue<Long> q, and there's a single consumer (Consumer extends Thread) doing the same.

  1. The queue is unbounded.
  2. Producers are faster than the consumer.
  3. Producers shouldn't wait for access to the queue due to the consumer.

Single queue approach:

Tried using a single queue but had problems due to a slow consumer. Consumer would relinquish its lock to other waiting queues, but this caused producers to wait for the consumer's heavy task.

Two queue approach: using a buffer:

Implemented a solution with two queues q and qBuffer. Consumer locks on one queue, exhausts it, and checks the other queue if not empty. If buffer is not empty, it locks on the buffer and empties it. If buffer was empty, it awaits on the first queue. Producers use either queue if available.

Issues:

  1. Deciding which queue to use when both are busy.
  2. Needing a flag FLAG_BUFFER_NOT_EMPTY to prevent awaiting on q without exhausting the buffer.

Unsatisfied with this implementation, seeking standard Java solutions for this problem.

Code: (Provided code snippets)

英文:

TL;DR

  1. Is there a standard way to implement a solution in java where an unbounded underlying Queue&lt;T&gt; is thread-safe accessible to multiple mutually independent fast producers without them having to wait for a single slow consumer?

Premise:

I am struggling with a producer-consumer type problem. Consider multiple producers Producer extends Thread each of which has access to an underlying queue Queue&lt;Long&gt; q. Same goes for the single consumer Consumer extends Thread.

For this problem

  1. The queue is unbounded and its length doesn't concern us.

  2. The producers are faster than the consumer. This means that the atomic and heavy task performed by consumer takes a lot of more time than that compared to the producer's task.

  3. The producers are more important than the consumer: A producer shouldn't have to wait for access to queue because of consumer.

Single queue approach:

With a single queue, the best I could come up with was a meek consumer: every time the consumer would apply for a lock on q, on getting it, it would immediately relinquish it if there were other queues waiting. If there weren't, it pops at most one element, goes on lock.condition.await to be notified.

The problem with this approach was that if and when the consumer did find the queue to be free, it would start its atomic and heavy task. Producers who need access during this time then have to wait because of a consumer

Two queue approach: using a buffer (code below)

Here a second queue qBuffer of the same kind as q is also accessible to all. The consumer being single, only ever holds a lock(or is waiting for one) over one of the two queues. A producer, when finds a queue busy, uses the other queue.

The consumer only ever locks or waits on one of the 2 queues. As a result it need not be meek. It waits for a lock on q, exhausts it. Checks if buffer is not empty, waits for a lock on buffer, exhausts it. If the buffer was empty, goes into await on q.

The problems with this approach are:

  1. Deciding whihc queue to use: Both queues may be busy. For the producer to decide which queue doesn't have the consumer locking it, consumerThread has to be exposed via extending ReentranLock.getOwner(). The consumer may have been awaiting instead which is checked via ReentrantLock.hasQueuedThread(consumerThread). Both these methods are only approximate and can end being undecidable or causing the producer to wait behind a consumer on a thread.

  2. To prevent awaiting on q without exhausting buffer, a flag FLAG_BUFFER_NOT_EMPTY needs to be set.

All in all I am thoroughly dissatisfied with the key part of my implementation and would like to know if there is any standard way of tackling this in java.

Code:

overall (key parts below)

package all.code.classes;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MonitorLockPenaltyMitigation_LocksAndBuffer3 implements Runnable {

    private static int pid = 0, cid = 0;
    private static final Queue&lt;Long&gt; q = new LinkedList&lt;&gt;();
    private static final Queue&lt;Long&gt; qBuffer = new LinkedList&lt;&gt;();
    private static final Random rand = new Random();
    private final ExposedReentrantLock lock = new ExposedReentrantLock();
    private final ExposedReentrantLock bufferLock = new ExposedReentrantLock();
    private Thread consumerThread;
    private static int cCount = 0, pCount = 0;
    private final int n;
    private boolean FLAG_BUFFER_NOT_EMPTY = false;

    public MonitorLockPenaltyMitigation_LocksAndBuffer3(int noOfThreads) {
        this.n = noOfThreads;
    }

    private static void heavyTask(long iterations) {
        for (long i = 0; i &lt; iterations; i++) {
            double a = Math.sin(rand.nextDouble()) / Math.sin(rand.nextDouble());
        }

    }

    private static void heavyTask() {
        heavyTask(1000000);
    }





    private static void sop(Object o) {
        System.out.println(String.format(&quot;%-5s : %s&quot;, Thread.currentThread().getName(), o.toString()));
    }

    private static void sopl(Object o) {
        System.out.print(String.format(&quot;%-5s : %s&quot;, Thread.currentThread().getName(), o.toString()));
    }


    @Override
    public void run() {


        consumerThread = new Consumer();
        long t0 = 25, dt = 25;
        consumerThread.start();
        for (int i = 0; i &lt; n; i++) {
            new Producer(1 + rand.nextInt(5000)).start();
        }


    }

    private class Producer extends Thread {

        public long duration;

        public Producer(long sleepDuration) {
            super(&quot;  &quot; + pid++);


            this.duration = sleepDuration;
        }


        private void produceAndUnlock(Queue&lt;Long&gt; q, ExposedReentrantLock lock) {

            q.add(duration);
//            heavyTask(100000);
            lock.condition.signalAll();
            lock.unlock();
            sop(&quot;done &quot; + pCount++);

        }

        @Override
        public void run() {

            try {
                Thread.sleep(duration);
            } catch (InterruptedException e) {
                sop(&quot;interrupted&quot;);
            }


            sop(String.format(&quot;%s %-5d&quot;, &quot;awake&quot;, duration));


            if (lock.tryLock()) {
                sop(&quot;got q&quot;);
                produceAndUnlock(q, lock);
            } else if (bufferLock.tryLock()) {
                sop(&quot;got buffer&quot;);
                raiseBufferNotEmptyFlag();
                produceAndUnlock(qBuffer, bufferLock);
            } else
                {

                //both queues are busy
                //but only one could be because of consumer
                //try to get a lock on the other one
                //because of fairness, when consumer would apply for lock on that thread
                //it will succeed, never precede


                sop(&quot;both busy q:&quot; + lock.getOwner().getName() + &quot; qBuffer:&quot; + bufferLock.getOwner().getName());


                if (consumerThread.equals(lock.getOwner())
                        || lock.hasQueuedThread(consumerThread)) {
                    sop(&quot;consumer is on q waiting or runnning&quot;);
                    sop(&quot;waiting for  bufferLock&quot;);
                    bufferLock.lock();
                    produceAndUnlock(qBuffer, bufferLock);
                } else if (consumerThread.equals(bufferLock.getOwner()) ||
                        bufferLock.hasQueuedThread(consumerThread))
                {
                    sop(&quot;consumer is on qBuffer waiting or runnning&quot;);
                    sop(&quot;waiting for  lock&quot;);
                    lock.lock();
                    produceAndUnlock(q, lock);
                }
                else

                    {
                        //TODO uncertain ground: the methods above are approximate
                        //so here on out don&#39;t know what to do
                        sop(&quot;consumer not on any!&quot;);
                        sop(&quot;consumer state &quot;+consumerThread.getState());
                        sop(&quot;applying to buffer&quot;);
                        bufferLock.lock();
                        produceAndUnlock(qBuffer,bufferLock);

//                    System.err.println(&quot;XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&quot;);
//                    System.exit(0);
                }


            }


        }


    }

    private class Consumer extends Thread {


        public Consumer() {
            super(&quot;C &quot; + cid++);

        }


        @Override
        public void run() {


            while (true) {



                sop(&quot;waiting for lock on q&quot;);

                lock.lock();
                sop(&quot;locked q&quot;);
                consumeQueue(q);
                sop(&quot;emptied q&quot;);
                if (FLAG_BUFFER_NOT_EMPTY) {
                    sop(&quot;buffer not empty&quot;);
                    lock.unlock();
                    sop(&quot;unlocked q&quot;);
                    sop(&quot;waiting for lock on buffer&quot;);
                    bufferLock.lock();
                    sop(&quot;locked buffer&quot;);
                    consumeQueue(qBuffer);
                    sop(&quot;emptied buffer&quot;);
                    lowerBufferNotEmptyFlag();
                    bufferLock.unlock();
                    sop(&quot;unlocked buffer&quot;);
                    continue;
                }

                while (q.isEmpty()) {
                    sop(&quot;empty q!&quot;);
                    try {
                        sop(&quot;unlocked q and going into wait on q&quot;);
                        lock.condition.await();
                        sop(&quot;awoke and locked on q&quot;);

                    } catch (InterruptedException e) {

                    }

                }
                consumeQueue(q);
                lock.unlock();
                sop(&quot;done unlocked q&quot;);


            }


        }

        private void consumeQueue(Queue&lt;Long&gt; q) {

            while (!q.isEmpty()) {
                sop(&quot;polling &quot; + ++cCount + &quot;/&quot; + n + &quot; &quot; + q.poll());
                heavyTask();
            }


        }




    }

    public void raiseBufferNotEmptyFlag() {
        FLAG_BUFFER_NOT_EMPTY = true;
    }

    public void lowerBufferNotEmptyFlag() {
        FLAG_BUFFER_NOT_EMPTY = false;
    }


    private class ExposedReentrantLock extends ReentrantLock {


        public Condition condition;

        public ExposedReentrantLock() {
            super();
            this.condition = newCondition();
        }

        public ExposedReentrantLock(boolean fair) {
            super(fair);
            this.condition = newCondition();
        }

        public Thread getOwner() {
            return super.getOwner();
        }


    }

    public static void main(String[] args) throws InterruptedException {

        int n = 250;
        sop(&quot;This code uses &quot; + n + &quot; threads as a test case\n&quot; +
                &quot;You may pass the no of threads as the one and only int argument, if you want\n&quot;);
        if (args.length == 1) {
            n = Integer.parseInt(args[0]);

        }
        sop(&quot;No of threads &quot; + n+&quot;\n&quot;);
        new Thread(new MonitorLockPenaltyMitigation_LocksAndBuffer3(n), MonitorLockPenaltyMitigation_LocksAndBuffer3.class.getSimpleName()).start();

    }

}








Key part of Consumer extends Thread

 public void run() {


            while (true) {



                sop(&quot;waiting for lock on q&quot;);

                lock.lock();
                sop(&quot;locked q&quot;);
                consumeQueue(q);
                sop(&quot;emptied q&quot;);
                if (FLAG_BUFFER_NOT_EMPTY) {
                    sop(&quot;buffer not empty&quot;);
                    lock.unlock();
                    sop(&quot;unlocked q&quot;);
                    sop(&quot;waiting for lock on buffer&quot;);
                    bufferLock.lock();
                    sop(&quot;locked buffer&quot;);
                    consumeQueue(qBuffer);
                    sop(&quot;emptied buffer&quot;);
                    lowerBufferNotEmptyFlag();
                    bufferLock.unlock();
                    sop(&quot;unlocked buffer&quot;);
                    continue;
                }

                while (q.isEmpty()) {
                    sop(&quot;empty q!&quot;);
                    try {
                        sop(&quot;unlocked q and going into wait on q&quot;);
                        lock.condition.await();
                        sop(&quot;awoke and locked on q&quot;);

                    } catch (InterruptedException e) {

                    }

                }
                consumeQueue(q);
                lock.unlock();
                sop(&quot;done unlocked q&quot;);


            }


        }

 private void consumeQueue(Queue&lt;Long&gt; q) {

            while (!q.isEmpty()) {
                sop(&quot;polling &quot; + ++cCount + &quot;/&quot; + n + &quot; &quot; + q.poll());
                heavyTask();
            }


        }

Key part of Producer extends Thread


@Override
        public void run() {

            try {
                Thread.sleep(duration);
            } catch (InterruptedException e) {
                sop(&quot;interrupted&quot;);
            }


            sop(String.format(&quot;%s %-5d&quot;, &quot;awake&quot;, duration));


            if (lock.tryLock()) {
                sop(&quot;got q&quot;);
                produceAndUnlock(q, lock);
            } else if (bufferLock.tryLock()) {
                sop(&quot;got buffer&quot;);
                raiseBufferNotEmptyFlag();
                produceAndUnlock(qBuffer, bufferLock);
            } else
                {

                //both queues are busy
                //but only one could be because of consumer
                //try to get a lock on the other one
                //because of fairness, when consumer would apply for lock on that thread
                //it will succeed, never precede


                sop(&quot;both busy q:&quot; + lock.getOwner().getName() + &quot; qBuffer:&quot; + bufferLock.getOwner().getName());


                if (consumerThread.equals(lock.getOwner())
                        || lock.hasQueuedThread(consumerThread)) {
                    sop(&quot;consumer is on q waiting or runnning&quot;);
                    sop(&quot;waiting for  bufferLock&quot;);
                    bufferLock.lock();
                    produceAndUnlock(qBuffer, bufferLock);
                } else if (consumerThread.equals(bufferLock.getOwner()) ||
                        bufferLock.hasQueuedThread(consumerThread))
                {
                    sop(&quot;consumer is on qBuffer waiting or runnning&quot;);
                    sop(&quot;waiting for  lock&quot;);
                    lock.lock();
                    produceAndUnlock(q, lock);
                }
                else

                    {
                        //TODO uncertain ground: the methods above are approximate
                        //so here on out don&#39;t know what to do
                        sop(&quot;consumer not on any!&quot;);
                        sop(&quot;consumer state &quot;+consumerThread.getState());
                        sop(&quot;applying to buffer&quot;);
                        bufferLock.lock();
                        produceAndUnlock(qBuffer,bufferLock);

//                    System.err.println(&quot;XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&quot;);
//                    System.exit(0);
                }


            }


        }
        
        private void produceAndUnlock(Queue&lt;Long&gt; q, ExposedReentrantLock lock) {

            q.add(duration);
//            heavyTask(100000);
            lock.condition.signalAll();
            lock.unlock();
            sop(&quot;done &quot; + pCount++);

        }

Consumer class(included in overall)

 private class Consumer extends Thread {


        public Consumer() {
            super(&quot;C &quot; + cid++);

        }


        @Override
        public void run() {


            while (true) {



                sop(&quot;waiting for lock on q&quot;);

                lock.lock();
                sop(&quot;locked q&quot;);
                consumeQueue(q);
                sop(&quot;emptied q&quot;);
                if (FLAG_BUFFER_NOT_EMPTY) {
                    sop(&quot;buffer not empty&quot;);
                    lock.unlock();
                    sop(&quot;unlocked q&quot;);
                    sop(&quot;waiting for lock on buffer&quot;);
                    bufferLock.lock();
                    sop(&quot;locked buffer&quot;);
                    consumeQueue(qBuffer);
                    sop(&quot;emptied buffer&quot;);
                    lowerBufferNotEmptyFlag();
                    bufferLock.unlock();
                    sop(&quot;unlocked buffer&quot;);
                    continue;
                }

                while (q.isEmpty()) {
                    sop(&quot;empty q!&quot;);
                    try {
                        sop(&quot;unlocked q and going into wait on q&quot;);
                        lock.condition.await();
                        sop(&quot;awoke and locked on q&quot;);

                    } catch (InterruptedException e) {

                    }

                }
                consumeQueue(q);
                lock.unlock();
                sop(&quot;done unlocked q&quot;);


            }


        }

        private void consumeQueue(Queue&lt;Long&gt; q) {

            while (!q.isEmpty()) {
                sop(&quot;polling &quot; + ++cCount + &quot;/&quot; + n + &quot; &quot; + q.poll());
                heavyTask();
            }


        }




    }

Producer class(included in overall)

 private class Producer extends Thread {

        public long duration;

        public Producer(long sleepDuration) {
            super(&quot;  &quot; + pid++);


            this.duration = sleepDuration;
        }


        private void produceAndUnlock(Queue&lt;Long&gt; q, ExposedReentrantLock lock) {

            q.add(duration);
//            heavyTask(100000);
            lock.condition.signalAll();
            lock.unlock();
            sop(&quot;done &quot; + pCount++);

        }

        @Override
        public void run() {

            try {
                Thread.sleep(duration);
            } catch (InterruptedException e) {
                sop(&quot;interrupted&quot;);
            }


            sop(String.format(&quot;%s %-5d&quot;, &quot;awake&quot;, duration));


            if (lock.tryLock()) {
                sop(&quot;got q&quot;);
                produceAndUnlock(q, lock);
            } else if (bufferLock.tryLock()) {
                sop(&quot;got buffer&quot;);
                raiseBufferNotEmptyFlag();
                produceAndUnlock(qBuffer, bufferLock);
            } else
                {

                //both queues are busy
                //but only one could be because of consumer
                //try to get a lock on the other one
                //because of fairness, when consumer would apply for lock on that thread
                //it will succeed, never precede


                sop(&quot;both busy q:&quot; + lock.getOwner().getName() + &quot; qBuffer:&quot; + bufferLock.getOwner().getName());


                if (consumerThread.equals(lock.getOwner())
                        || lock.hasQueuedThread(consumerThread)) {
                    sop(&quot;consumer is on q waiting or runnning&quot;);
                    sop(&quot;waiting for  bufferLock&quot;);
                    bufferLock.lock();
                    produceAndUnlock(qBuffer, bufferLock);
                } else if (consumerThread.equals(bufferLock.getOwner()) ||
                        bufferLock.hasQueuedThread(consumerThread))
                {
                    sop(&quot;consumer is on qBuffer waiting or runnning&quot;);
                    sop(&quot;waiting for  lock&quot;);
                    lock.lock();
                    produceAndUnlock(q, lock);
                }
                else

                    {
                        //TODO uncertain ground: the methods above are approximate
                        //so here on out don&#39;t know what to do
                        sop(&quot;consumer not on any!&quot;);
                        sop(&quot;consumer state &quot;+consumerThread.getState());
                        sop(&quot;applying to buffer&quot;);
                        bufferLock.lock();
                        produceAndUnlock(qBuffer,bufferLock);

//                    System.err.println(&quot;XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&quot;);
//                    System.exit(0);
                }


            }


        }


    }

huangapple
  • 本文由 发表于 2020年7月24日 23:23:25
  • 转载请务必保留本文链接:https://java.coder-hub.com/63076627.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定