基于spark将关系型数据库数据导入hdfs,支持增量追加导入、覆盖导入和去重导入
package com.shenyuchong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; public class App { /** * 作用: * 将关系型数据库数据导入hdfs(sql方式) * 支持mysql和oracle * 支持覆盖和追加模式 * 支持增量导入(取checkColumn字段的最大值) * 支持去重导入(数据源主键inKey,hdfs表主键outKey,多字段使用concat函数(以实际数据源字段连接函数为准)) */ public static String ip = "127.0.0.1"; public static String port = "3306"; public static String baseType = "mysql"; public static String inBase = "in_base"; public static String userName = "un"; public static String password = "pas"; public static String sql = "select 1"; public static String hdfs="hdfs://127.0.0.1:9000"; public static String outBase = "base"; public static String outTable = "table"; public static String noticeUrl="http://127.0.0.1:6009/schedule/schedule/donothing"; public static String writeMode = "append"; public static String checkColumn = ""; public static String inKey = ""; public static String outKey = ""; public static void main( String[] args ) { for (int i = 0; i < args.length-1; i++) { if (args[i].equals("-ip")) ip=args[i + 1]; //数据源地址 if (args[i].equals("-port")) port=args[i + 1]; //数据源端口 if (args[i].equals("-base_type")) baseType=args[i + 1]; //数据源类型 if (args[i].equals("-in_base")) inBase = args[i + 1]; //数据源数据库名称 if (args[i].equals("-in_key")) inKey = args[i + 1]; //数据源主键 if (args[i].equals("-out_key")) outKey = args[i + 1]; //HDFS表主键 if (args[i].equals("-user_name")) userName=args[i + 1]; //数据源用户名 if (args[i].equals("-password")) password=args[i + 1]; //数据源密码 if (args[i].equals("-sql")) sql=args[i + 1]; //导出语句(普通查询语句) if (args[i].equals("-hdfs")) hdfs=args[i + 1]; //HDFS地址 if (args[i].equals("-out_base")) outBase=args[i + 1]; //输出数据库名 if (args[i].equals("-out_table")) outTable=args[i + 1]; //输出表名 if (args[i].equals("-notice_url")) noticeUrl=args[i + 1]; //完成通知地址 if (args[i].equals("-write_mode")) writeMode=args[i + 1]; //写入模式:overwrite|append if (args[i].equals("-check_column")) checkColumn=args[i + 1];//增量追加检查字段 } /** * 必要的临时变量 */ SparkSession spark = SparkSession.builder().getOrCreate(); String tmpTable = outBase+"_"+outTable; String condition = ""; String driver = ""; String url = ""; /** * 根据数据源类型加载驱动 */ if ("mysql".equals(baseType.toLowerCase())) { driver = "com.mysql.cj.jdbc.Driver"; url = "jdbc:mysql://" + ip + ":" + port + "/" + outBase+ "?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"; } else if ("oracle".equals(baseType.toLowerCase())) { driver = "oracle.jdbc.driver.OracleDriver"; url = "jdbc:oracle:thin:@" + ip + ":" + port + ":" + outBase; } /** * 写入模式:追加|覆盖 */ SaveMode saveMode = SaveMode.Append; if("overwrite".equals(writeMode)) saveMode = SaveMode.Overwrite; String outSql = "select * from rdbTmpTable "; try { FileSystem fs = FileSystem.get(new URI(hdfs), new Configuration(), "root"); /** * 检查给定库表的路径是否存在 * 若存在则注册该路径到临时表 * 表存在条件下checkColumn增量检查字段和inKey、outKey主键才起效,并拼装到导出语句 */ if(fs.exists(new Path("/user/"+outBase+"/"+outTable))&&fs.exists(new Path("/user/"+outBase+"/"+outTable+"/_SUCCESS"))){ spark.read().parquet(hdfs+"/user/"+outBase+"/"+outTable+"/*").createOrReplaceTempView(outBase+"_"+outTable); /** * 增量检查字段拼装 */ if (checkColumn != null && !"".equals(checkColumn)) { String lastValue = spark.sql("select max("+checkColumn+") from "+outBase+"_"+outTable).collectAsList().get(0).get(0).toString(); condition = " where " + checkColumn + " >‘" + lastValue + "‘"; } /** * 加载远程数据源并注册临时表 */ spark.read().format("jdbc").option("driver", driver).option("url", url) .option("user", userName).option("password",password) .option("dbtable", "(select * from (" +sql+ ") tmp_table1 " + condition +") tmp_table2 ") .load().registerTempTable("rdbTmpTable"); /** * 若inKey、outKey都不为空,添加主键约束 */ if(!"".equals(inKey)&&!"".equals(outKey)) outSql = "select * from rdbTmpTable where "+inKey+" not in ( select "+outKey+" from "+tmpTable+")"; } /** * 打印 */ spark.sql("select * from rdbTmpTable").show(); spark.sql("select "+outKey+" from "+tmpTable).show(); spark.sql(outSql).show(); /** * 将数据写入hdfs */ spark.sql(outSql).write().format("parquet").mode(saveMode).save(hdfs+"/user/"+outBase+"/"+outTable); } catch (Exception e) { e.printStackTrace(); } /** * 通知后续服务直到后续服务接受了请求 */ boolean noticed=false; try { while(!noticed){ Thread.sleep(2000); noticed = connectSuccess(noticeUrl); } } catch (Exception e) { e.printStackTrace(); } spark.log().info("---------------:成功!!"); } /** * 根据地址请求服务,请求成功则返回true */ public static boolean connectSuccess(String path){ URL url; try { url = new URL(noticeUrl); HttpURLConnection con = (HttpURLConnection) url.openConnection(); if(con.getResponseCode()==200) return true; } catch (Exception e) { return false; } return false; } }
maven打包后使用:
sh /opt/apps/spark/bin/spark-submit --name mysql2hdfs --class com.gbd.App --master spark://127.0.0.1:7077 --deploy-mode client --executor-memory 8G --total-executor-cores 4 /opt/apps/schedule/sparkrdbms2hdfs-2.0.jar -ip 127.0.0.1 -port 3306 -base_type mysql -user_name root -password root -base_type mysql -out_base od -out_table table1 -hdfs hdfs://127.0.0.1:9000 -in_key "concat(id,datetime)" -out_key "concat(id,datetime)" -in_base ulanqab -sql "select t.* from table1 t where datetime >=CONCAT(DATE_ADD(CURDATE(),INTERVAL 1 DAY),‘ ‘,‘00:00:00‘) and datetime <=CONCAT(DATE_ADD(CURDATE(),INTERVAL 2 DAY),‘ ‘,‘23:00:00‘) " -notice_url http://127.0.0.1:6009/schedule/schedule/donothing
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>gbd</groupId> <artifactId>sparkrdbms2hdfs</artifactId> <version>2.0</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version><!--$NO-MVN-MAN-VER$ --> <scope>provided</scope> </dependency> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc8</artifactId> <version>12.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.3</version> <scope>provided</scope> </dependency> </dependencies> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.shenyuchong.App</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
原文:https://www.cnblogs.com/shenyuchong/p/11624604.html