SpringBoot+Kafka+ELK 完成海量日志收集(超详细)
yuyutoo 2024-10-11 21:39 8 浏览 0 评论
整体流程大概如下:
服务器准备
在这先列出各服务器节点,方便同学们在下文中对照节点查看相应内容
SpringBoot项目准备
引入log4j2替换SpringBoot默认log,demo项目结构如下:
pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- 排除spring-boot-starter-logging -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
</dependencies>
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
<Properties>
<Property name="LOG_HOME">logs</Property>
<property name="FILE_NAME">collector</property>
<property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
</Properties>
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout pattern="${patternLayout}"/>
</Console>
<RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
<PatternLayout pattern="${patternLayout}" />
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
<PatternLayout pattern="${patternLayout}" />
<Filters>
<ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<!-- 业务相关 异步logger -->
<AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
<AppenderRef ref="appAppender"/>
</AsyncLogger>
<AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
<AppenderRef ref="errorAppender"/>
</AsyncLogger>
<Root level="info">
<Appender-Ref ref="CONSOLE"/>
<Appender-Ref ref="appAppender"/>
<AppenderRef ref="errorAppender"/>
</Root>
</Loggers>
</Configuration>
IndexController
测试Controller,用以打印日志进行调试
@Slf4j
@RestController
public class IndexController {
@RequestMapping(value = "/index")
public String index() {
InputMDC.putMDC();
log.info("我是一条info日志");
log.warn("我是一条warn日志");
log.error("我是一条error日志");
return "idx";
}
@RequestMapping(value = "/err")
public String err() {
InputMDC.putMDC();
try {
int a = 1/0;
} catch (Exception e) {
log.error("算术异常", e);
}
return "err";
}
}
InputMDC
用以获取log中的[%X{hostName}]、[%X{ip}]、[%X{applicationName}]三个字段值
@Component
public class InputMDC implements EnvironmentAware {
private static Environment environment;
@Override
public void setEnvironment(Environment environment) {
InputMDC.environment = environment;
}
public static void putMDC() {
MDC.put("hostName", NetUtil.getLocalHostName());
MDC.put("ip", NetUtil.getLocalIp());
MDC.put("applicationName", environment.getProperty("spring.application.name"));
}
}
NetUtil
public class NetUtil {
public static String normalizeAddress(String address){
String[] blocks = address.split("[:]");
if(blocks.length > 2){
throw new IllegalArgumentException(address + " is invalid");
}
String host = blocks[0];
int port = 80;
if(blocks.length > 1){
port = Integer.valueOf(blocks[1]);
} else {
address += ":"+port; //use default 80
}
String serverAddr = String.format("%s:%d", host, port);
return serverAddr;
}
public static String getLocalAddress(String address){
String[] blocks = address.split("[:]");
if(blocks.length != 2){
throw new IllegalArgumentException(address + " is invalid address");
}
String host = blocks[0];
int port = Integer.valueOf(blocks[1]);
if("0.0.0.0".equals(host)){
return String.format("%s:%d",NetUtil.getLocalIp(), port);
}
return address;
}
private static int matchedIndex(String ip, String[] prefix){
for(int i=0; i<prefix.length; i++){
String p = prefix[i];
if("*".equals(p)){ //*, assumed to be IP
if(ip.startsWith("127.") ||
ip.startsWith("10.") ||
ip.startsWith("172.") ||
ip.startsWith("192.")){
continue;
}
return i;
} else {
if(ip.startsWith(p)){
return i;
}
}
}
return -1;
}
public static String getLocalIp(String ipPreference) {
if(ipPreference == null){
ipPreference = "*>10>172>192>127";
}
String[] prefix = ipPreference.split("[> ]+");
try {
Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
String matchedIp = null;
int matchedIdx = -1;
while (interfaces.hasMoreElements()) {
NetworkInterface ni = interfaces.nextElement();
Enumeration<InetAddress> en = ni.getInetAddresses();
while (en.hasMoreElements()) {
InetAddress addr = en.nextElement();
String ip = addr.getHostAddress();
Matcher matcher = pattern.matcher(ip);
if (matcher.matches()) {
int idx = matchedIndex(ip, prefix);
if(idx == -1) continue;
if(matchedIdx == -1){
matchedIdx = idx;
matchedIp = ip;
} else {
if(matchedIdx>idx){
matchedIdx = idx;
matchedIp = ip;
}
}
}
}
}
if(matchedIp != null) return matchedIp;
return "127.0.0.1";
} catch (Exception e) {
return "127.0.0.1";
}
}
public static String getLocalIp() {
return getLocalIp("*>10>172>192>127");
}
public static String remoteAddress(SocketChannel channel){
SocketAddress addr = channel.socket().getRemoteSocketAddress();
String res = String.format("%s", addr);
return res;
}
public static String localAddress(SocketChannel channel){
SocketAddress addr = channel.socket().getLocalSocketAddress();
String res = String.format("%s", addr);
return addr==null? res: res.substring(1);
}
public static String getPid(){
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName();
int index = name.indexOf("@");
if (index != -1) {
return name.substring(0, index);
}
return null;
}
public static String getLocalHostName() {
try {
return (InetAddress.getLocalHost()).getHostName();
} catch (UnknownHostException uhe) {
String host = uhe.getMessage();
if (host != null) {
int colon = host.indexOf(':');
if (colon > 0) {
return host.substring(0, colon);
}
}
return "UnknownHost";
}
}
}
启动项目,访问/index和/ero接口,可以看到项目中生成了app-collector.log和error-collector.log两个日志文件
我们将Springboot服务部署在192.168.11.31这台机器上。
Kafka安装和启用
kafka下载地址:
http://kafka.apache.org/downloads.html
kafka安装步骤:首先kafka安装需要依赖与zookeeper,所以小伙伴们先准备好zookeeper环境(三个节点即可),然后我们来一起构建kafka broker。
## 解压命令:
tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
## 改名命令:
mv kafka_2.12-2.1.0/ kafka_2.12
## 进入解压后的目录,修改server.properties文件:
vim /usr/local/kafka_2.12/config/server.properties
## 修改配置:
broker.id=0
port=9092
host.name=192.168.11.51
advertised.host.name=192.168.11.51
log.dirs=/usr/local/kafka_2.12/kafka-logs
num.partitions=2
zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
## 建立日志文件夹:
mkdir /usr/local/kafka_2.12/kafka-logs
##启动kafka:
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
创建两个topic
## 创建topic
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1
我们可以查看一下topic情况
kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe
可以看到已经成功启用了app-log-collector和error-log-collector两个topic
filebeat安装和启用
filebeat下载
cd /usr/local/software
tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local
mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0
配置filebeat,可以参考下方yml配置文件
vim /usr/local/filebeat-5.6.2/filebeat.yml
###################### Filebeat Configuration Example #########################
filebeat.prospectors:
- input_type: log
paths:
## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
- /usr/local/logs/app-collector.log
#定义写入 ES 时的 _type 值
document_type: "app-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: app-log-collector ## 按服务划分用作kafka topic
evn: dev
- input_type: log
paths:
- /usr/local/logs/error-collector.log
document_type: "error-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: error-log-collector ## 按服务划分用作kafka topic
evn: dev
output.kafka:
enabled: true
hosts: ["192.168.11.51:9092"]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: true
filebeat启动:
检查配置是否正确
cd /usr/local/filebeat-6.6.0
./filebeat -c filebeat.yml -configtest
## Config OK
启动filebeat
/usr/local/filebeat-6.6.0/filebeat &
检查是否启动成功
ps -ef | grep filebeat
可以看到filebeat已经启动成功
然后我们访问192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已经生成了app-log-collector-0和error-log-collector-0文件,说明filebeat已经帮我们把数据收集好放到了kafka上。
logstash安装
我们在logstash的安装目录下新建一个文件夹
mkdir scrpit
然后cd进该文件,创建一个logstash-script.conf文件
cd scrpit
vim logstash-script.conf
## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
input {
kafka {
## app-log-服务名称
topics_pattern => "app-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads => 1 ## 增加consumer的并行消费线程数
decorate_events => true
#auto_offset_rest => "latest"
group_id => "app-log-group"
}
kafka {
## error-log-服务名称
topics_pattern => "error-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads => 1
decorate_events => true
#auto_offset_rest => "latest"
group_id => "error-log-group"
}
}
filter {
## 时区转换
ruby {
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
if "app-log" in [fields][logtopic]{
grok {
## 表达式,这里对应的是Springboot输出的日志格式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
}
## 测试输出到控制台:
output {
stdout { codec => rubydebug }
}
## elasticsearch:
output {
if "app-log" in [fields][logtopic]{
## es插件
elasticsearch {
# es服务地址
hosts => ["192.168.11.35:9200"]
# 用户名密码
user => "elastic"
password => "123456"
## 索引名,+ 号开头的,就会自动认为后面是时间格式:
## javalog-app-service-2019.01.23
index => "app-log-%{[fields][logbiz]}-%{index_time}"
# 是否嗅探集群ip:一般设置true;http://192.168.11.35:9200/_nodes/http?pretty
# 通过嗅探机制进行es集群负载均衡发日志消息
sniffing => true
# logstash默认自带一个mapping模板,进行模板覆盖
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
elasticsearch {
hosts => ["192.168.11.35:9200"]
user => "elastic"
password => "123456"
index => "error-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
}
启动logstash
/usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &
等待启动成功,我们再次访问192.168.11.31:8001/err
可以看到控制台开始打印日志
ElasticSearch与Kibana
ES和Kibana的搭建之前没写过博客,网上资料也比较多,大家可以自行搜索。
搭建完成后,访问Kibana的管理页面192.168.11.35:5601,选择Management -> Kinaba - Index Patterns
然后Create index pattern
- index pattern 输入 app-log-*
- Time Filter field name 选择 currentDateTime
这样我们就成功创建了索引。
我们再次访问192.168.11.31:8001/err,这个时候就可以看到我们已经命中了一条log信息
里面展示了日志的全量信息
到这里,我们完整的日志收集及可视化就搭建完成了!
相关推荐
- VBA中利用Instr函数(vba int函数)
-
【分享成果,随喜正能量】每一个在你的生命里出现的人,都有原因,喜欢你的人给了你温暖和勇气,你喜欢的人让你学会了爱和自持,你不喜欢的人教会你宽容与尊重,不喜欢你的人让你自省与成长。。...
- Insta360 Link体验:支持4K画质,一款使用场景丰富的AI云台摄像头
-
记者|王公逸伴随直播、线上会议需求的兴起,网络直播的需求愈发增大,8月2日,影石Insta360正式推出全新产品:Insta360Link,这是一款AI智能云台摄像头。从产品形态来说,Insta3...
- VBA技术资料MF299:利用Instr进行文本查找
-
我给VBA的定义:VBA是个人小型自动化处理的有效工具。利用好了,可以大大提高自己的工作效率,而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套,分为初级、中级、高级三大部分,教程是对VB...
- Fabric.js 拖放元素进画布 - 掘金
-
本文简介点赞+关注+收藏=学会了学习Fabric.js,我的建议是看文档不如看demo。本文实现的功能:将元素拖进到画布中并生成对应的图形或图片。效果如下图所示:...
- Vue3为什么推荐使用ref而不是reactive
-
为什么推荐使用ref而不是reactivereactive本身具有很大局限性导致使用过程需要额外注意,如果忽视这些问题将对开发造成不小的麻烦;ref更像是vue2时代optionapi的data的替...
- Fabric.js 样式不更新怎么办?(js更改样式)
-
本文简介带尬猴,我嗨德育处主任不知道你有没有遇到过在使用Fabric.js时无意中一些骚操作修改了元素的样式,但刷新画布却没更新元素样式?如果你也遇到同样的问题的话,可以尝试使用本文的方法。...
- Fabric.js 修改画布交互方式到底有什么用?
-
本文简介点赞+关注+收藏=学会了fabric.js为我们提供了很多厉害的方法。今天要搞明白的一个东西是canvas.interactive。官方文档对canvas.interact...
- Rust Web编程:第五章 在浏览器上显示内容
-
我们现在正处于可以构建一个Web应用程序的阶段,该应用程序可以使用不同的方法和数据管理一系列HTTP请求。这很有用,特别是当我们为微服务构建服务器时。然而,我们也希望非程序员能够与我们的应...
- Fabric.js 自由绘制椭圆 - 掘金(canvas画椭圆)
-
本文简介点赞+关注+收藏=学会了本文讲解在Fabric.js中如何自由绘制椭圆形,如果你还不了解Fabric.js,可以查阅《Fabric.js从入门到精通》。效果如下图所示...
- 手把手教你实现JS手搓"防抖"优化代码——专业的事用专业的方法!
-
前言在我们前端编程中,假如我们要给后端发送请求,万一手抖多点了几次,多发送了几遍怎么办?解决方案:防抖!这种事就要交给我们专业的“防抖”先生来处理!今天,我们就来教大家手搓“防抖”...
- 详解虚拟DOM与Diff算法(虚拟dom一定比实际dom快吗)
-
vue的虚拟DOM,Diff算法,其中一些关键的地方从别处搬运了一些图进行说明(感谢制图的大佬),也包含比较详细的源码解读。...
- 走进 React Fiber 的世界(我走进你的世界手势舞视频)
-
文/阿里淘系F(x)Team-冷卉Fiber设计思想Fiber是对React核心算法的重构,facebook团队使用两年多的时间去重构React的核心算法,在React16以上...
- 前端新一代框架 Svelte 火了!十个场景带你简单认识它!
-
近几年听到的主流框架都是Vue、React、Angular,但其实有一个框架在国外非常火,用起来也是很方便,那就是...
- 借助DeepSeek实现了一个PDF阅读器
-
1、简介使用pdf.js库加载和显示PDF文件。实现了翻页、缩放功能。提供了基本的错误处理。功能特点:支持选择本地PDF文件。可以逐页查看PDF内容。支持放大缩小功能。界面简洁,易于使...
- DeepSeek代码之旅1:卫星地图标记方法之——html语言的实现
-
最近遇到一个任务,具体功能如下:1、调用高德地图API,图层为卫星图层,根据需要标记兴趣点;2、标记完成后可以保存兴趣点,便于下次加载历史兴趣点。...
你 发表评论:
欢迎- 一周热门
-
-
前端面试:iframe 的优缺点? iframe有那些缺点
-
带斜线的表头制作好了,如何填充内容?这几种方法你更喜欢哪个?
-
漫学笔记之PHP.ini常用的配置信息
-
其实模版网站在开发工作中很重要,推荐几个参考站给大家
-
推荐7个模板代码和其他游戏源码下载的网址
-
[干货] JAVA - JVM - 2 内存两分 [干货]+java+-+jvm+-+2+内存两分吗
-
正在学习使用python搭建自动化测试框架?这个系统包你可能会用到
-
织梦(Dedecms)建站教程 织梦建站详细步骤
-
【开源分享】2024PHP在线客服系统源码(搭建教程+终身使用)
-
2024PHP在线客服系统源码+完全开源 带详细搭建教程
-
- 最近发表
- 标签列表
-
- mybatis plus (70)
- scheduledtask (71)
- css滚动条 (60)
- java学生成绩管理系统 (59)
- 结构体数组 (69)
- databasemetadata (64)
- javastatic (68)
- jsp实用教程 (53)
- fontawesome (57)
- widget开发 (57)
- vb net教程 (62)
- hibernate 教程 (63)
- case语句 (57)
- svn连接 (74)
- directoryindex (69)
- session timeout (58)
- textbox换行 (67)
- extension_dir (64)
- linearlayout (58)
- vba高级教程 (75)
- iframe用法 (58)
- sqlparameter (59)
- trim函数 (59)
- flex布局 (63)
- contextloaderlistener (56)