首页 > 其他 > 详细

pyspark

时间:2021-05-21 14:18:01      阅读:12      评论:0      收藏:0      [点我收藏+]

To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.

 

Using Python version 3.5.2 (default, Nov 23 2017 16:37:01) SparkSession available as ‘spark‘.
# Print the tables in the catalog print(spark.catalog.listTables())
[Table(name=‘flights‘, database=None, description=None, tableType=‘TEMPORARY‘, isTemporary=True)]
 
Your SparkSession has an attribute called catalog which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.
 
Running a query on this table is as easy as using the .sql() method on your SparkSession. This method takes a string containing the query and returns a DataFrame with the results!
 
 
# Don‘t change this query
query = "FROM flights SELECT * LIMIT 10"

# Get the first 10 rows of flights
flights10 = spark.sql(query)

# Show the results
flights10.show()
 
 
 
Sometimes it makes sense to then take that table and work with it locally using a tool like pandas. Spark DataFrames make that easy with the .toPandas() method. Calling this method on a Spark DataFrame returns the corresponding pandas DataFrame. It‘s as simple as that!
use pandas for spark
Calling this method on a Spark DataFrame 
 
 
# Don‘t change this query query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"
# Run the query
flight_counts = sql(query)
# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()
# Print the head of pd_counts
print(pd_counts.head())
 
 
The .createDataFrame() method takes a pandas DataFrame and returns a Spark DataFrame.
The output of this method is stored locally, not in the SparkSession catalog. This means that you can use all the Spark DataFrame methods on it, but you can‘t access the data in other contexts.
 

For example, a SQL query (using the .sql() method) that references your DataFrame will throw an error. To access the data in this way, you have to save it as a temporary table.

You can do this using the .createTempView() Spark DataFrame method, which takes as its only argument the name of the temporary table you‘d like to register. This method registers the DataFrame as a table in the catalog, but as this table is temporary, it can only be accessed from the specific SparkSession used to create the Spark DataFrame.

There is also the method .createOrReplaceTempView(). This safely creates a new temporary table if nothing was there before, or updates an existing table if one was already defined. You‘ll use this method to avoid running into problems with duplicate tables.

Check out the diagram to see all the different ways your Spark data structures interact with each other.

技术分享图片

 

 

 

如何使用create or replace temp view

DataFrame.createOrReplaceTempView(name)

 
 
 
# Don‘t change this file path
file_path = "/usr/local/share/datasets/airports.csv"

# Read in the airports data
airports = spark.read.csv(file_path)

# Show the data
airports.show()
 
header
faa| name| lat| lon| alt| tz|dst|
 
************************************************************************************************************************************************************
In this chapter, you‘ll learn how to use the methods defined by Spark‘s DataFrame class to perform common data operations.
 

In Spark you can do this using the .withColumn() method, which takes two arguments. First, a string with the name of your new column, and second the new column itself.

The new column must be an object of class Column. Creating one of these is as easy as extracting a column from your DataFrame using df.colName.

Updating a Spark DataFrame is somewhat different than working in pandas because the Spark DataFrame is immutable. This means that it can‘t be changed, and so columns can‘t be updated in place.

Thus, all these methods return a new DataFrame. To overwrite the original DataFrame you must reassign the returned DataFrame using the method like so:

df = df.withColumn("newCol", df.oldCol + 1)

The above code creates a DataFrame with the same columns as df plus a new column, newCol, where every entry is equal to the corresponding entry from oldCol, plus one.

To overwrite an existing column, just pass the name of the column as the first argument!

Remember, a SparkSession called spark is already in your workspace.

 

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+ |year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|

 
 
# Create the DataFrame flights
flights = spark.table("flights")

# Show the head
flights.show()

# Add duration_hrs
flights = flights.withColumn("duration_hrs",flights.air_time/60)
 
 

Filtering Data

Now that you have a bit of SQL know-how under your belt, it‘s easier to talk about the analogous operations using Spark DataFrames.

Let‘s take a look at the .filter() method. As you might suspect, this is the Spark counterpart of SQL‘s WHERE clause. The .filter() method takes either an expression that would follow the WHERE clause of a SQL expression as a string, or a Spark Column of boolean (True/False) values.

For example, the following two expressions will produce the same output:

flights.filter("air_time > 120").show()
flights.filter(flights.air_time > 120).show()

Notice that in the first case, we pass a string to .filter(). In SQL, we would write this filtering task as SELECT * FROM flights WHERE air_time > 120. Spark‘s .filter() can accept any expression that could go in the WHEREclause of a SQL query (in this case, "air_time > 120"), as long as it is passed as a string. Notice that in this case, we do not reference the name of the table in the string -- as we wouldn‘t in the SQL request.

