資源簡介
kafka生產者和消費者實例,了解Kafka的一個簡單入門實例源碼下載

代碼片段和文件信息
import?java.util.HashMap;
import?java.util.List;
import?java.util.Map;
import?java.util.Properties;
import?kafka.consumer.Consumer;
import?kafka.consumer.ConsumerConfig;
import?kafka.consumer.ConsumerIterator;
import?kafka.consumer.KafkaStream;
import?kafka.javaapi.consumer.ConsumerConnector;
/**
?*?接收數據
?*?接收到:?message:?10
接收到:?message:?11
接收到:?message:?12
接收到:?message:?13
接收到:?message:?14
?*?@author?zm
?*
?*/
public?class?kafkaConsumer?extends?Thread{
private?String?topic;
public?kafkaConsumer(String?topic){
super();
this.topic?=?topic;
}
@Override
public?void?run()?{
ConsumerConnector?consumer?=?createConsumer();
Map?topicCountMap?=?new?HashMap();
topicCountMap.put(topic?1);?//?一次從主題中獲取一個數據
?Map>>??messageStreams?=?consumer.createMessageStreams(topicCountMap);
?KafkaStream?stream?=?messageStreams.get(topic).get(0);//?獲取每次接收到的這個數據
?ConsumerIterator?iterator?=??stream.iterator();
?while(iterator.hasNext()){
?String?message?=?new?String(iterator.next().message());
?System.out.println(“接收到:?“?+?message);
?}
}
private?ConsumerConnector?createConsumer()?{
Properties?properties?=?new?Properties();
properties.put(“zookeeper.connect“?“192.168.1.110:2181192.168.1.111:2181192.168.1.112:2181“);//聲明zk
properties.put(“group.id“?“group1“);
return?Consumer.createJavaConsumerConnector(new?ConsumerConfig(properties));
?}
public?static?void?main(String[]?args)?{
new?kafkaConsumer(“test“).start();//?使用kafka集群中創建好的主題?test?
}
?
}
?屬性????????????大小?????日期????時間???名稱
-----------?---------??----------?-----??----
?????文件????????512??2015-03-09?08:45??kafka\.classpath
?????文件????????559??2015-03-09?08:45??kafka\.project
?????文件????????274??2015-03-09?08:45??kafka\.settings\org.eclipse.jdt.core.prefs
?????文件????????249??2015-03-09?08:45??kafka\.settings\org.maven.ide.eclipse.prefs
?????文件????????882??2015-03-09?08:46??kafka\pom.xm
?????文件????????174??2015-03-09?08:45??kafka\src\main\java\bj\zm\kafka\App.java
?????文件???????1735??2015-03-09?09:23??kafka\src\main\java\kafkaConsumer.java
?????文件???????1607??2015-03-09?09:22??kafka\src\main\java\kafkaProducer.java
?????文件????????639??2015-03-09?08:45??kafka\src\test\java\bj\zm\kafka\AppTest.java
?????文件????????537??2015-03-09?08:46??kafka\target\classes\bj\zm\kafka\App.class
?????文件???????2914??2015-03-09?09:23??kafka\target\classes\kafkaConsumer.class
?????文件???????2193??2015-03-09?09:22??kafka\target\classes\kafkaProducer.class
?????文件????????609??2015-03-09?08:46??kafka\target\test-classes\bj\zm\kafka\AppTest.class
?????目錄??????????0??2015-03-09?08:45??kafka\src\main\java\bj\zm\kafka
?????目錄??????????0??2015-03-09?08:45??kafka\src\test\java\bj\zm\kafka
?????目錄??????????0??2015-03-09?08:45??kafka\src\main\java\bj\zm
?????目錄??????????0??2015-03-09?08:45??kafka\src\test\java\bj\zm
?????目錄??????????0??2015-03-09?08:46??kafka\target\classes\bj\zm\kafka
?????目錄??????????0??2015-03-09?08:46??kafka\target\test-classes\bj\zm\kafka
?????目錄??????????0??2015-03-09?08:45??kafka\src\main\java\bj
?????目錄??????????0??2015-03-09?08:45??kafka\src\test\java\bj
?????目錄??????????0??2015-03-09?08:46??kafka\target\classes\bj\zm
?????目錄??????????0??2015-03-09?08:46??kafka\target\test-classes\bj\zm
?????目錄??????????0??2015-03-09?09:07??kafka\src\main\java
?????目錄??????????0??2015-03-09?08:45??kafka\src\test\java
?????目錄??????????0??2015-03-09?08:46??kafka\target\classes\bj
?????目錄??????????0??2015-03-09?08:46??kafka\target\test-classes\bj
?????目錄??????????0??2015-03-09?08:45??kafka\src\main
?????目錄??????????0??2015-03-09?08:45??kafka\src\test
?????目錄??????????0??2015-03-09?09:07??kafka\target\classes
............此處省略8個文件信息
評論
共有 條評論