Mule ESB 学习笔记(19)Mule和RSS的整合

定时扫描特定目录的rss文件:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:rss="http://www.mulesoft.org/schema/mule/rss"
      xmlns:file="http://www.mulesoft.org/schema/mule/file"
      xmlns:test="http://www.mulesoft.org/schema/mule/test"
      xsi:schemaLocation="
               http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
               http://www.mulesoft.org/schema/mule/rss http://www.mulesoft.org/schema/mule/rss/current/mule-rss.xsd
               http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
               http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd">

    <file:connector name="fileConnector" streaming="false"/>

    <flow name="feedSplitterConsumer">
        <file:inbound-endpoint  path="/e:/download/" pollingFrequency="100" >
            <file:filename-wildcard-filter pattern="*.rss"/>
            <rss:feed-splitter/>
            
        </file:inbound-endpoint>
         <echo-component/>
        <component>
            <singleton-object class="com.easyway.esb.mule.rss.EntryReceiver"/>
        </component>
    </flow>

</mule>

轮询特定的路径:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:rss="http://www.mulesoft.org/schema/mule/rss"
      xmlns:http="http://www.mulesoft.org/schema/mule/http"
      xsi:schemaLocation="
               http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
               http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
               http://www.mulesoft.org/schema/mule/rss http://www.mulesoft.org/schema/mule/rss/current/mule-rss.xsd">

    <http:polling-connector name="PollingHttpConnector" pollingFrequency="1000" discardEmptyContent="false"/>

    <flow name="feedConsumer">
        <http:inbound-endpoint address="http://topmanopensource.iteye.com/rss"
                               connector-ref="PollingHttpConnector" />
        <rss:feed-splitter/>
        <rss:entry-last-updated-filter/>
    </flow>
</mule>

定期从jms发送rss文件信息,并读取:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:atom="http://www.mulesoft.org/schema/mule/atom"
      xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
      xmlns:rss="http://www.mulesoft.org/schema/mule/rss"
      xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:test="http://www.mulesoft.org/schema/mule/test"
      xsi:schemaLocation="
               http://www.mulesoft.org/schema/mule/rss http://www.mulesoft.org/schema/mule/rss/current/mule-rss.xsd      
               http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
               http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
               http://www.mulesoft.org/schema/mule/atom http://www.mulesoft.org/schema/mule/atom/current/mule-atom.xsd
               http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
               http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd">

   <jms:activemq-connector name="jmsConnectorNoRedelivery" maxRedelivery="-1" />
   
   <spring:beans>
     <spring:bean  id="feedConsumer" class="com.easyway.esb.mule.rss.FeedReceiver" />
       <spring:bean  id="entryReceiver" class="com.easyway.esb.mule.rss.EntryReceiver" />
   </spring:beans>
    
    <flow name="feedConsumerFlow">
        <jms:inbound-endpoint queue="feed.in" connector-ref="jmsConnectorNoRedelivery"/>

        <component>
            <spring-object  bean="feedConsumer"/>
        </component>
    </flow>

    <flow name="feedSplitterConsumerFlow">
        <jms:inbound-endpoint queue="feed.split.in" exchange-pattern="one-way" connector-ref="jmsConnectorNoRedelivery">
            <rss:feed-splitter/>
        </jms:inbound-endpoint>
        <component>
           <spring-object  bean="entryReceiver"/>
        </component>
    </flow>

</mule>

手动生成rss文件:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:rss="http://www.mulesoft.org/schema/mule/rss"
      xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
      xsi:schemaLocation="
       http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
       http://www.mulesoft.org/schema/mule/rss http://www.mulesoft.org/schema/mule/rss/current/mule-rss.xsd
       http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd">

    <rss:feed-last-updated-filter lastUpdate="2009-10-01 13:00:00" acceptWithoutUpdateDate="false" name="feedFilter"/>

    <rss:object-to-feed-transformer name="ObjectToFeed"/>

    <flow name="flowTest">
        <vm:inbound-endpoint path="feed.in">
            <rss:feed-splitter/>
            <rss:entry-last-updated-filter lastUpdate="2009-10-01"/>
               <rss:object-to-feed-transformer/>
        </vm:inbound-endpoint>
        <echo-component/>
    </flow>
</mule>
package com.easyway.esb.mule.rss;

import org.mule.api.annotations.param.Payload;

import com.sun.syndication.feed.synd.SyndEntry;

import java.util.concurrent.atomic.AtomicInteger;
/**
 * 
 * @author longgangbai
 *
 */
public class EntryReceiver
{
    private AtomicInteger count = new AtomicInteger(0);

    public void readEntry(@Payload SyndEntry entry) throws Exception
    {
        count.getAndIncrement();
    }

    public int getCount()
    {
        return count.get();
    }
}
package com.easyway.esb.mule.rss;

import org.mule.api.annotations.param.Payload;

import com.sun.syndication.feed.synd.SyndFeed;

import java.util.concurrent.atomic.AtomicInteger;
/**
 * 
 * @author longgangbai
 *
 */
public class FeedReceiver
{
    private AtomicInteger count = new AtomicInteger(0);

    public void readFeed(@Payload SyndFeed feed) throws Exception
    {
        count.set(feed.getEntries().size());            
    }

    public int getCount()
    {
        return count.get();
    }
}

 测试代码:

/*
 * Copyright (c) EASYWAY 2011 All Rights Reserved
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.easyway.esb.mule.rss;
import java.io.File;

import org.apache.commons.io.FileUtils;
import org.mule.api.MuleContext;
import org.mule.api.client.MuleClient;
import org.mule.api.context.MuleContextFactory;
import org.mule.config.spring.SpringXmlConfigurationBuilder;
import org.mule.context.DefaultMuleContextFactory;
/**
 * <p>鍔熻兘鎻忚堪,璇ラ儴鍒嗗繀椤讳互涓枃鍙ュ彿缁撳熬銆�p>
 *
 * 鍒涘缓鏃ユ湡 2013-8-26<br>
 * @author  $Author$<br>
 * @version $Revision$ $Date$
 * @since   3.0.0
 */
public class MuleRssMain {
    

    	public static void main(String[] args) {
    	     try {
    		       String configFile = "jms-rss-consume.xml";
    		        System.setProperty("mule.verbose.exceptions","true");
    		        String[] configFileArr = new String[] {configFile };
    		        MuleContextFactory muleContextFactory = new DefaultMuleContextFactory();
    		        MuleContext muleContext = muleContextFactory
    		                .createMuleContext(new SpringXmlConfigurationBuilder(configFileArr));
    		        muleContext.start();
    		        
    			    MuleClient client = muleContext.getClient();
    		        FeedReceiver component = (FeedReceiver)muleContext.getRegistry().get("feedConsumer");
    		        
    		        String path=MuleRssMain.class.getClassLoader().getResource("./sample-feed.rss").getFile();
    		       
    		        String feed=FileUtils.readFileToString(new File(path));
    		        client.dispatch("jms://feed.in", feed, null);
    		        Thread.sleep(2000);
    		        System.out.println(component.getCount());
    		        
    		         client = muleContext.getClient();
    		         component = (FeedReceiver)muleContext.getRegistry().get("feedConsumer");
    		        client.dispatch("jms://feed.split.in", feed, null);
    		        Thread.sleep(5000);                
    		        System.out.println(component.getCount());
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		}

    	}

相关推荐