WSO2 CEP
WSO2 Complex Event Processor (CEP) is a
lightweight, easy-to-use, open source Complex Event Processing Server
(CEP) available under Apache Software License v2.0. WSO2 CEP
identifies the most meaningful events within the event cloud,
analyzes their impact, and acts on them in real-time. It's built to
be extremely high performing and massively scalable. (Reference :
http://wso2.com/products/complex-event-processor
)
For More information, read on :
http://www.slideshare.net/wso2.org/introducing-the-wso2-complex-event-processor and http://siddhi.sourceforge.net/
WSO2 CEP Windows Concept
Window is a limited subset of events
from an event stream. Users can define a window and then use the
events on the window for calculations. A window has two types of
output: current events and expired events. A window emits current
events when a new event arrives. Expired events are emitted whenever
an existing event has expired from a window. (Reference :
http://docs.wso2.org/wiki/display/CEP210/Windows
)
To read more on CEP and the concepts
behind this sample, read the documentation provided.
Note : I hope the reader has an idea
about WSO2 CEP and the concepts behind it before try out this sample.
Also this sample will be helpful to the
WSO2 Quality Assurance team to verify the Unique window and the First
Unique Window Functionality of WSO2 CEP product.
Unique Window
A window that keeps only the latest
events that are unique according to the given attribute.
For More Information (Reference):
http://docs.wso2.org/wiki/display/CEP210/Windows#Windows-UniqueWindow
First Unique Window
A window that keeps the first events
that are unique according to the given unique attribute
For More Information (Reference):
http://docs.wso2.org/wiki/display/CEP210/Windows#Windows-FirstUniqueWindow
Twitter and StockQuote Analyzer
For More Information (Reference): http://docs.wso2.org/wiki/display/CEP210/Twitter+and+StockQuote+Analyzer
In the above link, in the provided sample, CEP will receive events from stock quotes stream and Twitter feeds stream, and trigger an event if the last traded amount of the stock quotes stream vary by 2 percent with regards to the average traded price of a symbol within past 2 minutes and the word count of the Twitter feeds stream with related to that symbol is greater than 10.
But to demonstrate and verify the Unique window and the first unique the above sample should be changed as given below.
1. A - Unique Window : Steps to Change
<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep"> <description> A window that keeps only the latest events that are unique according to the given attribute. </description> <engineProviderConfiguration engineProvider="SiddhiCEPRuntime"> <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property> <property name="siddhi.enable.distributed.processing">false</property> </engineProviderConfiguration> <input topic="AllStockQuotes" brokerName="activemqJmsBroker"> <mapMapping stream="allStockQuotes" queryEvnetType="Tuple"> <property name="symbol" inputName="symbol" type="java.lang.String"/> <property name="price" inputName="price" type="java.lang.Double"/> </mapMapping> </input> <input topic="TwitterFeed" brokerName="activemqJmsBroker"> <mapMapping stream="twitterFeed" queryEvnetType="Tuple"> <property name="company" inputName="company" type="java.lang.String"/> <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/> </mapMapping> </input> <query name="StocksPredictor"> <expression> from allStockQuotes#window.length(1) unidirectional join twitterFeed#window.unique("company") insert into StockQuote twitterFeed.wordCount as wc,twitterFeed.company as companyname, allStockQuotes.symbol as sym </expression> <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker"> <xmlMapping> <quotedata:StockQuoteDataEvent xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:quotedata="http://ws.cdyne.com/"> <quotedata:WordCount>{wc}</quotedata:WordCount> <quotedata:Company>{companyname}</quotedata:Company> <quotedata:StockSymbol>{sym}</quotedata:StockSymbol> </quotedata:StockQuoteDataEvent> </xmlMapping> </output> </query> </bucket>
1. B - First Unique Window : Steps to Change
<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep"> <description> A window that keeps the first events that are unique according to the given unique attribute. </description> <engineProviderConfiguration engineProvider="SiddhiCEPRuntime"> <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property> <property name="siddhi.enable.distributed.processing">false</property> </engineProviderConfiguration> <input topic="AllStockQuotes" brokerName="activemqJmsBroker"> <mapMapping stream="allStockQuotes" queryEvnetType="Tuple"> <property name="symbol" inputName="symbol" type="java.lang.String"/> <property name="price" inputName="price" type="java.lang.Double"/> </mapMapping> </input> <input topic="TwitterFeed" brokerName="activemqJmsBroker"> <mapMapping stream="twitterFeed" queryEvnetType="Tuple"> <property name="company" inputName="company" type="java.lang.String"/> <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/> </mapMapping> </input> <query name="StocksPredictor"> <expression> from allStockQuotes#window.length(1) unidirectional join twitterFeed#window.firstUnique("company") insert into StockQuote twitterFeed.wordCount as wc,twitterFeed.company as companyname, allStockQuotes.symbol as sym </expression> <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker"> <xmlMapping> <quotedata:StockQuoteDataEvent xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:quotedata="http://ws.cdyne.com/"> <quotedata:WordCount>{wc}</quotedata:WordCount> <quotedata:Company>{companyname}</quotedata:Company> <quotedata:StockSymbol>{sym}</quotedata:StockSymbol> </quotedata:StockQuoteDataEvent> </xmlMapping> </output> </query> </bucket>
- Then follow the steps given in the sample in the following link even for this sample : (Reference: )
http://docs.wso2.org/wiki/display/CEP210/Twitter+and+StockQuote+Analyzer
- Apache Ant to build & deploy the Sample & Service, and to run the client. Refer Installation Prerequisites for instructions to install Apache Ant.
- ActiveMQ JMS Broker to publish and
subscribe events. Refer Installation
Prerequisites for instructions to install ActiveMQ JMS Broker.
- Install the WSO2 Complex Event Processor, but do not start the server, Refer to the Installation and Deployment for instructions.
- Copy paste activemq-all-xxx.jar from
the <ActiveMQ_HOME> directory to <CEP_HOME>/samples/lib
directory.
- Copy paste activemq-core-xxx.jar and
geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib
to <CEP_HOME>/repository/components/lib directory.
- In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples
For example, in Linux: cd <CEP_HOME>/samples/cep-samples - From there, type ant deploy-jms,
This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory. - Start ActiveMQ JMS Broker. Refer Installation Prerequisites for instructions to run ActiveMQ JMS Broker.
- Now start the WSO2 Complex Event
Processor. Refer to the Running
the Product for instructions.
4. Starting JMS Subscriber
- In a new command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples
For example, in Linux: cd <CEP_HOME>/samples/cep-samples - From there, type ant
jmsSubscriber -Dtopic=PredictedStockQuotes, this will
subscribe to the PredictedStockQuotes topic of the ActiveMQ Broker
receiving the output events of CEP.
5. Publishing Events
- In a command prompt, switch to the CEP samples directory:<CEP_HOME>/samples/cep-samples
For example, in Linux: cd <CEP_HOME>/samples/cep-samples - From there, type
ant jmsAllStockQuotesPublisher
This will publish some JMS Map events to AllStockQuotes topic.
- Then from there, type
ant jmsTwitterFeedPublisher
This will publish some JMS Map events to TwitterFeed topic.
6. Observations
It wil be changed according to the respected behavior.
For the Unique
Window :
--------------------------------------
You will be able to see the output
events in the JMS subscriber console with the latest unique events.
E.g.,
Latest/Last event with unique company
name, word count and symbol.
For the First
Unique Window :
-----------------------------------------------
You will be able to see the output
events in the JMS subscriber console with the first unique events.
E.g.,
Very First event with unique company
name, word count and symbol.
Great article Ushani! With the clear description of the demo, it should be no problem to try this.
ReplyDeleteThanks Peter
DeleteSure I will be. You can refer the following link for more information about the product and try out samples as well.
ReplyDeletehttp://wso2.com/products/complex-event-processor/