2021-03-27
import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaAdmin; import com.ibm.dsw.quote.preentitlement.base.AbstractBaseTest; public class AdminClientTest extends AbstractBaseTest { @Autowired private KafkaAdmin admin; @Test public void listTopic() throws InterruptedException, ExecutionException { AdminClient client = AdminClient.create(admin.getConfig()); // create topic NewTopic newTopic = new NewTopic("test1", 3, (short) 1); Collection<NewTopic> newTopicList = new ArrayList<>(); newTopicList.add(newTopic); CreateTopicsResult createTopicResult = client.createTopics(newTopicList); createTopicResult.all().get(); // list topic ListTopicsResult result = client.listTopics(); Collection<TopicListing> topic = result.listings().get(); topic.forEach(each -> System.out.println(each.name())); //get topic configuration DescribeConfigsResult ret = client.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "TopicName"))); Map<ConfigResource, Config> configs = ret.all().get(); for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) { ConfigResource key = entry.getKey(); Config value = entry.getValue(); System.out.println(String.format("Resource type: %s, resource name: %s", key.type(), key.name())); Collection<ConfigEntry> configEntries = value.entries(); for (ConfigEntry each : configEntries) { System.out.println(each.name() + " = " + each.value()); } } //get cluster information DescribeClusterResult ret2= client.describeCluster(new DescribeClusterOptions()); String clusterId = ret2.clusterId().get(); System.out.println("----------------clusterId------------"+clusterId); Collection<Node> nodes = ret2.nodes().get(); for (Node node: nodes) { System.out.println(node.host()); } client.close(); } }
Spring Boot 中使用kafka AdminClient管理Kafka
原文:https://www.cnblogs.com/Ivyduan/p/14586400.html