Skip to content

Commit 9a30cc5

Browse files
committed
add rocketmq demo
1 parent a279778 commit 9a30cc5

File tree

9 files changed

+101
-23
lines changed

9 files changed

+101
-23
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
使用代码方式,实现camel的操作。

09mq/rocket/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
</properties>
2020

2121
<dependencies>
22-
<dependency>
23-
<groupId>org.springframework.boot</groupId>
24-
<artifactId>spring-boot-starter-web</artifactId>
25-
</dependency>
22+
<!-- <dependency>-->
23+
<!-- <groupId>org.springframework.boot</groupId>-->
24+
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
25+
<!-- </dependency>-->
2626

2727
<dependency>
2828
<groupId>org.apache.rocketmq</groupId>

09mq/rocket/src/main/java/io/kimmking/mq/rocket/DemoController.java

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.kimmking.mq.rocket;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Data;
5+
import lombok.NoArgsConstructor;
6+
7+
@Data
8+
@NoArgsConstructor
9+
@AllArgsConstructor
10+
public class Order {
11+
12+
private long id;
13+
private String symbol;
14+
private double price;
15+
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.kimmking.mq.rocket;
2+
3+
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
4+
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
5+
import org.springframework.stereotype.Component;
6+
7+
@Component
8+
@RocketMQMessageListener(consumerGroup = "test2", topic = "test-k2")
9+
public class OrderConsumerDemo implements RocketMQReplyListener<Order,String> {
10+
11+
@Override
12+
public String onMessage(Order order) {
13+
System.out.println(this.getClass().getName() + " -> " + order);
14+
return "Process&Return [" + order + "].";
15+
}
16+
}
Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,56 @@
11
package io.kimmking.mq.rocket;
22

3+
import org.apache.rocketmq.client.producer.SendCallback;
4+
import org.apache.rocketmq.client.producer.SendResult;
5+
import org.apache.rocketmq.spring.core.RocketMQTemplate;
6+
import org.springframework.boot.CommandLineRunner;
37
import org.springframework.boot.SpringApplication;
48
import org.springframework.boot.autoconfigure.SpringBootApplication;
9+
import org.springframework.messaging.MessageHeaders;
10+
import org.springframework.messaging.support.MessageBuilder;
11+
import org.springframework.util.MimeTypeUtils;
12+
13+
import javax.annotation.Resource;
14+
import java.math.BigDecimal;
15+
import java.util.List;
516

617
@SpringBootApplication
7-
public class RocketApplication {
18+
public class RocketApplication implements CommandLineRunner {
819

920
public static void main(String[] args) {
1021
SpringApplication.run(RocketApplication.class, args);
1122
}
1223

24+
@Resource
25+
private RocketMQTemplate rocketMQTemplate;
26+
27+
@Override
28+
public void run(String... args) throws Exception {
29+
30+
String topic = "test-k1";
31+
SendResult sendResult = rocketMQTemplate.syncSend(topic, "Hello, World!" + System.currentTimeMillis());
32+
System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
33+
34+
sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(
35+
new Order(System.currentTimeMillis(),"CNY2USD", 0.1501d))
36+
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
37+
System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
38+
39+
String topic1 = "test-k2";
40+
rocketMQTemplate.asyncSend(topic1, new Order(System.currentTimeMillis(),"CNY2USD", 0.1502d), new SendCallback() {
41+
@Override
42+
public void onSuccess(SendResult result) {
43+
System.out.printf("async onSucess SendResult=%s %n", result);
44+
}
45+
46+
@Override
47+
public void onException(Throwable throwable) {
48+
System.out.printf("async onException Throwable=%s %n", throwable);
49+
}
50+
51+
});
52+
53+
54+
}
55+
1356
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.kimmking.mq.rocket;
2+
3+
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
4+
import org.apache.rocketmq.spring.core.RocketMQListener;
5+
import org.springframework.stereotype.Component;
6+
7+
@Component
8+
@RocketMQMessageListener(consumerGroup = "test1", topic = "test-k1")
9+
public class StringConsumerDemo implements RocketMQListener<String> {
10+
11+
@Override
12+
public void onMessage(String message) {
13+
System.out.println(this.getClass().getName() + " -> " + message);
14+
}
15+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
2+
rocketmq.name-server=localhost:9876
3+
rocketmq.producer.group=my-group1
4+
rocketmq.producer.sendMessageTimeout=300000
5+

09mq/rocket/src/main/resources/application.yaml

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)