当前位置: 首页 > news >正文

乌鲁木齐网站建设优化绍兴seo

乌鲁木齐网站建设优化,绍兴seo,帮别人做网站哪里可以接单,厦门优化公司文章目录自定义KafkaAppender修改log4j.properties配置启动命令配置添加参数启动之后可以在Kafka中查询发送数据时区问题-自定义实现JSONLayout解决自定义JSONLayout.java一键应用可能遇到的异常ClassNotFoundException: xxx.KafkaLog4jAppenderUnexpected problem occured dur…

文章目录

      • 自定义KafkaAppender
      • 修改log4j.properties配置
      • 启动命令配置添加参数
      • 启动之后可以在Kafka中查询发送数据
      • 时区问题-自定义实现JSONLayout解决
        • 自定义JSONLayout.java
    • 一键应用
      • 可能遇到的异常
        • ClassNotFoundException: xxx.KafkaLog4jAppender
        • Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
      • 参考文章

自定义KafkaAppender

注意:如果使用官方自带的可以直接引用,版本为Kafka当前使用的版本

				<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-log4j-appender</artifactId><version>2.4.1</version></dependency>

配置依赖为

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.28</version><scope>compile</scope></dependency>

自定义KafkaLog4jAppender.java内容为

这里我们实现了包名过滤功能

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaLog4jAppender extends AppenderSkeleton {/** 包含规则条件 */private Set<String> includeSet = new HashSet<>();private Set<String> includeMatchSet = new HashSet<>();/** 不包含规则条件 */private Set<String> excludeSet = new HashSet<>();private Set<String> excludeMatchSet = new HashSet<>();private String brokerList;private String topic;private String compressionType;private String securityProtocol;private String sslTruststoreLocation;private String sslTruststorePassword;private String sslKeystoreType;private String sslKeystoreLocation;private String sslKeystorePassword;private String saslKerberosServiceName;private String saslMechanism;private String clientJaasConfPath;private String clientJaasConf;private String kerb5ConfPath;private Integer maxBlockMs;private int retries = 2147483647;private int requiredNumAcks = 1;private int deliveryTimeoutMs = 120000;private boolean ignoreExceptions = true;private boolean syncSend;private Producer<byte[], byte[]> producer;private String includes;private String excludes;public String getIncludes() {return includes;}public void setIncludes(String includes) {this.includes = includes;}public String getExcludes() {return excludes;}public void setExcludes(String excludes) {this.excludes = excludes;}public KafkaLog4jAppender() {}public Producer<byte[], byte[]> getProducer() {return this.producer;}public String getBrokerList() {return this.brokerList;}public void setBrokerList(String brokerList) {this.brokerList = brokerList;}public int getRequiredNumAcks() {return this.requiredNumAcks;}public void setRequiredNumAcks(int requiredNumAcks) {this.requiredNumAcks = requiredNumAcks;}public int getRetries() {return this.retries;}public void setRetries(int retries) {this.retries = retries;}public int getDeliveryTimeoutMs() {return this.deliveryTimeoutMs;}public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {this.deliveryTimeoutMs = deliveryTimeoutMs;}public String getCompressionType() {return this.compressionType;}public void setCompressionType(String compressionType) {this.compressionType = compressionType;}public String getTopic() {return this.topic;}public void setTopic(String topic) {this.topic = topic;}public boolean getIgnoreExceptions() {return this.ignoreExceptions;}public void setIgnoreExceptions(boolean ignoreExceptions) {this.ignoreExceptions = ignoreExceptions;}public boolean getSyncSend() {return this.syncSend;}public void setSyncSend(boolean syncSend) {this.syncSend = syncSend;}public String getSslTruststorePassword() {return this.sslTruststorePassword;}public String getSslTruststoreLocation() {return this.sslTruststoreLocation;}public String getSecurityProtocol() {return this.securityProtocol;}public void setSecurityProtocol(String securityProtocol) {this.securityProtocol = securityProtocol;}public void setSslTruststoreLocation(String sslTruststoreLocation) {this.sslTruststoreLocation = sslTruststoreLocation;}public void setSslTruststorePassword(String sslTruststorePassword) {this.sslTruststorePassword = sslTruststorePassword;}public void setSslKeystorePassword(String sslKeystorePassword) {this.sslKeystorePassword = sslKeystorePassword;}public void setSslKeystoreType(String sslKeystoreType) {this.sslKeystoreType = sslKeystoreType;}public void setSslKeystoreLocation(String sslKeystoreLocation) {this.sslKeystoreLocation = sslKeystoreLocation;}public void setSaslKerberosServiceName(String saslKerberosServiceName) {this.saslKerberosServiceName = saslKerberosServiceName;}public void setClientJaasConfPath(String clientJaasConfPath) {this.clientJaasConfPath = clientJaasConfPath;}public void setKerb5ConfPath(String kerb5ConfPath) {this.kerb5ConfPath = kerb5ConfPath;}public String getSslKeystoreLocation() {return this.sslKeystoreLocation;}public String getSslKeystoreType() {return this.sslKeystoreType;}public String getSslKeystorePassword() {return this.sslKeystorePassword;}public String getSaslKerberosServiceName() {return this.saslKerberosServiceName;}public String getClientJaasConfPath() {return this.clientJaasConfPath;}public void setSaslMechanism(String saslMechanism) {this.saslMechanism = saslMechanism;}public String getSaslMechanism() {return this.saslMechanism;}public void setClientJaasConf(String clientJaasConf) {this.clientJaasConf = clientJaasConf;}public String getClientJaasConf() {return this.clientJaasConf;}public String getKerb5ConfPath() {return this.kerb5ConfPath;}public int getMaxBlockMs() {return this.maxBlockMs;}public void setMaxBlockMs(int maxBlockMs) {this.maxBlockMs = maxBlockMs;}@Overridepublic void activateOptions() {// 加载过滤规则setFilterRules(includes, includeMatchSet, includeSet);setFilterRules(excludes, excludeMatchSet, excludeSet);Properties props = new Properties();if (this.brokerList != null) {props.put("bootstrap.servers", this.brokerList);}if (props.isEmpty()) {throw new ConfigException("The bootstrap servers property should be specified");} else if (this.topic == null) {throw new ConfigException("Topic must be specified by the Kafka log4j appender");} else {if (this.compressionType != null) {props.put("compression.type", this.compressionType);}props.put("acks", Integer.toString(this.requiredNumAcks));props.put("retries", this.retries);props.put("delivery.timeout.ms", this.deliveryTimeoutMs);if (this.securityProtocol != null) {props.put("security.protocol", this.securityProtocol);}if (this.securityProtocol != null&& this.securityProtocol.contains("SSL")&& this.sslTruststoreLocation != null&& this.sslTruststorePassword != null) {props.put("ssl.truststore.location", this.sslTruststoreLocation);props.put("ssl.truststore.password", this.sslTruststorePassword);if (this.sslKeystoreType != null&& this.sslKeystoreLocation != null&& this.sslKeystorePassword != null) {props.put("ssl.keystore.type", this.sslKeystoreType);props.put("ssl.keystore.location", this.sslKeystoreLocation);props.put("ssl.keystore.password", this.sslKeystorePassword);}}if (this.securityProtocol != null&& this.securityProtocol.contains("SASL")&& this.saslKerberosServiceName != null&& this.clientJaasConfPath != null) {props.put("sasl.kerberos.service.name", this.saslKerberosServiceName);System.setProperty("java.security.auth.login.config", this.clientJaasConfPath);}if (this.kerb5ConfPath != null) {System.setProperty("java.security.krb5.conf", this.kerb5ConfPath);}if (this.saslMechanism != null) {props.put("sasl.mechanism", this.saslMechanism);}if (this.clientJaasConf != null) {props.put("sasl.jaas.config", this.clientJaasConf);}if (this.maxBlockMs != null) {props.put("max.block.ms", this.maxBlockMs);}props.put("key.serializer", ByteArraySerializer.class.getName());props.put("value.serializer", ByteArraySerializer.class.getName());System.out.println("Properties:" + props);this.producer = this.getKafkaProducer(props);LogLog.debug("Kafka producer connected to " + this.brokerList);LogLog.debug("Logging for topic: " + this.topic);}}/*** 设置过滤规则** @name setFilterRules* @date 2023/3/2 下午1:57* @return void* @param excludes* @param excludeMatchSet* @param excludeSet* @author Jast*/private void setFilterRules(String excludes, Set<String> excludeMatchSet, Set<String> excludeSet) {if (excludes != null) {for (String exclude : excludes.split(",")) {if (exclude.length() > 0) {if (exclude.endsWith(".*")) {excludeMatchSet.add(exclude.replace(".*", ""));} else {excludeSet.add(exclude);}}}}}protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {return new KafkaProducer(props);}@Overrideprotected void append(LoggingEvent event) {if (filterPackageName(event)) {return;}String message = this.subAppend(event);LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);Future<RecordMetadata> response =this.producer.send(new ProducerRecord(this.topic, message.getBytes(StandardCharsets.UTF_8)));if (this.syncSend) {try {response.get();} catch (ExecutionException | InterruptedException var5) {if (!this.ignoreExceptions) {throw new RuntimeException(var5);}LogLog.debug("Exception while getting response", var5);}}}private String subAppend(LoggingEvent event) {return this.layout == null ? event.getRenderedMessage() : this.layout.format(event);}@Overridepublic void close() {if (!this.closed) {this.closed = true;this.producer.close();}}@Overridepublic boolean requiresLayout() {return true;}/*** 过滤包名,如果为True则不发送到Kafka** @name filterPackageName* @date 2023/2/28 下午4:07* @return boolean* @param event* @author Jast*/private boolean filterPackageName(LoggingEvent event) {boolean flag = true;if (includeSet.size() == 0&& includeMatchSet.size() == 0&& excludeSet.size() == 0&& excludeMatchSet.size() == 0) {return false;}if (includeSet.size() == 0 && includeMatchSet.size() == 0) {flag = false;}/** 打印日志类/名称 */String loggerName = event.getLoggerName();for (String include : includeSet) {if (loggerName.equals(include)) {flag = false;}}for (String include : includeMatchSet) {if (loggerName.startsWith(include)) {flag = false;}}for (String exclude : excludeMatchSet) {if (loggerName.startsWith(exclude)) {flag = true;}}for (String exclude : excludeSet) {if (loggerName.equals(exclude)) {flag = true;}}return flag;}
}

修改log4j.properties配置

修改Spark自身的配置文件,配置文件位置:/opt/spark-client/conf

不同的集群可能配置文件所在目录不同

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.avris.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=app_name:xxx

启动命令配置添加参数

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \

说明:

  • kafka-appender-1.0.0.jar 为我们刚刚自定义的KafkaLog4jAppender类打成的jar包
  • slf4j-api-1.8.0-beta2.jar与slf4j-log4j12-1.8.0-beta2.jar版本是为了解决日志版本引发的NullpointException异常,在下面异常处理章节有描述
  • slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar这两个jar包也要放在lib目录下通过–jars命令提交

启动之后可以在Kafka中查询发送数据

{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 14:38:35 - 本轮任务计算完成,休眠 10000","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02T06:38:35.858Z","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.util.Print","class":"com.avris.util.Print$"}

这里有个问题net.logstash.log4j.JSONEventLayoutV1实现的方法,时区是错误的,我们需要修改时区,下面我们介绍自定义实现Layout

时区问题-自定义实现JSONLayout解决

JSONLayout比较影响性能,建议数据量大的情况下不要使用,自行记录相关信息,然后打印日志

自定义JSONLayout.java

在我们刚刚自定义KafkaAppender项目中创建JSONLayout.java类,内容如下

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LocationInfo;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;public class JSONLayout extends Layout {private boolean locationInfo = false;private String customUserFields;private boolean ignoreThrowable = false;private boolean activeIgnoreThrowable = ignoreThrowable;private String hostname = InetAddress.getLocalHost().getHostName();private String threadName;private long timestamp;private String ndc;private Map mdc;private LocationInfo info;private HashMap<String, Object> exceptionInformation;private static Integer version = 1;private JSONObject logstashEvent;public static final TimeZone GMT_8 = TimeZone.getTimeZone("GMT+8");public static final FastDateFormat ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", GMT_8);public static final String ADDITIONAL_DATA_PROPERTY = "com.avris.JSONLayout.UserFields";public static String dateFormat(long timestamp) {return ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS.format(timestamp);}/*** For backwards compatibility, the default is to generate location information in the log* messages.*/public JSONLayout() throws UnknownHostException {this(true);}/*** Creates a layout that optionally inserts location information into log messages.** @param locationInfo whether or not to include location information in the log messages.*/public JSONLayout(boolean locationInfo) throws UnknownHostException {this.locationInfo = locationInfo;}@Overridepublic String format(LoggingEvent loggingEvent) {threadName = loggingEvent.getThreadName();timestamp = loggingEvent.getTimeStamp();exceptionInformation = new HashMap<String, Object>();mdc = loggingEvent.getProperties();ndc = loggingEvent.getNDC();logstashEvent = new JSONObject();String whoami = this.getClass().getSimpleName();/*** All v1 of the event format requires is "@timestamp" and "@version" Every other field is* arbitrary*/logstashEvent.put("@version", version);logstashEvent.put("@timestamp", dateFormat(timestamp));/** Extract and add fields from log4j config, if defined */if (getUserFields() != null) {String userFlds = getUserFields();LogLog.debug("[" + whoami + "] Got user data from log4j property: " + userFlds);addUserFields(userFlds);}/*** Extract fields from system properties, if defined Note that CLI props will override* conflicts with log4j config*/if (System.getProperty(ADDITIONAL_DATA_PROPERTY) != null) {if (getUserFields() != null) {LogLog.warn("["+ whoami+ "] Loading UserFields from command-line. This will override any UserFields set in the log4j configuration file");}String userFieldsProperty = System.getProperty(ADDITIONAL_DATA_PROPERTY);LogLog.debug("[" + whoami + "] Got user data from system property: " + userFieldsProperty);addUserFields(userFieldsProperty);}/** Now we start injecting our own stuff. */logstashEvent.put("source_host", hostname);logstashEvent.put("message", loggingEvent.getRenderedMessage());if (loggingEvent.getThrowableInformation() != null) {final ThrowableInformation throwableInformation =loggingEvent.getThrowableInformation();if (throwableInformation.getThrowable().getClass().getCanonicalName() != null) {exceptionInformation.put("exception_class",throwableInformation.getThrowable().getClass().getCanonicalName());}if (throwableInformation.getThrowable().getMessage() != null) {exceptionInformation.put("exception_message", throwableInformation.getThrowable().getMessage());}if (throwableInformation.getThrowableStrRep() != null) {String stackTrace =StringUtils.join(throwableInformation.getThrowableStrRep(), "\n");exceptionInformation.put("stacktrace", stackTrace);}addEventData("exception", exceptionInformation);}if (locationInfo) {info = loggingEvent.getLocationInformation();addEventData("file", info.getFileName());addEventData("line_number", info.getLineNumber());addEventData("class", info.getClassName());addEventData("method", info.getMethodName());}addEventData("logger_name", loggingEvent.getLoggerName());addEventData("mdc", mdc);addEventData("ndc", ndc);addEventData("level", loggingEvent.getLevel().toString());addEventData("thread_name", threadName);return logstashEvent.toString() + "\n";}@Overridepublic boolean ignoresThrowable() {return ignoreThrowable;}/*** Query whether log messages include location information.** @return true if location information is included in log messages, false otherwise.*/public boolean getLocationInfo() {return locationInfo;}/*** Set whether log messages should include location information.** @param locationInfo true if location information should be included, false otherwise.*/public void setLocationInfo(boolean locationInfo) {this.locationInfo = locationInfo;}public String getUserFields() {return customUserFields;}public void setUserFields(String userFields) {this.customUserFields = userFields;}@Overridepublic void activateOptions() {activeIgnoreThrowable = ignoreThrowable;}private void addUserFields(String data) {if (null != data) {String[] pairs = data.split(",");for (String pair : pairs) {String[] userField = pair.split(":", 2);if (userField[0] != null) {String key = userField[0];String val = userField[1];addEventData(key, val);}}}}private void addEventData(String keyname, Object keyval) {if (null != keyval) {logstashEvent.put(keyname, keyval);}}
}

相关依赖

 <dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.21</version><scope>provided</scope></dependency>

打包上传服务器准备运行

启动命令中将kafka-appender-1.0.0.jar以及相关依赖添加

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \

启动后查看数据,发现@timestamp时间正常了

{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 16:41:24 - 本轮自定义任务计算完成","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02 16:41:24","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.avris.util.Print","class":"com.avris.util.Print$"}

一键应用

查看本节之前,请将之前讲解的步骤都看一遍,否则可能不了解。

为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下

  1. maven中引用依赖
 				<dependency><groupId>com.gitee.jastee</groupId><artifactId>kafka-log4j-appender</artifactId><version>1.0.5</version></dependency>
  1. 在代码中使用Log打印日志
  2. 修改Spark配置文件log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.gitee.jastee.kafka.appender.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=com.gitee.jastee.kafka.appender.JSONLayout
log4j.appender.kafka.layout.UserFields=app_name:xxx
  1. 启动命令添加

使用--conf 指定加载的jar包太多了?可以将三个包合成一个包去指定,使用jar xf解压,jar -cvfM在压缩,合成一个包即可,这里因为不同环境可能使用的slf4j版本不同,我就没统一合并,实际开发中根据自己需求合并就行。

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
  1. 启动程序

可能遇到的异常

ClassNotFoundException: xxx.KafkaLog4jAppender

启动程序提示异常,明明在启动时候将jar包提交了却提示找不到类

log4j:ERROR Could not instantiate class [com.xxx.KafkaLog4jAppender].
java.lang.ClassNotFoundException: com.xxx.KafkaLog4jAppender

原因:

因为Spark启动最初还未加载–jars的jar包,通过spark.executor.extraClassPathspark.driver.extraClassPath将我们自定义kafka-appender-1.0.0.jar(jar包中的类就是KafkaLog4jAppender.java)提交上去即可,如果是使用官方的,就将官方的jar包提交上去即可

解决方法:

在启动脚本添加

--conf "spark.driver.extraClassPath=kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=kafka-appender-1.0.0.jar" \

Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException

Unexpected problem occured during version sanity check
Reported exception:
java.lang.NullPointerExceptionat org.slf4j.LoggerFactory.versionSanityCheck(LoggerFactory.java:272)at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:126)at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at com.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)at com.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.ExceptionInInitializerErrorat org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at com.avris.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)at com.avris.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerExceptionat org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:423)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)... 27 more

