首页 > 其他 > 详细

hadoop系列整理---PySpark

时间:2020-05-17 09:24:49      阅读:83      评论:0      收藏:0      [点我收藏+]

【PySpark RDD与DataFrame、Pandas转换】

 1.  RDD=>  DataFrame

 2.  RDD=>  Pandas

 3.  DataFrame => RDD

 4.  DataFrame => Pandas

 5.  Pandas => DataFrame

【PySpark与Hbase交互】

  jar依赖:spark-examples_2.11-2.4.0.jar

  相关jvm中过滤器参考:https://www.cnblogs.com/frankdeng/p/9310262.html HBase(七)Hbase过滤器

 1. read

conf= {
"inputFormatClass": "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"keyClass": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"valueClass": "org.apache.hadoop.hbase.client.Result",
"keyConverter": "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
"valueConverter": "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
"conf": {
"hbase.zookeeper.quorum": hosts,
"hbase.mapreduce.inputtable": table,
"hbase.mapreduce.scan.columns": columns,
}
}
newAPIHadoopRDD(**conf)

 2. write

conf = {
    "path": "-",
"keyConverter": "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
"valueConverter": "org.apache.spark.examples.pythonconverters.StringListToPutConverter",
"conf": {
"hbase.zookeeper.quorum": hosts,
"hbase.mapred.outputtable": table,
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"
}
}
saveAsNewAPIHadoopDataset(**conf)

【PySpark与elsticsearch交互】

  jar依赖:elasticsearch-hadoop-6.4.1.jar

 1. read

conf = {
"inputFormatClass": "org.elasticsearch.hadoop.mr.EsInputFormat",
"keyClass": "org.apache.hadoop.io.NullWritable",
"valueClass": "org.elasticsearch.hadoop.mr.LinkedMapWritable",
"conf": {
"es.nodes": hosts,
"es.resource": f"{_index}/{_type}",
"es.query": query
}
}
newAPIHadoopRDD(**conf)

 2. write

conf = {
    "path": "-",
"outputFormatClass": "org.elasticsearch.hadoop.mr.EsOutputFormat",
"keyClass": "org.apache.hadoop.io.NullWritable",
"valueClass": "org.elasticsearch.hadoop.mr.LinkedMapWritable",
"conf": {
"es.nodes": hosts,
"es.resource": f"{_index}/{_type}",
"es.input.json": "yes",
"es.index.auto.create": index_auto_create, # 是否自动创建
"es.mapping.id": None if not mapping_id or index_auto_create else mapping_id
}
}
saveAsNewAPIHadoopFile(**conf)

【PySpark与mongodb交互】

   jar依赖:mongo-java-driver-3.6.2.jar, mongo-hadoop-core-2.0.2.jar, [spark-core_2.11-2.3.4.jar, spark-sql_2.11-2.3.4.jar]

 1. read

conf = {
    ‘inputFormatClass‘: ‘com.mongodb.hadoop.MongoInputFormat‘,
‘keyClass‘: ‘org.apache.hadoop.io.Text‘,
‘valueClass‘: ‘org.apache.hadoop.io.MapWritable‘,
‘conf‘: {
‘mongo.input.uri‘: ‘mongodb://localhost:27017/db.collection‘,
‘mongo.input.query‘: query
‘mongo.input.split.create_input_splits‘: ‘false‘
}
}
newAPIHadoopRDD(**conf)

 2. write

conf = {
    ‘path‘: ‘-‘,
‘outputFormatClass‘: ‘com.mongodb.hadoop.MongoOutputFormat‘,
‘keyClass‘: ‘org.apache.hadoop.io.Text‘,
‘valueClass‘: ‘org.apache.hadoop.io.MapWritable‘,
‘conf‘: {
‘mongo.output.uri‘: ‘mongodb://localhost:27017/output.collection‘
}
}
newAPIHadoopRDD(**conf)

hadoop系列整理---PySpark

原文:https://www.cnblogs.com/satansz/p/12903262.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!