Skip to content

Commit fc39363

Browse files
authored
refactor: log monitoring modification and content parsing optimization (apache#561)
* refactor(log): fefactoring log parsing logic * refactor: reconstruct file monitoring logic and optimize log parsing function * refactor: Optimize code comments for EsDataServiceImpl class * refactor: optimize code comments for EsDataServiceImpl class
1 parent e4f7cd9 commit fc39363

File tree

27 files changed

+477
-250
lines changed

27 files changed

+477
-250
lines changed

ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.ozhera.log.agent.channel;
2020

21+
import cn.hutool.core.io.FileUtil;
2122
import cn.hutool.core.lang.Pair;
2223
import com.google.common.base.Preconditions;
2324
import com.google.common.collect.Lists;
@@ -26,6 +27,14 @@
2627
import com.xiaomi.data.push.rpc.RpcClient;
2728
import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
2829
import com.xiaomi.mone.file.ILogFile;
30+
import com.xiaomi.youpin.docean.Ioc;
31+
import com.xiaomi.youpin.docean.anno.Lookup;
32+
import com.xiaomi.youpin.docean.anno.Service;
33+
import com.xiaomi.youpin.docean.plugin.config.Config;
34+
import lombok.Getter;
35+
import lombok.extern.slf4j.Slf4j;
36+
import org.apache.commons.collections.CollectionUtils;
37+
import org.apache.commons.lang3.StringUtils;
2938
import org.apache.ozhera.log.agent.channel.comparator.*;
3039
import org.apache.ozhera.log.agent.channel.listener.DefaultFileMonitorListener;
3140
import org.apache.ozhera.log.agent.channel.listener.FileMonitorListener;
@@ -45,14 +54,6 @@
4554
import org.apache.ozhera.log.api.model.vo.UpdateLogProcessCmd;
4655
import org.apache.ozhera.log.common.Constant;
4756
import org.apache.ozhera.log.utils.NetUtil;
48-
import com.xiaomi.youpin.docean.Ioc;
49-
import com.xiaomi.youpin.docean.anno.Lookup;
50-
import com.xiaomi.youpin.docean.anno.Service;
51-
import com.xiaomi.youpin.docean.plugin.config.Config;
52-
import lombok.Getter;
53-
import lombok.extern.slf4j.Slf4j;
54-
import org.apache.commons.collections.CollectionUtils;
55-
import org.apache.commons.lang3.StringUtils;
5657

5758
import java.text.NumberFormat;
5859
import java.util.*;
@@ -63,6 +64,7 @@
6364
import java.util.stream.Collectors;
6465

6566
import static org.apache.ozhera.log.common.Constant.GSON;
67+
import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
6668
import static org.apache.ozhera.log.common.PathUtils.PATH_WILDCARD;
6769
import static org.apache.ozhera.log.common.PathUtils.SEPARATOR;
6870

@@ -214,7 +216,7 @@ private List<Long> channelStart(List<ChannelService> channelServiceList) {
214216
log.info("realChannelService,id:{}", channelId);
215217
try {
216218
channelService.start();
217-
fileMonitorListener.addChannelService(channelService);
219+
fileMonitorListener.addChannelService(abstractChannelService);
218220
successChannelIds.add(channelId);
219221
} catch (RejectedExecutionException e) {
220222
log.error("The thread pool is full.id:{}", channelId, e);
@@ -266,21 +268,32 @@ private ChannelService channelServiceTrans(ChannelDefine channelDefine) {
266268
if (null == agentMemoryService) {
267269
agentMemoryService = new AgentMemoryServiceImpl(org.apache.ozhera.log.common.Config.ins().get("agent.memory.path", AgentMemoryService.DEFAULT_BASE_PATH));
268270
}
269-
ChannelService channelService;
270-
Input input = channelDefine.getInput();
271-
boolean matchWildcard = Arrays.stream(input.getLogPattern().split(",")).anyMatch(data -> StringUtils.substringAfterLast(data, SEPARATOR).contains(PATH_WILDCARD));
272-
if (matchWildcard) {
273-
channelService = new WildcardChannelServiceImpl(exporter, agentMemoryService, channelDefine, filterChain, memoryBasePath);
274-
} else {
275-
channelService = new ChannelServiceImpl(exporter, agentMemoryService, channelDefine, filterChain);
276-
}
277-
return channelService;
271+
return createChannelService(channelDefine, exporter, filterChain);
278272
} catch (Throwable e) {
279-
log.error("channelServiceTrans exception, channelDefine:{}, exception:{}", gson.toJson(channelDefine), e);
273+
log.error("channelServiceTrans exception, channelDefine:{}", gson.toJson(channelDefine), e);
280274
}
281275
return null;
282276
}
283277

278+
private ChannelService createChannelService(ChannelDefine channelDefine, MsgExporter exporter, FilterChain filterChain) {
279+
Input input = channelDefine.getInput();
280+
String logType = channelDefine.getInput().getType();
281+
boolean containsWildcard = isWildcardAllowedForLogType(input.getLogPattern(), logType);
282+
if (containsWildcard || FileUtil.exist(input.getLogPattern())) {
283+
return new ChannelServiceImpl(exporter, agentMemoryService, channelDefine, filterChain);
284+
} else {
285+
return new WildcardChannelServiceImpl(exporter, agentMemoryService, channelDefine, filterChain, memoryBasePath);
286+
}
287+
}
288+
289+
private boolean isWildcardAllowedForLogType(String logPattern, String logType) {
290+
if (LogTypeEnum.OPENTELEMETRY == LogTypeEnum.name2enum(logType)) {
291+
return true;
292+
}
293+
return Arrays.stream(logPattern.split(SYMBOL_COMMA))
294+
.noneMatch(data -> StringUtils.substringAfterLast(data, SEPARATOR).contains(PATH_WILDCARD));
295+
}
296+
284297
private void preCheckChannelDefine(ChannelDefine channelDefine) {
285298
Preconditions.checkArgument(null != channelDefine, "channelDefine can not be null");
286299
Preconditions.checkArgument(null != channelDefine.getInput(), "channelDefine.input can not be null");
@@ -434,14 +447,15 @@ private void delTailFileColl(List<ChannelDefine> channelDefines, boolean directD
434447
try {
435448
if (directDel || CollectionUtils.isNotEmpty(channelDels)) {
436449
log.info("[delete config]data:{}", gson.toJson(channelDels));
437-
List<Long> channelIdDels = channelDels.stream().map(ChannelDefine::getChannelId).collect(Collectors.toList());
450+
List<Long> channelIdDels = channelDels.stream().map(ChannelDefine::getChannelId).toList();
438451
List<ChannelService> tempChannelServiceList = Lists.newArrayList();
439452
channelServiceList.forEach(channelService -> {
440-
Long channelId = ((AbstractChannelService) channelService).getChannelDefine().getChannelId();
453+
AbstractChannelService abstractChannelService = (AbstractChannelService) channelService;
454+
Long channelId = abstractChannelService.getChannelDefine().getChannelId();
441455
if (channelIdDels.contains(channelId)) {
442456
log.info("[delete config]channelService:{}", channelId);
443457
channelService.close();
444-
fileMonitorListener.removeChannelService(channelService);
458+
fileMonitorListener.removeChannelService(abstractChannelService);
445459
tempChannelServiceList.add(channelService);
446460
this.channelDefineList.removeIf(channelDefine -> {
447461
if (channelDefine.getChannelId().equals(channelId)) {

ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.apache.ozhera.log.api.enums.LogTypeEnum;
4343
import org.apache.ozhera.log.api.model.meta.FilterConf;
4444
import org.apache.ozhera.log.api.model.msg.LineMessage;
45-
import org.apache.ozhera.log.common.Config;
4645
import org.apache.ozhera.log.common.Constant;
4746
import org.apache.ozhera.log.common.PathUtils;
4847
import org.apache.ozhera.log.utils.NetUtil;
@@ -66,10 +65,11 @@
6665
@Slf4j
6766
public class ChannelServiceImpl extends AbstractChannelService {
6867

69-
private AgentMemoryService memoryService;
68+
private final AgentMemoryService memoryService;
7069

7170
private MsgExporter msgExporter;
7271

72+
@Getter
7373
private ChannelDefine channelDefine;
7474

7575
private ChannelMemory channelMemory;
@@ -80,7 +80,7 @@ public class ChannelServiceImpl extends AbstractChannelService {
8080
@Getter
8181
private final ConcurrentHashMap<String, Future> futureMap = new ConcurrentHashMap<>();
8282

83-
private Set<String> delFileCollList = new CopyOnWriteArraySet<>();
83+
private final Set<String> delFileCollList = new CopyOnWriteArraySet<>();
8484

8585
private final Map<String, Long> reOpenMap = new HashMap<>();
8686
private final Map<String, Long> fileReadMap = new ConcurrentHashMap<>();
@@ -89,13 +89,13 @@ public class ChannelServiceImpl extends AbstractChannelService {
8989

9090
private ScheduledFuture<?> lastFileLineScheduledFuture;
9191

92-
private Gson gson = Constant.GSON;
92+
private final Gson gson = Constant.GSON;
9393

94-
private List<LineMessage> lineMessageList = new ArrayList<>();
94+
private final List<LineMessage> lineMessageList = new ArrayList<>();
9595

96-
private ReentrantLock fileColLock = new ReentrantLock();
96+
private final ReentrantLock fileColLock = new ReentrantLock();
9797

98-
private ReentrantLock fileReopenLock = new ReentrantLock();
98+
private final ReentrantLock fileReopenLock = new ReentrantLock();
9999

100100
private volatile long lastSendTime = System.currentTimeMillis();
101101

@@ -108,19 +108,15 @@ public class ChannelServiceImpl extends AbstractChannelService {
108108
*/
109109
private boolean collectOnce;
110110

111-
private FilterChain chain;
111+
private final FilterChain chain;
112112

113113
/**
114114
* The file path to monitor
115115
*/
116-
private List<MonitorFile> monitorFileList;
116+
private final List<MonitorFile> monitorFileList;
117117

118118
private LogTypeEnum logTypeEnum;
119119

120-
private String logPattern;
121-
122-
private String logSplitExpress;
123-
124120
private String linePrefix;
125121

126122
public ChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService memoryService, ChannelDefine channelDefine, FilterChain chain) {
@@ -171,8 +167,8 @@ public void start() {
171167
Long channelId = channelDefine.getChannelId();
172168
Input input = channelDefine.getInput();
173169

174-
this.logPattern = input.getLogPattern();
175-
this.logSplitExpress = input.getLogSplitExpress();
170+
String logPattern = input.getLogPattern();
171+
String logSplitExpress = input.getLogSplitExpress();
176172
this.linePrefix = input.getLinePrefix();
177173

178174
String logType = channelDefine.getInput().getType();
@@ -312,9 +308,7 @@ private ReadListener initFileReadListener(MLog mLog, String patternCode, String
312308
return;
313309
}
314310
long ct = System.currentTimeMillis();
315-
readResult.get().getLines().stream()
316-
.filter(l -> !shouldFilterLogs(channelDefine.getFilterLogLevelList(), l))
317-
.forEach(l -> {
311+
readResult.get().getLines().stream().forEach(l -> {
318312
String logType = channelDefine.getInput().getType();
319313
LogTypeEnum logTypeEnum = LogTypeEnum.name2enum(logType);
320314
// Multi-line application log type and opentelemetry type are used to determine the exception stack
@@ -354,7 +348,7 @@ private void lastLineRemainSendSchedule(String patternCode) {
354348
try {
355349
String remainMsg = mLog.takeRemainMsg2();
356350
if (null != remainMsg) {
357-
log.info("start send last line,pattern:{},patternCode:{},data:{}", pattern, patternCode, remainMsg);
351+
log.info("start send last line,pattern:{},patternCode:{},ip:{},data:{}", pattern, patternCode, getTailPodIp(pattern), remainMsg);
358352
wrapDataToSend(remainMsg, referenceEntry.getValue().getValue(), pattern, patternCode, getTailPodIp(pattern), appendTime);
359353
}
360354
} finally {
@@ -441,7 +435,7 @@ private void printMapToJson(Map<String, ChannelMemory.FileProgress> map, boolean
441435
}
442436

443437
if (!collectOnce && !snapshot.isEmpty()) {
444-
String jsonMap = gson.toJson(snapshot);
438+
String jsonMap = gson.toJson(snapshot.keySet());
445439
log.info("fileProgressMap: {}", jsonMap);
446440
}
447441
}
@@ -505,7 +499,7 @@ private void doExport(List<LineMessage> subList) {
505499

506500
@Override
507501
public void close() {
508-
log.info("Delete the current collection task,channelId:{}", getChannelId());
502+
log.info("Delete the current collection task,channelId:{},logPattern:{}", getChannelId(), getChannelDefine().getInput().getLogPattern());
509503
//1.Stop log capture
510504
for (Map.Entry<String, ILogFile> fileEntry : logFileMap.entrySet()) {
511505
fileEntry.getValue().setStop(true);
@@ -525,7 +519,7 @@ public void close() {
525519
for (Future future : futureMap.values()) {
526520
future.cancel(false);
527521
}
528-
log.info("stop file monitor,fileName:", logFileMap.keySet().stream().collect(Collectors.joining(SYMBOL_COMMA)));
522+
log.info("stop file monitor,fileName:{}", String.join(SYMBOL_COMMA, logFileMap.keySet()));
529523
lineMessageList.clear();
530524
reOpenMap.clear();
531525
fileReadMap.clear();
@@ -615,11 +609,6 @@ public List<MonitorFile> getMonitorPathList() {
615609
return monitorFileList;
616610
}
617611

618-
@Override
619-
public ChannelDefine getChannelDefine() {
620-
return channelDefine;
621-
}
622-
623612
public ChannelMemory getChannelMemory() {
624613
return channelMemory;
625614
}

ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ private String buildFileExpression(String logPattern) {
222222
.distinct()
223223
.toList();
224224
return expressions.size() == 1 ?
225-
expressions.get(0) :
225+
expressions.getFirst() :
226226
expressions.stream().collect(Collectors.joining("|", MULTI_FILE_PREFIX, MULTI_FILE_SUFFIX));
227227
}
228228

ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void watch(String filePattern, List<FileAlterationMonitor> monitorList, C
4242
FileAlterationMonitor monitor = new FileAlterationMonitor(10000);
4343
log.info("agent monitor files:{}", GSON.toJson(watchList));
4444
for (String watch : watchList) {
45-
FileAlterationObserver observer = new LogFileAlterationObserver(new File(watch));
45+
FileAlterationObserver observer = new LogFileAlterationObserver(new File(watch), File::exists);
4646
observer.addListener(new FileListener(consumer));
4747
log.info("## agent monitor file:{}, filePattern:{}", watch, filePattern);
4848
monitor.addObserver(observer);

ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
package org.apache.ozhera.log.agent.channel.file;
2020

2121
import com.google.common.collect.Lists;
22-
import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
23-
import org.apache.ozhera.log.agent.common.ChannelUtil;
2422
import lombok.extern.slf4j.Slf4j;
2523
import org.apache.commons.io.comparator.DefaultFileComparator;
2624
import org.apache.commons.io.comparator.NameFileComparator;
25+
import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
26+
import org.apache.ozhera.log.agent.common.ChannelUtil;
2727

2828
import java.io.File;
2929
import java.util.*;
@@ -49,33 +49,41 @@ public class InodeFileComparator extends DefaultFileComparator {
4949

5050
@Override
5151
public int compare(File file1, File file2) {
52+
// log.info("InodeFileComparator compare file1:{},file2:{},filePaths:{}", file1, file2, GSON.toJson(filePaths));
5253
int sort = fileComparator.compare(file1, file2);
53-
if (file1.isDirectory() || file2.isDirectory()) {
54-
return sort;
55-
}
56-
if (sort == 0 && filePaths.contains(file1.getAbsolutePath())) {
57-
//The file name is the same
58-
Long oldInode;
59-
if (INODE_MAP.containsKey(file1.getAbsolutePath())) {
60-
oldInode = INODE_MAP.get(file1.getAbsolutePath());
61-
} else {
62-
oldInode = ChannelUtil.buildUnixFileNode(file1.getAbsolutePath()).getSt_ino();
63-
INODE_MAP.put(file1.getAbsolutePath(), oldInode);
54+
try {
55+
if (file1.isDirectory() || file2.isDirectory()) {
56+
return sort;
6457
}
65-
ChannelMemory.UnixFileNode unixFileNode2 = ChannelUtil.buildUnixFileNode(file2.getAbsolutePath());
66-
if (!Objects.equals(oldInode, unixFileNode2.getSt_ino())) {
67-
INODE_MAP.put(file2.getAbsolutePath(), unixFileNode2.getSt_ino());
68-
return 1;
58+
if (sort == 0 && filePaths.contains(file1.getAbsolutePath())) {
59+
//The file name is the same
60+
// log.info("INODE_MAP:{}", GSON.toJson(INODE_MAP));
61+
Long oldInode;
62+
if (INODE_MAP.containsKey(file1.getAbsolutePath())) {
63+
oldInode = INODE_MAP.get(file1.getAbsolutePath());
64+
} else {
65+
oldInode = ChannelUtil.buildUnixFileNode(file1.getAbsolutePath()).getSt_ino();
66+
INODE_MAP.put(file1.getAbsolutePath(), oldInode);
67+
}
68+
ChannelMemory.UnixFileNode unixFileNode2 = ChannelUtil.buildUnixFileNode(file2.getAbsolutePath());
69+
if (!Objects.equals(oldInode, unixFileNode2.getSt_ino())) {
70+
INODE_MAP.put(file2.getAbsolutePath(), unixFileNode2.getSt_ino());
71+
return 1;
72+
}
6973
}
74+
} catch (Exception e) {
75+
log.error("InodeFileComparator compare error,file1:{},file2:{}", file1, file2, e);
7076
}
7177
return sort;
7278
}
7379

7480
public static void addFile(String filePath) {
81+
log.info("InodeFileComparator add file : {}", filePath);
7582
filePaths.add(filePath);
7683
}
7784

7885
public static void removeFile(String filePath) {
86+
log.info("InodeFileComparator remove file : {}", filePath);
7987
filePaths.remove(filePath);
8088
}
8189
}

0 commit comments

Comments
 (0)