深圳浪尖 浪尖聊大数据
val data = new JdbcRDD(sc, getConnection
, "SELECT id,aa FROM bbb where ? <= ID AND ID <= ?", lowerBound = 3, upperBound =5, numPartitions = 1, mapRow = extractValues)
参数解释:
1,sparkcontext。
2,一个创建链接的函数。
3,sql。必须有? <= ID AND ID <= ?。
4,要取数据的id最小行。
5,要取数据的id最大行号。
6,分区数。
7,一个将ResultSet转化为需要类型的方法。
override def getPartitions: Array[Partition] = {
// bounds are inclusive, hence the + 1 here and - 1 on end
val length = BigInt(1) + upperBound - lowerBound
(0 until numPartitions).map(i => {
val start = lowerBound + ((i * length) / numPartitions)
val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
new JdbcPartition(i, start.toLong, end.toLong)
}).toArray
}
就是一个通过jdbc获取指定范围数据的过程。
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()
重写分区的方法即可。
如:
CustomizedJdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
getCustomizedPartitions: () => Array[Partition],
prepareStatement: (PreparedStatement, CustomizedJdbcPartition) => PreparedStatement,
mapRow: (ResultSet) => T = CustomizedJdbcRDD.resultSetToObjectArray _)
同时把getPartition方法重写为:
override def getPartitions: Array[Partition] = {
getCustomizedPartitions();
}
https://v.qq.com/x/page/t0700p2vwyg.html
原文:https://blog.51cto.com/15127544/2664510