In the second case, we actually pass a column of boolean values to .filter(). Remember that flights.air_time > 120 returns a column of boolean values that has True in place of those records in flights.air_time that are over 120, and False otherwise.

 

two ways to filter

1 pass a string to .filter()

2 pass a column of boolean values

 

# Filter flights by passing a string
long_flights1 = flights.filter("distance>1000")

# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance>1000)

# Print the data to check they‘re equal
long_flights1.show()
long_flights2.show()
 
 

Selecting

The Spark variant of SQL‘s SELECT is the .select() method. This method takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the df.colName syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it, much like inside .withColumn().

The difference between .select() and .withColumn() methods is that .select() returns only the columns you specify, while .withColumn() returns all the columns of the DataFrame in addition to the one you defined. It‘s often a good idea to drop columns you don‘t need at the beginning of an operation so that you‘re not dragging around extra data as you‘re wrangling. In this case, you would use .select() and not .withColumn().

 

  • Select the columns tailnumorigin, and dest from flights by passing the column names as strings. Save this as selected1.
  • Select the columns origindest, and carrier using the df.colName syntax and then filter the result using both of the filters already defined for you (filterA and filterB) to only keep flights from SEA to PDX. Save this as selected2.
 
 
 

Selecting II

Similar to SQL, you can also use the .select() method to perform column-wise operations. When you‘re selecting a column using the df.colName notation, you can perform any column operation and the .select() method will return the transformed column. For example,

flights.select(flights.air_time/60)

returns a column of flight durations in hours instead of minutes. You can also use the .alias() method to rename a column you‘re selecting. So if you wanted to .select() the column duration_hrs (which isn‘t in your DataFrame) you could do

flights.select((flights.air_time/60).alias("duration_hrs"))

The equivalent Spark DataFrame method .selectExpr() takes SQL expressions as a string:

flights.selectExpr("air_time/60 as duration_hrs")

with the SQL as keyword being equivalent to the .alias() method. To select multiple columns, you can pass multiple strings.

 

 

# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
 
 
 

Aggregating

All of the common aggregation methods, like .min().max(), and .count() are GroupedData methods. These are created by calling the .groupBy() DataFrame method. You‘ll learn exactly what that means in a few exercises. For now, all you have to do to use these functions is call that method on your DataFrame. For example, to find the minimum value of a column, col, in a DataFrame, df, you could do

df.groupBy().min("col").show()

This creates a GroupedData object (so you can use the .min() method), then finds the minimum value in col, and returns it as a DataFrame.

Now you‘re ready to do some aggregating of your own!

 

  • Find the length of the shortest (in terms of distance) flight that left PDX by first .filter()ing and using the .min() method. Perform the filtering by referencing the column directly, not passing a SQL string.
  • Find the length of the longest (in terms of time) flight that left SEA by filter()ing and using the .max() method. Perform the filtering by referencing the column directly, not passing a SQL string.

 

 

# Find the shortest flight from PDX in terms of distance
flights.filter(flights.origin=="PDX").groupBy().min("distance").show()

# Find the longest flight from SEA in terms of air time
flights.filter(flights.origin=="SEA").groupBy().max("air_time").show()
 
 
 
 
  • Use the .avg() method to get the average air time of Delta Airlines flights (where the carrier column has the value "DL") that left SEA. The place of departure is stored in the column originshow() the result.
  • Use the .sum() method to get the total number of hours all planes in this dataset spent in the air by creating a column called duration_hrs from the column air_timeshow() the result.

 

# Average duration of Delta flights
flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupBy().avg("air_time").show()

# Total hours in the air
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()
 
group by 括号里面没有任何东西
 
 
 

Grouping and Aggregating I

Part of what makes aggregating so powerful is the addition of groups. PySpark has a whole class devoted to grouped data frames: pyspark.sql.GroupedData, which you saw in the last two exercises.

You‘ve learned how to create a grouped DataFrame by calling the .groupBy() method on a DataFrame with no arguments.

Now you‘ll see that when you pass the name of one or more columns in your DataFrame to the .groupBy() method, the aggregation methods behave like when you use a GROUP BY statement in a SQL query!

 

group by with no arguments create a grouped DataFrame by calling the .groupBy() method on a DataFrame with no arguments.

group by with when you pass the name of one or more columns in your DataFrame to the .groupBy() method, the aggregation methods behave like when you use a GROUP BY statement in a SQL query

 

  • Create a DataFrame called by_plane that is grouped by the column tailnum.
  • Use the .count() method with no arguments to count the number of flights each plane made.
  • Create a DataFrame called by_origin that is grouped by the column origin.
  • Find the .avg() of the air_time column to find average duration of flights from PDX and SEA.

 

# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show()

# Group by origin
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()
 
 

Grouping and Aggregating II

In addition to the GroupedData methods you‘ve already seen, there is also the .agg() method. This method lets you pass an aggregate column expression that uses any of the aggregate functions from the pyspark.sql.functions submodule.

