乌鲁木齐网站建设优化绍兴seo
文章目录
- 自定义KafkaAppender
- 修改log4j.properties配置
- 启动命令配置添加参数
- 启动之后可以在Kafka中查询发送数据
- 时区问题-自定义实现JSONLayout解决
- 自定义JSONLayout.java
- 一键应用
- 可能遇到的异常
- ClassNotFoundException: xxx.KafkaLog4jAppender
- Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
- 参考文章
自定义KafkaAppender
注意:如果使用官方自带的可以直接引用,版本为Kafka当前使用的版本
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-log4j-appender</artifactId><version>2.4.1</version></dependency>
配置依赖为
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.28</version><scope>compile</scope></dependency>
自定义KafkaLog4jAppender.java
内容为
这里我们实现了包名过滤功能
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaLog4jAppender extends AppenderSkeleton {/** 包含规则条件 */private Set<String> includeSet = new HashSet<>();private Set<String> includeMatchSet = new HashSet<>();/** 不包含规则条件 */private Set<String> excludeSet = new HashSet<>();private Set<String> excludeMatchSet = new HashSet<>();private String brokerList;private String topic;private String compressionType;private String securityProtocol;private String sslTruststoreLocation;private String sslTruststorePassword;private String sslKeystoreType;private String sslKeystoreLocation;private String sslKeystorePassword;private String saslKerberosServiceName;private String saslMechanism;private String clientJaasConfPath;private String clientJaasConf;private String kerb5ConfPath;private Integer maxBlockMs;private int retries = 2147483647;private int requiredNumAcks = 1;private int deliveryTimeoutMs = 120000;private boolean ignoreExceptions = true;private boolean syncSend;private Producer<byte[], byte[]> producer;private String includes;private String excludes;public String getIncludes() {return includes;}public void setIncludes(String includes) {this.includes = includes;}public String getExcludes() {return excludes;}public void setExcludes(String excludes) {this.excludes = excludes;}public KafkaLog4jAppender() {}public Producer<byte[], byte[]> getProducer() {return this.producer;}public String getBrokerList() {return this.brokerList;}public void setBrokerList(String brokerList) {this.brokerList = brokerList;}public int getRequiredNumAcks() {return this.requiredNumAcks;}public void setRequiredNumAcks(int requiredNumAcks) {this.requiredNumAcks = requiredNumAcks;}public int getRetries() {return this.retries;}public void setRetries(int retries) {this.retries = retries;}public int getDeliveryTimeoutMs() {return this.deliveryTimeoutMs;}public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {this.deliveryTimeoutMs = deliveryTimeoutMs;}public String getCompressionType() {return this.compressionType;}public void setCompressionType(String compressionType) {this.compressionType = compressionType;}public String getTopic() {return this.topic;}public void setTopic(String topic) {this.topic = topic;}public boolean getIgnoreExceptions() {return this.ignoreExceptions;}public void setIgnoreExceptions(boolean ignoreExceptions) {this.ignoreExceptions = ignoreExceptions;}public boolean getSyncSend() {return this.syncSend;}public void setSyncSend(boolean syncSend) {this.syncSend = syncSend;}public String getSslTruststorePassword() {return this.sslTruststorePassword;}public String getSslTruststoreLocation() {return this.sslTruststoreLocation;}public String getSecurityProtocol() {return this.securityProtocol;}public void setSecurityProtocol(String securityProtocol) {this.securityProtocol = securityProtocol;}public void setSslTruststoreLocation(String sslTruststoreLocation) {this.sslTruststoreLocation = sslTruststoreLocation;}public void setSslTruststorePassword(String sslTruststorePassword) {this.sslTruststorePassword = sslTruststorePassword;}public void setSslKeystorePassword(String sslKeystorePassword) {this.sslKeystorePassword = sslKeystorePassword;}public void setSslKeystoreType(String sslKeystoreType) {this.sslKeystoreType = sslKeystoreType;}public void setSslKeystoreLocation(String sslKeystoreLocation) {this.sslKeystoreLocation = sslKeystoreLocation;}public void setSaslKerberosServiceName(String saslKerberosServiceName) {this.saslKerberosServiceName = saslKerberosServiceName;}public void setClientJaasConfPath(String clientJaasConfPath) {this.clientJaasConfPath = clientJaasConfPath;}public void setKerb5ConfPath(String kerb5ConfPath) {this.kerb5ConfPath = kerb5ConfPath;}public String getSslKeystoreLocation() {return this.sslKeystoreLocation;}public String getSslKeystoreType() {return this.sslKeystoreType;}public String getSslKeystorePassword() {return this.sslKeystorePassword;}public String getSaslKerberosServiceName() {return this.saslKerberosServiceName;}public String getClientJaasConfPath() {return this.clientJaasConfPath;}public void setSaslMechanism(String saslMechanism) {this.saslMechanism = saslMechanism;}public String getSaslMechanism() {return this.saslMechanism;}public void setClientJaasConf(String clientJaasConf) {this.clientJaasConf = clientJaasConf;}public String getClientJaasConf() {return this.clientJaasConf;}public String getKerb5ConfPath() {return this.kerb5ConfPath;}public int getMaxBlockMs() {return this.maxBlockMs;}public void setMaxBlockMs(int maxBlockMs) {this.maxBlockMs = maxBlockMs;}@Overridepublic void activateOptions() {// 加载过滤规则setFilterRules(includes, includeMatchSet, includeSet);setFilterRules(excludes, excludeMatchSet, excludeSet);Properties props = new Properties();if (this.brokerList != null) {props.put("bootstrap.servers", this.brokerList);}if (props.isEmpty()) {throw new ConfigException("The bootstrap servers property should be specified");} else if (this.topic == null) {throw new ConfigException("Topic must be specified by the Kafka log4j appender");} else {if (this.compressionType != null) {props.put("compression.type", this.compressionType);}props.put("acks", Integer.toString(this.requiredNumAcks));props.put("retries", this.retries);props.put("delivery.timeout.ms", this.deliveryTimeoutMs);if (this.securityProtocol != null) {props.put("security.protocol", this.securityProtocol);}if (this.securityProtocol != null&& this.securityProtocol.contains("SSL")&& this.sslTruststoreLocation != null&& this.sslTruststorePassword != null) {props.put("ssl.truststore.location", this.sslTruststoreLocation);props.put("ssl.truststore.password", this.sslTruststorePassword);if (this.sslKeystoreType != null&& this.sslKeystoreLocation != null&& this.sslKeystorePassword != null) {props.put("ssl.keystore.type", this.sslKeystoreType);props.put("ssl.keystore.location", this.sslKeystoreLocation);props.put("ssl.keystore.password", this.sslKeystorePassword);}}if (this.securityProtocol != null&& this.securityProtocol.contains("SASL")&& this.saslKerberosServiceName != null&& this.clientJaasConfPath != null) {props.put("sasl.kerberos.service.name", this.saslKerberosServiceName);System.setProperty("java.security.auth.login.config", this.clientJaasConfPath);}if (this.kerb5ConfPath != null) {System.setProperty("java.security.krb5.conf", this.kerb5ConfPath);}if (this.saslMechanism != null) {props.put("sasl.mechanism", this.saslMechanism);}if (this.clientJaasConf != null) {props.put("sasl.jaas.config", this.clientJaasConf);}if (this.maxBlockMs != null) {props.put("max.block.ms", this.maxBlockMs);}props.put("key.serializer", ByteArraySerializer.class.getName());props.put("value.serializer", ByteArraySerializer.class.getName());System.out.println("Properties:" + props);this.producer = this.getKafkaProducer(props);LogLog.debug("Kafka producer connected to " + this.brokerList);LogLog.debug("Logging for topic: " + this.topic);}}/*** 设置过滤规则** @name setFilterRules* @date 2023/3/2 下午1:57* @return void* @param excludes* @param excludeMatchSet* @param excludeSet* @author Jast*/private void setFilterRules(String excludes, Set<String> excludeMatchSet, Set<String> excludeSet) {if (excludes != null) {for (String exclude : excludes.split(",")) {if (exclude.length() > 0) {if (exclude.endsWith(".*")) {excludeMatchSet.add(exclude.replace(".*", ""));} else {excludeSet.add(exclude);}}}}}protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {return new KafkaProducer(props);}@Overrideprotected void append(LoggingEvent event) {if (filterPackageName(event)) {return;}String message = this.subAppend(event);LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);Future<RecordMetadata> response =this.producer.send(new ProducerRecord(this.topic, message.getBytes(StandardCharsets.UTF_8)));if (this.syncSend) {try {response.get();} catch (ExecutionException | InterruptedException var5) {if (!this.ignoreExceptions) {throw new RuntimeException(var5);}LogLog.debug("Exception while getting response", var5);}}}private String subAppend(LoggingEvent event) {return this.layout == null ? event.getRenderedMessage() : this.layout.format(event);}@Overridepublic void close() {if (!this.closed) {this.closed = true;this.producer.close();}}@Overridepublic boolean requiresLayout() {return true;}/*** 过滤包名,如果为True则不发送到Kafka** @name filterPackageName* @date 2023/2/28 下午4:07* @return boolean* @param event* @author Jast*/private boolean filterPackageName(LoggingEvent event) {boolean flag = true;if (includeSet.size() == 0&& includeMatchSet.size() == 0&& excludeSet.size() == 0&& excludeMatchSet.size() == 0) {return false;}if (includeSet.size() == 0 && includeMatchSet.size() == 0) {flag = false;}/** 打印日志类/名称 */String loggerName = event.getLoggerName();for (String include : includeSet) {if (loggerName.equals(include)) {flag = false;}}for (String include : includeMatchSet) {if (loggerName.startsWith(include)) {flag = false;}}for (String exclude : excludeMatchSet) {if (loggerName.startsWith(exclude)) {flag = true;}}for (String exclude : excludeSet) {if (loggerName.equals(exclude)) {flag = true;}}return flag;}
}
修改log4j.properties配置
修改Spark自身的配置文件,配置文件位置:/opt/spark-client/conf
不同的集群可能配置文件所在目录不同
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.avris.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=app_name:xxx
启动命令配置添加参数
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
说明:
- kafka-appender-1.0.0.jar 为我们刚刚自定义的
KafkaLog4jAppender
类打成的jar包 - slf4j-api-1.8.0-beta2.jar与slf4j-log4j12-1.8.0-beta2.jar版本是为了解决日志版本引发的NullpointException异常,在下面异常处理章节有描述
- slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar这两个jar包也要放在lib目录下通过–jars命令提交
启动之后可以在Kafka中查询发送数据
{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 14:38:35 - 本轮任务计算完成,休眠 10000","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02T06:38:35.858Z","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.util.Print","class":"com.avris.util.Print$"}
这里有个问题
net.logstash.log4j.JSONEventLayoutV1
实现的方法,时区是错误的,我们需要修改时区,下面我们介绍自定义实现Layout
时区问题-自定义实现JSONLayout解决
JSONLayout比较影响性能,建议数据量大的情况下不要使用,自行记录相关信息,然后打印日志
自定义JSONLayout.java
在我们刚刚自定义KafkaAppender项目中创建JSONLayout.java
类,内容如下
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LocationInfo;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;public class JSONLayout extends Layout {private boolean locationInfo = false;private String customUserFields;private boolean ignoreThrowable = false;private boolean activeIgnoreThrowable = ignoreThrowable;private String hostname = InetAddress.getLocalHost().getHostName();private String threadName;private long timestamp;private String ndc;private Map mdc;private LocationInfo info;private HashMap<String, Object> exceptionInformation;private static Integer version = 1;private JSONObject logstashEvent;public static final TimeZone GMT_8 = TimeZone.getTimeZone("GMT+8");public static final FastDateFormat ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", GMT_8);public static final String ADDITIONAL_DATA_PROPERTY = "com.avris.JSONLayout.UserFields";public static String dateFormat(long timestamp) {return ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS.format(timestamp);}/*** For backwards compatibility, the default is to generate location information in the log* messages.*/public JSONLayout() throws UnknownHostException {this(true);}/*** Creates a layout that optionally inserts location information into log messages.** @param locationInfo whether or not to include location information in the log messages.*/public JSONLayout(boolean locationInfo) throws UnknownHostException {this.locationInfo = locationInfo;}@Overridepublic String format(LoggingEvent loggingEvent) {threadName = loggingEvent.getThreadName();timestamp = loggingEvent.getTimeStamp();exceptionInformation = new HashMap<String, Object>();mdc = loggingEvent.getProperties();ndc = loggingEvent.getNDC();logstashEvent = new JSONObject();String whoami = this.getClass().getSimpleName();/*** All v1 of the event format requires is "@timestamp" and "@version" Every other field is* arbitrary*/logstashEvent.put("@version", version);logstashEvent.put("@timestamp", dateFormat(timestamp));/** Extract and add fields from log4j config, if defined */if (getUserFields() != null) {String userFlds = getUserFields();LogLog.debug("[" + whoami + "] Got user data from log4j property: " + userFlds);addUserFields(userFlds);}/*** Extract fields from system properties, if defined Note that CLI props will override* conflicts with log4j config*/if (System.getProperty(ADDITIONAL_DATA_PROPERTY) != null) {if (getUserFields() != null) {LogLog.warn("["+ whoami+ "] Loading UserFields from command-line. This will override any UserFields set in the log4j configuration file");}String userFieldsProperty = System.getProperty(ADDITIONAL_DATA_PROPERTY);LogLog.debug("[" + whoami + "] Got user data from system property: " + userFieldsProperty);addUserFields(userFieldsProperty);}/** Now we start injecting our own stuff. */logstashEvent.put("source_host", hostname);logstashEvent.put("message", loggingEvent.getRenderedMessage());if (loggingEvent.getThrowableInformation() != null) {final ThrowableInformation throwableInformation =loggingEvent.getThrowableInformation();if (throwableInformation.getThrowable().getClass().getCanonicalName() != null) {exceptionInformation.put("exception_class",throwableInformation.getThrowable().getClass().getCanonicalName());}if (throwableInformation.getThrowable().getMessage() != null) {exceptionInformation.put("exception_message", throwableInformation.getThrowable().getMessage());}if (throwableInformation.getThrowableStrRep() != null) {String stackTrace =StringUtils.join(throwableInformation.getThrowableStrRep(), "\n");exceptionInformation.put("stacktrace", stackTrace);}addEventData("exception", exceptionInformation);}if (locationInfo) {info = loggingEvent.getLocationInformation();addEventData("file", info.getFileName());addEventData("line_number", info.getLineNumber());addEventData("class", info.getClassName());addEventData("method", info.getMethodName());}addEventData("logger_name", loggingEvent.getLoggerName());addEventData("mdc", mdc);addEventData("ndc", ndc);addEventData("level", loggingEvent.getLevel().toString());addEventData("thread_name", threadName);return logstashEvent.toString() + "\n";}@Overridepublic boolean ignoresThrowable() {return ignoreThrowable;}/*** Query whether log messages include location information.** @return true if location information is included in log messages, false otherwise.*/public boolean getLocationInfo() {return locationInfo;}/*** Set whether log messages should include location information.** @param locationInfo true if location information should be included, false otherwise.*/public void setLocationInfo(boolean locationInfo) {this.locationInfo = locationInfo;}public String getUserFields() {return customUserFields;}public void setUserFields(String userFields) {this.customUserFields = userFields;}@Overridepublic void activateOptions() {activeIgnoreThrowable = ignoreThrowable;}private void addUserFields(String data) {if (null != data) {String[] pairs = data.split(",");for (String pair : pairs) {String[] userField = pair.split(":", 2);if (userField[0] != null) {String key = userField[0];String val = userField[1];addEventData(key, val);}}}}private void addEventData(String keyname, Object keyval) {if (null != keyval) {logstashEvent.put(keyname, keyval);}}
}
相关依赖
<dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.21</version><scope>provided</scope></dependency>
打包上传服务器准备运行
启动命令中将kafka-appender-1.0.0.jar
以及相关依赖添加
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
启动后查看数据,发现@timestamp时间正常了
{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 16:41:24 - 本轮自定义任务计算完成","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02 16:41:24","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.avris.util.Print","class":"com.avris.util.Print$"}
一键应用
查看本节之前,请将之前讲解的步骤都看一遍,否则可能不了解。
为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下
- maven中引用依赖
<dependency><groupId>com.gitee.jastee</groupId><artifactId>kafka-log4j-appender</artifactId><version>1.0.5</version></dependency>
- 在代码中使用Log打印日志
- 修改
Spark
配置文件log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.gitee.jastee.kafka.appender.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=com.gitee.jastee.kafka.appender.JSONLayout
log4j.appender.kafka.layout.UserFields=app_name:xxx
- 启动命令添加
使用
--conf
指定加载的jar包太多了?可以将三个包合成一个包去指定,使用jar xf
解压,jar -cvfM
在压缩,合成一个包即可,这里因为不同环境可能使用的slf4j版本不同,我就没统一合并,实际开发中根据自己需求合并就行。
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
- 启动程序
可能遇到的异常
ClassNotFoundException: xxx.KafkaLog4jAppender
启动程序提示异常,明明在启动时候将jar包提交了却提示找不到类
log4j:ERROR Could not instantiate class [com.xxx.KafkaLog4jAppender].
java.lang.ClassNotFoundException: com.xxx.KafkaLog4jAppender
原因:
因为Spark启动最初还未加载–jars的jar包,通过spark.executor.extraClassPath
与spark.driver.extraClassPath
将我们自定义kafka-appender-1.0.0.jar
(jar包中的类就是KafkaLog4jAppender.java
)提交上去即可,如果是使用官方的,就将官方的jar包提交上去即可
解决方法:
在启动脚本添加
--conf "spark.driver.extraClassPath=kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=kafka-appender-1.0.0.jar" \
Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
Unexpected problem occured during version sanity check
Reported exception:
java.lang.NullPointerExceptionat org.slf4j.LoggerFactory.versionSanityCheck(LoggerFactory.java:272)at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:126)at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at com.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)at com.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.ExceptionInInitializerErrorat org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at com.avris.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)at com.avris.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerExceptionat org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:423)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)... 27 more
原因:
使用日志版本问题,现在使用的版本为slf4j-log4j12-1.7.30.jar
解决方法:
使用slf4j的1.8.0-beta2
版本
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.8.0-beta2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.8.0-beta2</version></dependency>
通过spark.driver.extraClassPath
和spark.executor.extraClassPath
参数提交
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
参考文章
https://www.jianshu.com/p/cde2b4712859
https://blog.csdn.net/epitomizelu/article/details/123687998