服務端集群節點之間(jiān)緩存更新通(tōng)知

如果您部署的服務端集群節點服務之間(jiān)可(kě)以通(tōng)過确定的URL地址訪問,那(nà)麽您可(kě)以通(tōng)過3.8.集群管理(lǐ)進行(xíng)集群節點緩存更新的管理(lǐ)。

如果集群節點服務之間(jiān)無法通(tōng)過确定的URL地址訪問,比如每次節點重啓IP都不确定,或者節點個(gè)數(shù)可(kě)能随時(shí)随地發生(shēng)變化,那(nà)麽可(kě)以參考如下實現,通(tōng)過消息中間(jiān)件通(tōng)知的機制(zhì)實現每個(gè)節點的緩存更新。

img

​ 這種方式無需在URule Pro的集群管理(lǐ)中配置每個(gè)節點URL地址,需要實現 ClusterPacketCacheAdapter 和(hé) JarCacheAdapter兩個(gè)URule更新緩存的接口的方法生(shēng)産消息:

package com.bstek.urule.console.cache.packet;

import java.util.List;
import java.util.Map;
//服務端集群節點知識包緩存
public interface ClusterPacketCacheAdapter {
  public static final String BEAN_ID = "urule.clusterPacketCacheAdapter";

  void putPacket(long packetId, PacketData paramPacketData);

  void putPacket(String packetCode, PacketData paramPacketData);

  void remove(long packetId);

  void remove(String packetCode);
  //刷新知識包緩存時(shí)觸發
  List<Map<String, Object>> refreshPacket(String groupId, long packetId);
  //重置全部知識包緩存時(shí)觸發
  List<Map<String, Object>> recacheAllPackets(String groupId);
  //删除項目時(shí)觸發
  List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list);
}
public abstract class JarCacheAdapter {
   public static String BEAN_ID = "urule.jarCacheAdapter";
   //更新jar熱部署時(shí)觸發
   public abstract List<Map<String, Object>> loadDynamicJars(String groupId, UrlType urlType) throws Exception;
}

接收到消息的消費類中,通(tōng)過 com.bstek.urule.console.cache.ServerCacheManager裏的方法進行(xíng)當前節點的緩存更新:

方法 說明(míng)
reloadPacket(String systemId, long packetId) 重新加載指定id知識包
syncPacketForRemoveProject(String systemId, long projectId) 删除指定項目的知識包緩存
recacheAllPackets(String systemId) 重新加載所有(yǒu)知識包緩存
reloadDynamicJars(String systemId) 重新加載所有(yǒu)Jar緩存

下面做(zuò)一個(gè)參考實現,前面我們使用Redis實現了Session共享,為(wèi)了方便我們可(kě)以繼續使用Redis的消息通(tōng)知功能,你(nǐ)也可(kě)以使用更專業的消息中間(jiān)件來(lái)實現,比如Kafka、RabbitMQ等。

1、定義消息通(tōng)知的常量類

package com.bstek.urule.sample.mq.constant;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * MQ所需常量
 */
@Component
public class MQConstant {
    public static String QUEUE_CLUSTER_PACKET_REFRESHALL = "urule.cluster.knowledge.refreshall";
    public static String QUEUE_CLUSTER_PACKET_REFRESH = "urule.cluster.knowledge.refresh";
    public static String QUEUE_CLUSTER_PROJECT_REMOVE = "urule.cluster.project.remove";
    public static String QUEUE_CLUSTER_JAR_SYNC = "urule.cluster.jar.sync";

    public static String QUEUE_CLIENT_PACKET_DISABLE = "urule.client.knowledge.disable";
    public static String QUEUE_CLIENT_PACKET_ENABLE = "urule.client.knowledge.enable";
    public static String QUEUE_CLIENT_PACKET_REFRESH = "urule.client.knowledge.refresh";
    public static String QUEUE_CLIENT_JAR_SYNC = "urule.client.jar.sync";

    public static String MQIP;

