歡迎您光臨本站 註冊首頁

spark透過kafka-appender指定日誌輸出到kafka引發的死鎖問題

←手機掃碼閱讀     f2h0b53ohn @ 2020-05-09 , reply:0

在採用 log4j 的kafka-appender收集 spark 任務執行日誌時,發現提交到 yarn 上的任務始終 ACCEPTED 狀態,無法進入 RUNNING 狀態,並且會重試兩次後超時。期初認為是yarn資源不足導致,但在確認yarn資源充裕的時候問題依舊,而且基本上能穩定復現。
起初是這麼配置spark日誌輸出到kafka的:
log4j.rootCategory=INFO, console, kafka log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m%n # Kafka appender log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender # Set Kafka topic and brokerList log4j.appender.kafka.topic=yarn_spark_log log4j.appender.kafka.brokerList=localhost:9092 log4j.appender.kafka.compressionType=none log4j.appender.kafka.syncSend=false log4j.appender.kafka.maxBlockMs=10 log4j.appender.kafka.layout=org.apache.log4j.PatternLayout log4j.appender.kafka.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m
這裡用 org.apache.kafka.log4jappender.KafkaLog4jAppender 預設將所有日誌都輸出到kafka,這個appender已經被kafka官方維護,穩定性應該是可以保障的。
問題定位
發現問題後,嘗試將輸出到kafka的規則去掉,問題解除!於是把問題定位到跟日誌輸出到kafka有關。透過其他測試,證實目標kafka其實是正常的,這就非常奇怪了。
檢視yarn的ResourceManager日誌,發現有如下超時
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1578970174552_3204_000002 Timed out after 600 secs
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Updating application attempt appattempt_1578970174552_3204_000002 with final
state: FAILED, and exit status: -1000
2020-05-07 21:49:48,231 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1578970174552_3204_000002 State change from LAUNCHED to FINAL_SAV
ING on event = EXPIRE
表明,yarn本身是接收任務的,但是發現任務遲遲沒有啟動。在spark的場景下其實是指只有driver啟動了,但是沒有啟動executor。
而檢視driver日誌,發現日誌輸出到一個地方就卡住了,不往下繼續了。透過對比成功執行和卡住的情況發現,日誌卡在這條上:
2020/05/07 19:37:10.324 INFO SecurityManager: Changing view acls to: yarn,root
2020/05/07 19:37:10.344 INFO Metadata: Cluster ID: 6iG6WHA2SoK7FfgGgWHt_A
卡住的情況下,只會打出 SecurityManager 這行,而無法打出 Metadata 這行。
猜想 Metadata 這行是 kafka-client 本身打出來的,因為整個上下文只有yarn, spark, kafka-client可能會打出這個日誌。
在kafka-client 2.2.0版本中找到這個日誌是輸出位置:
public synchronized void update(MetadataResponse metadataResponse, long now) { ... String newClusterId = cache.cluster().clusterResource().clusterId(); if (!Objects.equals(previousClusterId, newClusterId)) { log.info("Cluster ID: {}", newClusterId); } ... }
看到 synchronized ,高度懷疑死鎖。於是考慮用 jstack 分析:
在yarn上執行spark任務的時候,driver程序叫ApplicationMaster,executor程序叫CoarseGrainedExecutorBackend。這裡首先嚐試再復現過程中找到drvier最終在哪個節點上執行,然後快速使用jstack -F

