Skip to content

Commit d455b0e

Browse files
authored
Merge pull request cubefs#205 from aib628/main
fix: adapt hadoop json/xml response content format and add value_variable properties support
2 parents 836a3af + f065d11 commit d455b0e

File tree

4 files changed

+181
-8
lines changed

4 files changed

+181
-8
lines changed

task-flink/src/main/java/com/oppo/cloud/flink/domain/Properties.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,78 @@
1616

1717
package com.oppo.cloud.flink.domain;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.regex.Matcher;
22+
import java.util.regex.Pattern;
1923
import lombok.Data;
2024

2125
@Data
2226
public class Properties {
2327

2428
private String key;
2529
private String value;
30+
31+
public String getFinalValue(List<Properties> properties) {
32+
String variableName = findVariableName(value);
33+
if (variableName == null) {
34+
return value;
35+
}
36+
37+
for (Properties property : properties) {
38+
if (!property.key.equals(this.key)) {
39+
continue;
40+
}
41+
42+
String variableValue = property.getFinalValue(properties);
43+
return value.replace("${" + variableName + "}", variableValue);
44+
}
45+
46+
return value;
47+
}
48+
49+
private String findVariableName(String originalValue) {
50+
Pattern VAR_PATTERN = Pattern.compile("\\$\\{(\\w+)}"); //变量格式${var_name}
51+
Matcher matcher = VAR_PATTERN.matcher(originalValue);
52+
if (matcher.find()) {
53+
return matcher.group(1);
54+
}
55+
56+
return null;
57+
}
58+
59+
public static void main(String[] args) {
60+
List<Properties> properties = new ArrayList<>();
61+
Properties prop_1 = new Properties();
62+
prop_1.key = "yarn.timeline-service.webapp.address";
63+
prop_1.value = "${yarn.timeline-service.hostname}:8188";
64+
65+
Properties prop_2 = new Properties();
66+
prop_2.key = "dfs.webhdfs.user.provider.user.pattern";
67+
prop_2.value = "^[A-Za-z_][A-Za-z0-9._-]*[$]?$";
68+
69+
Properties prop_3 = new Properties();
70+
prop_3.key = "yarn.scheduler.configuration.fs.path";
71+
prop_3.value = "file://${hadoop.tmp.dir}/yarn/system/schedconf";
72+
73+
Properties prop_4 = new Properties();
74+
prop_4.key = "yarn.timeline-service.reader.webapp.address";
75+
prop_4.value = "${yarn.timeline-service.webapp.address}";
76+
77+
Properties prop_5 = new Properties();
78+
prop_5.key = "yarn.timeline-service.hostname";
79+
prop_5.value = "127.0.0.1";
80+
81+
properties.add(prop_1);
82+
properties.add(prop_2);
83+
properties.add(prop_3);
84+
properties.add(prop_4);
85+
properties.add(prop_5);
86+
87+
assert "127.0.0.1:8188".equals(prop_1.getFinalValue(properties));
88+
assert "127.0.0.1:8188".equals(prop_4.getFinalValue(properties));
89+
assert "file://${hadoop.tmp.dir}/yarn/system/schedconf".equals(prop_3.getFinalValue(properties));
90+
assert "^[A-Za-z_][A-Za-z0-9._-]*[$]?$".equals(prop_2.getFinalValue(properties));
91+
assert "^127.0.0.1".equals(prop_5.getFinalValue(properties));
92+
}
2693
}

task-flink/src/main/java/com/oppo/cloud/flink/service/impl/ClusterConfigServiceImpl.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.alibaba.fastjson2.JSON;
2020
import com.oppo.cloud.common.constant.Constant;
2121
import com.oppo.cloud.common.domain.cluster.hadoop.YarnConf;
22-
import com.oppo.cloud.common.domain.cluster.yarn.ClusterInfo;
2322
import com.oppo.cloud.common.service.RedisService;
2423
import com.oppo.cloud.common.util.YarnUtil;
2524
import com.oppo.cloud.flink.config.HadoopConfig;
@@ -29,11 +28,19 @@
2928
import com.oppo.cloud.flink.service.IClusterConfigService;
3029
import lombok.extern.slf4j.Slf4j;
3130
import org.apache.commons.lang3.StringUtils;
31+
import org.springframework.http.HttpHeaders;
3232
import org.springframework.http.ResponseEntity;
3333
import org.springframework.stereotype.Service;
3434
import org.springframework.web.client.RestTemplate;
35+
import org.w3c.dom.Document;
36+
import org.w3c.dom.Element;
37+
import org.w3c.dom.NodeList;
3538