    @Value("${project.urule.mq.ip:127.0.0.1:9092}")
    public void setMQIP(String mqIP) {
        MQIP = mqIP;
    }

    public static String CLUSTER_TOPIC;

    @Value("${project.urule.mq.clusterTopic:urule-cluster-topic}")
    public void setClusterTopic(String clusterTopic) {
        CLUSTER_TOPIC = clusterTopic;
    }

    public static String CLIENT_TOPIC;

    @Value("${project.urule.mq.clientTopic:urule-client-topic}")
    public void setClientTopic(String clientTopic) {
        CLIENT_TOPIC = clientTopic;
    }
}

2、yaml配置

project:
  urule:
    mq: 
      clusterTopic: urule-cluster-topic
      clientTopic: urule-client-topic

3、配置RedisSessionConfig類

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.session.data.redis.config.ConfigureRedisAction;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;

@Configuration
//設置session過期時(shí)間(jiān),默認是1800秒(miǎo)
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 30 * 60) 
public class RedisSessionConfig {

    @Bean
    public static ConfigureRedisAction configureRedisAction(){
        return ConfigureRedisAction.NO_OP;
    }
    @Autowired
    private RedisConnectionFactory factory;

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }
}

4、配置RedisConsumerConfig消息消費類

package com.bstek.urule.sample.mq.redis.config;

import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.listener.RedisConsumerListener;

/**
 * Redis消費頻道(dào)配置
 *
 */
@Component
public class RedisConsumerConfig {

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(new RedisConsumerListener());
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListenerAdapter, new PatternTopic(MQConstant.CLUSTER_TOPIC));

        return container;
    }


}

5、為(wèi)了切換消息中間(jiān)件方便,定義了一個(gè)生(shēng)産消息的接口

package com.bstek.urule.sample.mq;

public interface CustomProducerService {
    public static final String BEAN_ID = "urule.ext.CustomProducerService";
    public void sendMessage(String topic, String message);
}

6、定義生(shēng)産消息的實現類

package com.bstek.urule.sample.mq.redis.service;

import javax.annotation.Resource;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import com.bstek.urule.sample.mq.CustomProducerService;
/**
 * Reids消息發送類
 */
@Service
public class RedisProducerServiceImpl implements CustomProducerService{
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void sendMessage(String topic, String message){
        redisTemplate.convertAndSend(topic,message);
    }
}

7、定義服務端某一個(gè)集群節點知識包緩存發生(shēng)變化時(shí),發送消息的通(tōng)知類

package com.bstek.urule.sample.mq.adapter;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties.Tomcat.Threads;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.cache.packet.ClientPacketCacheAdapter;
import com.bstek.urule.console.cache.packet.ClusterPacketCacheAdapter;
import com.bstek.urule.console.cache.packet.PacketCache;
import com.bstek.urule.console.cache.packet.PacketConfig;
import com.bstek.urule.console.cache.packet.PacketData;
import com.bstek.urule.console.database.manager.packet.PacketManager;
import com.bstek.urule.console.database.model.Packet;
import com.bstek.urule.exception.RuleException;
import com.bstek.urule.runtime.KnowledgePackage;
import com.fasterxml.jackson.databind.node.ObjectNode;

import com.bstek.urule.sample.mq.CustomProducerService;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.service.RedisProducerServiceImpl;
import lombok.extern.slf4j.Slf4j;
/**
 * 知識包緩存更新消息通(tōng)知類
 */
@Slf4j
@Component(ClusterPacketCacheAdapter.BEAN_ID)
public class MsgClusterPacketCacheAdapter implements ClusterPacketCacheAdapter{

    @Autowired
    private CustomProducerService customProducerService;

    public List<Map<String, Object>> recacheAllPackets(String groupId) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
        log.info("recacheAllPackets(String groupId):"+groupId);
        ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
        msg.put("groupId", groupId);
        msg.put("systemId", Utils.SystemId);
        msg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
        return result;
    }
    @Override
    public List<Map<String, Object>> refreshPacket(String groupId, long packetId) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();

