1.导出指定表
#!/usr/bin/env python #-*- coding:utf8 -*- # 从mysql中提取hive建表语句-指定的表 import os,sys import fileinput import datetime import mysql.connector reload(sys) sys.setdefaultencoding("utf8") # 获取参数 if (len(sys.argv) == 3): Db_name = sys.argv[1].lower() Tab_Name = sys.argv[2].lower() else: print "Parameters is illegal." print "python export_tabname.py db tablename1,tablename2" sys.exit(1) #IP host_valuse="xx.xx.xxx.xx" #用户 user_valuse="hive" #密码 passwd_valuse="xxxxxxxxxxx" #数据库名称 database_valuse="hive" #表名称 TabName = "‘" + Tab_Name.replace(",","‘,‘") + "‘" def hive_create_table(host_valuse,user_valuse,passwd_valuse,database_valuse): conn = mysql.connector.connect(host=host_valuse,user=user_valuse,passwd=passwd_valuse,database=database_valuse,charset=‘utf8‘) mycursor = conn.cursor() # 获取DB_ID select_DB_ID = "select DB_ID from DBS where NAME=‘{0}‘;".format(Db_name) mycursor.execute(select_DB_ID) result_DB_ID = mycursor.fetchall() fo = open("create_tab.sql", "w") for dir_DB_ID in result_DB_ID : # 获取数据库名 DB_ID = str(dir_DB_ID)[1:].split(‘,‘)[0] print(DB_ID) select_DB_NAME = "select NAME from DBS where DB_ID="+DB_ID+";" print(select_DB_NAME ) mycursor.execute(select_DB_NAME) result_DB_NAME = mycursor.fetchone() fo.write("\n===========数据库:"+str(result_DB_NAME).split(‘\‘‘)[1]+"===========\n") DBname=str(result_DB_NAME).split(‘\‘‘)[1] print ‘数据库名字:‘ + DBname print(result_DB_NAME) # 获取表名 select_table_name_sql = "select TBL_NAME from TBLS where DB_ID=‘"+ DB_ID+"‘ and TBL_NAME in ({0})".format(TabName) mycursor.execute(select_table_name_sql) result_table_names = mycursor.fetchall() for table_name in result_table_names : fo.write("\nDROP TABLE IF EXISTS "+DBname +‘.`‘+str(table_name).split(‘\‘‘)[1]+"`;") fo.write("\nCREATE TABLE "+DBname +‘.`‘+str(table_name).split(‘\‘‘)[1]+"(\n") # 根据表名获取SD_ID select_table_SD_ID = "select SD_ID from TBLS where tbl_name=‘"+str(table_name).split(‘\‘‘)[1]+"‘ and DB_ID="+DB_ID+";" print(select_table_SD_ID) mycursor.execute(select_table_SD_ID) result_SD_ID = mycursor.fetchone() print(result_SD_ID ) # 根据SD_ID获取CD_ID SD_ID=str(result_SD_ID)[1:].split(‘,‘)[0] select_table_CD_ID = "select CD_ID from SDS where SD_ID="+str(result_SD_ID)[1:].split(‘,‘)[0]+";" print(select_table_CD_ID) mycursor.execute(select_table_CD_ID) result_CD_ID = mycursor.fetchone() print(result_CD_ID) # 根据CD_ID获取表的列 CD_ID=str(result_CD_ID)[1:].split(‘,‘)[0] select_table_COLUMN_NAME = "select COLUMN_NAME,TYPE_NAME,COMMENT from COLUMNS_V2 where CD_ID="+str(result_CD_ID)[1:].split(‘,‘)[0]+" order by INTEGER_IDX;" print(select_table_COLUMN_NAME) mycursor.execute(select_table_COLUMN_NAME) result_COLUMN_NAME = mycursor.fetchall() print(result_COLUMN_NAME) index=0 for col,col_type,col_name in result_COLUMN_NAME: print(col) print(col_type) print(col_name) print(len(result_COLUMN_NAME) ) # 写入表的列和列的类型到文件 if col_name is None: fo.write(" `"+str(col)+"` "+str(col_type)) else: fo.write(" `"+str(col)+"` "+str(col_type) + " COMMENT ‘" + str(col_name) + "‘") if index < len(result_COLUMN_NAME)-1: index = index + 1 fo.write(",\n") elif index == len(result_COLUMN_NAME)-1: fo.write("\n)") # 根据表名获取TBL_ID select_table_SD_ID = "select TBL_ID from TBLS where tbl_name=‘"+str(table_name).split(‘\‘‘)[1]+"‘ and DB_ID="+DB_ID+";" print(select_table_SD_ID) mycursor.execute(select_table_SD_ID) result_TBL_ID = mycursor.fetchone() print(result_TBL_ID) # 根据TBL_ID获取分区信息 select_table_PKEY_NAME_TYPE = "select PKEY_NAME,PKEY_TYPE,PKEY_COMMENT from PARTITION_KEYS where TBL_ID="+str(result_TBL_ID)[1:].split(‘,‘)[0]+" order by INTEGER_IDX;" print(select_table_PKEY_NAME_TYPE) mycursor.execute(select_table_PKEY_NAME_TYPE) result_PKEY_NAME_TYPE = mycursor.fetchall() print(result_PKEY_NAME_TYPE) if len(result_PKEY_NAME_TYPE) > 0: #if result_PKEY_NAME_TYPE is not None: fo.write("\nPARTITIONED BY (\n") #elif len(result_PKEY_NAME_TYPE) == 0: else : fo.write("\n") i=0 for pkey_name,pkey_type,PKEY_COMMENT in result_PKEY_NAME_TYPE: if str(PKEY_COMMENT) is None: fo.write(" `"+str(pkey_name)+"` "+str(pkey_type)) else: fo.write(" `"+str(pkey_name)+"` "+str(pkey_type) + " COMMENT ‘" + str(PKEY_COMMENT) + "‘\n") if i < len(result_PKEY_NAME_TYPE)- 1: i = i + 1 fo.write(",") elif i == len(result_PKEY_NAME_TYPE) - 1: fo.write(")\n") # 根据表TBL_ID 获得中文名称 select_PARAM_VALUE01 = "select PARAM_VALUE from TABLE_PARAMS WHERE TBL_ID=( select TBL_ID from TBLS where tbl_name=‘"+str(table_name).split(‘\‘‘)[1]+"‘ and DB_ID="+DB_ID+") and PARAM_KEY=‘comment‘;" print(select_PARAM_VALUE01) mycursor.execute(select_PARAM_VALUE01) result_PARAM_VALUE01 = mycursor.fetchone() print result_PARAM_VALUE01 if result_PARAM_VALUE01 is None: print ‘未设置表名‘ elif not result_PARAM_VALUE01[0]: print ‘表名为空‘ else: fo.write("COMMENT ‘" + str(result_PARAM_VALUE01[0]) +"‘ \n" ) # 根据SD_ID和CD_ID获取SERDE_ID select_SERDE_ID = "select SERDE_ID from SDS where SD_ID="+SD_ID+" and CD_ID="+CD_ID+";" print(select_SERDE_ID) mycursor.execute(select_SERDE_ID) result_SERDE_ID = mycursor.fetchone() print(result_SERDE_ID) # 根据SERDE_ID获取PARAM_VALUE(列分隔符) select_PARAM_VALUE = "select PARAM_VALUE from SERDE_PARAMS where SERDE_ID="+str(result_SERDE_ID)[1:].split(",")[0]+" and PARAM_KEY=‘field.delim‘;" print(select_PARAM_VALUE) mycursor.execute(select_PARAM_VALUE) result_PARAM_VALUE = mycursor.fetchone() print(result_PARAM_VALUE) if result_PARAM_VALUE is not None: fo.write("ROW FORMAT DELIMITED\n") fo.write("FIELDS TERMINATED BY ‘"+str(result_PARAM_VALUE).split(‘\‘‘)[1]+"‘\n") # 根据SERDE_ID获取PARAM_VALUE(行分隔符) select_PARAM_HNAG = "select PARAM_VALUE from SERDE_PARAMS where SERDE_ID="+str(result_SERDE_ID)[1:].split(",")[0]+" and PARAM_KEY=‘line.delim‘;" print(select_PARAM_HNAG) mycursor.execute(select_PARAM_HNAG) RESULT_PARAM_HNAG = mycursor.fetchone() print(RESULT_PARAM_HNAG) if RESULT_PARAM_HNAG is not None: fo.write("LINES TERMINATED BY ‘"+str(RESULT_PARAM_HNAG).split(‘\‘‘)[1]+"‘\n") # 根据SD_ID和CD_ID获取输入输出格式 select_table_STORE_FORMAT = "select INPUT_FORMAT from SDS where SD_ID="+SD_ID+" and CD_ID="+CD_ID+";" print(select_table_STORE_FORMAT) mycursor.execute(select_table_STORE_FORMAT) result_table_STORE_FORMAT= mycursor.fetchall() print(result_table_STORE_FORMAT) for store_format in result_table_STORE_FORMAT: if "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" in str(store_format): fo.write("STORED AS ORC;\n") elif "org.apache.hadoop.mapred.TextInputFormat" in str(store_format): fo.write("STORED AS TEXTFILE;\n") elif "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" in str(store_format): fo.write("STORED AS PARQUET;\n") elif "org.apache.kudu.mapreduce.KuduTableInputFormat" in str(store_format): fo.write("STORED AS KuduTable;\n") else : fo.write("STORED AS null;\n") fo.close() if __name__ == "__main__": hive_create_table(host_valuse,user_valuse,passwd_valuse,database_valuse)
2.导出一个库下的所有表
#!/usr/bin/env python #-*- coding:utf8 -*- # 从mysql中提取hive建表语句-指定数据库的所有表 import os,sys import fileinput import datetime import mysql.connector reload(sys) sys.setdefaultencoding("utf8") # 获取参数 if (len(sys.argv) == 2): Db_name = sys.argv[1].lower() else: print "Parameters is illegal." print "python export_tabname.py db_name" sys.exit(1) #IP host_valuse="xx.xx.xxx.xx" #用户 user_valuse="hive" #密码 passwd_valuse="xxxxxxxxx" #数据库名称 database_valuse="hive" def hive_create_table(host_valuse,user_valuse,passwd_valuse,database_valuse): conn = mysql.connector.connect(host=host_valuse,user=user_valuse,passwd=passwd_valuse,database=database_valuse,charset=‘utf8‘) mycursor = conn.cursor() # 获取DB_ID select_DB_ID = "select DB_ID from DBS where NAME=‘{0}‘;".format(Db_name) mycursor.execute(select_DB_ID) result_DB_ID = mycursor.fetchall() fo = open("create_tab.sql", "w") for dir_DB_ID in result_DB_ID : # 获取数据库名 DB_ID = str(dir_DB_ID)[1:].split(‘,‘)[0] print(DB_ID) select_DB_NAME = "select NAME from DBS where DB_ID="+DB_ID+";" print(select_DB_NAME ) mycursor.execute(select_DB_NAME) result_DB_NAME = mycursor.fetchone() fo.write("\n===========数据库:"+str(result_DB_NAME).split(‘\‘‘)[1]+"===========\n") DBname=str(result_DB_NAME).split(‘\‘‘)[1] print ‘数据库名字:‘ + DBname print(result_DB_NAME) # 获取表名 select_table_name_sql = "select TBL_NAME from TBLS where DB_ID=‘"+ DB_ID+"‘;" mycursor.execute(select_table_name_sql) result_table_names = mycursor.fetchall() for table_name in result_table_names : fo.write("\nDROP TABLE IF EXISTS "+DBname +‘.`‘+str(table_name).split(‘\‘‘)[1]+"`;") fo.write("\nCREATE TABLE IF NOT EXISTS "+DBname +‘.`‘+str(table_name).split(‘\‘‘)[1]+"`(\n") # 根据表名获取SD_ID select_table_SD_ID = "select SD_ID from TBLS where tbl_name=‘"+str(table_name).split(‘\‘‘)[1]+"‘ and DB_ID="+DB_ID+";" print(select_table_SD_ID) mycursor.execute(select_table_SD_ID) result_SD_ID = mycursor.fetchone() print(result_SD_ID ) # 根据SD_ID获取CD_ID SD_ID=str(result_SD_ID)[1:].split(‘,‘)[0] select_table_CD_ID = "select CD_ID from SDS where SD_ID="+str(result_SD_ID)[1:].split(‘,‘)[0]+";" print(select_table_CD_ID) mycursor.execute(select_table_CD_ID) result_CD_ID = mycursor.fetchone() print(result_CD_ID) # 根据CD_ID获取表的列 CD_ID=str(result_CD_ID)[1:].split(‘,‘)[0] select_table_COLUMN_NAME = "select COLUMN_NAME,TYPE_NAME,COMMENT from COLUMNS_V2 where CD_ID="+str(result_CD_ID)[1:].split(‘,‘)[0]+" order by INTEGER_IDX;" print(select_table_COLUMN_NAME) mycursor.execute(select_table_COLUMN_NAME) result_COLUMN_NAME = mycursor.fetchall() print(result_COLUMN_NAME) index=0 for col,col_type,col_name in result_COLUMN_NAME: print(col) print(col_type) print(col_name) print(len(result_COLUMN_NAME) ) # 写入表的列和列的类型到文件 if col_name is None: fo.write(" `"+str(col)+"` "+str(col_type)) else: fo.write(" `"+str(col)+"` "+str(col_type) + " COMMENT ‘" + str(col_name) + "‘") if index < len(result_COLUMN_NAME)-1: index = index + 1 fo.write(",\n") elif index == len(result_COLUMN_NAME)-1: fo.write("\n)") # 根据表名获取TBL_ID select_table_SD_ID = "select TBL_ID from TBLS where tbl_name=‘"+str(table_name).split(‘\‘‘)[1]+"‘ and DB_ID="+DB_ID+";" print(select_table_SD_ID) mycursor.execute(select_table_SD_ID) result_TBL_ID = mycursor.fetchone() print(result_TBL_ID) # 根据TBL_ID获取分区信息 select_table_PKEY_NAME_TYPE = "select PKEY_NAME,PKEY_TYPE,PKEY_COMMENT from PARTITION_KEYS where TBL_ID="+str(result_TBL_ID)[1:].split(‘,‘)[0]+" order by INTEGER_IDX;" print(select_table_PKEY_NAME_TYPE) mycursor.execute(select_table_PKEY_NAME_TYPE) result_PKEY_NAME_TYPE = mycursor.fetchall() print(result_PKEY_NAME_TYPE) if len(result_PKEY_NAME_TYPE) > 0: #if result_PKEY_NAME_TYPE is not None: fo.write("\nPARTITIONED BY (\n") #elif len(result_PKEY_NAME_TYPE) == 0: else : fo.write("\n") i=0 for pkey_name,pkey_type,PKEY_COMMENT in result_PKEY_NAME_TYPE: if str(PKEY_COMMENT) is None: fo.write(" `"+str(pkey_name)+"` "+str(pkey_type)) else: fo.write(" `"+str(pkey_name)+"` "+str(pkey_type) + " COMMENT ‘" + str(PKEY_COMMENT) + "‘\n") if i < len(result_PKEY_NAME_TYPE)- 1: i = i + 1 fo.write(",") elif i == len(result_PKEY_NAME_TYPE) - 1: fo.write(")\n") # 根据表TBL_ID 获得中文名称 select_PARAM_VALUE01 = "select PARAM_VALUE from TABLE_PARAMS WHERE TBL_ID=( select TBL_ID from TBLS where tbl_name=‘"+str(table_name).split(‘\‘‘)[1]+"‘ and DB_ID="+DB_ID+") and PARAM_KEY=‘comment‘;" print(select_PARAM_VALUE01) mycursor.execute(select_PARAM_VALUE01) result_PARAM_VALUE01 = mycursor.fetchone() print result_PARAM_VALUE01 if result_PARAM_VALUE01 is None: print ‘未设置表名‘ elif not result_PARAM_VALUE01[0]: print ‘表名为空‘ else: fo.write("COMMENT ‘" + str(result_PARAM_VALUE01[0]) +"‘ \n" ) # 根据SD_ID和CD_ID获取SERDE_ID select_SERDE_ID = "select SERDE_ID from SDS where SD_ID="+SD_ID+" and CD_ID="+CD_ID+";" print(select_SERDE_ID) mycursor.execute(select_SERDE_ID) result_SERDE_ID = mycursor.fetchone() print(result_SERDE_ID) # 根据SERDE_ID获取PARAM_VALUE(列分隔符) select_PARAM_VALUE = "select PARAM_VALUE from SERDE_PARAMS where SERDE_ID="+str(result_SERDE_ID)[1:].split(",")[0]+" and PARAM_KEY=‘field.delim‘;" print(select_PARAM_VALUE) mycursor.execute(select_PARAM_VALUE) result_PARAM_VALUE = mycursor.fetchone() print(result_PARAM_VALUE) if result_PARAM_VALUE is not None: fo.write("ROW FORMAT DELIMITED\n") fo.write("FIELDS TERMINATED BY ‘"+str(result_PARAM_VALUE).split(‘\‘‘)[1]+"‘\n") # 根据SERDE_ID获取PARAM_VALUE(行分隔符) select_PARAM_HNAG = "select PARAM_VALUE from SERDE_PARAMS where SERDE_ID="+str(result_SERDE_ID)[1:].split(",")[0]+" and PARAM_KEY=‘line.delim‘;" print(select_PARAM_HNAG) mycursor.execute(select_PARAM_HNAG) RESULT_PARAM_HNAG = mycursor.fetchone() print(RESULT_PARAM_HNAG) if RESULT_PARAM_HNAG is not None: fo.write("LINES TERMINATED BY ‘"+str(RESULT_PARAM_HNAG).split(‘\‘‘)[1]+"‘\n") # 根据SD_ID和CD_ID获取输入输出格式 select_table_STORE_FORMAT = "select INPUT_FORMAT from SDS where SD_ID="+SD_ID+" and CD_ID="+CD_ID+";" print(select_table_STORE_FORMAT) mycursor.execute(select_table_STORE_FORMAT) result_table_STORE_FORMAT= mycursor.fetchall() print(result_table_STORE_FORMAT) for store_format in result_table_STORE_FORMAT: if "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" in str(store_format): fo.write("STORED AS ORC;\n") elif "org.apache.hadoop.mapred.TextInputFormat" in str(store_format): fo.write("STORED AS TEXTFILE;\n") elif "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" in str(store_format): fo.write("STORED AS PARQUET;\n") elif "org.apache.kudu.mapreduce.KuduTableInputFormat" in str(store_format): fo.write("STORED AS KuduTable;\n") else : fo.write("STORED AS null;\n") fo.close() if __name__ == "__main__": hive_create_table(host_valuse,user_valuse,passwd_valuse,database_valuse)
需要注意:
视图也会同时导出,STORED AS null 的存储格式。
分隔符需要判断是否正确。
原文:https://www.cnblogs.com/hello-wei/p/13672038.html