This submodule contains many useful functions for computing things like standard deviations. All the aggregation functions in this submodule take the name of a column in a GroupedData table.

 

 

  • Import the submodule pyspark.sql.functions as F.
  • Create a GroupedData table called by_month_dest that‘s grouped by both the month and dest columns. Refer to the two columns by passing both strings as separate arguments.
  • Use the .avg() method on the by_month_dest DataFrame to get the average dep_delay in each month for each destination.
  • Find the standard deviation of dep_delay by using the .agg() method with the function F.stddev().
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy("month","dest")

# Average departure delay by month and destination
by_month_dest.avg("dep_delay").show()

# Standard deviation of departure delay
by_month_dest.agg(F.stddev("dep_delay")).show()
 
 
A join will combine two different tables along a column that they share. 
 
 
In PySpark, joins are performed using the DataFrame method .join(). This method takes three arguments. The first is the second DataFrame that you want to join with the first one. The second argument, on, is the name of the key column(s) as a string. The names of the key column(s) must be the same in each table. The third argument, how, specifies the kind of join to perform. In this course we‘ll always use the value how="leftouter".
 
# Examine the data
print(airports.show())

# Rename the faa column
airports = airports.withColumnRenamed("faa","dest")

# Join the DataFrames
flights_with_airports = flights.join(airports,on="dest",how="leftouter")

# Examine the new DataFrame
print(flights_with_airports.show())
 
 
*********************************************************************************************************************************************************
**************************machine learning pipelines

At the core of the pyspark.ml module are the Transformer and Estimator classes. Almost every other class in the module behaves similarly to these two basic classes.

Transformer classes have a .transform() method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class Bucketizer to create discrete bins from a continuous feature or the class PCA to reduce the dimensionality of your dataset using principal component analysis.

Estimator classes all implement a .fit() method. These methods also take a DataFrame, but instead of returning another DataFrame they return a model object. This can be something like a StringIndexerModel for including categorical data saved as strings in your models, or a RandomForestModel that uses the random forest algorithm for classification or regression.

 

df.withColumnRenamed(‘age‘, ‘age2‘).collect()
[Row(age2=2, name=‘Alice‘), Row(age2=5, name=‘Bob‘)]



  • First, rename the year column of planes to plane_year to avoid duplicate column names.
  • Create a new DataFrame called model_data by joining the flights table with planes using the tailnum column as the key.
# Rename year column
planes = planes.withColumnRenamed("year","plane_year")

# Join the DataFrames
model_data = flights.join(planes, on="tailnum", how="leftouter")

 

 

Data types

Good work! Before you get started modeling, it‘s important to know that Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called ‘doubles‘ in Spark).

When we imported our data, we let Spark guess what kind of information each column held. Unfortunately, Spark doesn‘t always guess right and you can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.

To remedy this, you can use the .cast() method in combination with the .withColumn() method. It‘s important to note that .cast() works on columns, while .withColumn() works on DataFrames.

The only argument you need to pass to .cast() is the kind of value you want to create, in string form. For example, to create integers, you‘ll pass the argument "integer" and for decimal numbers you‘ll use "double".

You can put this call to .cast() inside a call to .withColumn() to overwrite the already existing column, just like you did in the previous chapter!

 

String to integer

Now you‘ll use the .cast() method you learned in the previous exercise to convert all the appropriate columns from your DataFrame model_data to integers!

To convert the type of a column using the .cast() method, you can write code like this:

dataframe = dataframe.withColumn("col", dataframe.col.cast("new_type"))



dataframe = dataframe.withColumn("col", dataframe.col.cast("new_type"))


Use the method .withColumn() to .cast() the following columns to type "integer". Access the columns using the df.col notation:
  • model_data.arr_delay
  • model_data.air_time
  • model_data.month
  • model_data.plane_year
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay",model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year",model_data.plane_year.cast("integer"))
 
  • Create the column plane_age using the .withColumn() method and subtracting the year of manufacture (column plane_year) from the year (column year) of the flight.
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year-model_data.plane_year)
 
 


  • Use the .withColumn() method to create the column is_late. This column is equal to model_data.arr_delay > 0.
  • Convert this column to an integer column so that you can use it in your model and name it label (this is the default name for the response variable in Spark‘s machine learning routines).
  • Filter out missing values (this has been done for you).

 

# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay>0)

# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")
 
 


Strings and factors

As you know, Spark requires numeric data for modeling. So far this hasn‘t been an issue; even boolean columns can easily be converted to integers without any trouble. But you‘ll also be using the airline and the plane‘s destination as features in your model. These are coded as strings and there isn‘t any obvious way to convert them to a numeric data type.