3639
import javax.annotation.Resource;
40+
import javax.xml.parsers.DocumentBuilder;
41+
import javax.xml.parsers.DocumentBuilderFactory;
42+
import java.io.ByteArrayInputStream;
43+
import java.util.ArrayList;
3744
import java.util.HashMap;
3845
import java.util.List;
3946
import java.util.Map;
@@ -154,10 +161,23 @@ public YarnPathInfo getYarnPathInfo(String ip) {
154161
log.error("getHDFSPathErr:{}", url);
155162
return null;
156163
}
164+
List<String> contentType = responseEntity.getHeaders().get(HttpHeaders.CONTENT_TYPE);
165+
if (contentType == null) {
166+
log.error("getContentTypeErr:{}", url);
167+
return null;
168+
}
169+
170+
YarnConfProperties yarnConfProperties = new YarnConfProperties();
157171

158-
YarnConfProperties yarnConfProperties = null;
159172
try {
160-
yarnConfProperties = JSON.parseObject(responseEntity.getBody(), YarnConfProperties.class);
173+
if (contentType.toString().contains("json")) {
174+
yarnConfProperties = JSON.parseObject(responseEntity.getBody(), YarnConfProperties.class);
175+
} else if (contentType.toString().contains("xml")) {
176+
parseXML(responseEntity.getBody(), yarnConfProperties);
177+
} else {
178+
log.error("unsupported type: {},{}", url, contentType);
179+
return null;
180+
}
161181
} catch (Exception e) {
162182
log.error("Exception:", e);
163183
return null;
@@ -172,7 +192,7 @@ public YarnPathInfo getYarnPathInfo(String ip) {
172192
if (yarnConfProperties != null && yarnConfProperties.getProperties() != null) {
173193
for (Properties properties : yarnConfProperties.getProperties()) {
174194
String key = properties.getKey();
175-
String value = properties.getValue();
195+
String value = properties.getFinalValue(yarnConfProperties.getProperties());
176196
if (YARN_REMOTE_APP_LOG_DIR.equals(key)) {
177197
log.info("yarnConfProperties key: {}, value: {}", YARN_REMOTE_APP_LOG_DIR, value);
178198
remoteDir = value;
@@ -189,7 +209,7 @@ public YarnPathInfo getYarnPathInfo(String ip) {
189209
log.info("yarnConfProperties key: {}, value: {}", MARREDUCE_INTERMEDIATE_DONE_DIR, value);
190210
mapreduceIntermediateDoneDir = value;
191211
}
192-
if(YARN_MAPREDUCE_STAGING_DIR.equals(key)){
212+
if (YARN_MAPREDUCE_STAGING_DIR.equals(key)) {
193213
log.info("yarnConfProperties key: {}, value: {}", YARN_MAPREDUCE_STAGING_DIR, value);
194214
mapreduceStagingDir = value;
195215
}
@@ -230,5 +250,26 @@ public YarnPathInfo getYarnPathInfo(String ip) {
230250
return yarnPathInfo;
231251
}
232252

253+
/**
254+
* Parse xml format
255+
*/
256+
public void parseXML(String xml, YarnConfProperties yarnConfProperties) throws Exception {
257+
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
258+
DocumentBuilder builder = factory.newDocumentBuilder();
259+
Document document = builder.parse(new ByteArrayInputStream(xml.getBytes()));
260+
Element root = document.getDocumentElement();
261+
NodeList nodeList = root.getElementsByTagName("property");
262+
List<Properties> properties = new ArrayList<>();
263+
for (int i = 0; i < nodeList.getLength(); i++) {
264+
Element element = (Element) nodeList.item(i);
265+
String name = element.getElementsByTagName("name").item(0).getTextContent();
266+
String value = element.getElementsByTagName("value").item(0).getTextContent();
267+
Properties property = new Properties();
268+
property.setKey(name);
269+
property.setValue(value);
270+
properties.add(property);
271+
}
272+
yarnConfProperties.setProperties(properties);
273+
}
233274

234275
}

task-metadata/src/main/java/com/oppo/cloud/meta/domain/Properties.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,78 @@
1616

1717
package com.oppo.cloud.meta.domain;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.regex.Matcher;
22+
import java.util.regex.Pattern;
1923
import lombok.Data;
2024

2125
@Data
2226
public class Properties {
2327

2428
private String key;
2529
private String value;
30+
31+
public String getFinalValue(List<Properties> properties) {
32+
String variableName = findVariableName(value);
33+
if (variableName == null) {
34+
return value;
35+
}
36+
37+
for (Properties property : properties) {
38+
if (!property.key.equals(this.key)) {
39+
continue;
40+
}
41+
42+
String variableValue = property.getFinalValue(properties);
43+
return value.replace("${" + variableName + "}", variableValue);
44+
}
45+
46+
return value;
47+
}
48+
49+
private String findVariableName(String originalValue) {
50+
Pattern VAR_PATTERN = Pattern.compile("\\$\\{(\\w+)}"); //变量格式${var_name}
51+
Matcher matcher = VAR_PATTERN.matcher(originalValue);
52+
if (matcher.find()) {
53+
return matcher.group(1);
54+
}
55+
56+
return null;
57+
}
58+
59+
public static void main(String[] args) {
60+
List<Properties> properties = new ArrayList<>();
61+
Properties prop_1 = new Properties();
62+
prop_1.key = "yarn.timeline-service.webapp.address";
63+
prop_1.value = "${yarn.timeline-service.hostname}:8188";
64+
65+
Properties prop_2 = new Properties();
66+
prop_2.key = "dfs.webhdfs.user.provider.user.pattern";
67+
prop_2.value = "^[A-Za-z_][A-Za-z0-9._-]*[$]?$";
68+
69+
Properties prop_3 = new Properties();
70+
prop_3.key = "yarn.scheduler.configuration.fs.path";
71+
prop_3.value = "file://${hadoop.tmp.dir}/yarn/system/schedconf";
72+
73+
Properties prop_4 = new Properties();
74+
prop_4.key = "yarn.timeline-service.reader.webapp.address";
75+
prop_4.value = "${yarn.timeline-service.webapp.address}";
76+
77+
Properties prop_5 = new Properties();
78+
prop_5.key = "yarn.timeline-service.hostname";
79+
prop_5.value = "127.0.0.1";
80+
81+
properties.add(prop_1);
82+
properties.add(prop_2);
83+
properties.add(prop_3);
84+
properties.add(prop_4);
85+
properties.add(prop_5);
86+
87+
assert "127.0.0.1:8188".equals(prop_1.getFinalValue(properties));
88+
assert "127.0.0.1:8188".equals(prop_4.getFinalValue(properties));
89+
assert "file://${hadoop.tmp.dir}/yarn/system/schedconf".equals(prop_3.getFinalValue(properties));
90+
assert "^[A-Za-z_][A-Za-z0-9._-]*[$]?$".equals(prop_2.getFinalValue(properties));
91+
assert "^127.0.0.1".equals(prop_5.getFinalValue(properties));
92+
}
2693
}

task-metadata/src/main/java/com/oppo/cloud/meta/service/impl/ClusterConfigServiceImpl.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import lombok.extern.slf4j.Slf4j;
3131
import org.apache.commons.lang3.StringUtils;
3232
import org.springframework.http.HttpHeaders;
33-
import org.springframework.http.MediaType;
3433
import org.springframework.http.ResponseEntity;
3534
import org.springframework.stereotype.Service;
3635
import org.springframework.web.client.RestTemplate;
@@ -41,7 +40,6 @@
4140
import javax.annotation.Resource;
4241
import javax.xml.parsers.DocumentBuilder;
4342
import javax.xml.parsers.DocumentBuilderFactory;
44-
import javax.xml.parsers.ParserConfigurationException;
4543
import java.io.ByteArrayInputStream;
4644
import java.util.ArrayList;
4745
import java.util.HashMap;
@@ -200,7 +198,7 @@ public YarnPathInfo getYarnPathInfo(String ip) {
200198
if (yarnConfProperties != null && yarnConfProperties.getProperties() != null) {
201199
for (Properties properties : yarnConfProperties.getProperties()) {
202200
String key = properties.getKey();
203-
String value = properties.getValue();
201+
String value = properties.getFinalValue(yarnConfProperties.getProperties());
204202
if (YARN_REMOTE_APP_LOG_DIR.equals(key)) {
205203
log.info("yarnConfProperties key: {}, value: {}", YARN_REMOTE_APP_LOG_DIR, value);
206204
remoteDir = value;

0 commit comments

Comments
 (0)