首页 > 数据库技术 > 详细

java通过SparkSession连接spark-sql

时间:2019-12-20 15:56:56      阅读:579      评论:0      收藏:0      [点我收藏+]

SparkSession配置获取客户端

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;

public class SparkTool implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SparkTool.class);

    public static String appName ="root";
    private static JavaSparkContext jsc = null;
    private static SparkSession spark = null;

    private static void initSpark() {
        if (jsc == null || spark == null) {

            SparkConf  sparkConf = new SparkConf();
            sparkConf.set("spark.driver.allowMultipleContexts", "true");
            sparkConf.set("spark.eventLog.enabled", "true");
            sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
            sparkConf.set("spark.hadoop.validateOutputSpecs", "false");
            sparkConf.set("hive.mapred.supports.subdirectories", "true");
            sparkConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");

            spark = SparkSession.builder().appName(appName).config(sparkConf).enableHiveSupport().getOrCreate();
            jsc = new JavaSparkContext(spark.sparkContext());
        }

    }

    public static JavaSparkContext getJsc() {
        if (jsc == null) {
            initSpark();
        }
        return jsc;
    }

    public static SparkSession getSession() {
        if (spark == null ) {
            initSpark();
        }
        return spark;

    }

}

通过sparkSession执行sql

 public List<TableInfo> selectTableInfoFromSpark(String abstractSql){
        List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
        TableInfo tableInfo = new TableInfo();
        SparkSession spark = SparkTool.getSession();
        Dataset<Row> dataset = spark.sql(abstractSql);
        List<Row> rowList = dataset.collectAsList();
        for(Row row : rowList){
            tableInfo.setColumnName(row.getString(1));
            tableInfo.setColumnType(row.getString(2));
            tableInfo.setColumnComment(row.getString(3));
            tableInfoList.add(tableInfo);
        }
        return tableInfoList;
    }

 

      java 或者scala操作spark-sql时查询出来的数据有RDD、DataFrame、DataSet三种。

     这三种数据结构关系以及转换或者解析见博客:https://www.jianshu.com/p/71003b152a84

java通过SparkSession连接spark-sql

原文:https://www.cnblogs.com/yangcao/p/12073203.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!