使用的分布式消息队列,比如ActiveMQ、RabbitMQ等等,消息队列的是有可以使得程序之间解耦,提升程序响应效率。
如果我们把多线程环境比作是分布式的话,那么线程与线程之间也可以用这种消息队列的方式进行数据通信和解耦。
注册成功之后新增积分
假如我们模拟一个场景,就是用户注册的时候,在注册的时候成功以后发放积分。这个场景一般来说,会这么去实现
3.代码实现
public class User {
private String userName;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "User{" + "userName=‘" + userName + ‘\‘‘ + ‘}‘;
}
}
public class UserService<T> {
private final ExecutorService single =
Executors.newSingleThreadExecutor();
private volatile boolean isRunning = true;
BlockingQueue<T> queue = new LinkedBlockingQueue(3);
{
init();
}
public void init(){
single.execute(()->{
while (isRunning){
try {
//阻塞获取
User user = (User) queue.poll(2, TimeUnit.SECONDS);
if(user==null){
isRunning = false;
break;
}
sendPoints(user);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public void register(){
User user = new User();
user.setUserName("name");
addUser(user);
}
/**
* 添加用户
* @param user
*/
private void addUser(User user) {
System.out.println("添加用户成功"+user);
}
/**
* 添加积分成功
* @param user
*/
private void sendPoints(User user){
System.out.println("添加积分成功"+user);
}
}
public class Test001 {
public static void main(String[] args) {
UserService userService = new UserService();
userService.register();
}
}
阻塞队列,顾名思义,首先是一个队列,而一个阻塞队列在数据结构中起的作用大致如下图:
当阻塞队列是满时,从队列里添加元素的操作将会被阻塞。
同样
试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他线程从列中移除
一个或者多个元素或者完全清空队列使队列重新变得空闲起来并后续新增
在多线程领域:所谓阻塞,在某些情况下会被挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。
为什么需要BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockQueue都给你一手包办了,
在concurrent包发布以前,在多环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,
而这些给我们的程序带来不小的复杂度。
ArrayBlockingQueue:由数组结构组成的有界阻塞队列
LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.Max_Value)阻塞队列
SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
public boolean offer(E e) {
//1、如果元素为null,抛出空指针异常
if (e == null) throw new NullPointerException();
//2、如果当前的队列的满了,则丢弃要放入的元素,返回false
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
//3、构造新节点,获取putLock锁
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//如果队列没有满,则进队列,并递增元素计较
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
生产者:
/**
* 生产者
*/
public class ProducerThread implements Runnable {
private BlockingQueue<String> queue;
private String data;
public ProducerThread(BlockingQueue<String> queue,String data) {
this.queue = queue;
this.data = data;
}
@Override
public void run() {
System.out.println("生产者启动======");
boolean offer = queue.offer(data);
if (offer) {
System.out.println(Thread.currentThread().getName()
+"生产队列成功"+data+"成功");
}else{
System.out.println(Thread.currentThread().getName()
+"生产队列成功"+data+"失败");
}
System.out.println("生产者关闭");
}
}
消费者
/**
* 消费者
*/
public class ConsumerThread implements Runnable {
private BlockingQueue<String> queue;
private boolean flag = true;
public ConsumerThread(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (flag){
System.out.println("消费者启动======");
String data = null;
try {
data = queue.poll();
if(data!=null){
System.out.println("消费者队列成功"+data);
}else{
System.out.println("消费队列失败");
flag = false;
break;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
}
}
}
}
public class Test001 { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3); new Thread(new ProducerThread(blockingQueue,"A")).start(); new Thread(new ConsumerThread(blockingQueue)).start(); } }
原文:https://www.cnblogs.com/cxyyh/p/11380825.html