在我之前发表的文章中,我提到我最近热衷于Complex Event Processing (CEP) (复杂事件处理)。简单来说,CEP把数据流作为输入,根据一系列预定义的规则,把数据(或部分数据)重定向给监听者们;又或者是当发现数据中的隐含的模式(Pattern)时,触发事件。在大量数据被产生出来并需要进行实时地分析的场景下,CEP特别有用。
import java.util.Date; public static class Tick { String symbol; Double price; Date timeStamp; public Tick(String s, double p, long t) { symbol = s; price = p; timeStamp = new Date(t); } public double getPrice() {return price;} public String getSymbol() {return symbol;} public Date getTimeStamp() {return timeStamp;} @Override public String toString() { return "Price: " + price.toString() + " time: " + timeStamp.toString(); } }
import com.espertech.esper.client.*; public class main { public static void main(String [] args){ //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); // We register Ticks as objects the engine will have to handle cepConfig.addEventType("StockTick",Tick.class.getName()); // We setup the engine EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig); } }
import java.util.Random; import com.espertech.esper.client.*; public class exampleMain { private static Random generator=new Random(); public static void GenerateRandomTick(EPRuntime cepRT){ double price = (double) generator.nextInt(10); long timeStamp = System.currentTimeMillis(); String symbol = "AAPL"; Tick tick= new Tick(symbol,price,timeStamp); System.out.println("Sending tick:" + tick); cepRT.sendEvent(tick); } public static void main(String[] args) { //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); cepConfig.addEventType("StockTick",Tick.class.getName()); EPServiceProvider cep=EPServiceProviderManager.getProvider("myCEPEngine",cepConfig); EPRuntime cepRT = cep.getEPRuntime(); } }
public static void main(String[] args) { //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); cepConfig.addEventType("StockTick",Tick.class.getName()); EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig); EPRuntime cepRT = cep.getEPRuntime(); // We register an EPL statement EPAdministrator cepAdm = cep.getEPAdministrator(); EPStatement cepStatement = cepAdm.createEPL("select * from " + "StockTick(symbol='AAPL').win:length(2) " + "having avg(price) > 6.0"); }
cepStatement.addListener(new CEPListener());
public static class CEPListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Event received: " + newData[0].getUnderlying()); } }
for(int i = 0; i< 5; i++) GenerateRandomTick(cepRT);
import com.espertech.esper.client.*; import java.util.Random; import java.util.Date; public class exampleMain { public static class Tick { String symbol; Double price; Date timeStamp; public Tick(String s, double p, long t) { symbol = s; price = p; timeStamp = new Date(t); } public double getPrice() {return price;} public String getSymbol() {return symbol;} public Date getTimeStamp() {return timeStamp;} @Override public String toString() { return "Price: " + price.toString() + " time: " + timeStamp.toString(); } } private static Random generator = new Random(); public static void GenerateRandomTick(EPRuntime cepRT) { double price = (double) generator.nextInt(10); long timeStamp = System.currentTimeMillis(); String symbol = "AAPL"; Tick tick = new Tick(symbol, price, timeStamp); System.out.println("Sending tick:" + tick); cepRT.sendEvent(tick); } public static class CEPListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Event received: " + newData[0].getUnderlying()); } } public static void main(String[] args) { //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); cepConfig.addEventType("StockTick", Tick.class.getName()); EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine", cepConfig); EPRuntime cepRT = cep.getEPRuntime(); EPAdministrator cepAdm = cep.getEPAdministrator(); EPStatement cepStatement = cepAdm.createEPL("select * from " + "StockTick(symbol='AAPL').win:length(2) " + "having avg(price) > 6.0"); cepStatement.addListener(new CEPListener()); // We generate a few ticks... for (int i = 0; i < 5; i++) { GenerateRandomTick(cepRT); } } }
log4j:WARN No appenders could be found for logger (com.espertech.esper.epl.metric.MetricReportingPath). log4j:WARN Please initialize the log4j system properly. Sending tick:Price: 6.0 time: Tue Jul 21 01:11:15 CEST 2009 Sending tick:Price: 0.0 time: Tue Jul 21 01:11:15 CEST 2009 Sending tick:Price: 7.0 time: Tue Jul 21 01:11:15 CEST 2009 Sending tick:Price: 4.0 time: Tue Jul 21 01:11:15 CEST 2009 Sending tick:Price: 9.0 time: Tue Jul 21 01:11:15 CEST 2009 Event received: Price: 9.0 time: Tue Jul 21 01:11:15 CEST 2009
import org.apache.log4j.ConsoleAppender; import org.apache.log4j.SimpleLayout; import org.apache.log4j.Level; import org.apache.log4j.Logger; //and this in the main function before the rest of your code: public static void main(String [] args){ SimpleLayout layout = new SimpleLayout(); ConsoleAppender appender = new ConsoleAppender(new SimpleLayout()); Logger.getRootLogger().addAppender(appender); Logger.getRootLogger().setLevel((Level) Level.WARN); (...)
下一篇文章中,我将更深入一些来探索EPL语句,提供一些代码来连接两个引擎实现所谓的“事件精化”(event refinement)(译者:好像之后作者再也没有更新过了,所以,不要指望后续了:))
win:time(3 sec)就是定义了3秒的时间窗口,avg(price)就是统计了3秒内的OrderEvent对象的price的平均值select avg(price) from test.OrderEvent.win:time(3 sec)
win:length(10)就是定义了10个Event的,avg(price)就是统计了最近10个的OrderEvent对象的price的平均值select avg(price) from test.OrderEvent.win:length(100)
having过滤select avg(price) from test.OrderEvent.win:time_batch(3 sec) where price>10
似曾相识啊,执行方式也基本和SQL里的where 和 having差不多。select avg(price) from test.OrderEvent.win:time_batch(3 sec) having price>10
select count(price) from test.OrderEvent.win:time_batch(3 sec) where price>10
group byselect sum(price) from test.OrderEvent.win:time_batch(3 sec) where price>10
select itemName,sum(price) from test.OrderEvent.win:time_batch(3 sec) where price>10 group by itemName
select Math.round(sum(price)) from test.OrderEvent.win:time_batch(3 sec) where price>10
配置一下public class Util {
public static double computePercent(double amount, double total) {
return amount / total * 100;
OK了,可以用了<plugin-singlerow-function name="percent"
function-class="mycompany.MyUtilityClass" function-method="computePercent" />
select percent(price,total) from OrderEvent
log4j 日志性能优化
日志记录是大多数应用中的一项共同需求。但是日志记录不是没有坏处,我们需要付出性能上的损失。日志是一个I / O活动,并使用'同步'的代码,同时使用appender追加在单一的文件中写入。
1.基本规则 - 通常我们所知道,但很多时间忽略了─
如果你正在做在你的日志象mylogString.append(“blah blah”).append(“few more blah blah”).append(“..”),先使用"if(logger.isDebugEnabled())"语句做个判断,如果您的应用程序配置为“Info”或更高的日志记录级别,这样会阻止字符串操作,
3.使用日志缓冲 - 缓冲避免发送每个日志事件到日志文件,并会减少IO操作。
在下面的示例代码中,使用“Buffered IO” - > true“和BufferSize。然而,在较高的缓冲区大小设置下,要在日志文件中立即看到你的日志(特别是如果你使用“tail -f”看日志文件)有问题。其次,如果服务器崩溃,你可能会看不到仍处于缓冲内存日志记录,还没有刷新日志文件信息的最后一个重要的部分。
<appender name="mainAppender" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="../log/server.log" />
<param name="Append" value="true" />
<param name="MaxFileSize" value="10MB"/>
<param name="MaxBackupIndex" value="20"/>
<param name="ImmediateFlush" value="false" />
<param name="BufferedIO" value="true" />
<param name="BufferSize" value="16" /> <!-- In KB, Default is 8KB -->
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %X{user}[%t] %-5p - %m%n" />
4.考虑使用'Asyn“记录日志 - 使用'ASYN”记录有巨大的性能提升。异步日志记录在后台线程中完成
<appender name="asyncAppender" class="org.apache.log4j.AsyncAppender">
<param name="BufferSize" value="128" />
<param name="Blocking" value="true" />
<appender-ref ref="mainAppender" />
阻塞:意思是'如果缓冲区已满,仍在等待刷新,怎么做“。如果是“false”,日志appender将丢弃记录事件,直到缓冲区可用,提供了更好的性能。默认为“true”。这也是一个权衡。然而,'Asyn'的日志操作可以忽略main appender缓存的日志事件。
Appender-Ref:它采用最初定义“main Appender”(步骤2)做记录。然而,在“ASYN”记录场景,您可以忽略main appender缓冲日志事件的。请注意,“ASYN logger”缓冲基于“日志事件数量'但是'main logger”缓冲是基于日志的大小。
此外,更多的缓冲 ->刷新日志有更多的延迟。 (你可能错过服务器崩溃情况下的一些日志)
5.映射诊断上下文(MDC) - 虽然这个功能和性能没有直接关系的,但它让你的日志代码干净。 MDC使用ThreadLocal的存储来存储上下文方面的具体信息。 MDC可以用于记录像loged-in的用户名,事务ID,任何其它标识符来记录日志。
In your code - > MDC.put("userssion.getUser()); (import org.apache.log4j.MDC)
In Log4j.xml (or log4j.properties), set pattern layout with %X:
<param name="ConversionPattern" value="%d %X{user}[%t] %-5p - %m%n" />
2012-11-02 12:12:04,586 [user: 0x3c5c84b3][pool-7-thread-182] DEBUG .......
2012-11-02 12:12:04,586 [user: 0x4c678c66][pool-7-thread-182] DEBUG .......
6.多个记录器支持 - 如果在你的应用程序使用多个记录器,如果你得到重复的日志条目,请考虑加上'additivity=talse'属性。
<logger name="me.prettyprint" additivity="false">
<level value="error" />
<appender-ref ref="cassandraAppender" />
<rollingPolicy class="org.apache.log4j.rolling.TimeBasedRollingPolicy">
<param name="FileNamePattern" value="server.log.%d{yyyy-MM}**.gz**"/>
public void setLogLevel(String loggerName, String newLevel) throws IllegalArgumentException {
final Level level = Level.toLevel(newLevel);
if (!newLevel.equalsIgnoreCase(level.toString())) {
throw new IllegalArgumentException("Invalid log level");
final Logger logger = Logger.getLogger(loggerName);
if ((logger == null) || (logger.getLevel() == null)) {
throw new IllegalArgumentException("Logger does not exist.");
9.可以改变日志到ASYN/ SYN,缓冲/非缓冲模式。欲了解更多详情请参阅log4j的API。为了演示,我给一个片段:
// Get All Appenders
Enumeration<Appender> appenders = LogManager.getRootLogger().getAllAppenders();
// Get your Appender from Enumeration,
if (appender instanceof AsyncAppender) {
// Get all internal Appenders user by Asyn Appender
Enumeration<Appender> mainAppenders = asyncAppender.getAllAppenders();
// Get Main Appender, and do your operations
[ lucene扩展 ] spellChecker原理分析 - MR-fox - 博客园
lucene 的扩展包中包含了spellchecker,利用它我们可以方便的实现拼写检查的功能,但是检查的效果(推荐的准确程度)需要开发者进行调整、优化。
麻辣烫 中文测试 麻辣酱 麻辣火锅 中国人 中华人民共和国 |
/** * 根据字典文件创建spellchecker所使用的索引。 * * @param spellIndexPath * spellchecker索引文件路径 * @param idcFilePath * 原始字典文件路径 * @throws IOException */ public void createSpellIndex(String spellIndexPath, String idcFilePath) throws IOException { Directory spellIndexDir = FSDirectory.open( new File(spellIndexPath)); SpellChecker spellChecker = new SpellChecker(spellIndexDir); IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_35, null ); spellChecker.indexDictionary( new PlainTextDictionary( new File( idcFilePath)), config, false ); // close spellIndexDir.close(); spellChecker.close(); } |
除了PlainTextDictionary(1 word per line),我们还可以使用:
- FileDictionary(1 string per line, optionally with a tab-separated integer value | 词组之间用tab分隔)
- LuceneDictionary(Lucene Dictionary: terms taken from the given field of a Lucene index | 用现有的index的term建立索引)
- HighFrequencyDictionary(HighFrequencyDictionary: terms taken from the given field of a Lucene index, which appear in a number of documents above a given threshold. | 在LuceneDictionary的基础上加入了一定的限定,term只有出现在各document中的次数满足一定数量时才被spellchecker采用)
/** * 根据指定索引中的字典创建spellchecker所使用的索引。 * * @param oriIndexPath * 指定原始索引 * @param fieldName * 索引字段(某个字段的字典) * @param spellIndexPath * 原始字典文件路径 * @throws IOException */ public void createSpellIndex(String oriIndexPath, String fieldName, String spellIndexPath) throws IOException { IndexReader oriIndex = IndexReader.open(FSDirectory.open( new File( oriIndexPath))); LuceneDictionary dict = new LuceneDictionary(oriIndex, fieldName); Directory spellIndexDir = FSDirectory.open( new File(spellIndexPath)); SpellChecker spellChecker = new SpellChecker(spellIndexDir); IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_35, null ); spellChecker.indexDictionary(dict, config, true ); } |
Document<stored,indexed,omitNorms,indexOptions=DOCS_ONLY<word:麻辣烫>> Document<stored,indexed,omitNorms,indexOptions=DOCS_ONLY<word:中文测试>> Document<stored,indexed,omitNorms,indexOptions=DOCS_ONLY<word:麻辣酱>> Document<stored,indexed,omitNorms,indexOptions=DOCS_ONLY<word:麻辣火锅>> Document<stored,indexed,omitNorms,indexOptions=DOCS_ONLY<word:中国人>> Document<stored,indexed,omitNorms,indexOptions=DOCS_ONLY<word:中华人民共和国>> end1:人 end1:烫 end1:试 end1:酱 end1:锅 end2:国人 end2:测试 end2:火锅 end2:辣烫 end2:辣酱 end3:共和国 end4:民共和国 gram1:中 gram1:人 gram1:国 gram1:文 gram1:测 gram1:火 gram1:烫 gram1:试 gram1:辣 gram1:酱 gram1:锅 gram1:麻 gram1: gram2:中国 gram2:中文 gram2:国人 gram2:文测 gram2:测试 gram2:火锅 gram2:辣火 gram2:辣烫 gram2:辣酱 gram2:麻辣 gram2:麻 gram3:中华人 gram3:人民共 gram3:共和国 gram3:华人民 gram3:民共和 gram4:中华人民 gram4:人民共和 gram4:华人民共 gram4:民共和国 start1:中 start1:麻 start1: start2:中国 start2:中文 start2:麻辣 start2:麻 start3:中华人 start4:中华人民 word:中华人民共和国 word:中国人 word:中文测试 word:麻辣火锅 word:麻辣酱 word:麻辣烫 |
package com.fox.lab; import java.io.File; import java.io.IOException; import java.util.Iterator; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.spell.LuceneDictionary; import org.apache.lucene.search.spell.SpellChecker; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; /** * @author huangfox * @createDate 2012-2-16 * @eMail [email protected] */ public class DidYouMeanSearcher { SpellChecker spellChecker = null ; LuceneDictionary dict = null ; /** * * @param spellCheckIndexPath * spellChecker索引位置 */ public DidYouMeanSearcher(String spellCheckIndexPath, String oriIndexPath, String fieldName) { Directory directory; try { directory = FSDirectory.open( new File(spellCheckIndexPath)); spellChecker = new SpellChecker(directory); IndexReader oriIndex = IndexReader.open(FSDirectory.open( new File( oriIndexPath))); dict = new LuceneDictionary(oriIndex, fieldName); } catch (IOException e) { e.printStackTrace(); } } /** * 设定精度,默认0.5 * * @param v */ public void setAccuracy( float v) { spellChecker.setAccuracy(v); } /** * 针对检索式进行spell check * * @param queryString * 检索式 * @param suggestionsNumber * 推荐的最大数量 * @return */ public String[] search(String queryString, int suggestionsNumber) { String[] suggestions = null ; try { // if (exist(queryString)) // return null; suggestions = spellChecker.suggestSimilar(queryString, suggestionsNumber); } catch (IOException e) { e.printStackTrace(); } return suggestions; } private boolean exist(String queryString) { Iterator<String> ite = dict.getWordsIterator(); while (ite.hasNext()) { if (ite.next().equals(queryString)) return true ; } return false ; } } |
package com.fox.lab; import java.io.IOException; public class DidYouMeanMainApp { /** * @param args */ public static void main(String[] args) { // 创建index DidYouMeanIndexer indexer = new DidYouMeanIndexer(); String spellIndexPath = "D:\\spellchecker" ; String idcFilePath = "D:\\dic.txt" ; String oriIndexPath = "D:\\solrHome\\example\\solr\\data\\index" ; String fieldName = "ab" ; DidYouMeanSearcher searcher = new DidYouMeanSearcher(spellIndexPath, oriIndexPath, fieldName); searcher.setAccuracy( 0 .5f); int suggestionsNumber = 15 ; String queryString = "麻辣将" ; // try { // indexer.createSpellIndex(spellIndexPath, idcFilePath); // indexer.createSpellIndex(oriIndexPath, fieldName, spellIndexPath); // } catch (IOException e) { // e.printStackTrace(); // } String[] result = searcher.search(queryString, suggestionsNumber); if (result == null || result.length == 0 ) { System.out.println( "我不知道你要什么,或许你就是对的!" ); } else { System.out.println( "你是不是想找:" ); for ( int i = 0 ; i < result.length; i++) { System.out.println(result[i]); } } } } |
你是不是想找: 麻辣酱 麻辣火锅 麻辣烫 |
你是不是想找: 中文测试 |
我不知道你要什么,或许你就是对的! |