Saturday, February 23, 2013

Sample demonstrates the Unique Window and the First Unique Window feature of WSO2 Complex Event Processor (CEP)


WSO2 CEP

WSO2 Complex Event Processor (CEP) is a lightweight, easy-to-use, open source Complex Event Processing Server (CEP) available under Apache Software License v2.0. WSO2 CEP identifies the most meaningful events within the event cloud, analyzes their impact, and acts on them in real-time. It's built to be extremely high performing and massively scalable. (Reference : http://wso2.com/products/complex-event-processor )



WSO2 CEP Windows Concept

Window is a limited subset of events from an event stream. Users can define a window and then use the events on the window for calculations. A window has two types of output: current events and expired events. A window emits current events when a new event arrives. Expired events are emitted whenever an existing event has expired from a window. (Reference : http://docs.wso2.org/wiki/display/CEP210/Windows )

To read more on CEP and the concepts behind this sample, read the documentation provided.

Note : I hope the reader has an idea about WSO2 CEP and the concepts behind it before try out this sample.
Also this sample will be helpful to the WSO2 Quality Assurance team to verify the Unique window and the First Unique Window Functionality of WSO2 CEP product.


Unique Window
A window that keeps only the latest events that are unique according to the given attribute.


First Unique Window
A window that keeps the first events that are unique according to the given unique attribute



Twitter and StockQuote Analyzer

This sample demonstrates how Siddhi engine can be used with JMS broker to receive, process and publish JMS Map & XML messages.
For More Information (Reference): http://docs.wso2.org/wiki/display/CEP210/Twitter+and+StockQuote+Analyzer



In the above link, in the provided sample, CEP will receive events from stock quotes stream and Twitter feeds stream, and trigger an event if the last traded amount of the stock quotes stream vary by 2 percent with regards to the average traded price of a symbol within past 2 minutes and the word count of the Twitter feeds stream with related to that symbol is greater than 10.



But to demonstrate and verify the Unique window and the first unique the above sample should be changed as given below.



    1. A - Unique Window : Steps to Change
Bucket (jms-twitter-stockquote-analyser.xml) for this sample should be updated as following as it supports this feature./<CEP_HOME>/samples/cep-samples/conf/buckets/jms-twitter-stockquote-analyser.xml



<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
A window that keeps only the latest
events that are unique according to the given attribute.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="AllStockQuotes" brokerName="activemqJmsBroker">
        <mapMapping stream="allStockQuotes" queryEvnetType="Tuple">
            <property name="symbol" inputName="symbol" type="java.lang.String"/>
            <property name="price" inputName="price" type="java.lang.Double"/>
        </mapMapping>
    </input>
    <input topic="TwitterFeed" brokerName="activemqJmsBroker">
        <mapMapping stream="twitterFeed" queryEvnetType="Tuple">
            <property name="company" inputName="company" type="java.lang.String"/>
            <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/>
        </mapMapping>
    </input>
      <query name="StocksPredictor">

        <expression>
from allStockQuotes#window.length(1) unidirectional join twitterFeed#window.unique("company") 
insert  into StockQuote twitterFeed.wordCount as wc,twitterFeed.company as companyname, allStockQuotes.symbol as sym
       </expression>

        <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker">
            <xmlMapping>
                <quotedata:StockQuoteDataEvent
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                        xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:WordCount>{wc}</quotedata:WordCount>
                    <quotedata:Company>{companyname}</quotedata:Company>
     <quotedata:StockSymbol>{sym}</quotedata:StockSymbol>
                    </quotedata:StockQuoteDataEvent>
            </xmlMapping>
        </output>
    </query>
</bucket> 
 
 

   

    1. B - First Unique Window : Steps to Change
Bucket (jms-twitter-stockquote-analyser.xml) for this sample should be updated as following as it supports this feature./<CEP_HOME>/samples/cep-samples/conf/buckets/jms-twitter-stockquote-analyser.xml




<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
A window that keeps the first events
that are unique according to the given unique attribute.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="AllStockQuotes" brokerName="activemqJmsBroker">
        <mapMapping stream="allStockQuotes" queryEvnetType="Tuple">
            <property name="symbol" inputName="symbol" type="java.lang.String"/>
            <property name="price" inputName="price" type="java.lang.Double"/>
        </mapMapping>
    </input>
    <input topic="TwitterFeed" brokerName="activemqJmsBroker">
        <mapMapping stream="twitterFeed" queryEvnetType="Tuple">
            <property name="company" inputName="company" type="java.lang.String"/>
            <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/>
        </mapMapping>
    </input>
      <query name="StocksPredictor">

         <expression>
from allStockQuotes#window.length(1) unidirectional join twitterFeed#window.firstUnique("company") 
insert  into StockQuote twitterFeed.wordCount as wc,twitterFeed.company as companyname, allStockQuotes.symbol as sym
       </expression> 

        <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker">
            <xmlMapping>
                <quotedata:StockQuoteDataEvent
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                        xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:WordCount>{wc}</quotedata:WordCount>
                    <quotedata:Company>{companyname}</quotedata:Company>
     <quotedata:StockSymbol>{sym}</quotedata:StockSymbol>
                    </quotedata:StockQuoteDataEvent>
            </xmlMapping>
        </output>
    </query>
</bucket>



    Then follow the steps given in the sample in the following link even for this sample : (Reference: )
    http://docs.wso2.org/wiki/display/CEP210/Twitter+and+StockQuote+Analyzer

    2. Prerequisites
  • Apache Ant to build & deploy the Sample & Service, and to run the client. Refer Installation Prerequisites for instructions to install Apache Ant.
  • ActiveMQ JMS Broker to publish and subscribe events. Refer Installation Prerequisites for instructions to install ActiveMQ JMS Broker.

    3. Deploying the configurations
The steps are as follows :
  1. Install the WSO2 Complex Event Processor, but do not start the server, Refer to the Installation and Deployment for instructions.
  2. Copy paste activemq-all-xxx.jar from the <ActiveMQ_HOME> directory to <CEP_HOME>/samples/lib directory.
  3. Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib to <CEP_HOME>/repository/components/lib directory.
  4. In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  5. From there, type ant deploy-jms,
    This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory.
  6. Start ActiveMQ JMS Broker. Refer Installation Prerequisites for instructions to run ActiveMQ JMS Broker.
  7. Now start the WSO2 Complex Event Processor. Refer to the Running the Product for instructions.

    4. Starting JMS Subscriber
The steps are as follows :
  1. In a new command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant jmsSubscriber -Dtopic=PredictedStockQuotes, this will subscribe to the PredictedStockQuotes topic of the ActiveMQ Broker receiving the output events of CEP.



    5. Publishing Events
The steps are as follows :
  1. In a command prompt, switch to the CEP samples directory:<CEP_HOME>/samples/cep-samples
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant jmsAllStockQuotesPublisher
    This will publish some JMS Map events to AllStockQuotes topic.
  3. Then from there, type ant jmsTwitterFeedPublisher
    This will publish some JMS Map events to TwitterFeed topic.


    6. Observations
These observation will not be the same as provided in the screenshots in http://docs.wso2.org/wiki/display/CEP210/Twitter+and+StockQuote+Analyzer
It wil be changed according to the respected behavior.
    For the Unique Window :
    --------------------------------------
You will be able to see the output events in the JMS subscriber console with the latest unique events.
E.g.,
Latest/Last event with unique company name, word count and symbol.

    For the First Unique Window :
    -----------------------------------------------
You will be able to see the output events in the JMS subscriber console with the first unique events.
E.g.,
Very First event with unique company name, word count and symbol.