Skip to content

LibSVM

Overview

The LibSVM project defines a text-based data format for representing numeric datasets.

Apache Spark uses the LibSVM data format to enable "split pipeline" workflows, where data pre-processing and modeling happen in separate pipelines.

The LibSVM data format specializes in storing data pre-processing results. It outperforms the alternatives as data size and sparsity increase (eg. high-cardinality categorical features, TF-IDF features).

However, a dataset in LibSVM data format should be regarded as a short-lived artifact for internal consumption. It needs backing by code or documentation to explain its contents. A casual observer cannot independently tell which raw features existed in the original dataset, or whether (and how) they were transformed.

This opacity carries over to trained models, where it poses a serious challenge when converting them to the PMML representation.

The PMML converter processes the core model object easily, because it is independent of the data format. The confusion arises when wrapping this model with an appropriate data input and output interface (ie. model schema). The fallback assumes dummy features like <featuresCol>[<index>] .

The PMML converter can be assisted by supplying an ad-hoc pipeline model instead, which prepends a compatible data pre-processor to the model:

import org.apache.spark.ml.{PipelineModel, PipelineStage}

val preProcPipelineModel: PipelineModel = ???
val libsvmPipelineModel: PipelineModel = ???

val pmmlStages: Array[PipelineStage] = preProcPipelineModel.stages ++ libsvmPipelineModel.stages

// Create a pipeline model from pre-fitted pipeline stages
val pmmlPipelineModel = PipelineModelFactory.create("pmmlPipelineModel", pmmlStages)

Dataset

The LibSVM reader loads and aggregates all features into a singular VectorUDT column in one pass.

Loading a LibSVM dataset from file:

val df = spark.read
    .format("libsvm")
    .option("numFeatures", "4")
    .option("vectorType", "dense")
    .load("Iris.libsvm")

df.printSchema()

Be sure to configure the expected vector size and sparsity to achieve maximal performance.

Workflow

A LibSVM dataset has exactly two columns, label and features , which makes it compatible with any Apache Spark predictor as-is:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression

val classifier = new LogisticRegression()

val libsvmPipeline = new Pipeline()
    .setStages(Array(classifier))

val libsvmPipelineModel = libsvmPipeline.fit(df)

Historically, the JPMML-SparkML library has rejected pipeline models that begin with a VectorUDT column.

Bypass this limitation using the "split pipeline" workflow approach.

First, create a data schema that lists all fields available for reading or writing in the deployment environment. Convey as much detail about each field as possible.

Second, create a data pre-processing pipeline that implements data flows between this new data schema and the reference LibSVM data schema.

The minimal implementation consists of a VectorAssembler stage that shows which scalar columns and in which order combine into the VectorUDT column. The PMML converter uses this information to resolve vector indices to data schema fields, a prerequisite for generating correctly named and typed DataField elements.

Emulating the label field:

import org.apache.spark.ml.feature.StringIndexerModel
import org.apache.spark.sql.types.{StringType, StructType}

val irisLabelSchema = new StructType()
    .add("Species", StringType, nullable = false)

val speciesIndexerModel = new StringIndexerModel("speciesIndexerModel", Array("setosa", "versicolor", "virginica"))
    .setInputCol("Species")
    .setOutputCol("label")

Emulating the features field:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.types.{DoubleType, StructType}

val irisFeaturesSchema = new StructType()
    .add("Sepal.Length", DoubleType, nullable = false)
    .add("Sepal.Width", DoubleType, nullable = false)
    .add("Petal.Length", DoubleType, nullable = false)
    .add("Petal.Width", DoubleType, nullable = false)

val featuresAssembler = new VectorAssembler("featuresAssembler")
    .setInputCols(Array("Sepal.Length", "Sepal.Width", "Petal.Length", "Petal.Width"))
    .setOutputCol("features")

The most challenging operation in Scala (and Java) is creating the final pipeline model object, because the PipelineModel(Array[PipelineStage]) constructor is private and thus only invokable using reflection. PySpark has no such obstacle.

Final assembly:

import org.apache.spark.ml.{PipelineModel, Transformer}
import org.apache.spark.sql.types.StructType

// Create new data schema, completely ignoring the LibSVM data schema
val irisSchema = new StructType(irisLabelSchema.fields ++ irisFeaturesSchema.fields)

// Create new pipeline model, by prepending new pipeline stages to LibSVM pipeline stages
val irisStages: Array[Transformer] = Array(speciesIndexerModel, featuresAssembler) ++ libsvmPipelineModel.stages

val irisPipelineModel = PipelineModelFactory.create("irisPipelineModel", irisStages)

Export to PMML

Convert Apache Spark pipeline models to PMML using the JPMML-SparkML library.

import org.jpmml.sparkml.PMMLBuilder
import org.jpmml.sparkml.model.HasPredictionModelOptions

val irisPmmlBuilder = new PMMLBuilder(irisSchema, irisPipelineModel)
    // Suppress redundant output fields
    .putOption(HasPredictionModelOptions.OPTION_KEEP_PREDICTIONCOL, false)

println(irisPmmlBuilder.buildString)

Resources