29 changed files with 1005 additions and 54 deletions
@ -0,0 +1,37 @@ |
|||
package cc.hiver.config; |
|||
|
|||
import org.springframework.amqp.core.Binding; |
|||
import org.springframework.amqp.core.BindingBuilder; |
|||
import org.springframework.amqp.core.DirectExchange; |
|||
import org.springframework.amqp.core.Queue; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
/** |
|||
* RabbitMQ 配置类 |
|||
*/ |
|||
@Configuration |
|||
public class RabbitMqConfig { |
|||
|
|||
public static final String EXCHANGE_NAME = "hiver.direct.exchange"; |
|||
public static final String QUEUE_NAME = "hiver.demo.queue"; |
|||
public static final String ROUTING_KEY = "hiver.demo.routing.key"; |
|||
|
|||
// 1. 定义Direct交换机
|
|||
@Bean |
|||
public DirectExchange directExchange() { |
|||
return new DirectExchange(EXCHANGE_NAME, true, false); |
|||
} |
|||
|
|||
// 2. 定义队列 (durable=true 持久化)
|
|||
@Bean |
|||
public Queue demoQueue() { |
|||
return new Queue(QUEUE_NAME, true); |
|||
} |
|||
|
|||
// 3. 将队列绑定到交换机
|
|||
@Bean |
|||
public Binding bindingDemoQueue() { |
|||
return BindingBuilder.bind(demoQueue()).to(directExchange()).with(ROUTING_KEY); |
|||
} |
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
package cc.hiver.rabbitmq; |
|||
|
|||
import cc.hiver.config.RabbitMqConfig; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.amqp.rabbit.annotation.RabbitListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* RabbitMQ 消费者示例 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class RabbitMqConsumer { |
|||
|
|||
/** |
|||
* 监听指定的队列 |
|||
* @param message 消息内容 |
|||
*/ |
|||
@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME) |
|||
public void receiveMessage(String message) { |
|||
log.info("【RabbitMQ消费消息】收到来自队列 {} 的消息: {}", RabbitMqConfig.QUEUE_NAME, message); |
|||
// 此处编写业务处理逻辑
|
|||
} |
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
package cc.hiver.rabbitmq; |
|||
|
|||
import cc.hiver.config.RabbitMqConfig; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* RabbitMQ 生产者示例 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class RabbitMqProducer { |
|||
|
|||
@Autowired |
|||
private RabbitTemplate rabbitTemplate; |
|||
|
|||
/** |
|||
* 发送消息 |
|||
* @param message 消息内容 |
|||
*/ |
|||
public void sendMessage(String message) { |
|||
log.info("【RabbitMQ生产消息】试图发送消息: {}", message); |
|||
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY, message); |
|||
log.info("【RabbitMQ生产消息】发送成功"); |
|||
} |
|||
} |
|||
@ -0,0 +1,157 @@ |
|||
package cc.hiver.mall.mq; |
|||
|
|||
import cc.hiver.core.entity.User; |
|||
import cc.hiver.core.entity.Worker; |
|||
import cc.hiver.core.service.UserService; |
|||
import cc.hiver.core.service.WorkerService; |
|||
import cc.hiver.mall.dao.mapper.MallDeliveryOrderMapper; |
|||
import cc.hiver.mall.dao.mapper.MallOrderMapper; |
|||
import cc.hiver.mall.entity.MallDeliveryOrder; |
|||
import cc.hiver.mall.pojo.dto.CreateOrderDTO; |
|||
import cc.hiver.mall.serviceimpl.mybatis.MallOrderServiceImpl; |
|||
import cc.hiver.mall.utils.WechatSendMessageUtil; |
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONArray; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import me.chanjar.weixin.mp.bean.template.WxMpTemplateData; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.springframework.amqp.rabbit.annotation.RabbitListener; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.text.SimpleDateFormat; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 处理非核心、响应时间不敏感的常规异步队列。 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class OrderAsyncConsumer { |
|||
|
|||
@Autowired |
|||
private MallOrderServiceImpl mallOrderService; |
|||
|
|||
@Autowired |
|||
private UserService userService; |
|||
|
|||
@Autowired |
|||
private MallDeliveryOrderMapper mallDeliveryOrderMapper; |
|||
|
|||
@Autowired |
|||
private MallOrderMapper mallOrderMapper; |
|||
|
|||
@Autowired |
|||
private WorkerService workerService; |
|||
|
|||
@Autowired |
|||
private WechatSendMessageUtil wechatSendMessageUtil; |
|||
|
|||
// 微信模板配置
|
|||
private static final String TEMPLATE_ID = "K15zpZSHBNivouTfpW5FK1XFz8GbYAK8b9dXXOP_Ka0"; |
|||
|
|||
@RabbitListener(queues = OrderQueueConfig.ASYNC_WORKER_QUEUE) |
|||
public void handleWechatNotify(String message) { |
|||
log.info("【异步队列处理】微信模版推送: {}", message); |
|||
try { |
|||
JSONObject msg = JSON.parseObject(message); |
|||
String orderId = msg.getString("orderId"); |
|||
|
|||
// 查询配送单
|
|||
LambdaQueryWrapper<MallDeliveryOrder> qw = new LambdaQueryWrapper<>(); |
|||
qw.eq(MallDeliveryOrder::getOrderId, orderId); |
|||
MallDeliveryOrder delivery = mallDeliveryOrderMapper.selectOne(qw); |
|||
if (delivery == null) { |
|||
return; |
|||
} |
|||
|
|||
List<String> targetOpenIds = new ArrayList<>(); |
|||
String orderTypeStr = "抢单大厅单"; |
|||
|
|||
if (StringUtils.isNotBlank(delivery.getWorkerId())) { |
|||
orderTypeStr = "指定接单"; |
|||
// 获取指定骑手的关联 userId 和 openid
|
|||
String openId = getOfficialAccountOpenidByWorkerId(delivery.getWorkerId()); |
|||
if (StringUtils.isNotBlank(openId)) { |
|||
targetOpenIds.add(openId); |
|||
} |
|||
} else { |
|||
// 如果没有指定人,发给所有人?业务需求:给所有配送员发微信公众号消息
|
|||
// 这可能需要查出所有 Worker 关联的 user.officialAccountOpenid (实际应用中可能需要加状态过滤)
|
|||
// 为简化并实现逻辑,这里只查有效的骑手 user
|
|||
targetOpenIds = getAllWorkerOpenIds(); |
|||
} |
|||
|
|||
if (targetOpenIds.isEmpty()) { |
|||
log.info("【异步队列处理】没有找到可推送微信模板的openid,目标跳过推送。orderId={}", orderId); |
|||
return; |
|||
} |
|||
|
|||
// 组装模板参数 (参照要求配置)
|
|||
List<WxMpTemplateData> data = new ArrayList<>(); |
|||
data.add(new WxMpTemplateData("thing3", orderTypeStr)); |
|||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|||
String timeFormat = delivery.getCreateTime() != null ? sdf.format(delivery.getCreateTime()) : sdf.format(new java.util.Date()); |
|||
data.add(new WxMpTemplateData("time4", timeFormat)); |
|||
|
|||
String address = StringUtils.defaultString(delivery.getShopName()) + " - " + StringUtils.defaultString(delivery.getReceiverAddress()); |
|||
// 模板参数有长度限制(通常20字),做简略截断以防发送失败
|
|||
if (address.length() > 20) { |
|||
address = address.substring(0, 17) + "..."; |
|||
} |
|||
data.add(new WxMpTemplateData("thing10", address)); |
|||
|
|||
// 查询或者组装此单据的物品数量,delivery可能没有存数量,这里可简化显示。
|
|||
// 由于MallDeliveryOrder中可能没存总量,可通过orderId反查Goods(篇幅及耗时原因,也可以在发送消息时传进来)
|
|||
// 查出 MallDeliveryOrder 自带或者去 MallOrderGood
|
|||
data.add(new WxMpTemplateData("character_string15", "1")); // 也可以去 mall_order_goods 查数量
|
|||
|
|||
data.add(new WxMpTemplateData("phrase8", "待接单")); |
|||
|
|||
wechatSendMessageUtil.sendWechatTempMessage(targetOpenIds, TEMPLATE_ID, data, "pages/index/index"); |
|||
log.info("【异步队列处理】微信模版推送成功,推送总人数:{}", targetOpenIds.size()); |
|||
} catch (Exception e) { |
|||
log.error("【异步队列处理】微信推送异常(不影响主链路): {}", e.getMessage(), e); |
|||
} |
|||
} |
|||
|
|||
|
|||
private String getOfficialAccountOpenidByWorkerId(String workerId) { |
|||
Worker worker = workerService.findByWorkerId(workerId); |
|||
if (worker != null && StringUtils.isNotBlank(worker.getUserId())) { |
|||
User user = userService.findById(worker.getUserId()); |
|||
if (user != null) { |
|||
return user.getOfficialAccountOpenid(); |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
private List<String> getAllWorkerOpenIds() { |
|||
return mallOrderMapper.getWeChatId(); |
|||
} |
|||
|
|||
@RabbitListener(queues = OrderQueueConfig.ASYNC_CACHE_QUEUE) |
|||
public void handleCacheUpdate(String message) { |
|||
log.info("【异步队列处理】刷新订单附属缓存: {}", message); |
|||
try { |
|||
JSONObject body = JSON.parseObject(message); |
|||
String shopId = body.getString("shopId"); |
|||
String regionId = body.getString("regionId"); |
|||
Integer shopSaleCount = body.getInteger("shopSaleCount"); |
|||
JSONArray itemsArray = body.getJSONArray("items"); |
|||
// 将JSON结构重建后,委托回原有 Service 逻辑执行
|
|||
List<CreateOrderDTO.OrderItemDTO> items = new ArrayList<>(); |
|||
for (int i = 0; i < itemsArray.size(); i++) { |
|||
items.add(itemsArray.getJSONObject(i).toJavaObject(CreateOrderDTO.OrderItemDTO.class)); |
|||
} |
|||
// productMap 重建,为避免过度依赖对象序列化,可以直接在 redis 增量方法里面反查
|
|||
mallOrderService.updateSaleCacheIncrementalViaMq(shopId, regionId, items, shopSaleCount); |
|||
} catch (Exception e) { |
|||
log.error("【异步队列处理】缓存更新失败: {}", e.getMessage(), e); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,66 @@ |
|||
package cc.hiver.mall.mq; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.amqp.AmqpException; |
|||
import org.springframework.amqp.core.Message; |
|||
import org.springframework.amqp.core.MessagePostProcessor; |
|||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
|
|||
/** |
|||
* 订单相关的MQ异步消息发送者 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class OrderAsyncProducer { |
|||
|
|||
@Autowired |
|||
private RabbitTemplate rabbitTemplate; |
|||
|
|||
/** |
|||
* 发送微信模板推送请求 (包含参数,接收人标识等) |
|||
*/ |
|||
public void sendWechatNotify(String orderId) { |
|||
Map<String, Object> msg = new HashMap<>(); |
|||
msg.put("orderId", orderId); |
|||
rabbitTemplate.convertAndSend(OrderQueueConfig.ORDER_DIRECT_EXCHANGE, OrderQueueConfig.ASYNC_WORKER_ROUTING, JSON.toJSONString(msg)); |
|||
log.info("【订单MQ】已发出微信通知异步处理委托, orderId={}", orderId); |
|||
} |
|||
|
|||
/** |
|||
* 发送异步缓存更新和非核心数据统计任务 |
|||
*/ |
|||
public void sendCacheUpdate(Map<String, Object> params) { |
|||
rabbitTemplate.convertAndSend(OrderQueueConfig.ORDER_DIRECT_EXCHANGE, OrderQueueConfig.ASYNC_CACHE_ROUTING, JSON.toJSONString(params)); |
|||
} |
|||
|
|||
/** |
|||
* 发送延迟(死信)超时消息 |
|||
* @param orderId 订单ID |
|||
* @param delayType 延迟监听类型(如:Worker_Timeout_5m, Worker_Timeout_10m, Shop_Cook_Timeout) |
|||
* @param delayMillis 需要延迟的毫秒数 |
|||
*/ |
|||
public void sendDelayMessage(String orderId, String delayType, long delayMillis) { |
|||
Map<String, Object> msgBody = new HashMap<>(); |
|||
msgBody.put("orderId", orderId); |
|||
msgBody.put("delayType", delayType); |
|||
msgBody.put("timestamp", System.currentTimeMillis()); |
|||
|
|||
rabbitTemplate.convertAndSend(OrderQueueConfig.DELAY_EXCHANGE, OrderQueueConfig.DELAY_ROUTING, JSON.toJSONString(msgBody), |
|||
new MessagePostProcessor() { |
|||
@Override |
|||
public Message postProcessMessage(Message message) throws AmqpException { |
|||
// 设置每条消息的 TTL (存活时间)
|
|||
message.getMessageProperties().setExpiration(String.valueOf(delayMillis)); |
|||
return message; |
|||
} |
|||
}); |
|||
|
|||
log.info("【订单MQ】已发送一条{}延时校验消息,设定在{}ms后触发,orderId={}", delayType, delayMillis, orderId); |
|||
} |
|||
} |
|||
@ -0,0 +1,135 @@ |
|||
package cc.hiver.mall.mq; |
|||
|
|||
import cc.hiver.core.common.constant.SettingConstant; |
|||
import cc.hiver.core.common.sms.SmsUtil; |
|||
import cc.hiver.core.serviceimpl.JPushServiceImpl; |
|||
import cc.hiver.mall.dao.mapper.MallDeliveryOrderMapper; |
|||
import cc.hiver.mall.dao.mapper.MallOrderMapper; |
|||
import cc.hiver.mall.dao.mapper.ShopMapper; |
|||
import cc.hiver.mall.entity.MallDeliveryOrder; |
|||
import cc.hiver.mall.entity.MallOrder; |
|||
import cc.hiver.mall.entity.Shop; |
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
|||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.springframework.amqp.rabbit.annotation.RabbitListener; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* 订单超时控制死信队列消费者 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class OrderDelayConsumer { |
|||
|
|||
@Autowired |
|||
private MallDeliveryOrderMapper mallDeliveryOrderMapper; |
|||
|
|||
@Autowired |
|||
private MallOrderMapper mallOrderMapper; |
|||
|
|||
@Autowired |
|||
private ShopMapper shopMapper; |
|||
|
|||
@Autowired |
|||
private SmsUtil smsUtil; |
|||
|
|||
@Autowired |
|||
private JPushServiceImpl jPushService; |
|||
|
|||
@RabbitListener(queues = OrderQueueConfig.DEAD_QUEUE) |
|||
public void processDelayMessage(String messageBody) { |
|||
log.info("【订单延时处理】接收到死信超时消息: {}", messageBody); |
|||
try { |
|||
JSONObject msg = JSON.parseObject(messageBody); |
|||
String orderId = msg.getString("orderId"); |
|||
String delayType = msg.getString("delayType"); |
|||
|
|||
switch (delayType) { |
|||
case "Worker_Timeout_5m": |
|||
handleWorkerTimeout5m(orderId); |
|||
break; |
|||
case "Worker_Timeout_10m": |
|||
handleWorkerTimeout10m(orderId); |
|||
break; |
|||
case "NoWorker_Timeout_10m": |
|||
handleNoWorkerTimeout10m(orderId); |
|||
break; |
|||
case "Shop_Cook_Timeout": |
|||
handleShopCookTimeout(orderId); |
|||
break; |
|||
default: |
|||
log.warn("【订单延时处理】未知的延迟类型: {}", delayType); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("【订单延时处理】处理超时消息异常: {}", e.getMessage(), e); |
|||
} |
|||
} |
|||
|
|||
private void handleWorkerTimeout5m(String orderId) { |
|||
MallDeliveryOrder delivery = getDeliveryOrder(orderId); |
|||
if (delivery == null || delivery.getStatus() != 0 || StringUtils.isBlank(delivery.getWorkerId())) { |
|||
// 已被处理、状态不再是0待接单,或者已经不是指定单,安全撤销(忽略)
|
|||
return; |
|||
} |
|||
log.info("【订单延时处理】指定配送单 5分钟未接单,向配送员发送短信通知 orderId={}", orderId); |
|||
// 发送 SMS_TIMEOUT
|
|||
if (StringUtils.isNotBlank(delivery.getWorkerPhone())) { |
|||
smsUtil.sendCode(delivery.getWorkerPhone(), null,SettingConstant.SMS_TYPE.SMS_TIMEOUT.name()); |
|||
} |
|||
} |
|||
|
|||
private void handleWorkerTimeout10m(String orderId) { |
|||
MallDeliveryOrder delivery = getDeliveryOrder(orderId); |
|||
if (delivery == null || delivery.getStatus() != 0 || StringUtils.isBlank(delivery.getWorkerId())) { |
|||
return; |
|||
} |
|||
log.info("【订单延时处理】指定配送单 10分钟未接单,移除配送员并通知用户 orderId={}", orderId); |
|||
// 更新配送单(去掉workerId,退回大厅)
|
|||
LambdaUpdateWrapper<MallDeliveryOrder> uw = new LambdaUpdateWrapper<>(); |
|||
uw.eq(MallDeliveryOrder::getId, delivery.getId()) |
|||
.set(MallDeliveryOrder::getWorkerId, (String)null).set(MallDeliveryOrder::getWorkerName, (String)null) |
|||
.set(MallDeliveryOrder::getWorkerPhone, (String)null); |
|||
mallDeliveryOrderMapper.update(null, uw); |
|||
|
|||
// 给用户通知
|
|||
if (StringUtils.isNotBlank(delivery.getReceiverPhone())) { |
|||
smsUtil.sendCode(delivery.getReceiverPhone(), null,SettingConstant.SMS_TYPE.SMS_NO_ORDER.name()); |
|||
} |
|||
} |
|||
|
|||
private void handleNoWorkerTimeout10m(String orderId) { |
|||
MallDeliveryOrder delivery = getDeliveryOrder(orderId); |
|||
if (delivery == null || delivery.getStatus() != 0 || StringUtils.isNotBlank(delivery.getWorkerId())) { |
|||
return; |
|||
} |
|||
log.info("【订单延时处理】大厅抢单 10分钟无人接单,通知用户 orderId={}", orderId); |
|||
if (StringUtils.isNotBlank(delivery.getReceiverPhone())) { |
|||
smsUtil.sendCode(delivery.getReceiverPhone(),null, SettingConstant.SMS_TYPE.SMS_NO_ORDER.name()); |
|||
} |
|||
} |
|||
|
|||
private void handleShopCookTimeout(String orderId) { |
|||
MallOrder order = mallOrderMapper.selectById(orderId); |
|||
if (order == null || order.getShopMakeTime() != null) { |
|||
// 商家已经点击了出餐 (shopMakeTime != null)
|
|||
return; |
|||
} |
|||
log.info("【订单延时处理】商家超时未点击出餐,向商家发送极光推送 orderId={}", orderId); |
|||
Shop shop = shopMapper.selectById(order.getShopId()); |
|||
if (shop != null && StringUtils.isNotBlank(shop.getClientId())) { |
|||
// 发送给商家端的 APP Push
|
|||
jPushService.sendPushNotification(shop.getClientId(), "您的订单出餐已超时,请尽快处理!", orderId); |
|||
} |
|||
} |
|||
|
|||
private MallDeliveryOrder getDeliveryOrder(String orderId) { |
|||
LambdaQueryWrapper<MallDeliveryOrder> qw = new LambdaQueryWrapper<>(); |
|||
qw.eq(MallDeliveryOrder::getOrderId, orderId); |
|||
return mallDeliveryOrderMapper.selectOne(qw); |
|||
} |
|||
} |
|||
@ -0,0 +1,104 @@ |
|||
package cc.hiver.mall.mq; |
|||
|
|||
import org.springframework.amqp.core.Binding; |
|||
import org.springframework.amqp.core.BindingBuilder; |
|||
import org.springframework.amqp.core.DirectExchange; |
|||
import org.springframework.amqp.core.Queue; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
|
|||
/** |
|||
* 订单流程 MQ 配置,包括普通的异步任务和死信超时延迟任务。 |
|||
*/ |
|||
@Configuration |
|||
public class OrderQueueConfig { |
|||
|
|||
public static final String ORDER_DIRECT_EXCHANGE = "order.direct.exchange"; |
|||
|
|||
// 异步业务(如微信通知,缓存更新等)
|
|||
public static final String ASYNC_WORKER_QUEUE = "order.async.worker.queue"; |
|||
public static final String ASYNC_WORKER_ROUTING = "order.async.worker.routing.key"; |
|||
|
|||
public static final String ASYNC_CACHE_QUEUE = "order.async.cache.queue"; |
|||
public static final String ASYNC_CACHE_ROUTING = "order.async.cache.routing.key"; |
|||
|
|||
// 延时消息 DLX 配置
|
|||
// 原理:将消息发至 delay 队列(没有消费者),等过期后,由 rabbitmq 自动抛入 dead 交换机,最终进入 dead 队列供消费。
|
|||
public static final String DELAY_EXCHANGE = "order.delay.exchange"; |
|||
public static final String DELAY_QUEUE = "order.delay.queue"; |
|||
public static final String DELAY_ROUTING = "order.delay.routing.key"; |
|||
|
|||
public static final String DEAD_EXCHANGE = "order.dead.exchange"; |
|||
public static final String DEAD_QUEUE = "order.dead.queue"; |
|||
public static final String DEAD_ROUTING = "order.dead.routing.key"; |
|||
|
|||
// ----------------- 常规业务配置 -----------------
|
|||
|
|||
@Bean |
|||
public DirectExchange orderDirectExchange() { |
|||
return new DirectExchange(ORDER_DIRECT_EXCHANGE, true, false); |
|||
} |
|||
|
|||
@Bean |
|||
public Queue asyncWorkerQueue() { |
|||
return new Queue(ASYNC_WORKER_QUEUE, true); |
|||
} |
|||
|
|||
@Bean |
|||
public Binding bindingAsyncWorkerQueue() { |
|||
return BindingBuilder.bind(asyncWorkerQueue()).to(orderDirectExchange()).with(ASYNC_WORKER_ROUTING); |
|||
} |
|||
|
|||
@Bean |
|||
public Queue asyncCacheQueue() { |
|||
return new Queue(ASYNC_CACHE_QUEUE, true); |
|||
} |
|||
|
|||
@Bean |
|||
public Binding bindingAsyncCacheQueue() { |
|||
return BindingBuilder.bind(asyncCacheQueue()).to(orderDirectExchange()).with(ASYNC_CACHE_ROUTING); |
|||
} |
|||
|
|||
// ----------------- 死信(延时)架构配置 -----------------
|
|||
|
|||
@Bean |
|||
public DirectExchange delayExchange() { |
|||
return new DirectExchange(DELAY_EXCHANGE, true, false); |
|||
} |
|||
|
|||
@Bean |
|||
public DirectExchange deadExchange() { |
|||
return new DirectExchange(DEAD_EXCHANGE, true, false); |
|||
} |
|||
|
|||
@Bean |
|||
public Queue delayQueue() { |
|||
// 设置该队列的死信去向
|
|||
Map<String, Object> args = new HashMap<>(2); |
|||
// x-dead-letter-exchange: 出现 dead letter 的时候将 letter 发送到这个 exchange
|
|||
args.put("x-dead-letter-exchange", DEAD_EXCHANGE); |
|||
// x-dead-letter-routing-key: 出现 dead letter 的时候将 letter 路由给该 routing-key
|
|||
args.put("x-dead-letter-routing-key", DEAD_ROUTING); |
|||
// 这是一个没有消费者的等待队列,消息在这里过期后会进入死信交换机
|
|||
return new Queue(DELAY_QUEUE, true, false, false, args); |
|||
} |
|||
|
|||
@Bean |
|||
public Binding bindingDelayQueue() { |
|||
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING); |
|||
} |
|||
|
|||
@Bean |
|||
public Queue deadQueue() { |
|||
// 这是真正消费超时的死信队列
|
|||
return new Queue(DEAD_QUEUE, true); |
|||
} |
|||
|
|||
@Bean |
|||
public Binding bindingDeadQueue() { |
|||
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTING); |
|||
} |
|||
} |
|||
@ -0,0 +1,63 @@ |
|||
package cc.hiver.mall.mq; |
|||
|
|||
import cc.hiver.mall.controller.AdminSettlementController.ConfirmShopReq; |
|||
import cc.hiver.mall.entity.MallSettlementRecord; |
|||
import cc.hiver.mall.service.mybatis.MallSettlementRecordService; |
|||
import com.alibaba.fastjson.JSON; |
|||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.springframework.amqp.rabbit.annotation.RabbitListener; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.List; |
|||
import java.util.stream.Collectors; |
|||
|
|||
/** |
|||
* 商家结算MQ消费者 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class SettlementConfirmConsumer { |
|||
|
|||
@Autowired |
|||
private MallSettlementRecordService mallSettlementRecordService; |
|||
|
|||
@RabbitListener(queues = SettlementMqConfig.SETTLEMENT_CONFIRM_QUEUE) |
|||
public void handleSettlementConfirm(String message) { |
|||
log.info("【MQ结算消费者】接收到按商家确认结算消息: {}", message); |
|||
try { |
|||
ConfirmShopReq req = JSON.parseObject(message, ConfirmShopReq.class); |
|||
if (req == null || req.getShopIds() == null || req.getShopIds().isEmpty()) { |
|||
log.warn("【MQ结算消费者】消息参数异常(商家ID列表为空), message={}", message); |
|||
return; |
|||
} |
|||
|
|||
LambdaQueryWrapper<MallSettlementRecord> qw = new LambdaQueryWrapper<>(); |
|||
if (StringUtils.isNotBlank(req.getRegionId())) { |
|||
qw.eq(MallSettlementRecord::getRegionId, req.getRegionId()); |
|||
} |
|||
if (StringUtils.isNotBlank(req.getSettlementDate())) { |
|||
qw.apply("DATE(create_time) = {0}", req.getSettlementDate()); |
|||
} |
|||
qw.in(MallSettlementRecord::getShopId, req.getShopIds()); |
|||
qw.eq(MallSettlementRecord::getStatus, 0); |
|||
|
|||
List<MallSettlementRecord> list = mallSettlementRecordService.list(qw); |
|||
if (list == null || list.isEmpty()) { |
|||
log.info("【MQ结算消费者】没有需要确认的结算记录,终止处理"); |
|||
return; |
|||
} |
|||
|
|||
List<String> recordIds = list.stream().map(MallSettlementRecord::getId).collect(Collectors.toList()); |
|||
mallSettlementRecordService.confirmSettlements(recordIds); |
|||
log.info("【MQ结算消费者】结算确认处理成功, 共处理了 {} 条记录", recordIds.size()); |
|||
|
|||
} catch (Exception e) { |
|||
log.error("【MQ结算消费者】处理结算消息异常: {}", e.getMessage(), e); |
|||
// 此处由于是简单的配置,如果出现异常会根据 application.yml 中的重试机制进行重试(max-attempts: 3)
|
|||
throw new RuntimeException(e); // 抛出异常以触发重试或放入死信队列
|
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,35 @@ |
|||
package cc.hiver.mall.mq; |
|||
|
|||
import org.springframework.amqp.core.Binding; |
|||
import org.springframework.amqp.core.BindingBuilder; |
|||
import org.springframework.amqp.core.DirectExchange; |
|||
import org.springframework.amqp.core.Queue; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
/** |
|||
* 商家结算MQ生产环境配置 |
|||
*/ |
|||
@Configuration |
|||
public class SettlementMqConfig { |
|||
|
|||
public static final String SETTLEMENT_EXCHANGE = "mall.settlement.exchange"; |
|||
public static final String SETTLEMENT_CONFIRM_QUEUE = "mall.settlement.confirm.queue"; |
|||
public static final String SETTLEMENT_CONFIRM_ROUTING_KEY = "mall.settlement.confirm.routing.key"; |
|||
|
|||
@Bean |
|||
public DirectExchange settlementExchange() { |
|||
return new DirectExchange(SETTLEMENT_EXCHANGE, true, false); |
|||
} |
|||
|
|||
@Bean |
|||
public Queue settlementConfirmQueue() { |
|||
// durable=true 持久化队列
|
|||
return new Queue(SETTLEMENT_CONFIRM_QUEUE, true); |
|||
} |
|||
|
|||
@Bean |
|||
public Binding bindingSettlementConfirmQueue() { |
|||
return BindingBuilder.bind(settlementConfirmQueue()).to(settlementExchange()).with(SETTLEMENT_CONFIRM_ROUTING_KEY); |
|||
} |
|||
} |
|||
@ -0,0 +1,73 @@ |
|||
package cc.hiver.mall.utils; |
|||
|
|||
import me.chanjar.weixin.mp.api.WxMpService; |
|||
import me.chanjar.weixin.mp.bean.template.WxMpTemplateData; |
|||
import me.chanjar.weixin.mp.bean.template.WxMpTemplateMessage; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 微信推送模板消息工具类 |
|||
* |
|||
* @author Yazhi Li |
|||
*/ |
|||
@Component |
|||
public class WechatSendMessageUtil { |
|||
@Autowired |
|||
private WxMpService wxMpService; |
|||
|
|||
@Value("${hiver.social.wechat.appId}") |
|||
private String appId; |
|||
|
|||
private static final Logger log = LoggerFactory.getLogger(WechatSendMessageUtil.class); |
|||
public void sendWechatTempMessage(List<String> officialAccountOpenids,String templateId, List<WxMpTemplateData> data,String path) throws Exception { |
|||
for (String openId : officialAccountOpenids) { |
|||
|
|||
final WxMpTemplateMessage templateMessage = WxMpTemplateMessage.builder() |
|||
// 要推送的用户openid
|
|||
.toUser(openId) |
|||
// 消息详情跳转地址:https://blog.csdn.net/qq_34272760/article/details/120152903
|
|||
// 若需要跳转小程序,url则设置为:http://mp.weixin.qq.com,然后设置相关的MiniProgram参数【跳转的小程序必须是公众号关联的小程序!!!】
|
|||
.url("http://mp.weixin.qq.com") |
|||
// 微信模板ID
|
|||
.templateId(templateId) |
|||
.build(); |
|||
// data的字段及内容是自定义的,不必按我这儿的来,具体怎么和已有的系统消息结合,实现key和color可配置化需自行考虑
|
|||
//final JsonObject jsonObject = JsonParser.parseString(weChatServerMsgVo.getContent()).getAsJsonObject();
|
|||
/*final List<WxMpTemplateData> data = new ArrayList<>(); |
|||
for (String key : jsonObject.keySet()) { |
|||
final Object value = jsonObject.get(key); |
|||
String afterValue = String.valueOf(value).replace("\"", ""); |
|||
log.info("Key: " + key + ", Value: " + afterValue); |
|||
data.add(new WxMpTemplateData(key, afterValue)); |
|||
|
|||
}*/ |
|||
templateMessage.setData(data); |
|||
|
|||
// 跳转小程序相关配置【跳转的小程序必须是公众号关联的小程序!!!】
|
|||
final WxMpTemplateMessage.MiniProgram miniProgram = new WxMpTemplateMessage.MiniProgram(); |
|||
log.info("appId============" + appId); |
|||
// 小程序的appId
|
|||
miniProgram.setAppid(appId); |
|||
// 小程序的pagePath 注意,这里是支持传参的!!!
|
|||
miniProgram.setPagePath(path); |
|||
// 需要跳转首页时,需要设置 usePath = true (默认是 false,只跳转非首页)
|
|||
miniProgram.setUsePath(true); |
|||
templateMessage.setMiniProgram(miniProgram); |
|||
|
|||
try { |
|||
// 发送模板消息
|
|||
wxMpService.getTemplateMsgService().sendTemplateMsg(templateMessage); |
|||
} catch (Exception e) { |
|||
log.error("微信公众号模板消息推送失败,接收userId: " + openId, e); |
|||
} |
|||
|
|||
} |
|||
} |
|||
|
|||
} |
|||
Loading…
Reference in new issue