转载请注明出处:http://blog.csdn.net/luonanqin
元宵过后回公司上班,换了个部门,换了个领导,做的事也换了,不过Esper还是会继续搞,所以博客也会慢慢写的,大家别急。^_^
上一篇说到了EPL如何访问关系型数据库这种数据源,实际上别的数据源,比如:webservice、分布式缓存、非关系型数据库等等,Esper提供了统一的数据访问接口。然后今天会讲解如何创建另外一种事件类型——Schema。
1.Joining Method Invocation Results
和执行sql的语法类似,调用方法的一种触发方式也是通过join别的事件的属性来达到效果,且调用方法的句子为from子句。语法如下:
method:class_name.method_name(parameter_expressions)method是固定关键字,class_name为类全名,方法名为返回外部数据的方法名,parameter_expressions为方法的参数列表,对应join的事件属性,多个属性之间用逗号分隔,参数整体用圆括号括起来。例如:
select * from AssetMoveEvent, method:MyLookupLib.lookupAsset(assetId) // assetId为AssetMoveEvent的属性之一除了简单join,还可以为join加上where条件过滤一些返回结果。例如:
select assetId, assetDesc from AssetMoveEvent as asset, method:MyLookupLib.getAssetDescriptions() as desc //调用的方法无参 where asset.assetid = desc.assetidEsper不仅能缓存执行sql的查询结果,也能缓存执行方法的查询结果,并且缓存策略也是两种:LRU和Expire Time。具体可以参考上一篇缓存配置章节。若存在返回结果,且缓存生效后,Esper会自动为返回结果简历索引,加快查询速度。配置范例如下:
// LRU Cache <method-reference class-name="MyFromClauseWebServiceLib"> <lru-cache size="1000"/> </method-reference> // Expire Time Cache <method-reference class-name="com.mycompany.MyFromClauseLookupLib"> <expiry-time-cache max-age-seconds="10" purge-interval-seconds="10" ref-type="weak"/> </method-reference>class-name表示方法所在的类,可以是类全名,可以只有类名,前提是包已经import。其他参数的解释请参见上一篇缓存配置章节
2.Polling Method Invocation Results via Iterator
除了Join别的事件来触发查询方法,进而触发Listener,Esper还支持通过API直接执行方法。例如:
select * from method:MyLookupLib.getAssetDescriptions(category_var) // category_var为注册到引擎的变量。
package example;
/**
* 返回Java类型的外部数据
*
* Created by Luonanqin on 2/16/14.
*/
class JavaObject {
private String name;
private int size;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getSize() {
return size;
}
public void setSize(int size) {
this.size = size;
}
public String toString() {
return "JavaObject{" + "name=‘" + name + ‘\‘‘ + ", size=" + size + ‘}‘;
}
}
public class InvocationMethodJava {
public static JavaObject[] getJavaObject(int times) {
JavaObject[] javaObjects = new JavaObject[2];
JavaObject javaObject1 = new JavaObject();
javaObject1.setName("javaObject1");
javaObject1.setSize(1 * times);
JavaObject javaObject2 = new JavaObject();
javaObject2.setName("javaObject2");
javaObject2.setSize(2 * times);
javaObjects[0] = javaObject1;
javaObjects[1] = javaObject2;
return javaObjects;
}
}
package example;
import java.util.HashMap;
import java.util.Map;
/**
* 返回Map类型的外部数据
*
* Created by Luonanqin on 2/16/14.
*/
public class InvocationMethodMap {
public static Map<String, Object> getMapObject() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("name", "mapObject1");
map.put("size", 1);
return map;
}
public static Map<String, Class> getMapObjectMetadata() {
Map<String, Class> map = new HashMap<String, Class>();
map.put("name", String.class);
map.put("size", int.class);
return map;
}
}
package example;
import java.util.Iterator;
import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
/**
* Created by Luonanqin on 2/16/14.
*/
class Times {
private int times;
public int getTimes() {
return times;
}
public void setTimes(int times) {
this.times = times;
}
}
class InvocationMethodListener implements UpdateListener {
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
if (newEvents != null) {
System.out.println(newEvents[0].getUnderlying());
System.out.println(newEvents[1].getUnderlying());
}
}
}
public class InvocationMethodTest {
public static void main(String arg[]) {
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPRuntime runtime = epService.getEPRuntime();
EPAdministrator admin = epService.getEPAdministrator();
/**
* 调用外部方法返回Java类型数据
*/
String timesName = Times.class.getName();
String ijName = InvocationMethodJava.class.getName();
String epl1 = "select ij.* from " + timesName + " as t, method:" + ijName + ".getJavaObject(times) as ij";
System.out.println(epl1+"\n");
EPStatement state1 = admin.createEPL(epl1);
state1.addListener(new InvocationMethodListener());
Times times = new Times();
times.setTimes(2);
runtime.sendEvent(times);
System.out.println("");
/**
* 调用外部方法返回Map类型数据
*/
String imName = InvocationMethodMap.class.getName();
String epl2 = "select * from method:" + imName + ".getMapObject()";
System.out.println(epl2+"\n");
EPStatement state2 = admin.createEPL(epl2);
Iterator<EventBean> iter = state2.iterator();
while (iter.hasNext()) {
EventBean event = iter.next();
System.out.println(event.getUnderlying());
}
}
}4.Declare an Event Type by Providing Names and Types
我曾经在《Esper学习之二:事件类型》里说过,事件类型的定义可以是POJO,数组,Map,或者XML。实际上还有另一种定义事件类型的方法,那就是schema。这个schema可不是数据库中的schema,而是用EPL定义的事件类型,所以说此类事件类型是针对Esper设计的,并不能拿出来通用。
我们先从语法开始说起。
create [map | objectarray] schema schema_name [as] (property_name property_type [,property_name property_type [,...]) [inherits inherited_event_type[, inherited_event_type] [,...]] [starttimestamp timestamp_property_name] [endtimestamp timestamp_property_name] [copyfrom copy_type_name [, copy_type_name] [,...]]解释如下:
a.map objectarray分别表示当前定义的schema是map结构还是数组结构。
b.schema_name表示schema的名字,全局唯一。as为可选内容。
c.property_name表示schema所包含的属性名称,property_type为属性的类型,多个属性用逗号分隔,所有属性用圆括号括起来可以使用的类型有:int、String等已经内置的Java类型;已经通过Configuration接口注册的事件类型,比如Map,数组,schema等;可以使用POJO类,如果没有import就要写全名,否则写类名即可。
d.inherits表示继承别的事件类型,后面跟着的是要继承的事件类型。如果使用了继承,则当前定义的schema包含了继承的事件类型的所有属性。并且可以继承多个,用逗号分隔。
e.starttimestamp和endtimestamp是两个特殊的关键字。单独使用starttimestamp时,表示为schema记一个时间戳。后面跟着已经声明的属性并且能够返回一个data-time值。如果联合endtimestamp使用,则表示这个schema只能在某段事件内使用。后面跟着的同样也是已经声明的属性并且能够返回一个data-time值。注意endtimestamp不能单独使用。
f.copyfrom表示复制别的事件的所有属性到当前定义的schema中,并且可以copy多个事件,用逗号分隔。
下面用几个例子来一一展示如何使用上面的语法:
// 声明SecurityEvent create schema SecurityEvent as (ipAddress string, userId String, numAttempts int) // 声明AuthorizationEvent,并且包含com.mycompany.HostNameInfo类型的hostinfo属性 create schema AuthorizationEvent(group String, roles String[], hostinfo com.mycompany.HostNameInfo) // 声明CompositeEvent,并且包含了SecurityEvent数组作为innerEvents属性的类型 create schema CompositeEvent(group String, innerEvents SecurityEvent[]) // 声明WebPageVisitEvent,自己定义了userId属性,并且继承了PageHitEvent的所有属性 create schema WebPageVisitEvent(userId String) inherits PageHitEvent // 声明RoboticArmMovement,并且开始于startts,结束于endts create schema RoboticArmMovement (robotId string, startts long, endts long) starttimestamp startts endtimestamp endts // 声明ExtendedSecurityEvent,并且复制SecurityEvent事件的所有属性到ExtendedSecurityEvent create schema ExtendedSecurityEvent (userName string) copyfrom SecurityEvent // 声明WebSecurityEvent,并且复制SecurityEvent和WebPageVisitEvent事件的所有属性到WebSecurityEvent create schema WebSecurityEvent (userName string) copyfrom SecurityEvent, WebPageVisitEvent这里要额外说一下,继承不仅仅继承了事件的属性,如果被继承的事件定义了starttimestamp或者endtimestamp,同样也会被继承下来。但是copyfrom是不会吧starttimestamp和endtimestamp复制的。这点一定要注意。
// 声明数组类型的schema create objectarray schema SchemaTest1 as (prop1 string); ... equals ... @EventRepresentation(array=true)create schema SchemaTest1 as (prop1 string); // 声明Map类型的schema create map schema SchemaTest2 as (prop1 string); ... equals ... @EventRepresentation(array=false)create schema SchemaTest2 as (prop1 string);
5.Declare Variant Stream
Variant Stream简单来说就是包含了各种不同的事件类型的事件类型。所以语法也很明了:
create variant schema schema_name [as] eventtype_name|* [, eventtype_name|*] [,...]variant为关键字,表明这是Variant Stream,eventtype_name为事件的定义名,多个事件定义用逗号分隔开。*表示接收任何的事件类型,不过一般来说没有需求会到这种程度。举例如下:
// 声明SecurityVariant,包含了LoginEvent和LogoutEvent create variant schema SecurityVariant as LoginEvent, LogoutEvent // 声明AnyEvent,包含任何类型的事件 create variant schema AnyEvent as *
以上就是调用外部方法以及schema的创建讲解,尤其是schema的创建,可能大家会用的更多一些,最好能记住。
原文:http://blog.csdn.net/luonanqin/article/details/19576633