In Python, PySpark offers an API dedicated to machine learning, known as mllib. The mllib component of PySpark accommodates a variety of machine learning algorithms, encompassing classification, regression, clustering, collaborative filtering, and dimensionality reduction, along with essential optimization primitives.
PySpark MLlib - Machine Learning Concepts and Algorithms
The following are several machine learning concepts implemented in Python:
Classification
The pyspark.mllib library provides support for various classification techniques, including binary classification, multiclass classification, and regression analysis. Each object may be associated with a distinct class. The main aim of classification is to categorize data according to specific characteristics. Among the most effective algorithms utilized in classification are Random Forest, Naive Bayes, and Decision Tree.
Clustering
Clustering represents a challenge within unsupervised machine learning. This approach is employed when the classification of data is unknown; in such cases, we depend on the algorithm to identify patterns and categorize the data appropriately. Among the well-known algorithms for clustering are K-means clustering, Gaussian mixture models, and Hierarchical clustering methods.
FPM stands for frequent pattern matching, a technique employed for the extraction of various items, itemsets, subsequences, or other substructures. This method is primarily utilized in the context of extensive datasets.
linalg
The mllib.linalg library provides functionalities for performing linear algebra operations.
Recommendation
It serves to specify the pertinent information necessary for generating a recommendation. This system can anticipate future preferences and suggest the most suitable items. For instance, the online streaming service Netflix boasts an extensive library of films, and users often struggle to choose their preferred selections. This is precisely where recommendations become essential.
mllib regression
Regression analysis is employed to identify the connections and dependencies among various variables. It determines the correlation between each data feature and forecasts future values based on these relationships.
The mllib package encompasses a variety of additional algorithms, classes, and functions. In this section, we will explore the fundamental principles of pyspark.mllib.
MLlib Features
The PySpark mllib is useful for iterative algorithms. The features are the following:
- Extraction: It extracts features from "row" data.
- Transformation: It is used for scaling, converting, or modifying features.
- Selection: Selecting a useful subset from a larger set of features.
- Locality Sensitive Hashing: It combines aspects of feature transformation with other algorithms.
Let us explore the fundamental libraries within PySpark MLlib.
MLlib Linear Regression
Linear regression serves the purpose of identifying the relationship and interdependencies among variables. Take a look at the code snippet below:
frompyspark.sql import SparkSession
spark = SparkSession.builder.appName('Customer').getOrCreate()
frompyspark.ml.regression import LinearRegression
dataset = spark.read.csv(r'C:\Users\DEVANSH SHARMA\Ecommerce-Customers.csv')
dataset.show(10)
Output:
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
| _c0| _c1| _c2| _c3| _c4| _c5| _c6| _c7|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
| Email| Address| Avatar|Avg Session Length| Time on App| Time on Website|Length of Membership|Yearly Amount Spent|
|mstephenson@ferna...|835 Frank TunnelW...| Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616| 4.0826206329529615| 587.9510539684005|
|[email protected]|4547 Archer Commo...| DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744| 2.66403418213262| 392.2049334443264|
|[email protected]|24645 Valerie Uni...| Bisque|33.000914755642675|11.330278057777512|37.110597442120856| 4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...| SaddleBrown| 34.30555662975554|13.717513665142507| 36.72128267790313| 3.120178782748092| 581.8523440352177|
|mstephens@davidso...|14023 Rodriguez P...|MediumAquaMarine| 33.33067252364639|12.795188551078114| 37.53665330059473| 4.446308318351434| 599.4060920457634|
|alvareznancy@luca...|645 Martha Park A...| FloralWhite|33.871037879341976|12.026925339755056| 34.47687762925054| 5.493507201364199| 637.102447915074|
|katherine20@yahoo...|68388 Reyes Light...| DarkSlateBlue| 32.02159550138701|11.366348309710526| 36.68377615286961| 4.685017246570912| 521.5721747578274|
|[email protected]|Unit 6538 Box 898...| Aqua|32.739142938380326| 12.35195897300293| 37.37335885854755| 4.4342734348999375| 549.9041461052942|
|vchurch@walter-ma...|860 Lee KeyWest D...| Salmon| 33.98777289568564|13.386235275676436|37.534497341555735| 3.2734335777477144| 570.2004089636196|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
only showing top 10 rows
In the code snippet provided, we are utilizing the VectorAssembler library to generate a new column designated as the Independent feature:
frompyspark.ml.linalg import Vectors
frompyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols = ["Avg Session Length","Time on App","Time on Website"],outputCol = "Independent Features")
output = featureassembler.transform(dataset)
output.show()
Output:
+------------------+
Independent Feature
+------------------+
|34.49726772511229 |
|31.92627202636016 |
|33.000914755642675|
|34.30555662975554 |
|33.33067252364639 |
|33.871037879341976|
|32.02159550138701 |
|32.739142938380326|
|33.98777289568564 |
+------------------+
z = featureassembler.transform(dataset)
finlized_data = z.select("Indepenent feature", "Yearly Amount Spent",)
z.show()
Output:
+--------------------++-------------------+
|Independent Feature | Yearly Amount Spent|
+--------------------++-------------------+
|34.49726772511229 | 587.9510539684005 |
|31.92627202636016 | 392.2049334443264 |
|33.000914755642675 | 487.5475048674720 |
|34.30555662975554 | 581.8523440352177 |
|33.33067252364639 | 599.4060920457634 |
|33.871037879341976 | 637.102447915074 |
|32.02159550138701 | 521.5721747578274 |
|32.739142938380326 | 549.9041461052942 |
|33.98777289568564 | 570.2004089636196 |
+--------------------++-------------------+
PySpark offers the LinearRegression function to determine predictions for a specified dataset. The syntax is outlined as follows:
regressor = LinearRegression(featureCol = 'column_name1', labelCol = 'column_name2 ')
MLlib K- Mean Cluster
The K-Means clustering algorithm is among the most widely utilized methods in data analysis. It serves the purpose of grouping data points into a set number of clusters that the user specifies beforehand. The example provided below demonstrates the application of the MLlib K-Means clustering library:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Loads data.
dataset = spark.read.format("libsvm").load(r"C:\Users\DEVANSH SHARMA\Iris.csv")
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
Parameters of PySpark MLlib
The few important parameters of PySpark MLlib are given below:
- Ratings: It is RDD of Ratings or (userID, productID, rating) tuple.
- Rank: It represents Rank of the computed feature matrices (number of features).
- Iterations: It represents the number of iterations of ALS. (default: 5)
- Lambda: It is the Regularization parameter. (default : 0.01)
- Blocks: It is used to parallelize the computation of some number of blocks.
Collaborative Filtering (mllib.recommendation)
Collaborative filtering is a methodology commonly employed in recommender systems. This approach aims to populate the absent entries within a user-item association matrix. The spark.ml library at present offers support for model-based collaborative filtering. Within the realm of collaborative filtering, both users and items are characterized by a limited set of latent factors, which can be utilized to forecast the missing entries.
Scaling of the regularization parameter
The regularization parameter, denoted as regParam, is adjusted to address the least-squares issue. This least-squares problem arises when the count of ratings is generated by users while updating user factors, or when considering the number of ratings obtained by a product during the update of product factors.
Cold-start strategy
The ALS Model, which stands for Alternative Least Square Model, is employed for making predictions in the context of a typical prediction challenge. One significant issue arises when users or items included in the test dataset were absent during the model's training phase. This situation can manifest in two distinct scenarios outlined below:
- When making predictions, the model lacks training for users and items that do not have any prior rating history, a situation referred to as a cold-start strategy.
- During the process of cross-validation, the data is divided into training and evaluation sets. It is quite common to find users and items in the evaluation set that do not appear in the training set.
Let us examine the subsequent example in which we import ratings data from the MovieLens dataset. Each entry consists of a user, a movie, a rating, and a timestamp.
#importing the libraries
frompyspark.ml.evaluation import RegressionEvaluator
frompyspark.ml.recommendation import ALS
frompyspark.sql import Row
no_of_lines = spark.read.text(r"C:\Users\DEVANSH SHARMA\MovieLens.csv").rdd
no_of_parts = no_of_lines.map(lambda row: row.value.split("::"))
ratingsRDD = no_of_lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=long(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])
# Develop the recommendation model using ALS on the training data
# Note we set cold start strategy to make sure that we don't get NaN evaluation metrics.
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)
# Calculate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
# Evaluate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Evaluate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Evaluate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Evalute top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)