Showing posts with label Complex Event Processor. Show all posts
Showing posts with label Complex Event Processor. Show all posts

Thursday, February 28, 2013

Sample to demonstrate WSO2 CEP (Complex Event Processor) Registry Based Output topics and Text Mapping


In my previous blog post, I have given an introduction to WSO2 CEP. If you still do not have an idea, you can refer,
For More information, read on : http://wso2.com/products/complex-event-processor )

Processing and triggering new events based on received events is the basic idea of CEP. For processing events there should be some criteria specified to the CEP Engine, and this is provided as query to the CEP Bucket. To perform the expected task user may need one or more queries and some of this queries may be configured to trigger output events.
Query consists of the following elements.
  • Query Name - Name to identify the query
  • Expression - Actual query text specifying how the events should be processed
  • Output (optional) -Output should only be defined if you want to emit the output events triggered by the query,

Output consists following elements

  • Topic - Topic to which CEP will publish the output events.
  • Broker Name - Name of the broker which the output events are sent through
  • Output Mapping - Define the format of the output events.
Following are the available Output Mappings.
Reference : http://docs.wso2.org/wiki/pages/viewpage.action?pageId=21692679



Sample I am going to demonstrates is a way to realize Output Topics and text mapping using the registry. This sample can be helpful to the WSO2 Quality Assurance team to test the Registry Based Output and text mapping as well. I will not be doing something new in here. But to reuse the existing sa.mples provided in the documentation and organize them as you can execute them and view this feature in WSO2 CEP


To read more : http://docs.wso2.org/wiki/display/CEP210/Output+Topics


To demonstrates I will be using an existing sample from the documentation, which is Build Analyzer. http://docs.wso2.org/wiki/display/CEP210/Build+Analyzer



The best thing to do first is, run the sample given as it is to understand the behavior.


Steps to change for Registry Based Mapping
     1. Follow the steps given in the prerequisites. If you have already run the sample as it is you will not have to do those steps back again. Then you can move to the next step, Configuring Topics for E-Mail Broker.

  1.  When it comes to registry based mapping, the only step that will be differentiate from the existing sample will be the step which you have to configure Topics for E-Mail Broker.
    Follow the steps given in the following : http://docs.wso2.org/wiki/display/CEP210/Output+Topics

  2. Create a plain text content in the registry with the name "emailTopic" in the "config" directory of registry. (Go to the Registry → Browse → _system/config)



  3. Click on Add Resource which is under Entries and select Create Text Content from the Method drop down and provide the mapping details with your email address to see the incoming mail.



  4. Before Providing the file path for the output topic field to the bucket, since the bucket for the given example is already in our CEP-sample scenarios, we can first deploy and then edit the output mapping of it.

  5. Therefore deploy the configurations.
    In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples

  6. From there, type ant deploy-build.

  7. Then when you list down the buckets, you will be able to see the, BuildFailAnalysisBucket .

  8. If you view it, you will be able to see the output is just mapped inline. Therefore we should map it as it supports our registry mapping. Therefore click on the Edit link to edit the bucket's output mapping.

  9. Then click on the BuildFailFilterQuery to edit the Output mapping.

  10. There you get the Query page. Edit the Output section as follows by providing config-registry:/emailTopic as the topic.


  11. Now we are almost done which is related to this sample. Now start the WSO2 Complex Event Processor. Refer to the Running the Product for instructions.

  12. Then follow the steps provided in the following location (Build Analyzer Sample) from the Defining Stream in CEP engine section onwards.
    http://docs.wso2.org/wiki/display/CEP210/Build+Analyzer

  13. If you have successfully followed the steps, you will be able to view the same observation which you have received in inline mapping.

  14. This sample can be used to demonstrates Registry-Based Output Text mapping as well. Only step you have to follow is define the text content of the email using the registry. Except above 4th and 11th steps you have to follow the steps given in the following location.
    http://docs.wso2.org/wiki/display/CEP210/Text+Output+Mapping

