Browse Source

消息中间件API

chao 4 years ago
parent
commit
2f2ca83189

+ 7 - 1
pom.xml

@@ -98,7 +98,13 @@
             <groupId>redis.clients</groupId>
             <artifactId>jedis</artifactId>
         </dependency>
-
+        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
+        <!-- RocketMQ starter -->
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <version>2.2.0</version>
+        </dependency>
 
         <!-- knife4j:swagger增强-->
         <!-- https://doc.xiaominfo.com/knife4j/documentation/get_start.html -->

+ 60 - 0
src/main/java/com/caimei365/tools/controller/RocketMqApi.java

@@ -0,0 +1,60 @@
+package com.caimei365.tools.controller;
+
+import com.caimei365.tools.model.ResponseJson;
+import com.caimei365.tools.model.dto.MessageDto;
+import com.caimei365.tools.service.RocketMqService;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Description
+ *
+ * @author : Charles
+ * @date : 2021/6/17
+ */
+@Api(tags="MQ消息中间件API")
+@RestController
+@RequiredArgsConstructor
+@RequestMapping("/tools")
+public class RocketMqApi {
+
+    private final RocketMqService rocketMqService;
+
+    /**
+     * 发送消息
+     *
+     * @param messageDto {
+     *                    topic    消息主题
+     *                    content  消息内容
+     *                    tag      消息标签(可选)
+     *                    sort     有序消息(可选):1是,0否
+     *                    async    异步消息(可选):1是,0否
+     *                    oneway   单向消息(可选):1是,0否
+     *                    delay    延时消息等级(可选):1-18,0否,对应时间依次:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
+     *                  }
+     Posteturn SendResult
+     */
+    @ApiOperation("发送消息")
+    @PostMapping("/mq/send")
+    public ResponseJson<SendResult> sendCommonMessage(MessageDto messageDto) {
+        if (StringUtils.isEmpty(messageDto.getTopic())) {
+            return ResponseJson.error("消息主题不能为空!", null);
+        }
+        if (StringUtils.isEmpty(messageDto.getContent())) {
+            return ResponseJson.error("消息内容不能为空!", null);
+        }
+        SendResult sendResult = rocketMqService.sendCommonMessage(messageDto);
+        return ResponseJson.success(sendResult);
+    }
+
+
+}

+ 28 - 0
src/main/java/com/caimei365/tools/listener/TestMqListener.java

@@ -0,0 +1,28 @@
+package com.caimei365.tools.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * 测试消息监听
+ *
+ * @author : Charles
+ * @date : 2021/6/17
+ */
+@Slf4j
+@Component
+@RocketMQMessageListener(
+        topic = "test_topic",
+        // messageModel = MessageModel.BROADCASTING,//指定为广播消费
+        // consumeMode = ConsumeMode.ORDERLY, // 指定消费模式为顺序消费,消费的顺序也和发送顺序一致
+        // selectorType = SelectorType.TAG,// 如果我们的生产者指定了Tag,但是消费者的selectorExpression没有设置,即用默认的“*”,那么这个消费者也会消费到
+        // selectorExpression = "tag",     // 指定了tag后,发送的消息如果不带tag,将会消费不到
+        consumerGroup = "test_group")
+public class TestMqListener implements RocketMQListener<String> {
+    @Override
+    public void onMessage(String message) {
+        log.info("{}收到消息:{}", this.getClass().getSimpleName(), message);
+    }
+}

+ 58 - 0
src/main/java/com/caimei365/tools/model/dto/MessageDto.java

@@ -0,0 +1,58 @@
+package com.caimei365.tools.model.dto;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+import java.io.Serializable;
+
+/**
+ * Description
+ *
+ * @author : Charles
+ * @date : 2021/6/17
+ */
+@ApiModel("发送消息接收参数")
+@Data
+public class MessageDto implements Serializable {
+    private static final long serialVersionUID = 1L;
+    /**
+     * 消息主题
+     */
+    @NotNull
+    @ApiModelProperty("消息主题")
+    private String topic;
+    /**
+     * 消息内容
+     */
+    @NotNull
+    @ApiModelProperty("消息内容")
+    private String content;
+    /**
+     * 消息标签(可选)
+     */
+    @ApiModelProperty("消息标签(可选)")
+    private String tag;
+    /**
+     * 有序消息(可选)
+     */
+    @ApiModelProperty("有序消息(可选):1是,0否")
+    private Integer sort;
+    /**
+     * 异步消息(可选)
+     */
+    @ApiModelProperty("异步消息(可选):1是,0否")
+    private Integer async;
+    /**
+     * 单向消息(可选)
+     */
+    @ApiModelProperty("单向消息(可选):1是,0否")
+    private Integer oneway;
+    /**
+     * 延时消息(可选)
+     */
+    @ApiModelProperty("延时消息(可选),延时等级:1-18,0否,对应时间依次:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h")
+    private Integer delay;
+
+}

+ 2 - 0
src/main/java/com/caimei365/tools/model/po/LogisticsInfoPo.java

