首页 > 数据库技术 > 详细

连接mongodb,kafka异步处理代码

时间:2016-01-01 16:57:26      阅读:641      评论:0      收藏:0      [点我收藏+]

1. mongodb异步处理

依赖:

        <dependencies>
            <dependency>
                <groupId>org.mongodb</groupId>
                <artifactId>mongodb-driver-async</artifactId>
                <version>3.0.4</version>
            </dependency>
        </dependencies>    

代码

public static void main(String[] args) {
        List<ServerAddress> address=new ArrayList<>();
        address.add(new ServerAddress("172.16.4.90",3000));
        address.add(new ServerAddress("172.16.4.91",3000));
        address.add(new ServerAddress("172.16.4.92",3000));
        ClusterSettings clusterSettings = ClusterSettings.builder().hosts(address).build();
        MongoClientSettings settings = MongoClientSettings.builder().clusterSettings(clusterSettings).build();
        MongoClient mongoClient = MongoClients.create(settings);

        MongoDatabase database = mongoClient.getDatabase("shardb");
        MongoCollection<Document> collection = database.getCollection("shardtable");
        
        Document doc = new Document("name", "MongoDB")
        .append("type", "database")
        .append("count", 1)
        .append("info", new Document("x", 203).append("y", 102));
        Long start=System.currentTimeMillis();
        collection.insertOne(doc, new SingleResultCallback<Void>() {
            @Override
            public void onResult(final Void result, final Throwable t) {
                System.out.println("Inserted cosume="+(System.currentTimeMillis()-start));
            }
        });
         System.out.println("response cosume="+(System.currentTimeMillis()-start));
    }

 

 

2.kafka异步处理

依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.0</version>
        </dependency>

 

代码

public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "172.16.4.93:9092,172.16.4.94:9092,172.16.4.95:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         Producer<String, String> producer = new KafkaProducer(props);
         Long start=System.currentTimeMillis();
         for(int i = 0; i < 100; i++){
            //Future<RecordMetadata>  response= 
            producer.send(new ProducerRecord<String, String>("davidwang456", Integer.toString(i), Integer.toString(i)),
                    new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if(e != null){
                        e.printStackTrace();
                        System.out.println("The offset of the record we just sent is: " + metadata.offset());
                    }
                }});
/*            if(response.isDone()){
                System.out.println("send message to david1 message key="+i+",value="+i);
            }    */         
         }
         System.out.println(System.currentTimeMillis()-start);
         producer.close();
    }

结果

1. kafka的异步处理结果可以打印出来。

2. mongodb的异步处理结果没有打印出来。

连接mongodb,kafka异步处理代码

原文:http://www.cnblogs.com/davidwang456/p/5093430.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!