資源簡介
kafka
代碼片段和文件信息
package?com.lin.demo.consumer;??
??
import?java.util.HashMap;??
import?java.util.List;??
import?java.util.Map;??
import?java.util.Properties;??
??
import?kafka.consumer.ConsumerConfig;??
import?kafka.consumer.ConsumerIterator;??
import?kafka.consumer.KafkaStream;??
import?kafka.javaapi.consumer.ConsumerConnector;??
import?kafka.serializer.StringDecoder;??
import?kafka.utils.VerifiableProperties;??
??
import?com.lin.demo.producer.KafkaProducer;??
??
public?class?KafkaConsumer?{??
??
????private?final?ConsumerConnector?consumer;??
??
????private?KafkaConsumer()?{??
????????Properties?props?=?new?Properties();??
????????//?zookeeper?配置??
????????props.put(“zookeeper.connect“?“127.0.0.1:2181“);??
??
????????//?group?代表一個消費組??
????????props.put(“group.id“?“lingroup“);??
??
????????//?zk連接超時??
????????props.put(“zookeeper.session.timeout.ms“?“4000“);??
????????props.put(“zookeeper.sync.time.ms“?“200“);??
????????props.put(“rebalance.max.retries“?“5“);??
????????props.put(“rebalance.backoff.ms“?“1200“);??
??????????
??????
????????props.put(“auto.commit.interval.ms“?“1000“);??
????????props.put(“auto.offset.reset“?“smallest“);??
????????//?序列化類??
????????props.put(“serializer.class“?“kafka.serializer.StringEncoder“);??
??
????????ConsumerConfig?config?=?new?ConsumerConfig(props);??
??
????????consumer?=?kafka.consumer.Consumer.createJavaConsumerConnector(config);??
????}??
??
????void?consume()?{??
????????Map?topicCountMap?=?new?HashMap();??
????????topicCountMap.put(KafkaProducer.TOPIC?new?Integer(1));??
??
????????StringDecoder?keyDecoder?=?new?StringDecoder(new?VerifiableProperties());??
????????StringDecoder?valueDecoder?=?new?StringDecoder(new?VerifiableProperties());??
??
????????Map>>?consumerMap?=?consumer.createMessageStreams(topicCountMap?keyDecoder?valueDecoder);??
????????KafkaStream?stream?=?consumerMap.get(KafkaProducer.TOPIC).get(0);??
????????ConsumerIterator?it?=?stream.iterator();??
????????while?(it.hasNext())?{
???????? System.out.println(“<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<“?+?it.next().message()?+?“<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<“);??
????????/* KafkaProducer?kafkaProducer?=?new?KafkaProducer();
???????? kafkaProducer.produce();*/
????????}
????}??
??
????public?static?void?main(String[]?args)?{??
????????new?KafkaConsumer().consume();??
????}??
}??
?屬性????????????大小?????日期????時間???名稱
-----------?---------??----------?-----??----
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\
?????文件?????????596??2017-06-19?14:15??Kafka-Demo\.classpath
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\.myeclipse\
?????文件?????????304??2017-06-19?14:15??Kafka-Demo\.myme
?????文件????????1537??2017-06-19?14:15??Kafka-Demo\.project
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\.settings\
?????文件?????????500??2017-06-19?14:15??Kafka-Demo\.settings\.jsdtscope
?????文件?????????364??2017-06-19?14:15??Kafka-Demo\.settings\org.eclipse.jdt.core.prefs
?????文件?????????568??2017-06-19?14:15??Kafka-Demo\.settings\org.eclipse.wst.common.component
?????文件?????????252??2017-06-19?14:15??Kafka-Demo\.settings\org.eclipse.wst.common.project.facet.core.xm
?????文件??????????49??2017-06-19?14:15??Kafka-Demo\.settings\org.eclipse.wst.jsdt.ui.superType.container
?????文件???????????6??2017-06-19?14:15??Kafka-Demo\.settings\org.eclipse.wst.jsdt.ui.superType.name
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\src\
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\src\main\
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\src\main\java\
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\src\main\java\com\
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\src\main\java\com\lin\
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\src\main\java\com\lin\demo\
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\src\main\java\com\lin\demo\consumer\
?????文件????????2474??2017-06-19?14:15??Kafka-Demo\src\main\java\com\lin\demo\consumer\KafkaConsumer.java
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\src\main\java\com\lin\demo\producer\
?????文件????????1521??2017-06-19?14:15??Kafka-Demo\src\main\java\com\lin\demo\producer\KafkaProducer.java
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\src\main\resources\
?????文件?????????292??2017-06-19?14:15??Kafka-Demo\src\main\resources\log4j.properties
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\WebRoot\
?????文件?????????834??2017-06-19?14:15??Kafka-Demo\WebRoot\index.jsp
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\WebRoot\me
?????文件??????????36??2017-06-19?14:15??Kafka-Demo\WebRoot\me
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\WebRoot\WEB-INF\
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\WebRoot\WEB-INF\classes\
?????目錄???????????0??2017-06-19?14:15??Kafka-Demo\WebRoot\WEB-INF\classes\com\
............此處省略70個文件信息
評論
共有 條評論