@@ -13,6 +13,8 @@ import java.util.Date;
  */
  @Data
 public class LogisticsInfoPo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
     private Long id;
     /** 订单ID */
     private Long orderId;

+ 28 - 0
src/main/java/com/caimei365/tools/service/RocketMqService.java

@@ -0,0 +1,28 @@
+package com.caimei365.tools.service;
+
+import com.caimei365.tools.model.dto.MessageDto;
+import org.apache.rocketmq.client.producer.SendResult;
+
+/**
+ * Description
+ *
+ * @author : Charles
+ * @date : 2021/6/17
+ */
+public interface RocketMqService {
+    /**
+     * 发送消息
+     *
+     * @param messageDto {
+     *                    topic    消息主题
+     *                    content  消息内容
+     *                    tag      消息标签(可选)
+     *                    sort     有序消息(可选):1是,0否
+     *                    async    异步消息(可选):1是,0否
+     *                    oneway   单向消息(可选):1是,0否
+     *                    delay    延时消息等级(可选):1-18,0否,对应时间依次:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
+     *                  }
+     * @return SendResult
+     */
+    SendResult sendCommonMessage(MessageDto messageDto);
+}

+ 124 - 0
src/main/java/com/caimei365/tools/service/impl/RocketMqServiceImpl.java

@@ -0,0 +1,124 @@
+package com.caimei365.tools.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.caimei365.tools.model.dto.MessageDto;
+import com.caimei365.tools.service.RocketMqService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQ消息中间件实现类
+ *
+ * @author : Charles
+ * @date : 2021/6/17
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class RocketMqServiceImpl implements RocketMqService {
+
+    @Value("${rocketmq.producer.send-message-timeout}")
+    private Long timeout;
+
+    private final RocketMQTemplate rocketMQTemplate;
+
+    /**
+     * 发送消息
+     *
+     * @param messageDto {
+     *                   topic    消息主题
+     *                   content  消息内容
+     *                   tag      消息标签(可选)
+     *                   sort     有序消息(可选):1是,0否
+     *                   async    异步消息(可选):1是,0否
+     *                   oneway   单向消息(可选):1是,0否
+     *                   delay    延时消息等级(可选):1-18,0否,对应时间依次:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
+     *                   }
+     * @return SendResult
+     */
+    @Override
+    public SendResult sendCommonMessage(MessageDto messageDto) {
+        String destination = messageDto.getTopic();
+        if (StringUtils.isNotEmpty(messageDto.getTag())) {
+            destination += ":" + messageDto.getTag();
+        }
+        Message<String> message = MessageBuilder.withPayload(messageDto.getContent()).build();
+        SendResult returnResult = null;
+        boolean isAsync = (messageDto.getAsync() != null && messageDto.getAsync() == 1);
+        boolean isSort = (messageDto.getSort() != null && messageDto.getSort() == 1);
+        boolean isOneway = (messageDto.getOneway() != null && messageDto.getOneway() == 1);
+        boolean isDelay = (messageDto.getDelay() != null && messageDto.getDelay() > 0);
+        if (!isAsync) {
+            // 同步消息
+            if (isDelay) {
+                // 延时
+                returnResult = rocketMQTemplate.syncSend(destination, message, timeout, messageDto.getDelay());
+            } else {
+                if (isSort && isOneway) {
+                    // 单向有序
+                    rocketMQTemplate.sendOneWayOrderly(destination, message, "hashkey");
+                } else if (isSort) {
+                    // 有序
+                    returnResult = rocketMQTemplate.syncSendOrderly(destination, message, "hashkey");
+                } else if (isOneway) {
+                    // 单向
+                    rocketMQTemplate.sendOneWay(destination, message);
+                } else {
+                    returnResult = rocketMQTemplate.syncSend(destination, message);
+                }
+            }
+        } else {
+            // 异步消息
+            if (isDelay) {
+                // 异步延时
+                rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        log.info("异步消息发送成功:{}", JSON.toJSONString(sendResult));
+                        //可以处理相应的业务
+                    }
+                    @Override
+                    public void onException(Throwable throwable) {
+                        //可以处理相应的业务
+                    }
+                }, timeout, messageDto.getDelay());
+            } else {
+                if (isSort) {
+                    // 异步有序
+                    rocketMQTemplate.asyncSendOrderly(destination, message, "hashkey", new SendCallback() {
+                        @Override
+                        public void onSuccess(SendResult sendResult) {
+                            log.info("异步消息发送成功:{}", JSON.toJSONString(sendResult));
+                            //可以处理相应的业务
+                        }
+                        @Override
+                        public void onException(Throwable throwable) {
+                            //可以处理相应的业务
+                        }
+                    });
+                } else {
+                    rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
+                        @Override
+                        public void onSuccess(SendResult sendResult) {
+                            log.info("异步消息发送成功:{}", JSON.toJSONString(sendResult));
+                            //可以处理相应的业务
+                        }
+                        @Override
+                        public void onException(Throwable throwable) {
+                            //可以处理相应的业务
+                        }
+                    });
+                }
+            }
+        }
+        return returnResult;
+    }
+}