首页 > 其他 > 详细

2. 运行Spark Streaming

时间:2019-09-14 23:43:17      阅读:90      评论:0      收藏:0      [点我收藏+]

2.1 IDEA编写程序 

      Pom.xml加入以下依赖:

<dependency>
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.11</artifactId>
    <version>${spark.version}</version> 
    <scope>provided</scope>
</dependency>

      案例如下:

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by huicheng on 25/07/2019.
  * */

object WorldCount {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("master01", 9999)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate }
  }

}

      按照Spark Core中的方式进行打包,并将程序上传到Spark机器。并运行:

bin/spark-submit --class com.c.streaming.WorldCount ~/wordcount-jar-with- dependencies.jar

      通过Netcat发送数据:

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world

      如果程序运行时,log日志太多,可以将spark conf目录下的log4j文件里面的日志级别改成WARN 

 

2. 运行Spark Streaming

原文:https://www.cnblogs.com/zhanghuicheng/p/11227372.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!