// Create a Streming filter for printing lines containing "error" in Scala
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream using data received after connecting to port 777 on the local machine
val lines = scc.socketTextStream("localhost", 7777)
// Filter our DStreams for lines with "error"
val errorLines = lines.filter(_.contains("error"))
//Print out the lines with errors
errorLines.print()
// Start our streaming context and wait for it to "finish" ssc.start() // Wait for the job to finish ssc.awaitTermination()

// Assumes ApacheAccessLog is utility class for parsing entries from Apache logs val accessLogDStream = logData.map(line => ApacheAccessLog.parseFromLogLine(line)) val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1)) val ipCountsDStream = ipDStream.reduceByKey((x, y) => x +y)

def updateRunningSum(values: Seq[Long], state: Option[Long]) = {
    Some(State.getOrElse(0L) + values.size)
}
val responseCodeDSrteam = accessLogsDStream.map(log => (log.getResponseCode(), 1L))
val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)
ipAddressRequeseCount.foreach { rdd =>
    rdd.foreachPartition { partition =>
        // Open connection to storage system(e.g. a database connection)
        partition.foreach{ item =>
            // Use connection to push item to system
        }
       // Close connection
    }
}

    ssc.chepoint("hdfs://...")
原文:http://www.cnblogs.com/wttttt/p/6852102.html