首页 > 其他 > 详细

Spark中foreachRDD的正确使用

时间:2019-09-09 00:12:17      阅读:83      评论:0      收藏:0      [点我收藏+]

常出现的使用误区:


误区一:在driver上创建连接对象(比如网络连接或数据库连接)
    如果在driver上创建连接对象,然后在RDD的算子函数内使用连接对象,那么就意味着需要将连接对象序列化后从driver传递到worker上。而连接对象(比如Connection对象)通常来说是不支持序列化的,此时通常会报序列化的异常(serialization errors)。因此连接对象必须在worker上创建,不要在driver上创建。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection() // 数据库连接在driver上执行
  rdd.foreach { record =>
  connection.send(record) // 在worker上执行
  }
}

 

误区二:为每一条记录都创建一个连接对象
    通常来说,连接对象的创建和销毁都是很消耗时间的。因此频繁地创建和销毁连接对象,可能会导致降低spark作业的整体性能和吞吐量。

dstream.foreachRDD { rdd =>
rdd.foreach { record =>
    val connection = createNewConnection() //每插入一条数据,创建一个连接
    connection.send(record)
    connection.close()
    }
}

  比较正确的做法是:对DStream中的RDD,调用foreachPartition,对RDD中每个分区创建一个连接对象,使用一个连接对象将一个分区内的数据都写入数据库中。这样可以大大减少创建的连接对象的数量。

正确做法一:为每个RDD分区创建一个连接对象

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
    }
}

 

正确做法二:为每个RDD分区使用一个连接池中的连接对象

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
    // 从数据库连接池中获取连接
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection) // 用完以后将连接返    回给连接池,进行复用
}
}

 

Spark中foreachRDD的正确使用

原文:https://www.cnblogs.com/oush/p/11489126.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!