英文:
Mitigating monitor lock penalty: Fast important producers with a slow meek consumer
问题
TL;DR:
- Is there a standard way to implement a solution in Java where an unbounded underlying
Queue<T>
is thread-safe accessible to multiple mutually independent fast producers without them having to wait for a single slow consumer?
Premise:
Struggling with a producer-consumer problem, where multiple producers (Producer extends Thread
) access an underlying queue Queue<Long> q
, and there's a single consumer (Consumer extends Thread
) doing the same.
- The queue is unbounded.
- Producers are faster than the consumer.
- Producers shouldn't wait for access to the queue due to the consumer.
Single queue approach:
Tried using a single queue but had problems due to a slow consumer. Consumer would relinquish its lock to other waiting queues, but this caused producers to wait for the consumer's heavy task.
Two queue approach: using a buffer:
Implemented a solution with two queues q
and qBuffer
. Consumer locks on one queue, exhausts it, and checks the other queue if not empty. If buffer is not empty, it locks on the buffer and empties it. If buffer was empty, it awaits on the first queue. Producers use either queue if available.
Issues:
- Deciding which queue to use when both are busy.
- Needing a flag
FLAG_BUFFER_NOT_EMPTY
to prevent awaiting onq
without exhausting the buffer.
Unsatisfied with this implementation, seeking standard Java solutions for this problem.
Code: (Provided code snippets)
英文:
TL;DR
- Is there a standard way to implement a solution in java where an unbounded underlying
Queue<T>
is thread-safe accessible to multiple mutually independent fast producers without them having to wait for a single slow consumer?
Premise:
I am struggling with a producer-consumer type problem. Consider multiple producers Producer extends Thread
each of which has access to an underlying queue Queue<Long> q
. Same goes for the single consumer Consumer extends Thread
.
For this problem
-
The queue is unbounded and its length doesn't concern us.
-
The producers are faster than the consumer. This means that the atomic and heavy task performed by consumer takes a lot of more time than that compared to the producer's task.
-
The producers are more important than the consumer: A producer shouldn't have to wait for access to queue because of consumer.
Single queue approach:
With a single queue, the best I could come up with was a meek consumer: every time the consumer would apply for a lock
on q
, on getting it, it would immediately relinquish it if there were other queues waiting. If there weren't, it pops at most one element, goes on lock.condition.await
to be notified.
The problem with this approach was that if and when the consumer did find the queue to be free, it would start its atomic and heavy task. Producers who need access during this time then have to wait because of a consumer
Two queue approach: using a buffer (code below)
Here a second queue qBuffer
of the same kind as q
is also accessible to all. The consumer being single, only ever holds a lock
(or is waiting for one) over one of the two queues. A producer, when finds a queue busy, uses the other queue.
The consumer only ever locks or waits on one of the 2 queues. As a result it need not be meek. It waits for a lock on q
, exhausts it. Checks if buffer is not empty, waits for a lock on buffer, exhausts it. If the buffer was empty, goes into await
on q
.
The problems with this approach are:
-
Deciding whihc queue to use: Both queues may be busy. For the producer to decide which queue doesn't have the consumer locking it,
consumerThread
has to be exposed via extendingReentranLock.getOwner()
. The consumer may have beenawait
ing instead which is checked viaReentrantLock.hasQueuedThread(consumerThread)
. Both these methods are only approximate and can end being undecidable or causing the producer to wait behind a consumer on a thread. -
To prevent
await
ing onq
without exhausting buffer, a flagFLAG_BUFFER_NOT_EMPTY
needs to be set.
All in all I am thoroughly dissatisfied with the key part of my implementation and would like to know if there is any standard way of tackling this in java.
Code:
overall (key parts below)
package all.code.classes;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MonitorLockPenaltyMitigation_LocksAndBuffer3 implements Runnable {
private static int pid = 0, cid = 0;
private static final Queue<Long> q = new LinkedList<>();
private static final Queue<Long> qBuffer = new LinkedList<>();
private static final Random rand = new Random();
private final ExposedReentrantLock lock = new ExposedReentrantLock();
private final ExposedReentrantLock bufferLock = new ExposedReentrantLock();
private Thread consumerThread;
private static int cCount = 0, pCount = 0;
private final int n;
private boolean FLAG_BUFFER_NOT_EMPTY = false;
public MonitorLockPenaltyMitigation_LocksAndBuffer3(int noOfThreads) {
this.n = noOfThreads;
}
private static void heavyTask(long iterations) {
for (long i = 0; i < iterations; i++) {
double a = Math.sin(rand.nextDouble()) / Math.sin(rand.nextDouble());
}
}
private static void heavyTask() {
heavyTask(1000000);
}
private static void sop(Object o) {
System.out.println(String.format("%-5s : %s", Thread.currentThread().getName(), o.toString()));
}
private static void sopl(Object o) {
System.out.print(String.format("%-5s : %s", Thread.currentThread().getName(), o.toString()));
}
@Override
public void run() {
consumerThread = new Consumer();
long t0 = 25, dt = 25;
consumerThread.start();
for (int i = 0; i < n; i++) {
new Producer(1 + rand.nextInt(5000)).start();
}
}
private class Producer extends Thread {
public long duration;
public Producer(long sleepDuration) {
super(" " + pid++);
this.duration = sleepDuration;
}
private void produceAndUnlock(Queue<Long> q, ExposedReentrantLock lock) {
q.add(duration);
// heavyTask(100000);
lock.condition.signalAll();
lock.unlock();
sop("done " + pCount++);
}
@Override
public void run() {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
sop("interrupted");
}
sop(String.format("%s %-5d", "awake", duration));
if (lock.tryLock()) {
sop("got q");
produceAndUnlock(q, lock);
} else if (bufferLock.tryLock()) {
sop("got buffer");
raiseBufferNotEmptyFlag();
produceAndUnlock(qBuffer, bufferLock);
} else
{
//both queues are busy
//but only one could be because of consumer
//try to get a lock on the other one
//because of fairness, when consumer would apply for lock on that thread
//it will succeed, never precede
sop("both busy q:" + lock.getOwner().getName() + " qBuffer:" + bufferLock.getOwner().getName());
if (consumerThread.equals(lock.getOwner())
|| lock.hasQueuedThread(consumerThread)) {
sop("consumer is on q waiting or runnning");
sop("waiting for bufferLock");
bufferLock.lock();
produceAndUnlock(qBuffer, bufferLock);
} else if (consumerThread.equals(bufferLock.getOwner()) ||
bufferLock.hasQueuedThread(consumerThread))
{
sop("consumer is on qBuffer waiting or runnning");
sop("waiting for lock");
lock.lock();
produceAndUnlock(q, lock);
}
else
{
//TODO uncertain ground: the methods above are approximate
//so here on out don't know what to do
sop("consumer not on any!");
sop("consumer state "+consumerThread.getState());
sop("applying to buffer");
bufferLock.lock();
produceAndUnlock(qBuffer,bufferLock);
// System.err.println("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
// System.exit(0);
}
}
}
}
private class Consumer extends Thread {
public Consumer() {
super("C " + cid++);
}
@Override
public void run() {
while (true) {
sop("waiting for lock on q");
lock.lock();
sop("locked q");
consumeQueue(q);
sop("emptied q");
if (FLAG_BUFFER_NOT_EMPTY) {
sop("buffer not empty");
lock.unlock();
sop("unlocked q");
sop("waiting for lock on buffer");
bufferLock.lock();
sop("locked buffer");
consumeQueue(qBuffer);
sop("emptied buffer");
lowerBufferNotEmptyFlag();
bufferLock.unlock();
sop("unlocked buffer");
continue;
}
while (q.isEmpty()) {
sop("empty q!");
try {
sop("unlocked q and going into wait on q");
lock.condition.await();
sop("awoke and locked on q");
} catch (InterruptedException e) {
}
}
consumeQueue(q);
lock.unlock();
sop("done unlocked q");
}
}
private void consumeQueue(Queue<Long> q) {
while (!q.isEmpty()) {
sop("polling " + ++cCount + "/" + n + " " + q.poll());
heavyTask();
}
}
}
public void raiseBufferNotEmptyFlag() {
FLAG_BUFFER_NOT_EMPTY = true;
}
public void lowerBufferNotEmptyFlag() {
FLAG_BUFFER_NOT_EMPTY = false;
}
private class ExposedReentrantLock extends ReentrantLock {
public Condition condition;
public ExposedReentrantLock() {
super();
this.condition = newCondition();
}
public ExposedReentrantLock(boolean fair) {
super(fair);
this.condition = newCondition();
}
public Thread getOwner() {
return super.getOwner();
}
}
public static void main(String[] args) throws InterruptedException {
int n = 250;
sop("This code uses " + n + " threads as a test case\n" +
"You may pass the no of threads as the one and only int argument, if you want\n");
if (args.length == 1) {
n = Integer.parseInt(args[0]);
}
sop("No of threads " + n+"\n");
new Thread(new MonitorLockPenaltyMitigation_LocksAndBuffer3(n), MonitorLockPenaltyMitigation_LocksAndBuffer3.class.getSimpleName()).start();
}
}
Key part of Consumer extends Thread
public void run() {
while (true) {
sop("waiting for lock on q");
lock.lock();
sop("locked q");
consumeQueue(q);
sop("emptied q");
if (FLAG_BUFFER_NOT_EMPTY) {
sop("buffer not empty");
lock.unlock();
sop("unlocked q");
sop("waiting for lock on buffer");
bufferLock.lock();
sop("locked buffer");
consumeQueue(qBuffer);
sop("emptied buffer");
lowerBufferNotEmptyFlag();
bufferLock.unlock();
sop("unlocked buffer");
continue;
}
while (q.isEmpty()) {
sop("empty q!");
try {
sop("unlocked q and going into wait on q");
lock.condition.await();
sop("awoke and locked on q");
} catch (InterruptedException e) {
}
}
consumeQueue(q);
lock.unlock();
sop("done unlocked q");
}
}
private void consumeQueue(Queue<Long> q) {
while (!q.isEmpty()) {
sop("polling " + ++cCount + "/" + n + " " + q.poll());
heavyTask();
}
}
Key part of Producer extends Thread
@Override
public void run() {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
sop("interrupted");
}
sop(String.format("%s %-5d", "awake", duration));
if (lock.tryLock()) {
sop("got q");
produceAndUnlock(q, lock);
} else if (bufferLock.tryLock()) {
sop("got buffer");
raiseBufferNotEmptyFlag();
produceAndUnlock(qBuffer, bufferLock);
} else
{
//both queues are busy
//but only one could be because of consumer
//try to get a lock on the other one
//because of fairness, when consumer would apply for lock on that thread
//it will succeed, never precede
sop("both busy q:" + lock.getOwner().getName() + " qBuffer:" + bufferLock.getOwner().getName());
if (consumerThread.equals(lock.getOwner())
|| lock.hasQueuedThread(consumerThread)) {
sop("consumer is on q waiting or runnning");
sop("waiting for bufferLock");
bufferLock.lock();
produceAndUnlock(qBuffer, bufferLock);
} else if (consumerThread.equals(bufferLock.getOwner()) ||
bufferLock.hasQueuedThread(consumerThread))
{
sop("consumer is on qBuffer waiting or runnning");
sop("waiting for lock");
lock.lock();
produceAndUnlock(q, lock);
}
else
{
//TODO uncertain ground: the methods above are approximate
//so here on out don't know what to do
sop("consumer not on any!");
sop("consumer state "+consumerThread.getState());
sop("applying to buffer");
bufferLock.lock();
produceAndUnlock(qBuffer,bufferLock);
// System.err.println("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
// System.exit(0);
}
}
}
private void produceAndUnlock(Queue<Long> q, ExposedReentrantLock lock) {
q.add(duration);
// heavyTask(100000);
lock.condition.signalAll();
lock.unlock();
sop("done " + pCount++);
}
Consumer class(included in overall)
private class Consumer extends Thread {
public Consumer() {
super("C " + cid++);
}
@Override
public void run() {
while (true) {
sop("waiting for lock on q");
lock.lock();
sop("locked q");
consumeQueue(q);
sop("emptied q");
if (FLAG_BUFFER_NOT_EMPTY) {
sop("buffer not empty");
lock.unlock();
sop("unlocked q");
sop("waiting for lock on buffer");
bufferLock.lock();
sop("locked buffer");
consumeQueue(qBuffer);
sop("emptied buffer");
lowerBufferNotEmptyFlag();
bufferLock.unlock();
sop("unlocked buffer");
continue;
}
while (q.isEmpty()) {
sop("empty q!");
try {
sop("unlocked q and going into wait on q");
lock.condition.await();
sop("awoke and locked on q");
} catch (InterruptedException e) {
}
}
consumeQueue(q);
lock.unlock();
sop("done unlocked q");
}
}
private void consumeQueue(Queue<Long> q) {
while (!q.isEmpty()) {
sop("polling " + ++cCount + "/" + n + " " + q.poll());
heavyTask();
}
}
}
Producer class(included in overall)
private class Producer extends Thread {
public long duration;
public Producer(long sleepDuration) {
super(" " + pid++);
this.duration = sleepDuration;
}
private void produceAndUnlock(Queue<Long> q, ExposedReentrantLock lock) {
q.add(duration);
// heavyTask(100000);
lock.condition.signalAll();
lock.unlock();
sop("done " + pCount++);
}
@Override
public void run() {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
sop("interrupted");
}
sop(String.format("%s %-5d", "awake", duration));
if (lock.tryLock()) {
sop("got q");
produceAndUnlock(q, lock);
} else if (bufferLock.tryLock()) {
sop("got buffer");
raiseBufferNotEmptyFlag();
produceAndUnlock(qBuffer, bufferLock);
} else
{
//both queues are busy
//but only one could be because of consumer
//try to get a lock on the other one
//because of fairness, when consumer would apply for lock on that thread
//it will succeed, never precede
sop("both busy q:" + lock.getOwner().getName() + " qBuffer:" + bufferLock.getOwner().getName());
if (consumerThread.equals(lock.getOwner())
|| lock.hasQueuedThread(consumerThread)) {
sop("consumer is on q waiting or runnning");
sop("waiting for bufferLock");
bufferLock.lock();
produceAndUnlock(qBuffer, bufferLock);
} else if (consumerThread.equals(bufferLock.getOwner()) ||
bufferLock.hasQueuedThread(consumerThread))
{
sop("consumer is on qBuffer waiting or runnning");
sop("waiting for lock");
lock.lock();
produceAndUnlock(q, lock);
}
else
{
//TODO uncertain ground: the methods above are approximate
//so here on out don't know what to do
sop("consumer not on any!");
sop("consumer state "+consumerThread.getState());
sop("applying to buffer");
bufferLock.lock();
produceAndUnlock(qBuffer,bufferLock);
// System.err.println("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
// System.exit(0);
}
}
}
}
专注分享java语言的经验与见解,让所有开发者获益!
评论