系统架构图:
完成要求:
生产者Producer具备Feign客户端,调用dbService微服务;
增加微服务dbService,通过Spring Data Jpa 访问数据库获取对象user;
生产者Producer将获取对象user通过RabbitMQ,发送给消费组groupA和groupB;
创建springboot项目,导入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
配置文件
server.port=8761
eureka.client.register-with-eureka=false
eureka.client.fetch-registry=false
启动类加入注解@EnableEurekaServer
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
用于定义User共有类
需要使用的依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>jakarta.persistence</groupId>
<artifactId>jakarta.persistence-api</artifactId>
<version>2.2.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-core</artifactId>
<version>5.4.31.Final</version>
<scope>compile</scope>
</dependency>
User类,加入@Entity/@Table等,供服务(dbSerivce)进行数据库表映射
/**
* @Author: cailong
* @Date: 2021/6/4 18:59
* @Description: User实体类
*/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "tb_user")
public class User implements Serializable {
private static final long serialVersionUID = 1L;
@Id
@GenericGenerator(name = "idGenerator", strategy = "uuid")
@GeneratedValue(generator = "idGenerator")
String uid;
@Column(name = "username",length = 20)
String userName;
@Column(name = "password",length = 20)
String passWord;
}
pox.xml依赖导入
<!-- spring data jpa -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 引入eureka client,服务发现和注册 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 引入自主创建的公共模块 -->
<dependency>
<groupId>org.hcl</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
配置文件
server.port=8085
spring.application.name=dbService
eureka.instance.hostname=localhost
eureka.client.service-url.defaultZone=http://localhost:8761/eureka/
spring.datasource.url=jdbc:mysql://localhost:3306/spring-cloud?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.jpa.database=mysql
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
spring.jpa.show-sql=false
spring.jpa.hibernate.ddl-auto=update
创建接口UserRepository,继承JpaRepository来实现对User表的操作
/**
* @Author: cailong
* @Date: 2021/6/4 19:13
* @Description:
*/
@Repository
public interface UserRepository extends JpaRepository<User,String> {
}
控制器创建
package com.example.dbservice.controller;
import com.example.dbservice.repository.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.hcl.common.User;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Optional;
/**
* @Author: cailong
* @Date: 2021/6/4 19:18
* @Description:
*/
@RestController
@Slf4j
public class UserController {
@Resource
private UserRepository userRepository;
/**
* 根据uid,查询对应的User
* @param uid
* @return
*/
@RequestMapping("/send/{uid}")
public User send(@PathVariable("uid") String uid){
log.info("finding uid: "+uid);
Optional<User> opUser = userRepository.findById(uid);
if (!opUser.isPresent()) //没有找到对应uid,返回null
return null;
log.info(String.valueOf(opUser.get()));
return opUser.get();
}
/**
* 保存新用户
* @param username
* @param password
* @return
*/
@RequestMapping(value = "/save")
public User save(String username,String password){
User user = new User();
user.setUserName(username);
user.setPassWord(password);
user = userRepository.save(user);
log.info("save user:"+user);
return user;
}
}
启动类配置
package com.example.dbservice;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@SpringBootApplication
@EnableEurekaClient//开启服务发现与注册
@EntityScan(basePackageClasses = org.hcl.common.User.class)//扫描指定实体类
public class DbServiceApplication {
public static void main(String[] args) {
SpringApplication.run(DbServiceApplication.class, args);
}
}
pox.xml,依赖
<!-- ribbitMQ -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- feign -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hcl</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
配置文件
server.port=8082
spring.application.name=producer
eureka.instance.hostname=localhost
eureka.client.service-url.defaultZone=http://localhost:8761/eureka/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
创建FeignClient,用于调用dbService服务
package com.hcl.producer;
import org.hcl.common.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
/**
* @Author: cailong
* @Date: 2021/6/4 22:46
* @Description: FeignClient,用于调用dbService服务
*/
@FeignClient(value = "dbService")
public interface DBServiceClient {
@RequestMapping("/send/{uid}")
User getUser(@PathVariable("uid") String uid);
}
创建接口SendService,用于发送消息到对应的RibbitMQ队列
package com.hcl.producer;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @Author: cailong
* @Date: 2021/6/4 20:34
* @Description:
*/
public interface SendService {
//建立myUserChannel队列
@Output("myUserChannel")
MessageChannel myUser();
}
启动类,加入@EnableEurekaClient,@EnableFeignClients,@EnableBinding
package com.hcl.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients //feign客户端
@EnableBinding(SendService.class) //绑定消息队列
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
控制器类
package com.hcl.producer;
import lombok.extern.slf4j.Slf4j;
import org.hcl.common.User;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @Author: cailong
* @Date: 2021/6/4 22:44
* @Description:
*/
@RestController
@Slf4j
public class ProducerController {
@Resource
private DBServiceClient dbServiceClient;
@Resource
SendService sendService;
/**
* 查找uid用户,如果找到就发送到消息队列中
* @param uid
* @return
*/
@RequestMapping(value = "/sendUser")
public String sendUser(String uid){
User user = dbServiceClient.getUser(uid);
log.info("user:"+user);
if (user == null) {
log.warn("fail ! case by uid=" + uid + " from user is null!");
return "fail ! case by uid=" + uid + " from user is null!";
}
//创建消息
Message msg = MessageBuilder.withPayload(user).build();
boolean isSend = sendService.myUser().send(msg);
log.info(isSend==true?"消息已发送到消息队列":"消息发送失败");
return "SUCCESS!";
}
}
pox.xml 依赖导入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hcl</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
配置文件
server.port=8081
spring.application.name=consumer
eureka.instance.hostname=localhost
eureka.client.service-url.defaultZone=http://localhost:8761/eureka/
spring.rabbitmq.port=5672
spring.rabbitmq.host=localhost
spring.cloud.stream.bindings.myUserChannel.group=groupA
接口ReceiveService创建,用于获取消息队列
package com.hcl.consumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* @Author: cailong
* @Date: 2021/6/4 23:06
* @Description:
*/
public interface ReceiveService {
@Input("myUserChannel")
SubscribableChannel myUser();
}
启动类,以及消息监听
package com.hcl.consumer;
import lombok.extern.slf4j.Slf4j;
import org.hcl.common.User;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(ReceiveService.class)
@Slf4j
public class ConsumerApplication {
@Value("${spring.application.name}")
String applicationName;
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
/**
* 接受消息队列中的消息
* @param user
*/
@StreamListener("myUserChannel")
public void receive(User user){
log.info("I‘m "+applicationName);
log.info("myUserChannel.groupA接收到消息:"+user);
}
}
second Consumer和Third Consumer 同consumer 几乎相同,不同在于配置文件中的
spring.cloud.stream.bindings.myUserChannel.group=groupA
consumer是groupA,而另外两个是groupB
访问Eureka查看服务启动情况
查看RibbitMQ界面,myUserChannel.groupA和myUserChannel.groupB队列的消费者状态和绑定信息
producer控制台输出
dbservice控制台输出
consumer控制台输出
consumer second 控制台输出
consumer third 控制台没有输出消息,原因是被consumer second消费了,这两者通过轮询的方式消费myUserChannel.groupB队列
本次实验中RibbitMQ使用的Work模式
工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。
我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。
使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展消费者,但是只能有一个消费者获得消息!竞争消费者模式。
原文:https://www.cnblogs.com/cailong/p/14855026.html