批處理(lǐ)支持

​ 實際業務當中,我們除了會(huì)做(zuò)單條規則計(jì)算(suàn)外,還(hái)有(yǒu)可(kě)能需要運行(xíng)規則引擎來(lái)處理(lǐ)一大(dà)批數(shù)據,這些(xiē)數(shù)據可(kě)能有(yǒu)幾萬條,幾十萬條,甚至更多(duō)。在這種情況下,如果我們還(hái)是采用普通(tōng)的KnowledgeSession在一個(gè)線程裏處理(lǐ)大(dà)批量數(shù)據的話(huà),那(nà)麽引擎還(hái)是隻能在當前線程裏運行(xíng),這樣就會(huì)需要很(hěn)長的時(shí)間(jiān)才能可(kě)能将這幾十萬條甚至更多(duō)的數(shù)據處理(lǐ)完成,在這個(gè)時(shí)候,為(wèi)了充分利用服務器(qì)較強的CPU性能,我們可(kě)以使用BatchSession利用多(duō)線程并行(xíng)處理(lǐ)這些(xiē)數(shù)據。顧名思義BatchSession是用來(lái)做(zuò)批處理(lǐ)任務的會(huì)話(huà)對象,它是 URule Pro當中提供的多(duō)線程并行(xíng)處理(lǐ)大(dà)批量業務數(shù)據的規則會(huì)話(huà)對象。

​ 要得(de)到一個(gè)BatchSession對象,我們也需要提供一個(gè)或多(duō)個(gè)KnowledgePackage對象,獲取KnowledgePackage對象的方法與上(shàng)面介紹的方法相同,有(yǒu)了KnowledgePackage對象後,就可(kě)以利用KnowledgeSessionFactory類創建一個(gè)BatchSession對象。

​ 在com.bstek.urule.runtime.KnowledgeSessionFactory類中除上(shàng)面提到的兩個(gè)構建KnowledgeSession的靜态方法外,還(hái)提供了另外八個(gè)可(kě)用于構建BatchSession的靜态方法,其源碼如下:

/**
     * 創建一個(gè)用于批處理(lǐ)的BatchSession對象,這裏默認将開(kāi)啓10個(gè)普通(tōng)的線程池來(lái)運行(xíng)提交的批處理(lǐ)任務,默認将每100個(gè)任務放在一個(gè)線程裏處理(lǐ)
     * @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage對象
     * @return 返回一個(gè)新的BatchSession對象
     */
    public static BatchSession newBatchSession(KnowledgePackage knowledgePackage){
        return new BatchSessionImpl(knowledgePackage,BatchSession.DEFAULT_THREAD_SIZE,BatchSession.DEFAULT_BATCH_SIZE);
    }

    /**
     * 創建一個(gè)用于批處理(lǐ)的BatchSession對象,第二個(gè)參數(shù)來(lái)指定線程池中可(kě)用線程個(gè)數(shù),默認将每100個(gè)任務放在一個(gè)線程裏處理(lǐ)
     * @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage對象
     * @param threadSize 線程池中可(kě)用的線程個(gè)數(shù)
     * @return 返回一個(gè)新的BatchSession對象
     */
    public static BatchSession newBatchSessionByThreadSize(KnowledgePackage knowledgePackage,int threadSize){
        return new BatchSessionImpl(knowledgePackage,threadSize,BatchSession.DEFAULT_BATCH_SIZE);
    }

    /**
     * 創建一個(gè)用于批處理(lǐ)的BatchSession對象,這裏默認将開(kāi)啓10個(gè)普通(tōng)的線程池來(lái)運行(xíng)提交的批處理(lǐ)任務,第二個(gè)參數(shù)用來(lái)決定單個(gè)線程處理(lǐ)的任務數(shù)
     * @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage對象
     * @param batchSize 單個(gè)線程處理(lǐ)的任務數(shù)
     * @return 返回一個(gè)新的BatchSession對象
     */
    public static BatchSession newBatchSessionByBatchSize(KnowledgePackage knowledgePackage,int batchSize){
        return new BatchSessionImpl(knowledgePackage,BatchSession.DEFAULT_THREAD_SIZE,batchSize);
    }

    /**
     * 創建一個(gè)用于批處理(lǐ)的BatchSession對象,第二個(gè)參數(shù)來(lái)指定線程池中可(kě)用線程個(gè)數(shù),第三個(gè)參數(shù)用來(lái)決定單個(gè)線程處理(lǐ)的任務數(shù)
     * @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage對象
     * @param threadSize 線程池中可(kě)用的線程個(gè)數(shù)
     * @param batchSize 單個(gè)線程處理(lǐ)的任務數(shù)
     * @return 返回一個(gè)新的BatchSession對象
     */
    public static BatchSession newBatchSession(KnowledgePackage knowledgePackage,int threadSize,int batchSize){
        return new BatchSessionImpl(knowledgePackage,threadSize,batchSize);
    }

    /**
     * 創建一個(gè)用于批處理(lǐ)的BatchSession對象,這裏默認将開(kāi)啓10個(gè)普通(tōng)的線程池來(lái)運行(xíng)提交的批處理(lǐ)任務,默認将每100個(gè)任務放在一個(gè)線程裏處理(lǐ)
     * @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage集合對象
     * @return 返回一個(gè)新的BatchSession對象
     */
    public static BatchSession newBatchSession(KnowledgePackage[] knowledgePackages){
        return new BatchSessionImpl(knowledgePackages,BatchSession.DEFAULT_THREAD_SIZE,BatchSession.DEFAULT_BATCH_SIZE);
    }

    /**
     * 創建一個(gè)用于批處理(lǐ)的BatchSession對象,第二個(gè)參數(shù)來(lái)指定線程池中可(kě)用線程個(gè)數(shù),默認将每100個(gè)任務放在一個(gè)線程裏處理(lǐ)
     * @param knowledgePackages 創建BatchSession對象所需要的KnowledgePackage集合對象
     * @param threadSize 線程池中可(kě)用的線程個(gè)數(shù)
     * @return 返回一個(gè)新的BatchSession對象
     */
    public static BatchSession newBatchSessionByThreadSize(KnowledgePackage[] knowledgePackages,int threadSize){
        return new BatchSessionImpl(knowledgePackages,threadSize,BatchSession.DEFAULT_BATCH_SIZE);
    }

    /**
     * 創建一個(gè)用于批處理(lǐ)的BatchSession對象,這裏默認将開(kāi)啓10個(gè)普通(tōng)的線程池來(lái)運行(xíng)提交的批處理(lǐ)任務,第二個(gè)參數(shù)用來(lái)決定單個(gè)線程處理(lǐ)的任務數(shù)
     * @param knowledgePackages 創建BatchSession對象所需要的KnowledgePackage集合對象
     * @param batchSize 單個(gè)線程處理(lǐ)的任務數(shù)
     * @return 返回一個(gè)新的BatchSession對象
     */
    public static BatchSession newBatchSessionByBatchSize(KnowledgePackage[] knowledgePackages,int batchSize){
        return new BatchSessionImpl(knowledgePackages,BatchSession.DEFAULT_THREAD_SIZE,batchSize);
    }

    /**
     * 創建一個(gè)用于批處理(lǐ)的BatchSession對象,第二個(gè)參數(shù)來(lái)指定線程池中可(kě)用線程個(gè)數(shù),第三個(gè)參數(shù)用來(lái)決定單個(gè)線程處理(lǐ)的任務數(shù)
     * @param knowledgePackages 創建BatchSession對象所需要的KnowledgePackage集合對象
     * @param threadSize 線程池中可(kě)用的線程個(gè)數(shù)
     * @param batchSize 單個(gè)線程處理(lǐ)的任務數(shù)
     * @return 返回一個(gè)新的BatchSession對象
     */
    public static BatchSession newBatchSession(KnowledgePackage[] knowledgePackages,int threadSize,int batchSize){
        return new BatchSessionImpl(knowledgePackages,threadSize,batchSize);
    }

