// Create a Streming filter for printing lines containing "error" in Scala val ssc = new StreamingContext(conf, Seconds(1)) // Create a DStream using data received after connecting to port 777 on the local machine val lines = scc.socketTextStream("localhost", 7777) // Filter our DStreams for lines with "error" val errorLines = lines.filter(_.contains("error")) //Print out the lines with errors errorLines.print()
// Start our streaming context and wait for it to "finish" ssc.start() // Wait for the job to finish ssc.awaitTermination()
// Assumes ApacheAccessLog is utility class for parsing entries from Apache logs val accessLogDStream = logData.map(line => ApacheAccessLog.parseFromLogLine(line)) val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1)) val ipCountsDStream = ipDStream.reduceByKey((x, y) => x +y)
def updateRunningSum(values: Seq[Long], state: Option[Long]) = { Some(State.getOrElse(0L) + values.size) } val responseCodeDSrteam = accessLogsDStream.map(log => (log.getResponseCode(), 1L)) val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)
ipAddressRequeseCount.foreach { rdd => rdd.foreachPartition { partition => // Open connection to storage system(e.g. a database connection) partition.foreach{ item => // Use connection to push item to system } // Close connection } }
ssc.chepoint("hdfs://...")
原文:http://www.cnblogs.com/wttttt/p/6852102.html