首先看看从官网学习后总结的一个思维导图
除了基本的SQLContext,你还可以创建一个HiveContext,它提供了基本的SQLContext的所提供的功能的超集。这些功能中包括附加的特性,可以编写查询,使用更完全的HiveQL解析器,访问Hive UDFs,能够从Hive表中读取数据。现在暂不研究,以后学习
<span style="white-space:pre"> </span>SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);例如,以下根据一个JSON文件创建出一个DataFrame:
package com.tg.spark.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
* 根据一个JSON文件创建出一个DataFrame:
* @author 汤高
*
*/
public class DataFrameOps {
public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源
//JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().json("hdfs://master:9000/testFile/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// age name
// null Michael
// 30 Andy
// 19 Justin
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30 Andy
// Count people by age
df.groupBy("age").count().show();
// age count
// null 1
// 19 1
// 30 1
}
}
Spark SQL支持两种不同的方法,用于将存在的RDDs转换成DataFrames。第一种方法使用反射来推断包含特定类型的对象的RDD的模式。在写Spark应用时,当你已知schema的情况下,这种基于反射的方式使得代码更加简介,并且效果更好。
创建DataFrames的第二种方法是通过编程接口,它允许你构建一个模式,然后将其应用到现有的RDD上。这种方式更加的繁琐,它允许你构建一个DataFrame当列以及类型未知,直到运行时才能知道时。 知道RDD格式的前提下
JavaBeans类定义了表的模式,JavaBeans类的参数的名称使用反射来读取,然后称为列的名称。
JavaBeans类还可以嵌套或者包含复杂的类型,例如Sequences或者Arrays。
这个RDD可以隐式地转换为DataFrame,然后注册成表,
表可以在后续SQL语句中使用Spark SQL中的Scala接口支持自动地将包含JavaBeans类的RDD转换成DataFrame。
步骤:
1、使用JavaBeans类定义schema
2、创建一个SQLContext
3、通过调用createDataFrame方法模式应用到所有现有的RDD,并为JavaBean提供class对象 达到将RDD转换成DataFrame
4、创建一个DataFrame,并将它注册成表。
5、使用sqlContext提供的sql方法,就可以使用SQL语句来查询了。查询后返回的结果是DataFrame,它支持所有的RDD操作
首先写一个JavaBean类,实现序列化接口,并提供get和set方法
package com.tg.spark.sql;
import scala.Serializable;
public class Person implements Serializable {
/**
*
*/
private static final long serialVersionUID = 727694963564948838L;
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}package com.tg.spark.sql;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
import java.util.List;
import org.apache.spark.SparkConf;
public class CreateDataFrame1 {
public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("hdfs://master:9000/testFile/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
}
});
//A schema can be applied to an existing RDD by calling createDataFrame and providing the Class object for the JavaBean.
// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
teenagers.persist(StorageLevel.MEMORY_ONLY());
System.out.println(teenagerNames);
}
}
上面的这段代码
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");SQLContext中的sql函数使应用可以以编程方式运行SQL查询,并且将结果以DataFrame形式返回
不知道RDD的列和它的类型时
步骤:
1.从原有的RDD中创建包含行的RDD。
2.创建一个由StructType表示的模式,StructType符合由步骤1创建的RDD的行的结构。
3.通过SQLContext提供的createDataFrame方法,将模式应用于包含行的RDD。
package com.tg.spark.sql;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
public class CreateDataFrame2 {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// Load a text file and convert each line to a JavaBean.
JavaRDD<String> people = sc.textFile("hdfs://master:9000/testFile/people.txt");
// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() {
public Row call(String record) throws Exception {
String[] fields = record.split(",");
return RowFactory.create(fields[0], fields[1].trim());
}
});
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName : schemaString.split(" ")) {
// true表示可以为空
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
// Apply the schema to the RDD.
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
// Register the DataFrame as a table.
peopleDataFrame.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal
// RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
results.persist(StorageLevel.MEMORY_ONLY());
System.out.println(names);
}
}
在最简单的形式中,默认的数据源(parquet除非通过spark.sql.sources.default另外进行配置)将被用于所有的操作。
package com.tg.spark.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.storage.StorageLevel;
/**
* 加载默认的数据源格式并保存
* //第一种读取方式xxxFile(path)
* @author Administrator
*
*/
public class DataSource {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
//指定保存模式
//df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
//第一种读取方式
DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");
parquetFile.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people ");
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
teenagers.persist(StorageLevel.MEMORY_ONLY());
System.out.println(teenagerNames);
}
}
package com.tg.spark.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
/**
* 加载指定的数据源格式并保存
* //第二种读取方式sqlContext.read().XXX(path)
* @author Administrator
*
*/
public class DataSource2 {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().format("json").load("hdfs://master:9000/testFile/people.json");
df.select("name", "age").write().format("parquet").save("people.parquet");
DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
teenagers.persist(StorageLevel.MEMORY_ONLY());
System.out.println(teenagerNames);
}
}
df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
|
Scala/Java |
Python |
Meaning |
|
SaveMode.ErrorIfExists (default) |
"error" (default) |
When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. 当往一个数据源中保存一个DataFrame,如果数据已经存在,会抛出一个异常。 |
|
SaveMode.Append |
"append" |
When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. 当往一个数据源中保存一个DataFrame,如果data/table已经存在,DataFrame的内容会追加到已经存在的数据后面。 |
|
SaveMode.Overwrite |
"overwrite" |
Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. Overwrite模式意味着当向数据源中保存一个DataFrame时,如果data/table已经存在了,已经存在的数据会被DataFrame中内容覆盖掉。 |
|
SaveMode.Ignore |
"ignore" |
Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. Ignore模式意味着当向数据源中保存一个DataFrame时,如果数据已经存在,save操作不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATE TABLE IF NOT EXISTS`相似。 |
package com.tg.spark.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.storage.StorageLevel;
/**
* 加载默认的数据源格式并保存
* //第一种读取方式xxxFile(path)
* @author Administrator
*
*/
public class DataSource {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
//指定保存模式
//df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
//第一种读取方式
DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");
parquetFile.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people ");
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
teenagers.persist(StorageLevel.MEMORY_ONLY());
System.out.println(teenagerNames);
}
}
? jsonRDD - 从一个已经存在的RDD中加载数据,每一个RDD的元素是一个包含一个JSON对象的字符串。
代码前面都有涉及到
public class DataSource3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
DataFrame people = sqlContext.read().json("hdfs://master:9000/testFile/people.json");
//DataFrame people = sqlContext.jsonFile("hdfs://master:9000/testFile/people.json");
// The inferred schema can be visualized using the printSchema() method.
people.printSchema();
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// Register this DataFrame as a table.
people.registerTempTable("people");
// SQL statements can be run by using the sql methods provided by sqlContext.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);
anotherPeople.show();
}
}至于怎么用spark操作hive和其他数据库,以后再做学习
码字不易,转载请指明出处http://blog.csdn.net/tanggao1314/article/details/51594942
更多内容,请参考官网 Spark sql 编程指南
原文:http://blog.csdn.net/tanggao1314/article/details/51594942