Fortunately, PySpark has functions for handling this built into the pyspark.ml.features submodule. You can create what are called ‘one-hot vectors‘ to represent the carrier and the destination of each flight. A one-hot vector is a way of representing a categorical feature where every observation has a vector in which all elements are zero except for at most one element, which has a value of one (1).

Each element in the vector corresponds to a level of the feature, so it‘s possible to tell what the right level is by seeing which element of the vector is equal to one (1).

The first step to encoding your categorical feature is to create a StringIndexer. Members of this class are Estimators that take a DataFrame with a column of strings and map each unique string to a number. Then, the Estimator returns a Transformer that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column.

The second step is to encode this numeric column as a one-hot vector using a OneHotEncoder. This works exactly the same way as the StringIndexer by creating an Estimator and then a Transformer. The end result is a column that encodes your categorical feature as a vector that‘s suitable for machine learning routines!

This may seem complicated, but don‘t worry! All you have to remember is that you need to create a StringIndexer and a OneHotEncoder, and the Pipeline will take care of the rest.

 

Carrier

In this exercise you‘ll create a StringIndexer and a OneHotEncoder to code the carrier column. To do this, you‘ll call the class constructors with the arguments inputCol and outputCol.

The inputCol is the name of the column you want to index or encode, and the outputCol is the name of the new column that the Transformer should create.

Instructions
100 XP
  • Create a StringIndexer called carr_indexer by calling StringIndexer() with inputCol="carrier" and outputCol="carrier_index".
  • Create a OneHotEncoder called carr_encoder by calling OneHotEncoder() with inputCol="carrier_index" and outputCol="carrier_fact".
# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier",outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index",outputCol="carrier_fact")
 
 
  • Create a StringIndexer called dest_indexer by calling StringIndexer() with inputCol="dest" and outputCol="dest_index".
  • Create a OneHotEncoder called dest_encoder by calling OneHotEncoder() with inputCol="dest_index" and outputCol="dest_fact".
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="dest",outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index",outputCol="dest_fact")
 
 

Assemble a vector

The last step in the Pipeline is to combine all of the columns containing our features into a single column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. You can do this by storing each of the values from a column as an entry in a vector. Then, from the model‘s point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.

Because of this, the pyspark.ml.feature submodule contains a class called VectorAssembler. This Transformer takes all of the columns you specify and combines them into a new vector column.

 

# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")
 
  • Import Pipeline from pyspark.ml.
  • Call the Pipeline() constructor with the keyword argument stages to create a Pipeline called flights_pipe.
    • stages should be a list holding all the stages you want your data to go through in the pipeline. Here this is just: [dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler]
 
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])
 
 
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)
 
 
# Split the data into training and test sets
training, test = piped_data.randomSplit([0.6,0.4])
 
 
********************************************************************************************************************************************************
*****************chapter 4
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()
 
You‘ll be using cross validation to choose the hyperparameters by creating a grid of the possible pairs of values for the two hyperparameters, elasticNetParam and regParam, and using the cross validation error to compare all the different models so you can choose the best one!
 
 
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")
 

Make a grid

Next, you need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule pyspark.ml.tuning includes a class called ParamGridBuilder that does just that (maybe you‘re starting to notice a pattern here; PySpark has a submodule for just about everything!).

You‘ll need to use the .addGrid() and .build() methods to create a grid that you can use for cross validation. The .addGrid() method takes a model parameter (an attribute of the model Estimatorlr, that you created a few exercises ago) and a list of values that you want to try. The .build() method takes no arguments, it just returns the grid that you‘ll use later.

 

  • Import the submodule pyspark.ml.tuning under the alias tune.
  • Call the class constructor ParamGridBuilder() with no arguments. Save this as grid.
  • Call the .addGrid() method on grid with lr.regParam as the first argument and np.arange(0, .1, .01) as the second argument. This second call is a function from the numpy module (imported as np) that creates a list of numbers from 0 to .1, incrementing by .01. Overwrite grid with the result.
  • Update grid again by calling the .addGrid() method a second time create a grid for lr.elasticNetParam that includes only the values [0, 1].
  • Call the .build() method on grid and overwrite it with the output.

 

# Import the tuning submodule
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam,[0,1])

# Build the grid
grid = grid.build()
 
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )
 

Fit the model(s)

You‘re finally ready to fit the models and select the best one!

Unfortunately, cross validation is a very computationally intensive procedure. Fitting all the models would take too long on DataCamp.

To do this locally you would use the code:

# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

Remember, the training data is called training and you‘re using lr to fit a logistic regression model. Cross validation selected the parameter values regParam=0 and elasticNetParam=0 as being the best. These are the default values, so you don‘t need to do anything else with lr before fitting the model.

 

# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))
 
 
 
 



 
 
 

 

pyspark

原文:https://www.cnblogs.com/lqbzsmdy/p/14791775.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!