import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import object NetWorkStream { def main(args: Array[String]): Unit = { //创建sparkConf对象 var conf=new SparkConf().setMaster("spark://").setAppName("netWorkStream"); //创建streamingContext:是所有数据流的一个主入口 //Seconds(1)代表每一秒,批量执行一次结果 var ssc=new StreamingContext(conf,Seconds(1)); //从192.168.99.143接受到输入数据 var lines= ssc.socketTextStream("", 9999); //计算出传入单词的个数 var words=lines.flatMap { line => line.split(" ")} var wordCount= { w => (w,1) }.reduceByKey(_+_); //打印结果 wordCount.print(); ssc.start();//启动进程 ssc.awaitTermination();//等待计算终止 }
nc -lk 9999 zhang xing sheng zhang
17/03/25 14:10:33 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 128.0 (TID 134) in 30 ms on (1/1) 17/03/25 14:10:33 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 128.0, whose tasks have all completed, from pool 17/03/25 14:10:33 INFO scheduler.DAGScheduler: ResultStage 128 (print at NetWorkStream.scala:18) finished in 0.031 s 17/03/25 14:10:33 INFO scheduler.DAGScheduler: Job 64 finished: print at NetWorkStream.scala:18, took 0.080836 s 17/03/25 14:10:33 INFO spark.SparkContext: Starting job: print at NetWorkStream.scala:18 17/03/25 14:10:33 INFO scheduler.DAGScheduler: Got job 65 (print at NetWorkStream.scala:18) with 1 output partitions 17/03/25 14:10:33 INFO scheduler.DAGScheduler: Final stage: ResultStage 130 (print at NetWorkStream.scala:18) 17/03/25 14:10:33 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 129) 17/03/25 14:10:33 INFO scheduler.DAGScheduler: Missing parents: List() 17/03/25 14:10:33 INFO scheduler.DAGScheduler: Submitting ResultStage 130 (ShuffledRDD[131] at reduceByKey at NetWorkStream.scala:17), which has no missing parents 17/03/25 14:10:33 INFO memory.MemoryStore: Block broadcast_67 stored as values in memory (estimated size 2.8 KB, free 366.2 MB) 17/03/25 14:10:33 INFO memory.MemoryStore: Block broadcast_67_piece0 stored as bytes in memory (estimated size 1711.0 B, free 366.2 MB) 17/03/25 14:10:33 INFO storage.BlockManagerInfo: Added broadcast_67_piece0 in memory on (size: 1711.0 B, free: 366.3 MB) 17/03/25 14:10:33 INFO spark.SparkContext: Created broadcast 67 from broadcast at DAGScheduler.scala:1012 17/03/25 14:10:33 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 130 (ShuffledRDD[131] at reduceByKey at NetWorkStream.scala:17) 17/03/25 14:10:33 INFO scheduler.TaskSchedulerImpl: Adding task set 130.0 with 1 tasks 17/03/25 14:10:33 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 130.0 (TID 135,, partition 1, NODE_LOCAL, 6468 bytes) 17/03/25 14:10:33 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 135 on executor id: 0 hostname: 17/03/25 14:10:33 INFO storage.BlockManagerInfo: Added broadcast_67_piece0 in memory on (size: 1711.0 B, free: 366.3 MB) 17/03/25 14:10:33 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 130.0 (TID 135) in 14 ms on (1/1) 17/03/25 14:10:33 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 130.0, whose tasks have all completed, from pool 17/03/25 14:10:33 INFO scheduler.DAGScheduler: ResultStage 130 (print at NetWorkStream.scala:18) finished in 0.014 s 17/03/25 14:10:33 INFO scheduler.DAGScheduler: Job 65 finished: print at NetWorkStream.scala:18, took 0.022658 s ------------------------------------------- Time: 1490422233000 ms ------------------------------------------- (xing,1) (zhang,2) (sheng,1)
After a context is defined, you have to do the following.
Define the streaming computations by applying transformation and output operations to DStreams.
Start receiving data and processing it using streamingContext.start().
Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
The processing can be manually stopped using streamingContext.stop().
Once a context has been started, no new streaming computations can be set up or added to it.
Once a context has been stopped, it cannot be restarted.
Only one StreamingContext can be active in a JVM at the same time.
//ssc.stop(false)stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
sparkstreaming+socket workCount 小案例