前面介紹規則流中的決策節點時(shí),了解到決策節點中支持百分比分流,這種百分比分流就要求必須是在使用BatchSession處理(lǐ)一批數(shù)據的時(shí)候,或者是一個(gè)用一個(gè)普通(tōng)的KnowledgeSession一次性處理(lǐ)多(duō)條數(shù)據才有(yǒu)效,否則規則流隻會(huì)走比例最高(gāo)的那(nà)個(gè)分支。

​ BatchSession接口比較簡單,它隻定義了兩個(gè)方法:

/**
     * 添加一個(gè)具體(tǐ)要執行(xíng)Business對象
     * @param business Business對象實例
     */
    void addBusiness(Business business);


    /**
     * 等待線程池中所有(yǒu)業務線程執行(xíng)完成,在進行(xíng)批處理(lǐ)操作(zuò)時(shí)一定要以此方法作(zuò)為(wèi)方法調用結尾
     */
    void waitForCompletion();

​ 可(kě)以看到,它可(kě)以接收若幹個(gè)名為(wèi)com.bstek.urule.runtime.Business接口實例,Business接口比較簡單,它隻有(yǒu)一個(gè)方法:

package com.bstek.urule.runtime;
/**
 * @author Jacky.gao
 * @since 2015年9月29日
 */
public interface Business {
    void execute(KnowledgeSession session);
}

​ 在Business實現類中,我們的業務寫在execute方法當中,在這個(gè)方法中,隻有(yǒu)一個(gè)KnowledgeSession對象,這個(gè)session對象就是我們與規則引擎操作(zuò)的對象,示例代碼如下:

//從Spring中獲取KnowledgeService接口實例
KnowledgeService service=(KnowledgeService)Utils.getApplicationContext().getBean(KnowledgeService.BEAN_ID);
//通(tōng)過KnowledgeService接口獲取指定的知識包ID"213"
KnowledgePackage knowledgePackage=service.getKnowledge("213");

//通(tōng)過取的KnowledgePackage對象創建BatchSession對象,在這個(gè)對象中,我們将開(kāi)啓5個(gè)線程,每個(gè)線程最多(duō)放置10個(gè)Bussiness接口實例運行(xíng)
BatchSession batchSession=KnowledgeSessionFactory.newBatchSession(knowledgePackage, 5, 10);

for(int i=0;i<100;i++){
    batchSession.addBusiness(new Business(){
        @Override
        public void execute(KnowledgeSession session) {
            Employee employee=new Employee();
            employee.setSalary(11080);
            //将業務數(shù)據對象Employee插入到KnowledgeSession中
            session.insert(employee);
            session.startProcess("demo");
        }
    });
}
//等待所有(yǒu)的線程執行(xíng)完成,對于BatchSession調用來(lái)說,此行(xíng)代碼必不可(kě)少(shǎo),否則将導緻錯誤
batchSession.waitForCompletion();

results matching ""

    No results matching ""