引用包:jedis-3.0.1.jar、commons-pool2-2.6.0.jar
一、从Redis集合中实时获取数据:
连接Redis
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/** * 连接Redis * @param conferenceId * @return */ public String startRedis(String topicId) { textMessage = ""; String result = ""; try { JedisPoolConfig config = new JedisPoolConfig(); config.setTimeBetweenEvictionRunsMillis(30000); config.setMaxWaitMillis(10 * 1000); config.setMaxIdle(1000); config.setTestOnBorrow(true); JedisPool jedisPool = new JedisPool(config, redisIpYJ, Integer.parseInt(redisPortYJ),10000);// 连接redis服务端 result = "连接Redis成功"; lock.lock(); try { Thread thread = new Thread(new Runnable() { @Override public void run() { getRecordTextNew_Redis(jedisPool,topicId); } }); thread.start(); } finally { lock.unlock(); } } catch (Exception e) { result = "连接Redis失败:" + e.getMessage(); } return result; }
实时获取数据
public static Boolean isSelectRedis = false;//是否继续查询Redis /** * 从Redis实时获取语音记录文本(党组会) * @param topicId */ public void getRecordTextNew_Redis(JedisPool jedisPool,String topicId) { Jedis jedis = null; while (isSelectRedis) { try { jedis = jedisPool.getResource(); //取出一个连接 Set<String> results = jedis.zrange("asr:text:"+topicId,0,-1); for (String result: results) { //TODO消费result if (StringUtils.isNotEmpty(result)) { JSONObject resultMsg = JSONObject.parseObject(result); String text = resultMsg.getString("result"); System.out.println("消息text:"+text); String pgs = "1"; String micName = resultMsg.getString("roleName"); String micId = resultMsg.getString("role"); String uId = resultMsg.getString("uid");//段落ID if (StringUtils.isNotEmpty(text)) { String dataText = "<b>" + micName + ":</b>" + text; String dataText2 = "<div id=\""+ uId +"\"><b>" + micName + ":</b>" + text+"</div>"; textMap.put(uId, dataText2); System.out.println("消息dataText:"+dataText); JSONObject textObj = new JSONObject(); textObj.put("dataText", dataText); textObj.put("dataPgs", pgs); textObj.put("dataUId", uId); try { Thread.sleep(400); } catch (InterruptedException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } ConfWebSocketService.sendMessage(textObj.toJSONString(), "2");//向页面发送消息 } } } String[] strResults = (String[])(results.toArray(new String[results.size()])); if (strResults.length > 0) { //TODO 移除消费掉的数据 jedis.zrem("asr:text:"+topicId, strResults); } Thread.sleep(300); } catch (Exception e) { if (jedis != null) { jedis.close(); } e.printStackTrace(); }finally { if (jedis != null) { jedis.close(); } } } }
二、通过Redis订阅消息:
package net.nblh.utils.common; import java.util.Set; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * 建立订阅者,订阅者去订阅频道(mychannel) * @author lijd * */ public class GetSpeechRecognition_YJ_Sub extends Thread{ private final JedisPool jedisPool; private final GetSpeechRecognition_YJ_Msg msgListener = new GetSpeechRecognition_YJ_Msg(); private final String channel = "db0";//"mychannel"; public GetSpeechRecognition_YJ_Sub(JedisPool jedisPool) { super("GetSpeechRecognition_YJ_Sub"); this.jedisPool = jedisPool; } @Override public void run() { Jedis jedis = null; try { jedis = jedisPool.getResource(); //取出一个连接 Set<String> result = jedis.zrange("asr:text:1112",0,-1); //jedis.subscribe(msgListener, channel); //通过subscribe的api去订阅,参数是订阅者和频道名 //注意:subscribe是一个阻塞的方法,在取消订阅该频道前,会一直阻塞在这,无法执行后续的代码 //这里在msgListener的onMessage方法里面收到消息后,调用了this.unsubscribe();来取消订阅,才会继续执行 System.out.println("继续执行后续代码。。。"); } catch (Exception e) { if (jedis != null) { jedis.close(); } e.printStackTrace(); }finally { if (jedis != null) { jedis.close(); } } } }
package net.nblh.utils.common; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * //建立发布者,通过频道(mychannel)发布消息 * @author lijd * */ public class GetSpeechRecognition_YJ_Pub extends Thread{ private final JedisPool jedisPool; public GetSpeechRecognition_YJ_Pub(JedisPool jedisPool) { this.jedisPool = jedisPool; } @Override public void run() { while (true) { Jedis jedis = null; try { Thread.sleep(1000); jedis = jedisPool.getResource();//连接池中取出一个连接 String line = "fabuxiaoxi:"; if (!"quit".equals(line)) { jedis.publish("mychannel", line);//从通过mychannel 频道发布消息 System.out.println(String.format("发布消息成功!channel: %s, message: %s", "mychannel", line)); }else { break; } if (jedis != null) { jedis.close(); } }catch (Exception e) { e.printStackTrace(); } } } }
package net.nblh.utils.common; import redis.clients.jedis.JedisPubSub; /** * 建立消息监听类,并重写了JedisPubSub的一些相关方法 * @author lijd * */ public class GetSpeechRecognition_YJ_Msg extends JedisPubSub{ public GetSpeechRecognition_YJ_Msg(){} @Override public void onMessage(String channel, String message) { //收到消息会调用 System.out.println(String.format("收到消息成功! channel: %s, message: %s", channel, message)); //this.unsubscribe(); } @Override public void onSubscribe(String channel, int subscribedChannels) { //订阅频道会调用 System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d", channel, subscribedChannels)); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅会调用 System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d", channel, subscribedChannels)); } }
原文:https://www.cnblogs.com/lijianda/p/10371377.html