資源簡介
使用slf4j配置kafkaAppender寫入日志到kafka列隊(duì)
支持日志解析+過濾等擴(kuò)展接口

代碼片段和文件信息
package?cn.rmt.logback.kafka;
import?java.util.Iterator;
import?java.util.concurrent.ExecutorService;
import?java.util.concurrent.Executors;
import?ch.qos.logback.core.Appender;
import?ch.qos.logback.core.spi.AppenderAttachableImpl;
import?cn.rmt.logback.kafka.encoder.IKafkaEncoder;
import?cn.rmt.logback.kafka.producer.KafkaProducerFactory;
public?class?KafkaAppender?extends?KafkaAppenderConfig{
private?static?ExecutorService?exec?=?Executors.newFixedThreadPool(10);
private?final?AppenderAttachableImpl?aai?=?new?AppenderAttachableImpl();
protected?IKafkaEncoder?encoder?=?null;
//完整的一條logs數(shù)據(jù)??es的field名稱
public?static?String?MSG_FULL_DATA_KEY?=?“DATA“;?
public?KafkaAppender(){
}
public?void?setEncoder(IKafkaEncoder?layout)?{
???????this.encoder?=?layout;
}
@Override
public?void?start()?{
//加載初始化參數(shù)
if(?!checkPrerequisites()?){
addError(“kafka?appender?初始化參數(shù)加載失敗...“);
}
//初始化?produce
new?KafkaProducerFactory(producerConf).start();
super.start();
}
@Override
public?void?addAppender(Appender?newAppender)?{
aai.addAppender(newAppender);
}
@Override
public?Iterator>?iteratorForAppenders()?{
return?aai.iteratorForAppenders();
}
@Override
public?Appender?getAppender(String?name)?{
return?aai.getAppender(name);
}
@Override
public?boolean?isAttached(Appender?appender)?{
return?aai.isAttached(appender);
}
@Override
public?void?detachAndStopAllAppenders()?{
aai.detachAndStopAllAppenders();
}
@Override
public?boolean?detachAppender(Appender?appender)?{
return?aai.detachAppender(appender);
}
@Override
public?boolean?detachAppender(String?name)?{
return?aai.detachAppender(name);
}
@Override
protected?void?append(E?e)?{
final?String?payload?=?encoder.doEncode(e);
//發(fā)送消息到kafka
exec.execute(?new?Runnable()?{
@Override
public?void?run()?{
KafkaProducerFactory.getKafkaTemplate().sendDefault(payload);
}
});
}
}
?屬性????????????大小?????日期????時間???名稱
-----------?---------??----------?-----??----
?????目錄???????????0??2017-03-02?10:30??kafka\
?????目錄???????????0??2017-03-02?10:30??kafka\common\
?????文件????????1270??2017-02-22?11:57??kafka\common\AnalysisPattern.java
?????文件????????2562??2017-02-22?11:57??kafka\common\FieldAnalysis.java
?????目錄???????????0??2017-03-02?10:30??kafka\encoder\
?????文件?????????116??2017-02-22?11:57??kafka\encoder\IKafkaEncoder.java
?????文件????????1082??2017-02-22?11:57??kafka\encoder\KafkaEncoder.java
?????文件?????????526??2017-02-22?11:57??kafka\encoder\KafkaEncoderba
?????文件????????2127??2017-02-22?11:57??kafka\KafkaAppender.java
?????文件????????3739??2017-02-22?11:57??kafka\KafkaAppenderConfig.java
?????目錄???????????0??2017-03-02?10:30??kafka\layout\
?????文件?????????544??2017-02-22?11:57??kafka\layout\IpConvert.java
?????文件????????8209??2017-02-22?11:57??kafka\layout\PatternLayout.java
?????文件????????5271??2017-02-22?11:57??kafka\layout\PatternLayoutba
?????文件????????2169??2017-03-02?10:30??kafka\logback-default.xm
?????目錄???????????0??2017-03-02?10:30??kafka\producer\
?????文件????????1212??2017-02-22?11:57??kafka\producer\KafkaProducerFactory.java
?????文件????????1250??2017-02-22?11:57??kafka\producer\KafkaProducerListener.java
評論
共有 條評論