Java多线程生产者消费者

huangapple 未分类评论47阅读模式
英文:

Java Multi-threading producer-consumer

问题

在此代码之后,如何实现在发送记录和获取记录之间的多线程操作。

public class Test_ProducerConsumer {

    // 主方法
    public static void main(String[] args) {

        // 创建发送记录线程
        Thread sendThread = new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i = 0; i < 10; i++) {
                    kafka_io.send_records(sendtopic, "test_key", "test_value" + String.valueOf(i));
                }
            }
        });

        // 创建获取记录线程
        Thread getThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    kafka_io.get_records(gettopic, 100000);
                } catch (JSONException e) {
                    e.printStackTrace();
                }
            }
        });

        // 启动线程
        sendThread.start();
        getThread.start();

        // 等待线程结束
        try {
            sendThread.join();
            getThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

请注意,为了实现发送和获取记录之间的多线程操作,我们创建了两个线程,一个用于发送记录,另一个用于获取记录。通过调用start()方法来启动这些线程,并通过join()方法等待它们执行完毕。这样可以在两个操作之间实现并行处理。

英文:

Following this code, how to implement multithread between send records and get records.

public class Test_ProducerConsumer {  
		
	// Main method
	public static void main(String[] args) {

		// Send records
		for(int i=0; i&lt;10; i++) {
			kafka_io.send_records(sendtopic, &quot;test_key&quot;, &quot;test_value&quot; + String.valueOf(i));			
		}
		
		// Get records
		try {
			kafka_io.get_records(gettopic, 100000);
		} catch (JSONException e) {
			e.printStackTrace();
		} 
	}
}

答案1

得分: 0

你可以使用 BlockingQueue

在接收项目的一侧,您可以阻塞线程,直到有新项目到达,同时使用其他任何线程可以添加项目并通知正在接收项目的线程。

// 在一个类内部
public static void main(String[] args) {
    Runnable r = Consumer();
    Thread t = Thread(r);
    t.start();

    // 发送记录
    for(int i=0; i&lt;10; i++) {
        r.queue.add(DataWrapper(...));         
    }
}

class Consumer implements Runnable {
    BlockingQueue&lt;DataWrapper&gt; queue = BlockingQueue();
    @Override
    public void run() {
        while(true) {
            DataWrapper data = queue.take();
            // 在这里使用数据
        }
    }
}

class DataWrapper {
    String dataString;   // 这些是您的变量,根据需要进行更改
    int dataInt;
}
英文:

You can make use of BlockingQueue.

On the side of receiving the items, you can block the thread till a new item has came, while with any other thread you can add the item and notify the thread who is receiving the items.

// Inside a class
public static void main(String[] args) {
    Runnable r = Consumer()
    Thread t = Thread(r)
    t.start()

    // Send records
    for(int i=0; i&lt;10; i++) {
        r.queue.add(DataWrapper(...));         
    }
}

class Consumer implements Runnable {
    BlockingQueue&lt;DataWrapper&gt; queue = BlockingQueue();
    @Override
    public void run() {
        while(true) {
            DataWrapper data = queue.take();
            // use data here
        }
    }
}

class DataWrapper {
    String dataString;   // these are your variables change as you want
    int dataInt;
}

huangapple
  • 本文由 发表于 2020年5月5日 17:38:41
  • 转载请务必保留本文链接:https://java.coder-hub.com/61610068.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定