1、创建context语法
create context context_name
partition [by] event_property [and event_property [and ...]]
from stream_def [, event_property [...] from stream_def] [, ...]
context_name为context的名字,并且唯一。如果重复,会说明已存在。
event_property为事件的属性名,多个属性名之间用and连接,也可以用逗号连接。
stream_def就是view。
例如:
//一个流:id和name是User的属性 create context NewUser partition by id and name from User //多个流:sid是Student的属性,tid是Teacher的属性 create context Person partition by sid from Student, tid from Teacher
多个流一定要注意,每个流的中用于context的属性的数量要一样,数据类型也要一致。比如下面这几个就是错误的:
// 错误:sid是int,tname是String,数据类型不一致 create context Person partition by sid from Student, tname from Teacher // 错误:Student有一个属性,Teacher有两个属性,属性数量不一致 create context Person partition by sid from Student, tid,tname from Teacher // 错误:sid对应tname,sname对应tid,并且sname和tname是String,sid和tid是int,属性数量一样,但是对应的数据类型不一致 create context Person partition by sid,sname from Student, tname,tid from Teacher
可以对进入context的事件增加过滤条件,不符合条件的就被过滤掉,就像下面这样:
// age大于20的Student事件才能建立或者进入context create context Person partition by sid from Student(age > 20)
例子:
import cn.hutool.core.lang.Console; import com.espertech.esper.client.*; import com.espertech.esper.client.context.ContextPartitionSelectorSegmented; import lombok.*; import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * @author yaoyuan2 * @date 2019/3/26 */ public class SimpleContext { static EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); static EPAdministrator admin = epService.getEPAdministrator(); static String view = User.class.getName(); public static void main(String[] args) { String epl1 = "create context esbtest partition by id from "+view;//必须首先创建,否则报错:没有declare context String epl2 = "context esbtest select avg(amount) as avgAmount,id from " + view; Console.log(epl1); Console.log(epl2); EPStatement context = admin.createEPL(epl1); EPStatement state = admin.createEPL(epl2); state.addListener(new UpdateListener() { @Override public void update(EventBean[] newEvents, EventBean[] oldEvents) { if (newEvents != null) { Double avgAmount = (Double) (newEvents[0].get("avgAmount")); Integer id = (Integer) (newEvents[0].get("id")); Console.log("id={},avgAmount={}",id,avgAmount); } } }); EPRuntime runtime = epService.getEPRuntime(); //查看id为1的平均金额 ContextPartitionSelectorSegmented selectCtx = new ContextPartitionSelectorSegmented() { // 该方法的实现方式与context定义的properties有关,如果有两个property id和time,则Object数组长度为2,obj[0]为id值, // obj[1]为time值,然后再添加到list中并返回 @Override public List<Object[]> getPartitionKeys() { Object[] o = new Object[1]; o[0] = 1; List<Object[]> list = new ArrayList<Object[]>(); list.add(o); return list; } }; User u1 = new User(1,20); Console.log("sendEvent: id={},amount={}",u1.getId(),u1.getAmount()); runtime.sendEvent(u1); User u2 = new User(2,30); Console.log("sendEvent: id={},amount={}",u2.getId(),u2.getAmount()); runtime.sendEvent(u2); User u3 = new User(1,30); Console.log("sendEvent: id={},amount={}",u3.getId(),u3.getAmount()); runtime.sendEvent(u3); Iterator<EventBean> it = state.iterator(selectCtx); EventBean event = it.hasNext() ? it.next() : null; Console.log("Iterator context:id=1,avgAmount={}",event.get("avgAmount")); User u4 = new User(2,40); Console.log("sendEvent: id={},amount={}",u4.getId(),u4.getAmount()); runtime.sendEvent(u4); } } @Setter @Getter @ToString @NoArgsConstructor @AllArgsConstructor class User implements Serializable { private int id; private int amount; }
输出
create context esbtest partition by id from com.ebc.User context esbtest select avg(amount) as avgAmount,id from com.ebc.User sendEvent: id=1,amount=20 id=1,avgAmount=20.0 sendEvent: id=2,amount=30 id=2,avgAmount=30.0 sendEvent: id=1,amount=30 id=1,avgAmount=25.0 Iterator context:id=1,avgAmount=25.0 sendEvent: id=2,amount=40 id=2,avgAmount=35.0
通过例子发现,partition by id会根据id分组,每组进入各自的context。上例中,会建2个context,一个id=1的,另外1个id=2的。
如果partition by后边有同一个流的多个属性值,如:partition by id,name。A事件id=1,amount=20,B事件id=1,amount=21,C事件id=1,amount=20,那么A和C在一个context,B在另外1个context。
原文:https://www.cnblogs.com/yaoyuan2/p/10601240.html