//        PacketData packetData = PacketCache.ins.getPacket(packetId);
//        String packetCode =packetData.getPacket().getCode();
        Packet packet = PacketManager.ins.load(packetId);
        String packetCode = packet.getCode();

        log.info("refreshPacket(String groupId, long packetId):{}:{}:{}",groupId,packetId,packetCode);
        ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
        clustermsg.put("groupId", groupId);
        clustermsg.put("systemId", Utils.SystemId);
        clustermsg.put("packetId", String.valueOf(packetId));
        clustermsg.put("packetCode", packetCode);
        clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESH);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());

        /* 如果客戶端的緩存更新也采用消息中間(jiān)件通(tōng)知更新,可(kě)以添加如下代碼
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
        clientmsg.put("groupId", groupId);
        clientmsg.put("systemId", Utils.SystemId);
        clientmsg.put("packetId", String.valueOf(packetId));
        clientmsg.put("packetCode", packetCode);
        clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_REFRESH);
        customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());
        */
        return result;
    }
    @Override
    public List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();

        log.info("removeProject(String paramString, long paramLong, List<PacketConfig> paramList):"+projectId);
        /*如果客戶端的緩存更新也采用消息中間(jiān)件通(tōng)知更新,可(kě)以添加如下代碼
        for(PacketConfig pc:list) {
                    disableClientsPacket(groupId,pc.getId(),pc.getCode());
            } */ 
        ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
        msg.put("groupId", groupId);
        msg.put("systemId", Utils.SystemId);
        msg.put("projectId", String.valueOf(projectId));
        msg.put("messageType", MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
        return result;
    }


    @Override
    public void putPacket(long packetId, PacketData paramPacketData) {
        // TODO Auto-generated method stub
    }

    @Override
    public void putPacket(String packetCode, PacketData paramPacketData) {
        // TODO Auto-generated method stub
    }

    @Override
    public void remove(long packetId) {
        // TODO Auto-generated method stub
    }

    @Override
    public void remove(String packetCode) {
        // TODO Auto-generated method stub
    }
    /*如果客戶端的緩存更新也采用消息中間(jiān)件通(tōng)知更新,可(kě)以添加如下代碼
    public List<Map<String, Object>> disableClientsPacket(String groupId, long packetId,String packetCode) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
        log.info("disableClientsPacket(String groupId:{}, long packetId):{},code:{}" ,groupId,packetId,packetCode);
        ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
        msg.put("groupId", groupId);
        msg.put("systemId", Utils.SystemId);
        msg.put("packetId", String.valueOf(packetId));
        msg.put("packetCode", packetCode);
        msg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_DISABLE);
        customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, msg.toString());
        return result;
    }
*/

}

8、定義服務端某一個(gè)集群節點Jar包緩存發生(shēng)變化時(shí),發送消息的通(tōng)知類

package com.bstek.urule.sample.mq.adapter;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.common.errors.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.database.model.UrlType;
import com.bstek.urule.console.editor.jar.JarCacheAdapter;
import com.fasterxml.jackson.databind.node.ObjectNode;

import com.bstek.urule.sample.mq.CustomProducerService;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.service.RedisProducerServiceImpl;
import lombok.extern.slf4j.Slf4j;
/**
 * Jar包緩存消息發送類
 */
@Slf4j
@Component("urule.jarCacheAdapter")
public class MsgJarCacheAdapter extends JarCacheAdapter {

    @Autowired
    private CustomProducerService customProducerService;


    @Override
    public List<Map<String, Object>> loadDynamicJars(String groupId, UrlType urlType) throws Exception {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
        log.info("loadDynamicJars(String groupId, UrlType urlType)"+groupId);
        ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
        clustermsg.put("groupId", groupId);
        clustermsg.put("systemId", Utils.SystemId);
        clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_JAR_SYNC);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());
          /*如果客戶端的緩存更新也采用消息中間(jiān)件通(tōng)知更新,可(kě)以添加如下代碼
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
            clientmsg.put("groupId", groupId);
            clientmsg.put("systemId", Utils.SystemId);
            clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_JAR_SYNC);
            customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());
        */
        return result;
    }

}

