当前位置:实例文章 » JAVA Web实例» [文章]07-通过RocketMQ和Redis实现用户动态提醒

07-通过RocketMQ和Redis实现用户动态提醒

发布人:shili8 发布时间:2023-05-29 08:53 阅读次数:540

随着互联网的发展,用户的需求也越来越多样化,其中之一就是用户动态提醒。比如,当用户关注了某个话题或者某个用户时,如果有新的动态产生,就需要及时提醒用户。本文将介绍如何通过RocketMQ和Redis实现用户动态提醒。

一、RocketMQ简介

RocketMQ是阿里巴巴开源的分布式消息中间件,具有高可靠、高吞吐量、低延迟等特点。RocketMQ的消息模型分为生产者、消费者和消息队列三个部分,其中生产者负责发送消息,消费者负责接收消息,消息队列则是消息的存储和传输通道。

二、Redis简介

Redis是一个开源的内存数据结构存储系统,支持多种数据结构,如字符串、哈希、列表、集合、有序集合等。Redis的特点是快速、可靠、灵活,可以用于缓存、消息队列、分布式锁等场景。

三、实现用户动态提醒的流程

1. 用户关注某个话题或者某个用户时,将关注信息存储到Redis中。

2. 当有新的动态产生时,将动态信息存储到RocketMQ中。

3. 消费RocketMQ中的动态信息,并根据动态信息中的关注信息,从Redis中获取关注者的信息。

4. 将动态信息发送给关注者。

四、代码示例和注释

1. 将关注信息存储到Redis中

java
// Redis配置
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

// 存储关注信息到Redis中
@Service
public class FollowService {
    @Autowired
    private RedisTemplate redisTemplate;

    public void follow(String userId String targetId) {
        String key = follow: + userId;
        redisTemplate.opsForSet().add(key targetId);
    }
}


2. 将动态信息存储到RocketMQ中

java
// RocketMQ配置
@Configuration
public class RocketMQConfig {
    @Value(${rocketmq.producer.group})
    private String producerGroup;
    @Value(${rocketmq.namesrvAddr})
    private String namesrvAddr;

    @Bean
    public DefaultMQProducer defaultMQProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        producer.start();
        return producer;
    }
}

// 发送动态信息到RocketMQ中
@Service
public class DynamicService {
    @Autowired
    private DefaultMQProducer defaultMQProducer;

    public void sendDynamic(Dynamic dynamic) throws Exception {
        Message message = new Message(dynamic tag JSON.toJSONString(dynamic).getBytes());
        SendResult result = defaultMQProducer.send(message);
        System.out.println(发送结果: + result);
    }
}


3. 消费RocketMQ中的动态信息,并根据动态信息中的关注信息,从Redis中获取关注者的信息

java
// RocketMQ配置
@Configuration
public class RocketMQConfig {
    @Value(${rocketmq.consumer.group})
    private String consumerGroup;
    @Value(${rocketmq.namesrvAddr})
    private String namesrvAddr;

    @Bean
    public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.subscribe(dynamic *);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String dynamicJson = new String(msg.getBody());
                    Dynamic dynamic = JSON.parseObject(dynamicJson Dynamic.class);
                    // 根据动态信息中的关注信息,从Redis中获取关注者的信息
                    Set followers = redisTemplate.opsForSet().members(follow: + dynamic.getUserId());
                    for (Object follower : followers) {
                        // 将动态信息发送给关注者
                        sendMessage(dynamic follower.toString());
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        return consumer;
    }
}

// 发送动态信息给关注者
@Service
public class MessageService {
    @Autowired
    private RedisTemplate redisTemplate;

    public void sendMessage(Dynamic dynamic String userId) {
        // 将动态信息存储到Redis中
        String key = message: + userId;
        redisTemplate.opsForList().leftPush(key dynamic);
    }
}


以上就是通过RocketMQ和Redis实现用户动态提醒的流程和代码示例。通过这种方式,可以实现高效、可靠的用户动态提醒功能,提升用户体验。

其他信息

其他资源

Top