huangzhiguo vor 1 Jahr
Ursprung
Commit
002710c03f

+ 14 - 0
caimei365-cloud-gateway/pom.xml

@@ -21,6 +21,20 @@
             <groupId>org.springframework.cloud</groupId>
             <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-openfeign</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <version>2.2.0</version>
+        </dependency>
     </dependencies>
 
 </project>

+ 9 - 1
caimei365-cloud-gateway/src/main/java/com/caimei365/cloud/config/WebConfiguration.java

@@ -1,5 +1,6 @@
 package com.caimei365.cloud.config;
 
+import com.caimei365.cloud.service.TouristService;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -43,7 +44,14 @@ public class WebConfiguration implements WebFluxConfigurer {
             }
             ServerHttpRequest mutatedServerHttpRequest = request.mutate().header("X-CLIENT-IP", clientIp).build();
             ServerWebExchange mutatedServerWebExchange = exchange.mutate().request(mutatedServerHttpRequest).build();
-            return chain.filter(mutatedServerWebExchange);
+            chain.filter(mutatedServerWebExchange);
+            // 游客编号
+            TouristService touristService = new TouristService();
+            String touristId = touristService.touristInfo();
+            return exchange.getSession().flatMap(session -> {
+                session.getAttributes().put("touristId","touristId");
+                return chain.filter(exchange);
+            });
         }
     }
 

+ 121 - 0
caimei365-cloud-gateway/src/main/java/com/caimei365/cloud/service/TouristService.java

@@ -0,0 +1,121 @@
+package com.caimei365.cloud.service;
+
+import com.alibaba.fastjson.JSON;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+
+import java.util.Random;
+
+/**
+ * Description
+ *
+ * @author : Charles
+ * @date : 2023/11/1
+ */
+@Slf4j
+@Component
+public class TouristService {
+
+    @Value("${rocketmq.producer.send-message-timeout}")
+    private Long timeout;
+
+    private RocketMQTemplate rocketMQTemplate;
+
+
+    public String touristInfo() {
+        String touristId = "";
+        Random random = new Random();
+        Integer tourist = random.nextInt(900000) + 100000;
+        touristId = tourist.toString();
+        log.info("touristId==="+touristId);
+        sendCommonMessage("Tourist", touristId, null, 1, 1, null, null);
+        return touristId;
+    }
+
+    public void sendCommonMessage(String topic, String content, String tag, Integer sort, Integer async, Integer oneway, Integer delay) {
+        String destination = topic;
+        if (StringUtils.isNotEmpty(tag)) {
+            destination += ":" + tag;
+        }
+        Message<String> message = MessageBuilder.withPayload(content).build();
+        boolean isAsync = (async != null && async == 1);
+        boolean isSort = (sort != null && sort == 1);
+        boolean isOneway = (oneway != null && oneway == 1);
+        boolean isDelay = (delay != null && delay > 0);
+        log.info("destination----"+destination+"****message****"+message);
+        if (!isAsync) {
+            // 同步消息
+            if (isDelay) {
+                // 延时
+                rocketMQTemplate.syncSend(destination, message, timeout, delay);
+            } else {
+                if (isSort && isOneway) {
+                    // 单向有序
+                    rocketMQTemplate.sendOneWayOrderly(destination, message, "hashkey");
+                } else if (isSort) {
+                    // 有序
+                    rocketMQTemplate.syncSendOrderly(destination, message, "hashkey");
+                } else if (isOneway) {
+                    // 单向
+                    rocketMQTemplate.sendOneWay(destination, message);
+                } else {
+                    rocketMQTemplate.syncSend(destination, message);
+                }
+            }
+        } else {
+            // 异步消息
+            if (isDelay) {
+
+                // 异步延时
+                rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        log.info("异步消息发送成功:{}", JSON.toJSONString(sendResult));
+                        //可以处理相应的业务
+                    }
+                    @Override
+                    public void onException(Throwable throwable) {
+                        //可以处理相应的业务
+                        log.info("异步消息发送失败:{}", JSON.toJSONString(throwable));
+                    }
+                }, timeout, delay);
+            } else {
+                if (isSort) {
+                    // 异步有序
+                    rocketMQTemplate.asyncSendOrderly(destination, message, "hashkey", new SendCallback() {
+                        @Override
+                        public void onSuccess(SendResult sendResult) {
+                            log.info("异步消息发送成功:{}", JSON.toJSONString(sendResult));
+                            //可以处理相应的业务
+                        }
+                        @Override
+                        public void onException(Throwable throwable) {
+                            //可以处理相应的业务
+                            log.info("异步消息发送失败:{}", JSON.toJSONString(throwable));
+                        }
+                    });
+                } else {
+                    rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
+                        @Override
+                        public void onSuccess(SendResult sendResult) {
+                            log.info("异步消息发送成功:{}", JSON.toJSONString(sendResult));
+                            //可以处理相应的业务
+                        }
+                        @Override
+                        public void onException(Throwable throwable) {
+                            //可以处理相应的业务
+                            log.info("异步消息发送失败:{}", JSON.toJSONString(throwable));
+                        }
+                    });
+                }
+            }
+        }
+    }
+}

+ 7 - 0
caimei365-cloud-gateway/src/main/resources/application.yml

@@ -48,3 +48,10 @@ logging:
     path: /mnt/newdatadrive/data/runtime/cloud-instance/cloud-gateway/logs
   level:
     root: info
+
+# rocketmq 配置项,对应 RocketMQProperties 配置类
+rocketmq:
+  name-server: 47.107.48.218:9876  # RocketMQ Namesrv
+  producer:
+    group: caimei_beta_group        # 生产者分组
+    send-message-timeout: 3000     # 发送消息超时时间,单位:毫秒。默认为 3000 。