英文:
Java Multi-threading producer-consumer
问题
在此代码之后,如何实现在发送记录和获取记录之间的多线程操作。
public class Test_ProducerConsumer {
// 主方法
public static void main(String[] args) {
// 创建发送记录线程
Thread sendThread = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 10; i++) {
kafka_io.send_records(sendtopic, "test_key", "test_value" + String.valueOf(i));
}
}
});
// 创建获取记录线程
Thread getThread = new Thread(new Runnable() {
@Override
public void run() {
try {
kafka_io.get_records(gettopic, 100000);
} catch (JSONException e) {
e.printStackTrace();
}
}
});
// 启动线程
sendThread.start();
getThread.start();
// 等待线程结束
try {
sendThread.join();
getThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
请注意,为了实现发送和获取记录之间的多线程操作,我们创建了两个线程,一个用于发送记录,另一个用于获取记录。通过调用start()
方法来启动这些线程,并通过join()
方法等待它们执行完毕。这样可以在两个操作之间实现并行处理。
英文:
Following this code, how to implement multithread between send records and get records.
public class Test_ProducerConsumer {
// Main method
public static void main(String[] args) {
// Send records
for(int i=0; i<10; i++) {
kafka_io.send_records(sendtopic, "test_key", "test_value" + String.valueOf(i));
}
// Get records
try {
kafka_io.get_records(gettopic, 100000);
} catch (JSONException e) {
e.printStackTrace();
}
}
}
答案1
得分: 0
你可以使用 BlockingQueue。
在接收项目的一侧,您可以阻塞线程,直到有新项目到达,同时使用其他任何线程可以添加项目并通知正在接收项目的线程。
// 在一个类内部
public static void main(String[] args) {
Runnable r = Consumer();
Thread t = Thread(r);
t.start();
// 发送记录
for(int i=0; i<10; i++) {
r.queue.add(DataWrapper(...));
}
}
class Consumer implements Runnable {
BlockingQueue<DataWrapper> queue = BlockingQueue();
@Override
public void run() {
while(true) {
DataWrapper data = queue.take();
// 在这里使用数据
}
}
}
class DataWrapper {
String dataString; // 这些是您的变量,根据需要进行更改
int dataInt;
}
英文:
You can make use of BlockingQueue.
On the side of receiving the items, you can block the thread till a new item has came, while with any other thread you can add the item and notify the thread who is receiving the items.
// Inside a class
public static void main(String[] args) {
Runnable r = Consumer()
Thread t = Thread(r)
t.start()
// Send records
for(int i=0; i<10; i++) {
r.queue.add(DataWrapper(...));
}
}
class Consumer implements Runnable {
BlockingQueue<DataWrapper> queue = BlockingQueue();
@Override
public void run() {
while(true) {
DataWrapper data = queue.take();
// use data here
}
}
}
class DataWrapper {
String dataString; // these are your variables change as you want
int dataInt;
}
专注分享java语言的经验与见解,让所有开发者获益!
评论