直接上代码
package com.guo.server;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Publisher {
public void start(){
System.out.println("===========publisher start=============");
Context context = ZMQ.context(1);
Socket socket = context.socket(ZMQ.PUB);
socket.setLinger(5000);
socket.setSndHWM(0);
socket.bind("tcp://192.168.124.130:6666");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
for(int i=0;i<10;i++){
String pubstr ="WORK task"+i;
socket.send(pubstr,0);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
socket.send("END", 0);
System.out.println("===========publisher end=============");
socket.close();
context.term();
}
}
package com.guo.client;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Subscriber {
public void start(){
System.out.println("===========subscriber start=============");
Context context = ZMQ.context(1);
Socket socket = context.socket(ZMQ.SUB);
socket.connect("tcp://192.168.124.130:6666");
socket.subscribe("".getBytes());
while(true){
byte[] res =socket.recv(0);
String resStr = new String(res);
System.out.println("substring is ="+resStr);
if("END".equalsIgnoreCase(resStr)){
break;
}
}
System.out.println("===========subscriber end=============");
socket.close();
context.term();
}
}
说明:
在启动Subscriber程序后,启动publisher程序。重要的是这个模式下publisher的消息可以被同时多个subscriber程序接收并处理。实现广播的方式。
本文出自 “自由畅想” 博客,请务必保留此出处http://guo09.blog.51cto.com/2486806/1396143
java调用zeromq PUB-SUB模式,布布扣,bubuko.com
原文:http://guo09.blog.51cto.com/2486806/1396143