Saturday, February 23, 2013

Sample demonstrates the Unique Window and the First Unique Window feature of WSO2 Complex Event Processor (CEP)


WSO2 CEP

WSO2 Complex Event Processor (CEP) is a lightweight, easy-to-use, open source Complex Event Processing Server (CEP) available under Apache Software License v2.0. WSO2 CEP identifies the most meaningful events within the event cloud, analyzes their impact, and acts on them in real-time. It's built to be extremely high performing and massively scalable. (Reference : http://wso2.com/products/complex-event-processor )



WSO2 CEP Windows Concept

Window is a limited subset of events from an event stream. Users can define a window and then use the events on the window for calculations. A window has two types of output: current events and expired events. A window emits current events when a new event arrives. Expired events are emitted whenever an existing event has expired from a window. (Reference : http://docs.wso2.org/wiki/display/CEP210/Windows )

To read more on CEP and the concepts behind this sample, read the documentation provided.

Note : I hope the reader has an idea about WSO2 CEP and the concepts behind it before try out this sample.
Also this sample will be helpful to the WSO2 Quality Assurance team to verify the Unique window and the First Unique Window Functionality of WSO2 CEP product.


Unique Window
A window that keeps only the latest events that are unique according to the given attribute.


First Unique Window
A window that keeps the first events that are unique according to the given unique attribute



Twitter and StockQuote Analyzer

This sample demonstrates how Siddhi engine can be used with JMS broker to receive, process and publish JMS Map & XML messages.
For More Information (Reference): http://docs.wso2.org/wiki/display/CEP210/Twitter+and+StockQuote+Analyzer



In the above link, in the provided sample, CEP will receive events from stock quotes stream and Twitter feeds stream, and trigger an event if the last traded amount of the stock quotes stream vary by 2 percent with regards to the average traded price of a symbol within past 2 minutes and the word count of the Twitter feeds stream with related to that symbol is greater than 10.



But to demonstrate and verify the Unique window and the first unique the above sample should be changed as given below.



    1. A - Unique Window : Steps to Change
Bucket (jms-twitter-stockquote-analyser.xml) for this sample should be updated as following as it supports this feature./<CEP_HOME>/samples/cep-samples/conf/buckets/jms-twitter-stockquote-analyser.xml



<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
A window that keeps only the latest
events that are unique according to the given attribute.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="AllStockQuotes" brokerName="activemqJmsBroker">
        <mapMapping stream="allStockQuotes" queryEvnetType="Tuple">
            <property name="symbol" inputName="symbol" type="java.lang.String"/>
            <property name="price" inputName="price" type="java.lang.Double"/>
        </mapMapping>
    </input>
    <input topic="TwitterFeed" brokerName="activemqJmsBroker">
        <mapMapping stream="twitterFeed" queryEvnetType="Tuple">
            <property name="company" inputName="company" type="java.lang.String"/>
            <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/>
        </mapMapping>
    </input>
      <query name="StocksPredictor">

        <expression>
from allStockQuotes#window.length(1) unidirectional join twitterFeed#window.unique("company") 
insert  into StockQuote twitterFeed.wordCount as wc,twitterFeed.company as companyname, allStockQuotes.symbol as sym
       </expression>

        <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker">
            <xmlMapping>
                <quotedata:StockQuoteDataEvent
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                        xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:WordCount>{wc}</quotedata:WordCount>
                    <quotedata:Company>{companyname}</quotedata:Company>
     <quotedata:StockSymbol>{sym}</quotedata:StockSymbol>
                    </quotedata:StockQuoteDataEvent>
            </xmlMapping>
        </output>
    </query>
</bucket> 
 
 

   

    1. B - First Unique Window : Steps to Change
Bucket (jms-twitter-stockquote-analyser.xml) for this sample should be updated as following as it supports this feature./<CEP_HOME>/samples/cep-samples/conf/buckets/jms-twitter-stockquote-analyser.xml




<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
A window that keeps the first events
that are unique according to the given unique attribute.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="AllStockQuotes" brokerName="activemqJmsBroker">
        <mapMapping stream="allStockQuotes" queryEvnetType="Tuple">
            <property name="symbol" inputName="symbol" type="java.lang.String"/>
            <property name="price" inputName="price" type="java.lang.Double"/>
        </mapMapping>
    </input>
    <input topic="TwitterFeed" brokerName="activemqJmsBroker">
        <mapMapping stream="twitterFeed" queryEvnetType="Tuple">
            <property name="company" inputName="company" type="java.lang.String"/>
            <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/>
        </mapMapping>
    </input>
      <query name="StocksPredictor">

         <expression>
from allStockQuotes#window.length(1) unidirectional join twitterFeed#window.firstUnique("company") 
insert  into StockQuote twitterFeed.wordCount as wc,twitterFeed.company as companyname, allStockQuotes.symbol as sym
       </expression> 

        <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker">
            <xmlMapping>
                <quotedata:StockQuoteDataEvent
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                        xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:WordCount>{wc}</quotedata:WordCount>
                    <quotedata:Company>{companyname}</quotedata:Company>
     <quotedata:StockSymbol>{sym}</quotedata:StockSymbol>
                    </quotedata:StockQuoteDataEvent>
            </xmlMapping>
        </output>
    </query>
</bucket>



    Then follow the steps given in the sample in the following link even for this sample : (Reference: )
    http://docs.wso2.org/wiki/display/CEP210/Twitter+and+StockQuote+Analyzer

    2. Prerequisites
  • Apache Ant to build & deploy the Sample & Service, and to run the client. Refer Installation Prerequisites for instructions to install Apache Ant.
  • ActiveMQ JMS Broker to publish and subscribe events. Refer Installation Prerequisites for instructions to install ActiveMQ JMS Broker.

    3. Deploying the configurations
The steps are as follows :
  1. Install the WSO2 Complex Event Processor, but do not start the server, Refer to the Installation and Deployment for instructions.
  2. Copy paste activemq-all-xxx.jar from the <ActiveMQ_HOME> directory to <CEP_HOME>/samples/lib directory.
  3. Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib to <CEP_HOME>/repository/components/lib directory.
  4. In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  5. From there, type ant deploy-jms,
    This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory.
  6. Start ActiveMQ JMS Broker. Refer Installation Prerequisites for instructions to run ActiveMQ JMS Broker.
  7. Now start the WSO2 Complex Event Processor. Refer to the Running the Product for instructions.

    4. Starting JMS Subscriber
The steps are as follows :
  1. In a new command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant jmsSubscriber -Dtopic=PredictedStockQuotes, this will subscribe to the PredictedStockQuotes topic of the ActiveMQ Broker receiving the output events of CEP.



    5. Publishing Events
The steps are as follows :
  1. In a command prompt, switch to the CEP samples directory:<CEP_HOME>/samples/cep-samples
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant jmsAllStockQuotesPublisher
    This will publish some JMS Map events to AllStockQuotes topic.
  3. Then from there, type ant jmsTwitterFeedPublisher
    This will publish some JMS Map events to TwitterFeed topic.


    6. Observations
These observation will not be the same as provided in the screenshots in http://docs.wso2.org/wiki/display/CEP210/Twitter+and+StockQuote+Analyzer
It wil be changed according to the respected behavior.
    For the Unique Window :
    --------------------------------------
You will be able to see the output events in the JMS subscriber console with the latest unique events.
E.g.,
Latest/Last event with unique company name, word count and symbol.

    For the First Unique Window :
    -----------------------------------------------
You will be able to see the output events in the JMS subscriber console with the first unique events.
E.g.,
Very First event with unique company name, word count and symbol.