首页 > 系统服务 > 详细

Spark教程——(4)Spark-shell基于Phoenix访问HBase数据

时间:2019-07-04 00:11:12      阅读:164      评论:0      收藏:0      [点我收藏+]

 

package statistics

import common.util.timeUtil
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.{col, count, split}


class costMonth {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    //      .setAppName("finance test")
    //      .setMaster("local")
    val sc = new SparkContext(conf)
    //    sc.setLogLevel("WARN")
    val sqlContext = new SQLContext(sc)

    val df = sqlContext.load(
      "org.apache.phoenix.spark"
      , Map("table" -> "ASSET_NORMAL"
        , "zkUrl" -> "node3,node4,node5:2181")
    )
    df.registerTempTable("asset_normal")
    def costingWithin(originalValue: Float, years: Int): Double =  (originalValue*0.95)/(years*365)
    sqlContext.udf.register("costingWithin", costingWithin _)

    def costingBeyond(originalValue: Float): Double = originalValue*0.05/365
    sqlContext.udf.register("costingBeyond", costingBeyond _)

    def expire(acceptanceDate: String, years: Int): Boolean = timeUtil.dateStrAddYears2TimeStamp(acceptanceDate, timeUtil.SECOND_TIME_FORMAT, years) > System.currentTimeMillis()
    sqlContext.udf.register("expire", expire _)

    def monthSpace(stDate: String, endDate: String): Int = timeUtil.getMonthSpace(stDate, endDate)
    sqlContext.udf.register("monthSpace", monthSpace _)

    val costDay = sqlContext
      .sql(
        "select ID, ASSET_ID, ASSET_NAME, ACCEPTANCE_DATE, FIRST_DEPARTMENT_ID, SECOND_DEPARTMENT_ID, case when expire(ACCEPTANCE_DATE, DEPRECIABLE_LIVES_NAME) then costingWithin(ORIGINAL_VALUE, DEPRECIABLE_LIVES_NAME) else costingBeyond(ORIGINAL_VALUE) end as ACTUAL_COST, current_timestamp() as GENERATION_TIME from asset_normal "
      )


    //    df.show(false)
    costDay.write
      .format("org.apache.phoenix.spark")
      .mode("overwrite")
      .option("table", "ASSET_FINANCIAL_DETAIL")
      .option("zkUrl", "node3,node4,node5:2181")
      .save()

  }
}

 

ln -s /etc/hbase/conf.cloudera.hbase/hbase-site.xml /etc/spark/conf.cloudera.spark_on_yarn/hbase-site.xml

 

spark-shell --conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/spark/lib/cf/phoenix-spark-4.14.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/spark/lib/cf/phoenix-client-4.14.1-HBase-1.4.jar" --conf "spark.driver.extraClassPath=/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/spark/lib/cf/phoenix-spark-4.14.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/spark/lib/cf/phoenix-client-4.14.1-HBase-1.4.jar"

 

val df = sqlContext.load("org.apache.phoenix.spark",Map("table"->"ASSET_NORMAL","zkUrl"->"node3,node4,node5:2181"))

 

参考:

https://blog.csdn.net/dingyuanpu/article/details/52623655

https://www.cnblogs.com/feiyudemeng/p/9254046.html

http://dequn.github.io/2016/11/08/phoenix-spark-setting/

https://community.hortonworks.com/questions/212315/spark2-phoenix-plugin-with-zeppelin.html

https://community.hortonworks.com/content/supportkb/150591/how-to-connect-hbase-and-phoenix-tables-in-secure.html

https://community.hortonworks.com/articles/179762/how-to-connect-to-phoenix-tables-using-spark2.html

https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-spark/4.14.0-cdh5.14.2

https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-client/4.14.1-HBase-1.4

https://blogs.apache.org/phoenix/entry/spark_integration_in_apache_phoenix

http://phoenix.apache.org/phoenix_spark.html#

https://www.cnblogs.com/skyEva/p/5859742.html

Spark教程——(4)Spark-shell基于Phoenix访问HBase数据

原文:https://www.cnblogs.com/ratels/p/11129744.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!