原因:

使用日志版本问题,现在使用的版本为slf4j-log4j12-1.7.30.jar

解决方法:

使用slf4j的1.8.0-beta2版本

<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.8.0-beta2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.8.0-beta2</version></dependency>

通过spark.driver.extraClassPathspark.executor.extraClassPath参数提交

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \

参考文章

https://www.jianshu.com/p/cde2b4712859

https://blog.csdn.net/epitomizelu/article/details/123687998

http://www.ritt.cn/news/2804.html

相关文章:

  • 女装网站建设规划书什么是网站推广优化
  • 免费建站哪家性价比高广告联盟官网入口
  • 网站建设的公司如何招销售搜客通
  • 东营做网站哪里好济南优化seo公司
  • 网站建设厃金手指花总十三软文营销的写作技巧有哪些
  • 彩票网站建设平台网站优化 推广
  • 站长工具短链接生成北京网站seo公司
  • 2019网站建设有限公司哈尔滨企业网站模板建站
  • 网络营销的特点有成本低效率高效果好收益好站内seo优化
  • 杭州做微信网站软件公司新闻热点大事件
  • 网站开发人员配置网络推广工作好吗
  • 在淘宝做印刷网站怎么办培训心得体会范文大全2000字
  • 怎么建设收费网站正规网站优化公司
  • 武汉 网站开发如何推广app更高效
  • 维护网站一年多少钱seo网站优化是什么
  • dede网站地图怎么做全网整合营销推广
  • 怎么做网站生意自助建站申请
  • 学什么可以先做网站企业网站模板设计
  • 通州顺德网站建设外包客服平台
  • 杭州建设网站公司哪家好指数函数运算法则
  • 教育培训类网站建设百度投诉电话
  • 网站的上一页怎么做的网站seo如何做好优化
  • 公司网站的功能深圳seo优化公司哪家好
  • 三合一网站制作价格网球排名即时最新排名
  • 网站开发语音占比免费引流在线推广
  • 好的版式设计网站收录网站
  • 外发加工网费用大概多少搜索引擎关键词排名优化
  • 网站建设卖给别人可以吗深圳seo优化培训
  • 河池网站推广接单平台app
  • 怎么做自己的优惠券网站专业提升关键词排名工具