列印堆疊
jstack果然不負眾望,報告了死鎖!這裡我把結果貼的全一點
[root@node1 ~]# jstack 20136 20136: Unable to open socket file: target process not responding or HotSpot VM not loaded The -F option can be used when the target process is not responding [root@node1 ~]# jstack -F 20136 Attaching to process ID 20136, please wait... Debugger attached successfully. Server compiler detected. JVM version is 25.231-b11 Deadlock Detection: Found one Java-level deadlock: ============================= "kafka-producer-network-thread | producer-1": waiting to lock Monitor@0x00000000025fcc48 (Object@0x00000000ed680b60, a org/apache/kafka/log4jappender/KafkaLog4jAppender), which is held by "main" "main": waiting to lock Monitor@0x00007fec9dbde038 (Object@0x00000000ee44de38, a org/apache/kafka/clients/Metadata), which is held by "kafka-producer-network-thread | producer-1" Found a total of 1 deadlock. Thread 20157: (state = BLOCKED) - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=0, line=231 (Interpreted frame) - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame) - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String, java.lang.Object) @bci=34, line=324 (Interpreted frame) - org.apache.kafka.clients.Metadata.update(org.apache.kafka.common.requests.MetadataResponse, long) @bci=317, line=365 (Interpreted frame) - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(org.apache.kafka.common.requests.RequestHeader, long, org.apache.kafka.common.requests.MetadataResponse) @bci=184, line=1031 (Interpreted frame) - org.apache.kafka.clients.NetworkClient.handleCompletedReceives(java.util.List, long) @bci=215, line=822 (Interpreted frame) - org.apache.kafka.clients.NetworkClient.poll(long, long) @bci=132, line=544 (Interpreted frame) - org.apache.kafka.clients.producer.internals.Sender.run(long) @bci=227, line=311 (Interpreted frame) - org.apache.kafka.clients.producer.internals.Sender.run() @bci=28, line=235 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame) Thread 20150: (state = BLOCKED) Thread 20149: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.ref.ReferenceQueue.remove(long) @bci=59, line=144 (Interpreted frame) - java.lang.ref.ReferenceQueue.remove() @bci=2, line=165 (Interpreted frame) - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=216 (Interpreted frame) Thread 20148: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=502 (Interpreted frame) - java.lang.ref.Reference.tryHandlePending(boolean) @bci=54, line=191 (Interpreted frame) - java.lang.ref.Reference$ReferenceHandler.run() @bci=1, line=153 (Interpreted frame) Thread 20137: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - org.apache.kafka.clients.Metadata.awaitUpdate(int, long) @bci=63, line=261 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(java.lang.String, java.lang.Integer, long) @bci=160, line=983 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=19, line=860 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=12, line=840 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord) @bci=3, line=727 (Interpreted frame) - org.apache.kafka.log4jappender.KafkaLog4jAppender.append(org.apache.log4j.spi.LoggingEvent) @bci=69, line=283 (Interpreted frame) - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=106, line=251 (Interpreted frame) - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame) - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String) @bci=12, line=305 (Interpreted frame) - org.apache.spark.internal.Logging$class.logInfo(org.apache.spark.internal.Logging, scala.Function0) @bci=29, line=54 (Interpreted frame) - org.apache.spark.SecurityManager.logInfo(scala.Function0) @bci=2, line=44 (Interpreted frame) - org.apache.spark.SecurityManager.setViewAcls(scala.collection.immutable.Set, java.lang.String) @bci=36, line=139 (Interpreted frame) - org.apache.spark.SecurityManager.(org.apache.spark.SparkConf, scala.Option) @bci=158, line=81 (Interpreted frame) - org.apache.spark.deploy.yarn.ApplicationMaster.(org.apache.spark.deploy.yarn.ApplicationMasterArguments) @bci=85, line=70 (Interpreted frame) - org.apache.spark.deploy.yarn.ApplicationMaster$.main(java.lang.String[]) @bci=25, line=802 (Interpreted frame) - org.apache.spark.deploy.yarn.ApplicationMaster.main(java.lang.String[]) @bci=4 (Interpreted frame)
到這裡,已經確定是死鎖,導致driver一開始就執行停滯,那麼當然無法提交executor執行。
具體的死鎖稍後分析,先考慮如何解決。從感性認識看,似乎只要不讓kafka-client的日誌也輸出到kafka即可。實驗後,發現果然如此:如果只輸出org.apache.spark的日誌就可以正常執行。
根因分析
從stack的結果看,造成死鎖的是如下兩個執行緒:
kafka-client內部的網路執行緒spark
主入口執行緒
兩個執行緒其實都是卡在打日誌上了,觀察堆疊可以發現,兩個執行緒同時持有了同一個log物件。而這個log物件實際上是kafka-appender。而kafka-appender本質上持有kafka-client,及其內部的Metadata物件。log4j的doAppend為了保證執行緒安全也用 synchronized 修飾了:
public synchronized void doAppend(LoggingEvent event) { if(closed) { LogLog.error("Attempted to append to closed appender named ["+name+"]."); return; } if(!isAsSevereAsThreshold(event.level)) { return; } Filter f = this.headFilter; FILTER_LOOP: while(f != null) { switch(f.decide(event)) { case Filter.DENY: return; case Filter.ACCEPT: break FILTER_LOOP; case Filter.NEUTRAL: f = f.next; } } this.append(event); }
於是事情開始了:
main執行緒嘗試打日誌,首先進入了synchronized的doAppend,即獲取了 kafka-appender 的鎖
的鎖 kafka-appender 內部需要呼叫kafka-client傳送日誌到kafka,最終呼叫到 Thread 20137 展示的,執行到Metadata.awaitUpdate(也是個synchronized方法),內部的wait會嘗試獲取metadata的鎖。(詳見https://github.com/apache/kaf...)
內部需要呼叫kafka-client傳送日誌到kafka,最終呼叫到 展示的,執行到Metadata.awaitUpdate(也是個synchronized方法),內部的wait會嘗試獲取metadata的鎖。(詳見https://github.com/apache/kaf...) 但此時,kafka-producer-network-thread執行緒剛好進入了上文提到的打 Cluster ID 這個日誌的這個階段(update方法也是synchronized的),也就是說kafka-producer-network-thread執行緒獲得了metadata物件的鎖
這個日誌的這個階段(update方法也是synchronized的),也就是說kafka-producer-network-thread執行緒獲得了metadata物件的鎖 kafka-producer-network-thread執行緒要列印日誌同樣執行synchronized的doAppend,即獲取了 kafka-appender 的鎖
上圖main-thread持有了log物件鎖,要求獲取metadata物件鎖;kafka-producer-network-thread持有了metadata物件鎖,要求獲取log物件鎖於是造成了死鎖。


[f2h0b53ohn ] spark透過kafka-appender指定日誌輸出到kafka引發的死鎖問題已經有243次圍觀

http://coctec.com/docs/program/show-post-233426.html