9、定義接收到消息的監聽(tīng)類

package com.bstek.urule.sample.mq.redis.listener;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.cache.ServerCacheManager;
import com.bstek.urule.runtime.cache.ClientCacheManager;
import com.fasterxml.jackson.core.JsonProcessingException;

import com.bstek.urule.sample.mq.constant.MQConstant;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import javax.annotation.Resource;
/**
 * Redis消息監聽(tīng)處理(lǐ)類
 */
@Slf4j
public class RedisConsumerListener implements MessageListener {


    private static Map<String, Consumer<String>> RULE = new HashMap<>();

    {
        RULE.put(MQConstant.CLUSTER_TOPIC, this::consumerClusterMessage);
//        RULE.put(MQConstant.CLIENT_TOPIC, this::consumerClusterMessage);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] b_channel = message.getChannel();
        byte[] b_body = message.getBody();
        try {
            String channel = new String(b_channel);
            String body = new String(b_body);
            log.info("channel is:" + channel + " , body is: " + body);
            RULE.get(channel).accept(body);
        } catch (Exception e) {
        }
    }

    public void consumerClusterMessage(String message) {
        ServerCacheManager serverCacheManager = (ServerCacheManager) Utils.getApplicationContext().getBean("urule.serverCacheManager");
        log.info("consumerClusterMessage exec params is :" + message);

        try {
            HashMap<String,String> mapMessage = JsonUtils.getObjectJsonMapper().readValue(message, HashMap.class);
            String messageType = mapMessage.get("messageType");
            String systemId = mapMessage.get("systemId");
            String groupId = mapMessage.get("groupId");

            String packetId = mapMessage.get("packetId"); 
            String projectId = mapMessage.get("projectId");

            if(MQConstant.QUEUE_CLUSTER_PACKET_REFRESH.equals(messageType)) {
                serverCacheManager.reloadPacket(systemId, Long.valueOf(packetId));

            }else if(MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE.equals(messageType)) {
                serverCacheManager.syncPacketForRemoveProject(systemId, Long.valueOf(projectId));

            }else if(MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL.equals(messageType)) {
                serverCacheManager.recacheAllPackets(systemId);

            }else if(MQConstant.QUEUE_CLUSTER_JAR_SYNC.equals(messageType)) {
                serverCacheManager.reloadDynamicJars(systemId);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /*如果客戶端的消息消費監聽(tīng)類,可(kě)以添加如下代碼
    public void consumerClinetMessage(String message) {
        ClientCacheManager clientCacheManager = (ClientCacheManager) Utils.getApplicationContext().getBean("urule.clientCacheManager");
        log.info("consumerClinetMessage exec params is :" + message);
        try {
            HashMap<String,String> mapMessage = JsonUtils.getObjectJsonMapper().readValue(message, HashMap.class);
            String messageType = mapMessage.get("messageType");
            String systemId = mapMessage.get("systemId");
            String groupId = mapMessage.get("groupId");

            String packetId = mapMessage.get("packetId"); 
            String projectId = mapMessage.get("projectId");

            if(MQConstant.QUEUE_CLIENT_PACKET_DISABLE.equals(messageType)) {
                clientCacheManager.disableKnowledge(packetId);

            }else if(MQConstant.QUEUE_CLIENT_PACKET_ENABLE.equals(messageType)) {
                clientCacheManager.enableKnowledge(packetId);

            }else if(MQConstant.QUEUE_CLIENT_PACKET_REFRESH.equals(messageType)) {
                clientCacheManager.reloadKnowledge(packetId);

            }else if(MQConstant.QUEUE_CLIENT_JAR_SYNC.equals(messageType)) {
                clientCacheManager.reloadDynamicJars();
                log.info("=====jar重新加載完成=====");
            }

        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}
*/

至此,通(tōng)過Redis實現的集群節點之間(jiān)的緩存通(tōng)知功能就完成了。

results matching ""

    No results matching ""