当前位置: 代码迷 >> 综合 >> SpringBoot+Logback+ELK+Kafka+Sleuth链路追踪做日志收集
  详细解决方案

SpringBoot+Logback+ELK+Kafka+Sleuth链路追踪做日志收集

热度:93   发布时间:2023-11-20 01:08:36.0

ELK搭建

https://blog.csdn.net/qq_37598011/article/details/103260806

配置修改

先到logstash的config目录下

cd /application/logstash/config/

 新建logstash-test.conf配置

vim logstash-test.conf
input {kafka{id => "my_plugin_id"bootstrap_servers=>"192.168.78.133:9092"topics=>["logger-platform","logger-user","logger-gateway"]#  codec=>plaincodec=>json_linesauto_offset_reset => "latest" }#stdin { }
}
#filter {
#    grok {
#      patterns_dir => ["./patterns"]
#        match => { "message" => "%{WORD:module} \| %{LOGBACKTIME:timestamp} \| %{LOGLEVEL:level} \| %{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage}" }
#    }   
#}
output {elasticsearch {#action => "index"hosts => "127.0.0.1:9200"index  => "logger-%{+YYYY-MM}"}#stdout { codec=> rubydebug }
}

重启

nohub ../bin/logstash -f ./logstash-test.conf >/dev/null 2>&1 &

SpringBoot修改

新增pom依赖

        <!--kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version></dependency><!--logback-kafka-appender依赖--><dependency><groupId>com.github.danielwegener</groupId><artifactId>logback-kafka-appender</artifactId><version>0.2.0-RC1</version><scope>runtime</scope></dependency><!--导入ELK为json--><!-- logback--><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version><scope>runtime</scope></dependency><dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>5.1</version></dependency><!--链路追踪--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-sleuth</artifactId></dependency>

logback文件修改

    <include resource="org/springframework/boot/logging/logback/defaults.xml"/><!-- 该节点会读取Environment中配置的值,在这里我们读取application.yml中的值 --><springProperty scope="context" name="bootstrapServers" source="spring.kafka.bootstrap-servers"defaultValue="localhost:9092"/><springProperty scope="context" name="server-name" source="spring.application.name"/><springProperty scope="context" name="ip" source="spring.cloud.client.ipAddress" /><!-- kafka的appender配置 --><appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender"><encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"><providers><timestamp><timeZone>UTC</timeZone></timestamp><pattern><pattern>{"timestamp": "%d{yyyy-MM-dd'T'HH:mm:ss.SSSZ}","traceId": "%X{X-B3-TraceId:-}","parentSpanId" : "%X{X-B3-ParentSpanId:-}","spanId": "%X{X-B3-SpanId:-}","exportable": "%X{X-Span-Export:-}","pid": "${PID:-}","logger": "%logger","level": "%p","message": "%msg","host": "%property","serverName": "${server-name}","stackTrace": "%ex"}</pattern></pattern></providers></encoder><topic>logger-platform</topic><!-- we don't care how the log messages will be partitioned  --><keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" /><!-- use async delivery. the application threads are not blocked by logging --><deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /><!-- each <producerConfig> translates to regular kafka-client config (format: key=value) --><!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs --><!-- bootstrap.servers is the only mandatory producerConfig --><producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig><!-- don't wait for a broker to ack the reception of a batch.  --><producerConfig>acks=0</producerConfig><!-- wait up to 1000ms and collect log messages before sending them as a batch --><producerConfig>linger.ms=1000</producerConfig><!-- even if the producer buffer runs full, do not block the application but start to drop messages --><producerConfig>max.block.ms=0</producerConfig><!-- define a client-id that you use to identify yourself against the kafka broker --><producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig></appender><!-- 日志输出级别 --><root level="INFO"><appender-ref ref="kafkaAppender"/></root>

配置application.yml

spring:kafka:bootstrap-servers: 192.168.78.133:9092

 Slf4jUtils类


import org.slf4j.LoggerFactory;
import org.slf4j.spi.LocationAwareLogger;
import org.springframework.stereotype.Component;
import sun.misc.JavaLangAccess;
import sun.misc.SharedSecrets;import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.MessageFormat;/*** @Classname Slf4jUtils* @Description TODO* @Date 2019/11/29 5:58* @Created zzf*/
@Component
public class Slf4jUtils {/*** 空数组*/private static final Object[] EMPTY_ARRAY = new Object[]{};/*** 全类名*/private static final String FQCN = Slf4jUtils.class.getName();/*** 获取栈中类信息** @param stackDepth* @return*/private LocationAwareLogger getLocationAwareLogger(final int stackDepth) {/**通过堆栈信息获取调用当前方法的类名和方法名*/JavaLangAccess access = SharedSecrets.getJavaLangAccess();Throwable throwable = new Throwable();StackTraceElement frame = access.getStackTraceElement(throwable, stackDepth);return (LocationAwareLogger) LoggerFactory.getLogger(frame.getClassName() + "-" +frame.getMethodName().split("\\$")[0] + "-" +frame.getLineNumber());}/*** 封装Debug级别日志** @param msg* @param arguments*/public void debug(String msg, Object... arguments) {if (arguments != null && arguments.length > 0) {MessageFormat temp = new MessageFormat(msg);msg = temp.format(arguments);}getLocationAwareLogger(2).log(null, FQCN, LocationAwareLogger.DEBUG_INT, msg, EMPTY_ARRAY, null);}/*** 封装Info级别日志** @param msg* @param arguments*/public void info(String msg, Object... arguments) {if (arguments != null && arguments.length > 0) {MessageFormat temp = new MessageFormat(msg);msg = temp.format(arguments);}getLocationAwareLogger(2).log(null, FQCN, LocationAwareLogger.INFO_INT, msg, EMPTY_ARRAY, null);}/*** 封装Warn级别日志** @param msg* @param arguments*/public void warn(String msg, Object... arguments) {if (arguments != null && arguments.length > 0) {MessageFormat temp = new MessageFormat(msg);msg = temp.format(arguments);}getLocationAwareLogger(2).log(null, FQCN, LocationAwareLogger.WARN_INT, msg, EMPTY_ARRAY, null);}/*** 封装Error级别日志** @param msg* @param arguments*/public void error(String msg, Object... arguments) {if (arguments != null && arguments.length > 0) {MessageFormat temp = new MessageFormat(msg);msg = temp.format(arguments);}getLocationAwareLogger(2).log(null, FQCN, LocationAwareLogger.ERROR_INT, msg, EMPTY_ARRAY, null);}/*** 异常堆栈转字符串** @param e* @return*/public String ExceptionToString(Exception e) {StringWriter sw = null;PrintWriter pw = null;try {if (e == null) {return "无具体异常信息";}sw = new StringWriter();pw = new PrintWriter(sw);e.printStackTrace(pw);return sw.toString();} catch (Exception ex) {this.error("异常堆栈转字符串异常", ex);return "";} finally {sw.flush();pw.flush();pw.close();}}
}

使用如下 

跟log使用方式相同!

启动Boot程序

配置Kibana

配置Kibana创建索引logger

 

链路追踪! 

 

OK!可以看到日志已经输出Kafka了,并且有链路追踪功能!!!!!

通过AOP切面做日志收集

例子:(通过AOP切面做日志收集)

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ils.intelab.common.slf4j.Slf4jUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.ArrayList;
import java.util.List;/*** @Classname LogAspect* @Description TODO* @Date 2019/12/2 15:19* @Created zzf*/
@Aspect
@Component
public class LogAspect {@ResourceSlf4jUtils log;/*** 拦截所有控制器方法*/@Pointcut("execution(public * com.CC.CCC.CCCCC.controller..*.*(..))")public void webLog() {}@Around("webLog()")public Object aroundMethod(ProceedingJoinPoint joinPoint) throws Exception {/*result为连接点的放回结果*/Object result = null;/*result为连接点的放回结果*/ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = attributes.getRequest();// 记录下请求内容//如果是表单,参数值是普通键值对。如果是application/json否则request.getParameter是取不到的。String ip = request.getRemoteAddr();String method = joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature().getName();JSONObject jsonObject = new JSONObject();List<Object> argList = new ArrayList<>();if ("application/json".equals(request.getHeader("Content-Type"))) {for (Object arg : joinPoint.getArgs()) {// request/response无法使用toJSONif (arg instanceof HttpServletRequest) {argList.add("request");} else if (arg instanceof HttpServletResponse) {argList.add("response");} else {argList.add(JSONObject.toJSON(arg));}}} else {//记录请求的键值对for (String key : request.getParameterMap().keySet()) {JSONObject js = new JSONObject();js.put(key, request.getParameter(key));argList.add(js);}}MDC.put("url", request.getRequestURL().toString());MDC.put("ip", ip);MDC.put("method", method);MDC.put("arg", JSON.toJSON(argList).toString());/*执行目标方法*/try {/*返回通知方法*/result = joinPoint.proceed();} catch (Throwable e) {/*异常通知方法*/MDC.put("throwable", e.getMessage());StackTraceElement[] sta = e.getStackTrace();StringBuffer str = new StringBuffer();for (int i = 0; i < 15 && i < sta.length; i++) {str.append(sta[i] + "\n");}log.error(str.toString());//抛出异常throw (Exception) e.fillInStackTrace();}/*后置通知*/jsonObject.put("message", result);log.info(jsonObject.toJSONString());return result;}}

这里我通过MDC扩展了logback输出,所以需要在logback.xml中新增这几个,如:

               "ip": "%X{ip}","url": "%X{url}","method": "%X{method}","arg": "%X{arg}","throwable": "%X{throwable}"

%X{定义的字段}