Apache Spark是一个开源、分布式、通用的分析引擎。多年来,它一直是大数据生态系统中对大型数据集进行批量和实时处理的主要工具。尽管对该平台的本地支持仅限于JVM语言集,但其他通常用于数据处理和分析的语言(如Python和R)已经加入了Spark的互操作层,以利用其功能。在2019年的Build大会上,微软发布了Spark.NET。Spark.NET提供了为Spark互操作层编写的绑定,允许您在.NET应用程序中使用诸如Spark SQL和Spark Streaming之类的组件。因为Spark.NET与.NET Standard 2.0兼容,可以运行Windows、Mac和Linux等操作系统。Spark.NET是Mobius项目的衍生版,该项目为Spark提供了.NET绑定。
这个示例从NYC Open Data门户获取餐馆违规数据集,并使用Spark.NET处理它。然后,处理后的数据被用来训练一个机器学习模型,该模型试图预测一个机构在检查后将获得的等级。该模型将使用一个开源、跨平台的机器学习框架ML.NET进行训练。最后,使用经过训练的模型来指定一个期望的等级,从而丰富当前不存在等级的数据。
这个示例的源代码可以在GitHub lqdev/RestaurantInspectionsSparkMLNET中找到。
这个项目是用Ubuntu 18.04构建的,但是应该可以在Windows和Mac设备上运行。
因为Spark是在JVM上运行的,所以您的PC上需要Java。所需的最小版本是Java 8。在终端输入以下命令:
sudo apt install openjdk-8-jdk openjdk-8-jre
然后,确保最新安装的版本是默认的。
sudo update-alternatives --config java
使用Hadoop 2.7将Spark 2.4.1下载到您的计算机上。在本例中,我将它放在Downloads文件夹中。
wget https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz -O ~/Downloads/spark-2.4.1-bin-hadoop2.7.tgz
将最近下载的文件的内容解压缩到/usr/bin/local目录。
sudo tar -xvf ~/Downloads/spark-2.4.1-bin-hadoop2.7.tgz --directory /usr/local/bin
下载.Net Spark worker到您的计算机上。在本例中,我将它放在Downloads文件夹中。
wget https://github.com/dotnet/spark/releases/download/v0.4.0/Microsoft.Spark.Worker.netcoreapp2.1.linux-x64-0.4.0.tar.gz -O ~/Downloads/Microsoft.Spark.Worker.netcoreapp2.1.linux-x64-0.4.0.tar.gz
将最新下载的文件的内容解压缩到/usr/bin/local目录。
sudo tar -xvf ~/Downloads/Microsoft.Spark.Worker.netcoreapp2.1.linux-x64-0.4.0.tar.gz --directory /usr/local/bin
最后,提高Microsoft.Spark.Worker的权限。这是执行用户定义函数(UDF)所必需的。
sudo chmod +x /usr/local/bin/Microsoft.Spark.Worker-0.4.0/Microsoft.Spark.Worker
下载并配置这些必备条件之后,将它们在系统中的位置配置为环境变量。打开/.bashrc文件并在文件末尾添加以下内容。
export SPARK_PATH=/usr/local/bin/spark-2.4.1-bin-hadoop2.7 export PATH=$SPARK_PATH/bin:$PATH export HADOOP_HOME=$SPARK_PATH export SPARK_HOME=$SPARK_PATH export DOTNET_WORKER_DIR=/usr/local/bin/Microsoft.Spark.Worker-0.4.0
此解决方案中使用的数据集是DOHMH New York City Restaurant Inspection Results,来自NYC Open Data门户。它每天更新,包含餐馆和大学食堂指定的和未决的检查结果和违规情况。该数据集不包括已倒闭的企业。尽管数据集包含多个列,但在此解决方案中仅使用其中的一个子集。有关数据集的详细描述,请访问数据集网站。
这个解决方案由四个不同的 .NET Core应用程序组成:
为您的项目创建一个名为RestaurantInspectionsSparkMLNET的新目录,并使用以下命令导航到该目录。
mkdir RestaurantInspectionsSparkMLNET && cd RestaurantInspectionsSparkMLNET
然后,使用dotnet cli创建一个解决方案。
dotnet new sln
为了确保使用.Net Core SDK 2.1版本作为目标框架,特别是如果您安装了多个版本的.Net SDK,请在RestaurantInspectionsSparkMLNET解决方案目录中创建一个名为globals.json的文件。
touch global.json
在global.json文件中添加以下内容。确保使用安装在计算机上的特定版本的SDK。在本例中,我的计算机上安装了版本2.1.801。可以使用 dotnet --list-sdks 命令列出安装的SDK版本。
{
"sdk": {
"version": "2.1.801"
}
}
ETL项目负责获取原始源数据,并使用Spark应用一系列转换来准备数据,以训练机器学习模型,以及用缺失的分数来丰富数据。
在RestaurantInspectionsSparkMLNET解决方案目录中,使用dotnet cli创建一个名为RestaurantInspectionsETL的新控制台应用程序。
dotnet new console -o RestaurantInspectionsETL
使用dotnet cli将新创建的项目添加到解决方案中。
dotnet sln add ./RestaurantInspectionsETL/
因为这个项目使用Microsoft.Spark NuGet包,使用dotnet cli安装它。
dotnet add ./RestaurantInspectionsETL/ package Microsoft.Spark --version 0.4.0
ML模型类库将包含定义输入和输出的域模型,以及经过训练的模型本身。
在RestaurantInspectionsSparkMLNET解决方案目录中,使用dotnet cli创建一个名为RestaurantInspectionsML的新类库。
dotnet new classlib -o RestaurantInspectionsML
使用dotnet cli将新创建的项目添加到解决方案中。
dotnet sln add ./RestaurantInspectionsML/
因为这个项目使用Microsoft.ML NuGet包,使用dotnet cli安装它。
dotnet add ./RestaurantInspectionsML/ package Microsoft.ML --version 1.3.1
训练项目的目的是使用RestaurantInspectionsETL项目预先处理过的分级数据作为输入,利用ML.NET的Auto ML API来训练一个多级分类模型。训练后的模型将保存在RestaurantInspectionsML目录中。
在RestaurantInspectionsSparkMLNET解决方案目录中,使用dotnet cli创建一个名为RestaurantInspectionsTraining的新控制台应用程序。
dotnet new console -o RestaurantInspectionsTraining
使用dotnet cli将新创建的项目添加到解决方案中。
dotnet sln add ./RestaurantInspectionsTraining/
此项目依赖于在RestaurantInspectionsML项目中创建的域模型,因此需要向其添加引用。
dotnet add ./RestaurantInspectionsTraining/ reference ./RestaurantInspectionsML/
因为这个项目使用的是Microsoft.Auto.ML NuGet包,使用dotnet cli安装它。
dotnet add ./RestaurantInspectionsTraining/ package Microsoft.ML.AutoML --version 0.15.1
丰富化数据应用程序使用RestaurantInspectionsTraining应用程序创建的机器学习模型训练数据,并使用它预测RestaurantInspectionsETL应用创建的pre-processed未分类数据最有可能获得根据检查中发现的违法情况得到的等级。
在RestaurantInspectionsSparkMLNET解决方案目录中,使用dotnet cli创建一个名为RestaurantInspectionsEnrichment的新控制台应用程序。
dotnet new console -o RestaurantInspectionsEnrichment
使用dotnet cli将新创建的项目添加到解决方案中。
dotnet sln add ./RestaurantInspectionsEnrichment/
此项目依赖于在RestaurantInspectionsML项目中创建的域模型,因此需要向其添加引用。
dotnet add ./RestaurantInspectionsEnrichment/ reference ./RestaurantInspectionsML/
请使用以下NuGet包:
使用以下命令安装软件包:
dotnet add ./RestaurantInspectionsEnrichment/ package Microsoft.Spark --version 0.4.0 dotnet add ./RestaurantInspectionsEnrichment/ package Microsoft.ML.LightGBM --version 1.3.1
第一步是准备数据。为此使用Spark.NET转换集合。
导航到RestaurantInspectionsETL项目并创建一个数据目录。
mkdir Data
然后,将数据下载到新创建的Data目录中。
wget https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD -O Data/NYC-Restaurant-Inspections.csv
将下列using添加到Program.cs文件中。
using System; using System.IO; using Microsoft.Spark.Sql; using static Microsoft.Spark.Sql.Functions;
并不是所有的列都相关。在Program.cs文件的Main方法中,定义要删除的列。
string[] dropCols = new string[] { "CAMIS", "CUISINE DESCRIPTION", "VIOLATION DESCRIPTION", "BORO", "BUILDING", "STREET", "ZIPCODE", "PHONE", "ACTION", "GRADE DATE", "RECORD DATE", "Latitude", "Longitude", "Community Board", "Council District", "Census Tract", "BIN", "BBL", "NTA" };
Spark应用程序的入口点是SparkSession。在Program.cs文件的Main方法中创建SparkSession。
var sc = SparkSession .Builder() .AppName("Restaurant_Inspections_ETL") .GetOrCreate();
然后,将存储在NYC-Restaurant-Inspections.csv文件中的数据加载到一个DataFrame中。
DataFrame df = sc .Read() .Option("header", "true") .Option("inferSchema", "true") .Csv("Data/NYC-Restaurant-Inspections.csv");
可以将DataFrames看作数据库中的表或Excel中的表。Spark有各种表示数据的方法,但是数据流是Spark.NET支持的格式。此外,DataFrame API更高级,更容易使用。
加载数据之后,通过创建一个新的DataFrame来删除不需要的数据,这个DataFrame排除了dropCols和缺失的值。
DataFrame cleanDf = df .Drop(dropCols) .WithColumnRenamed("INSPECTION DATE","INSPECTIONDATE") .WithColumnRenamed("INSPECTION TYPE","INSPECTIONTYPE") .WithColumnRenamed("CRITICAL FLAG","CRITICALFLAG") .WithColumnRenamed("VIOLATION CODE","VIOLATIONCODE") .Na() .Drop();
通常,机器学习模型期望值是数值,因此在ETL步骤中,尝试将尽可能多的值转换为数值。CRITICALFLAG列包含可以编码为0/1的“Y/N”值。
DataFrame labeledFlagDf = cleanDf .WithColumn("CRITICALFLAG", When(Functions.Col("CRITICALFLAG") == "Y",1) .Otherwise(0));
该数据集每行包含一个违规项,对应不同的检查。因此,所有的违规行为都需要通过业务和检查进行汇总。
DataFrame groupedDf = labeledFlagDf .GroupBy("DBA", "INSPECTIONDATE", "INSPECTIONTYPE", "CRITICALFLAG", "SCORE", "GRADE") .Agg(Functions.CollectSet(Functions.Col("VIOLATIONCODE")).Alias("CODES")) .Drop("DBA", "INSPECTIONDATE") .WithColumn("CODES", Functions.ArrayJoin(Functions.Col("CODES"), ",")) .Select("INSPECTIONTYPE", "CODES", "CRITICALFLAG", "SCORE", "GRADE");
既然数据是用于训练和预测的格式,那么将清理后的数据流分成两个新的数据流,分级的和未分级的。分级数据集是用来训练机器学习模型的数据。未分级的数据将被用来完整丰富化。
DataFrame gradedDf = groupedDf .Filter( Col("GRADE") == "A" | Col("GRADE") == "B" | Col("GRADE") == "C" ); DataFrame ungradedDf = groupedDf .Filter( Col("GRADE") != "A" & Col("GRADE") != "B" & Col("GRADE") != "C" );
将DataFrames保存为csv文件供以后使用。
var timestamp = ((DateTimeOffset) DateTime.UtcNow).ToUnixTimeSeconds().ToString(); var saveDirectory = Path.Join("Output",timestamp); if(!Directory.Exists(saveDirectory)) { Directory.CreateDirectory(saveDirectory); } gradedDf.Write().Csv(Path.Join(saveDirectory,"Graded")); ungradedDf.Write().Csv(Path.Join(saveDirectory,"Ungraded"));
最终的Program.cs文件应该如下所示:
using System; using System.IO; using Microsoft.Spark.Sql; using static Microsoft.Spark.Sql.Functions; namespace RestaurantInspectionsETL { class Program { static void Main(string[] args) { // Define columns to remove string[] dropCols = new string[] { "CAMIS", "CUISINE DESCRIPTION", "VIOLATION DESCRIPTION", "BORO", "BUILDING", "STREET", "ZIPCODE", "PHONE", "ACTION", "GRADE DATE", "RECORD DATE", "Latitude", "Longitude", "Community Board", "Council District", "Census Tract", "BIN", "BBL", "NTA" }; // Create SparkSession var sc = SparkSession .Builder() .AppName("Restaurant_Inspections_ETL") .GetOrCreate(); // Load data DataFrame df = sc .Read() .Option("header", "true") .Option("inferSchema", "true") .Csv("Data/NYC-Restaurant-Inspections.csv"); //Remove columns and missing values DataFrame cleanDf = df .Drop(dropCols) .WithColumnRenamed("INSPECTION DATE","INSPECTIONDATE") .WithColumnRenamed("INSPECTION TYPE","INSPECTIONTYPE") .WithColumnRenamed("CRITICAL FLAG","CRITICALFLAG") .WithColumnRenamed("VIOLATION CODE","VIOLATIONCODE") .Na() .Drop(); // Encode CRITICAL FLAG column DataFrame labeledFlagDf = cleanDf .WithColumn("CRITICALFLAG", When(Functions.Col("CRITICALFLAG") == "Y",1) .Otherwise(0)); // Aggregate violations by business and inspection DataFrame groupedDf = labeledFlagDf .GroupBy("DBA", "INSPECTIONDATE", "INSPECTIONTYPE", "CRITICALFLAG", "SCORE", "GRADE") .Agg(Functions.CollectSet(Functions.Col("VIOLATIONCODE")).Alias("CODES")) .Drop("DBA", "INSPECTIONDATE") .WithColumn("CODES", Functions.ArrayJoin(Functions.Col("CODES"), ",")) .Select("INSPECTIONTYPE", "CODES", "CRITICALFLAG", "SCORE", "GRADE"); // Split into graded and ungraded DataFrames DataFrame gradedDf = groupedDf .Filter( Col("GRADE") == "A" | Col("GRADE") == "B" | Col("GRADE") == "C" ); DataFrame ungradedDf = groupedDf .Filter( Col("GRADE") != "A" & Col("GRADE") != "B" & Col("GRADE") != "C" ); // Save DataFrames var timestamp = ((DateTimeOffset) DateTime.UtcNow).ToUnixTimeSeconds().ToString(); var saveDirectory = Path.Join("Output",timestamp); if(!Directory.Exists(saveDirectory)) { Directory.CreateDirectory(saveDirectory); } gradedDf.Write().Csv(Path.Join(saveDirectory,"Graded")); ungradedDf.Write().Csv(Path.Join(saveDirectory,"Ungraded")); } } }
使用以下命令发布应用程序。
dotnet publish -f netcoreapp2.1 -r ubuntu.18.04-x64
使用spark-submit运行应用程序。
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local bin/Debug/netcoreapp2.1/ubuntu.18.04-x64/publish/microsoft-spark-2.4.x-0.4.0.jar dotnet bin/Debug/netcoreapp2.1/ubuntu.18.04-x64/publish/RestaurantInspectionsETL.dll
导航到RestaurantInspectionsTraining目录并创建一个名为ModelInput.cs的新文件。
touch ModelInput.cs
打开ModelInput.cs文件并添加以下代码。
using Microsoft.ML.Data; namespace RestaurantInspectionsML { public class ModelInput { [LoadColumn(0)] public string InspectionType { get; set; } [LoadColumn(1)] public string Codes { get; set; } [LoadColumn(2)] public float CriticalFlag { get; set; } [LoadColumn(3)] public float InspectionScore { get; set; } [LoadColumn(4)] [ColumnName("Label")] public string Grade { get; set; } } }
在模型中使用Attributes,定义了五个属性:
LoadColumn属性定义了该列在文件中的位置。最后一列中的数据被分配给Grade属性,然后在IDataView中作为标签引用。ML.NET算法使用ColumnName属性指定默认的列名,并在模型类中保持这个命名,这样就不需要在训练管道中将特性和标签列定义为参数。
在RestaurantInspectionsTraining目录中创建一个名为ModelOutput.cs的新文件。
touch ModelOutput.cs
打开ModelOutput.cs文件并添加以下代码。
namespace RestaurantInspectionsML { public class ModelOutput { public float[] Scores { get; set; } public string PredictedLabel { get; set; } } }
对于输出模型,ModelOutput类使用模型训练过程生成的输出的默认列名的属性:
该应用程序训练了一个多级分类算法。找到具有正确参数的“最佳”算法需要进行实验。幸运的是,如果您为ML.NET提供了您想要训练的算法类型,那么ML.NET的Auto ML就可以为您做到这一点。
导航到restaurantinspectionproject目录,并将以下using语句添加到Program.cs类中。
using System; using System.IO; using System.Linq; using Microsoft.ML; using static Microsoft.ML.DataOperationsCatalog; using Microsoft.ML.AutoML; using RestaurantInspectionsML;
在Program.cs文件的Main方法中,定义存储数据文件的路径。
string solutionDirectory = "/home/lqdev/Development/RestaurantInspectionsSparkMLNET"; string dataLocation = Path.Combine(solutionDirectory,"RestaurantInspectionsETL","Output");
ML.NET应用程序的入口点有MLContext,需要初始化一个MLContext实例。
MLContext mlContext = new MLContext();
接下来,获取数据文件的路径。RestaurantInspectionsETL应用程序生成的输出既包含csv文件,也包含关于创建它们的分区的信息。对于训练,只需要csv文件。
var latestOutput = Directory .GetDirectories(dataLocation) .Select(directory => new DirectoryInfo(directory)) .OrderBy(directoryInfo => directoryInfo.Name) .Select(directory => Path.Join(directory.FullName,"Graded")) .First(); var dataFilePaths = Directory .GetFiles(latestOutput) .Where(file => file.EndsWith("csv")) .ToArray();
然后,将数据加载到IDataView中。IDataView类似于DataFrame,因为它可以将数据表示为行、列及其模式。
var dataLoader = mlContext.Data.CreateTextLoader<ModelInput>(separatorChar:‘,‘, hasHeader:false, allowQuoting:true, trimWhitespace:true); IDataView data = dataLoader.Load(dataFilePaths);
将数据分成训练集和测试集进行评估是一个很好的实践。将数据分成80%的训练集和20%的测试集。
TrainTestData dataSplit = mlContext.Data.TrainTestSplit(data,testFraction:0.2); IDataView trainData = dataSplit.TrainSet; IDataView testData = dataSplit.TestSet;
Auto ML获取数据,使用不同的模型和超参数进行实验,以寻找“最佳”模型。定义实验的设置。在这种情况下,模型将运行600秒或10分钟,并尝试找到日志损失最低的模型。
var experimentSettings = new MulticlassExperimentSettings(); experimentSettings.MaxExperimentTimeInSeconds = 600; experimentSettings.OptimizingMetric = MulticlassClassificationMetric.LogLoss;
然后,创建实验。
var experiment = mlContext.Auto().CreateMulticlassClassificationExperiment(experimentSettings);
创建实验之后,运行它。
var experimentResults = experiment.Execute(data, progressHandler: new ProgressHandler());
默认情况下,运行应用程序不会显示进度信息。但是,可以将ProgressHandler对象传递到调用已实现Report方法的实验的Execute方法中。
在restaurantinspectionstrain项目目录中,创建一个名为ProgressHandler.cs的新文件。
touch ProgressHandler.cs
然后,添加以下代码:
using System; using Microsoft.ML.Data; using Microsoft.ML.AutoML; namespace RestaurantInspectionsTraining { public class ProgressHandler : IProgress<RunDetail<MulticlassClassificationMetrics>> { public void Report(RunDetail<MulticlassClassificationMetrics> run) { Console.WriteLine($"Trained {run.TrainerName} with Log Loss {run.ValidationMetrics.LogLoss:0.####} in {run.RuntimeInSeconds:0.##} seconds"); } } }
ProgressHandler类派生自IProgress<T>接口,该接口需要实现Report方法。每次运行后传递到Report方法中的对象是一个RunDetail<MulticlassClassificationMetrics>对象。每次运行完成时,都会调用Report方法并执行其中的代码。
一旦实验完成运行,从最佳运行中获取模型。将下列代码添加到Program.cs的Main方法中。
var bestModel = experimentResults.BestRun.Model;
使用测试数据集评估模型的性能并测量其微精度度量。
IDataView scoredTestData = bestModel.Transform(testData); var metrics = mlContext.MulticlassClassification.Evaluate(scoredTestData); Console.WriteLine($"MicroAccuracy: {metrics.MicroAccuracy}");
最后,将训练好的模型保存到RestaurantInspectionsML中。
string modelSavePath = Path.Join(solutionDirectory,"RestaurantInspectionsML","model.zip"); mlContext.Model.Save(bestModel, data.Schema, modelSavePath);
正常情况会在RestaurantInspectionsML项目中创建一个名为model.zip的文件。
通过向RestaurantInspectionsML.csproj添加以下内容,确保将训练后的模型文件复制并输出到RestaurantInspectionsML目录中。
<ItemGroup> <None Include="model.zip"> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> </None> </ItemGroup>
将其复制到RestaurantInspectionsML的输出目录可以更容易地从RestaurantInspectionsEnrichment项目中引用,因为该项目已经包含了对RestaurantInspectionsML类库的引用。
最终的Program.cs文件应该如下所示:
using System; using System.IO; using System.Linq; using Microsoft.ML; using static Microsoft.ML.DataOperationsCatalog; using Microsoft.ML.AutoML; using RestaurantInspectionsML; namespace RestaurantInspectionsTraining { class Program { static void Main(string[] args) { // Define source data directory paths string solutionDirectory = "/home/lqdev/Development/RestaurantInspectionsSparkMLNET"; string dataLocation = Path.Combine(solutionDirectory,"RestaurantInspectionsETL","Output"); // Initialize MLContext MLContext mlContext = new MLContext(); // Get directory name of most recent ETL output var latestOutput = Directory .GetDirectories(dataLocation) .Select(directory => new DirectoryInfo(directory)) .OrderBy(directoryInfo => directoryInfo.Name) .Select(directory => Path.Join(directory.FullName,"Graded")) .First(); var dataFilePaths = Directory .GetFiles(latestOutput) .Where(file => file.EndsWith("csv")) .ToArray(); // Load the data var dataLoader = mlContext.Data.CreateTextLoader<ModelInput>(separatorChar:‘,‘, hasHeader:false, allowQuoting:true, trimWhitespace:true); IDataView data = dataLoader.Load(dataFilePaths); // Split the data TrainTestData dataSplit = mlContext.Data.TrainTestSplit(data,testFraction:0.2); IDataView trainData = dataSplit.TrainSet; IDataView testData = dataSplit.TestSet; // Define experiment settings var experimentSettings = new MulticlassExperimentSettings(); experimentSettings.MaxExperimentTimeInSeconds = 600; experimentSettings.OptimizingMetric = MulticlassClassificationMetric.LogLoss; // Create experiment var experiment = mlContext.Auto().CreateMulticlassClassificationExperiment(experimentSettings); // Run experiment var experimentResults = experiment.Execute(data, progressHandler: new ProgressHandler()); // Best Run Results var bestModel = experimentResults.BestRun.Model; // Evaluate Model IDataView scoredTestData = bestModel.Transform(testData); var metrics = mlContext.MulticlassClassification.Evaluate(scoredTestData); Console.WriteLine($"MicroAccuracy: {metrics.MicroAccuracy}"); // Save Model string modelSavePath = Path.Join(solutionDirectory,"RestaurantInspectionsML","model.zip"); mlContext.Model.Save(bestModel, data.Schema, modelSavePath); } } }
完成所有代码和配置后,从restaurantinspectionstrain目录中,使用dotnet cli运行应用程序。注意,这将运行10来分钟。
dotnet run
控制台输出应该类似于下面的内容:
Trained LightGbmMulti with Log Loss 0.1547 in 1.55 seconds Trained FastTreeOva with Log Loss 0.0405 in 65.58 seconds Trained FastForestOva with Log Loss 0.0012 in 53.37 seconds Trained LightGbmMulti with Log Loss 0.0021 in 4.55 seconds Trained FastTreeOva with Log Loss 0.8315 in 5.22 seconds MicroAccuracy: 0.999389615839469
该模型经过训练后,可用于丰富化未分级数据。
导航到RestaurantInspectionsEnrichment项目目录,并将以下using语句添加到Program.cs类中。
using System.IO; using System.Linq; using Microsoft.ML; using Microsoft.Spark.Sql; using static Microsoft.Spark.Sql.Functions; using RestaurantInspectionsML;
要进行预测,必须将模型加载到应用程序中,因为预测一次只生成一行,所以还需要创建一个PredictionEngine。
在Program类中,定义PredictionEngine。
private static readonly PredictionEngine<ModelInput,ModelOutput> _predictionEngine;
然后,创建一个构造函数来加载模型并初始化它。
static Program() { MLContext mlContext = new MLContext(); ITransformer model = mlContext.Model.Load("model.zip",out DataViewSchema schema); _predictionEngine = mlContext.Model.CreatePredictionEngine<ModelInput,ModelOutput>(model); }
在Program类的Main方法中,定义数据文件的位置。
string solutionDirectory = "/home/lqdev/Development/RestaurantInspectionsSparkMLNET"; string dataLocation = Path.Combine(solutionDirectory,"RestaurantInspectionsETL","Output");
然后,获取由RestaurantInspectionsETL应用程序生成的最新未分级数据的路径。
var latestOutput = Directory .GetDirectories(dataLocation) .Select(directory => new DirectoryInfo(directory)) .OrderBy(directoryInfo => directoryInfo.Name) .Select(directory => directory.FullName) .First();
为您的丰富化数据的应用程序初始化一个SparkSession。
var sc = SparkSession .Builder() .AppName("Restaurant_Inspections_Enrichment") .GetOrCreate();
由RestaurantInspectionsETL生成的数据没有headers。但是,可以在加载数据时定义和设置架构。
var schema = @" INSPECTIONTYPE string, CODES string, CRITICALFLAG int, INSPECTIONSCORE int, GRADE string"; DataFrame df = sc .Read() .Schema(schema) .Csv(Path.Join(latestOutput,"Ungraded"));
Spark中没有允许您使用PredictionEngine的内置函数。但是,Spark可以通过udf扩展。请记住,udf没有像内置函数那样进行优化。因此,尽可能多地使用内置函数。
在Program类中,创建一个名为PredictGrade的新方法,它接受一组特性,这些特性组成了训练模型所期望的ModelInput。
public static string PredictGrade( string inspectionType, string violationCodes, int criticalFlag, int inspectionScore) { ModelInput input = new ModelInput { InspectionType=inspectionType, Codes=violationCodes, CriticalFlag=(float)criticalFlag, InspectionScore=(float)inspectionScore }; ModelOutput prediction = _predictionEngine.Predict(input); return prediction.PredictedLabel; }
然后,在Main方法中,将PredictGrade方法注册为SparkSession中的UDF。
sc.Udf().Register<string,string,int,int,string>("PredictGrade",PredictGrade);
一旦注册了UDF,就可以在Select语句中使用它,Select语句将创建一个新的DataFrame,其中包括输入特性以及经过训练模型的预测等级输出。
var enrichedDf = df .Select( Col("INSPECTIONTYPE"), Col("CODES"), Col("CRITICALFLAG"), Col("INSPECTIONSCORE"), CallUDF("PredictGrade", Col("INSPECTIONTYPE"), Col("CODES"), Col("CRITICALFLAG"), Col("INSPECTIONSCORE") ).Alias("PREDICTEDGRADE") );
最后,保存丰富化的DataFrame。
string outputId = new DirectoryInfo(latestOutput).Name; string enrichedOutputPath = Path.Join(solutionDirectory,"RestaurantInspectionsEnrichment","Output"); string savePath = Path.Join(enrichedOutputPath,outputId); if(!Directory.Exists(savePath)) { Directory.CreateDirectory(enrichedOutputPath); } enrichedDf.Write().Csv(savePath);
发布并运行丰富化数据的应用程序
最终的Program.cs文件应该如下所示。
using System.IO; using System.Linq; using Microsoft.ML; using Microsoft.Spark.Sql; using static Microsoft.Spark.Sql.Functions; using RestaurantInspectionsML; namespace RestaurantInspectionsEnrichment { class Program { private static readonly PredictionEngine<ModelInput,ModelOutput> _predictionEngine; static Program() { MLContext mlContext = new MLContext(); ITransformer model = mlContext.Model.Load("model.zip",out DataViewSchema schema); _predictionEngine = mlContext.Model.CreatePredictionEngine<ModelInput,ModelOutput>(model); } static void Main(string[] args) { // Define source data directory paths string solutionDirectory = "/home/lqdev/Development/RestaurantInspectionsSparkMLNET"; string dataLocation = Path.Combine(solutionDirectory,"RestaurantInspectionsETL","Output"); var latestOutput = Directory .GetDirectories(dataLocation) .Select(directory => new DirectoryInfo(directory)) .OrderBy(directoryInfo => directoryInfo.Name) .Select(directory => directory.FullName) .First(); var sc = SparkSession .Builder() .AppName("Restaurant_Inspections_Enrichment") .GetOrCreate(); var schema = @" INSPECTIONTYPE string, CODES string, CRITICALFLAG int, INSPECTIONSCORE int, GRADE string"; DataFrame df = sc .Read() .Schema(schema) .Csv(Path.Join(latestOutput,"Ungraded")); sc.Udf().Register<string,string,int,int,string>("PredictGrade",PredictGrade); var enrichedDf = df .Select( Col("INSPECTIONTYPE"), Col("CODES"), Col("CRITICALFLAG"), Col("INSPECTIONSCORE"), CallUDF("PredictGrade", Col("INSPECTIONTYPE"), Col("CODES"), Col("CRITICALFLAG"), Col("INSPECTIONSCORE") ).Alias("PREDICTEDGRADE") ); string outputId = new DirectoryInfo(latestOutput).Name; string enrichedOutputPath = Path.Join(solutionDirectory,"RestaurantInspectionsEnrichment","Output"); string savePath = Path.Join(enrichedOutputPath,outputId); if(!Directory.Exists(savePath)) { Directory.CreateDirectory(enrichedOutputPath); } enrichedDf.Write().Csv(savePath); } public static string PredictGrade( string inspectionType, string violationCodes, int criticalFlag, int inspectionScore) { ModelInput input = new ModelInput { InspectionType=inspectionType, Codes=violationCodes, CriticalFlag=(float)criticalFlag, InspectionScore=(float)inspectionScore }; ModelOutput prediction = _predictionEngine.Predict(input); return prediction.PredictedLabel; } } }
从RestaurantInspectionsEnrichment项目中使用以下命令发布应用程序。
dotnet publish -f netcoreapp2.1 -r ubuntu.18.04-x64
导航到发布目录。在本例中,它是bin/Debug/netcoreapp2.1/ubuntu.18.04-x64/publish。
从发布目录中,使用spark-submit运行应用程序。
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local microsoft-spark-2.4.x-0.4.0.jar dotnet RestaurantInspectionsEnrichment.dll
文件输出应该类似于下面的内容:
Cycle Inspection / Initial Inspection,04N,1,13,A Cycle Inspection / Re-inspection,08A,0,9,A Cycle Inspection / Initial Inspection,"10B,10H",0,10,A Cycle Inspection / Initial Inspection,10F,0,10,A Cycle Inspection / Reopening Inspection,10F,0,3,C
这个解决方案展示了如何在.NET应用程序中使用Spark。因为它是.NET生态系统的一部分,所以可以利用其他组件和框架(如ML.NET)来扩展系统的功能。虽然这个示例是在本地单节点集群上开发和运行的,但Spark是按比例运行的。因此,可以通过设置集群并在其中运行ETL和工作负载来进一步改进该应用程序。
基于Spark.NET和ML.NET Automated ML (自动学习)进行餐厅等级的检查预测
原文:https://www.cnblogs.com/BeanHsiang/p/11739886.html