Apache Spark SQL и машинное обучение для классификаций генетических вариантов

Джордж Джен, Jen Tek LLC

Введение

Covid-19 вызвал интерес к биоинформатике, науке о данных, связанной со сбором и анализом сложных биологических данных, таких как генетические коды и последовательности, сочетание математики и биологии.

Контекст

ClinVar — общедоступный ресурс, содержащий аннотации о генетических вариантах человека. Подробности можно найти на сайте Национального института здравоохранения.

https://www.ncbi.nlm.nih.gov/clinvar/

Эти варианты обычно вручную классифицируются клиническими лабораториями по категориальному спектру, начиная от доброкачественных, вероятно доброкачественных, неопределенного значения, вероятно патогенных и патогенных. Варианты, которые имеют противоречивые классификации (от лаборатории к лаборатории), могут вызвать путаницу, когда клиницисты или исследователи пытаются интерпретировать, влияет ли вариант на заболевание данного пациента.

Цель

Цель состоит в том, чтобы предсказать, будет ли вариант ClinVar иметь конфликтующие классификации. Это представлено как проблема бинарной классификации, каждая запись в наборе данных является генетическим вариантом.

Конфликтующие классификации - это когда две из любой из следующих трех категорий присутствуют для одного варианта, две заявки одной категории не считаются конфликтующими.

· Вероятная доброкачественная или доброкачественная

· VUS (изменение неопределенной значимости. Изменение в генетической последовательности, для которого связь с риском заболевания неясна)

· Вероятно патогенный или патогенный

Метка класса

Столбцу CLASS присвоена конфликтующая классификация. Это двоичное представление того, имеет ли вариант конфликтующие классификации, где 0 представляет согласованные классификации, а 1 представляет конфликтующие классификации.

Набор данных генетических вариантов является общественным достоянием, доступным на Kaggle.

https://www.kaggle.com/kevinarvai/clinvar-conflicting

Шаги проекта машинного обучения

Получение данных

Загрузите файл csv с Kaggle, поместите в папку на узле драйвера Apache Spark:

https://www.kaggle.com/kevinarvai/clinvar-conflicting?select=clinvar_conflicting.csv

Загрузите файл csv в Spark SQL DataFrame.

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._
Logger.getLogger("org").setLevel(Level.ERROR)
 
val spark = SparkSession
    .builder
    .appName("genetic classifier")
    .master("local[*]")
    .config("spark.sql.warehouse.dir", "file:///tmp")
  .getOrCreate()
/*
Load the csv file into SparkSQL DataFrame, with options:
inferSchema=true, set the datatype of the feature columns based upon value data types
header=true, set the DataFrame column names from the headers of the csv file.
*/ 
val ds = spark.read.format("csv").option("inferSchema", "true").option("header", "true")
  .option("quote", "\"")
    .load("file:///home/bigdata2/dataset/clinvar_conflicting.csv")
val df: DataFrame = ds.toDF()

Исследование данных:

df.show(3)
|CHROM|    POS|REF|ALT|AF_ESP|AF_EXAC|AF_TGP|            CLNDISDB|CLNDISDBINCL|               CLNDN|CLNDNINCL|             CLNHGVS|CLNSIGINCL|               CLNVC|               CLNVI|                  MC|ORIGIN|SSR|CLASS|Allele|     Consequence|  IMPACT| SYMBOL|Feature_type|       Feature|       BIOTYPE|EXON|INTRON|cDNA_position|CDS_position|Protein_position|Amino_acids| Codons|DISTANCE|STRAND|BAM_EDIT|                SIFT|         PolyPhen|MOTIF_NAME|MOTIF_POS|HIGH_INF_POS|MOTIF_SCORE_CHANGE|LoFtool|CADD_PHRED| CADD_RAW|BLOSUM62|
|    1|1168180|  G|  C|0.0771| 0.1002|0.1066|     MedGen:CN169374|         nan|       not_specified|      nan|NC_000001.10:g.11...|       nan|single_nucleotide...|UniProtKB_(protei...|SO:0001583|missen...|     1|nan|    0|     C|missense_variant|MODERATE|B3GALT6|  Transcript|   NM_080605.3|protein_coding| 1/1|  null|          552|         522|             174|        E/D|gaG/gaC|    null|     1|    null|           tolerated|           benign|      null|     null|        null|              null|   null|     1.053|-0.208682|       2|
|    1|1470752|  G|  A|   0.0|    0.0|   0.0|MedGen:C1843891,O...|         nan|Spinocerebellar_a...|      nan|NC_000001.10:g.14...|       nan|single_nucleotide...|OMIM_Allelic_Vari...|SO:0001583|missen...|     1|nan|    0|     A|missense_variant|MODERATE|TMEM240|  Transcript|NM_001114748.1|protein_coding| 4/4|  null|          523|         509|             170|        P/L|cCg/cTg|    null|    -1|      OK|deleterious_low_c...|           benign|      null|     null|        null|              null|   null|      31.0| 6.517838|      -3|
|    1|1737942|  A|  G|   0.0| 1.0E-5|   0.0|Human_Phenotype_O...|         nan|Strabismus|Nystag...|      nan|NC_000001.10:g.17...|       nan|single_nucleotide...|OMIM_Allelic_Vari...|SO:0001583|missen...|    35|nan|    1|     G|missense_variant|MODERATE|   GNB1|  Transcript|   NM_002074.4|protein_coding|6/12|  null|          632|         239|              80|        I/T|aTc/aCc|    null|    -1|      OK|         deleterious|probably_damaging|      null|     null|        null|              null|   null|      28.1| 6.061752|      -1|
 
only showing top 3 rows

Обратите внимание, что в наборе данных много пропущенных значений, т. е. нулевых значений. Набор данных со значительными пропущенными значениями обычно влияет на точность обучения и прогнозирования.

Некоторые значения столбца большие, например 6 цифр, некоторые значения столбца меньше 1. Без адресации это может сбить с толку алгоритм ML, который может рассматривать некоторые небольшие значения как статистически незначимые и игнорировать результат. Это можно решить при предварительной обработке данных с помощью скейлера. Я планирую использовать MinMaxScaler, который сделает значения всех столбцов функций между 0 и 1.

Оценить схему:

df.printSchema
root
 |-- CHROM: string (nullable = true)
 |-- POS: integer (nullable = true)
 |-- REF: string (nullable = true)
 |-- ALT: string (nullable = true)
 |-- AF_ESP: double (nullable = true)
 |-- AF_EXAC: double (nullable = true)
 |-- AF_TGP: double (nullable = true)
 |-- CLNDISDB: string (nullable = true)
 |-- CLNDISDBINCL: string (nullable = true)
 |-- CLNDN: string (nullable = true)
 |-- CLNDNINCL: string (nullable = true)
 |-- CLNHGVS: string (nullable = true)
 |-- CLNSIGINCL: string (nullable = true)
 |-- CLNVC: string (nullable = true)
 |-- CLNVI: string (nullable = true)
 |-- MC: string (nullable = true)
 |-- ORIGIN: integer (nullable = true)
 |-- SSR: string (nullable = true)
 |-- CLASS: integer (nullable = true)
 |-- Allele: string (nullable = true)
 |-- Consequence: string (nullable = true)
 |-- IMPACT: string (nullable = true)
 |-- SYMBOL: string (nullable = true)
 |-- Feature_type: string (nullable = true)
 |-- Feature: string (nullable = true)
 |-- BIOTYPE: string (nullable = true)
 |-- EXON: string (nullable = true)
 |-- INTRON: string (nullable = true)
 |-- cDNA_position: string (nullable = true)
 |-- CDS_position: string (nullable = true)
 |-- Protein_position: string (nullable = true)
 |-- Amino_acids: string (nullable = true)
 |-- Codons: string (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- STRAND: integer (nullable = true)
 |-- BAM_EDIT: string (nullable = true)
 |-- SIFT: string (nullable = true)
 |-- PolyPhen: string (nullable = true)
 |-- MOTIF_NAME: string (nullable = true)
 |-- MOTIF_POS: integer (nullable = true)
 |-- HIGH_INF_POS: string (nullable = true)
 |-- MOTIF_SCORE_CHANGE: double (nullable = true)
 |-- LoFtool: double (nullable = true)
 |-- CADD_PHRED: double (nullable = true)
 |-- CADD_RAW: double (nullable = true)
 |-- BLOSUM62: integer (nullable = true)

DataFrame имеет 46 столбцов, один столбец CLASS в качестве цели/метки и 45 столбцов в качестве функций.

Некоторые из них представляют собой строковые типы данных, которые необходимо закодировать в числовой тип данных, чтобы алгоритм ML мог их обрабатывать.

Распределение количества строк КЛАССА 1 (генетически конфликтующие) и 0 (генетически непротиворечивые) выглядит следующим образом:

df.groupBy("CLASS").count().show()
+-----+-----+
|CLASS|count|
+-----+-----+
|    1|16434|
|    0|48754|
+-----+-----+

Предварительная обработка данных

НУЛЕВАЯ замена

Замените нулевые значения в столбцах String на «ABCD» и 0 в числовых столбцах.

Закодируйте значение String в целое число, используя StringIndexer

Перестройте DataFrame после кодирования, первый столбец результирующего DataFrame — это CLASS, метка.

import org.apache.spark.ml.feature.StringIndexer
var df1=df.select("CLASS")
val featureDF=df.drop("CLASS")
val typeArray=featureDF.dtypes
for (i<-0 until featureDF.columns.size)
  {
    if (typeArray(i)._2=="StringType")
      {
          var indexer = new StringIndexer().setInputCol(typeArray(i)._1).setOutputCol(typeArray(i)._1+"Index")
          var indexed = indexer.fit(featureDF.na.fill("ABCD").select(typeArray(i)._1)).transform(featureDF.na.fill("ABCD").select(typeArray(i)._1)) 
          var temp=indexed.select(typeArray(i)._1+"index")
          df1=df1.withColumn("id", monotonically_increasing_id())
          .join(temp.withColumn("id", monotonically_increasing_id()), Seq("id"))
      }
      else
      {
          var temp=featureDF.na.fill(0).select(typeArray(i)._1)
          df1=df1.withColumn("id", monotonically_increasing_id()).join(temp.withColumn("id", monotonically_increasing_id()), Seq("id"))
      }
  }
df1=df1.drop("id")
df1.printSchema
root
 |-- CLASS: integer (nullable = true)
 |-- CHROMindex: double (nullable = false)
 |-- POS: integer (nullable = false)
 |-- REFindex: double (nullable = false)
 |-- ALTindex: double (nullable = false)
 |-- AF_ESP: double (nullable = false)
 |-- AF_EXAC: double (nullable = false)
 |-- AF_TGP: double (nullable = false)
 |-- CLNDISDBindex: double (nullable = false)
 |-- CLNDISDBINCLindex: double (nullable = false)
 |-- CLNDNindex: double (nullable = false)
 |-- CLNDNINCLindex: double (nullable = false)
 |-- CLNHGVSindex: double (nullable = false)
 |-- CLNSIGINCLindex: double (nullable = false)
 |-- CLNVCindex: double (nullable = false)
 |-- CLNVIindex: double (nullable = false)
 |-- MCindex: double (nullable = false)
 |-- ORIGIN: integer (nullable = false)
 |-- SSRindex: double (nullable = false)
 |-- Alleleindex: double (nullable = false)
 |-- Consequenceindex: double (nullable = false)
 |-- IMPACTindex: double (nullable = false)
 |-- SYMBOLindex: double (nullable = false)
 |-- Feature_typeindex: double (nullable = false)
 |-- Featureindex: double (nullable = false)
 |-- BIOTYPEindex: double (nullable = false)
 |-- EXONindex: double (nullable = false)
 |-- INTRONindex: double (nullable = false)
 |-- cDNA_positionindex: double (nullable = false)
 |-- CDS_positionindex: double (nullable = false)
 |-- Protein_positionindex: double (nullable = false)
 |-- Amino_acidsindex: double (nullable = false)
 |-- Codonsindex: double (nullable = false)
 |-- DISTANCE: integer (nullable = false)
 |-- STRAND: integer (nullable = false)
 |-- BAM_EDITindex: double (nullable = false)
 |-- SIFTindex: double (nullable = false)
 |-- PolyPhenindex: double (nullable = false)
 |-- MOTIF_NAMEindex: double (nullable = false)
 |-- MOTIF_POS: integer (nullable = false)
 |-- HIGH_INF_POSindex: double (nullable = false)
 |-- MOTIF_SCORE_CHANGE: double (nullable = false)
 |-- LoFtool: double (nullable = false)
 |-- CADD_PHRED: double (nullable = false)
 |-- CADD_RAW: double (nullable = false)
 |-- BLOSUM62: integer (nullable = false)

Сделать все целочисленные столбцы двойными

import org.apache.spark.sql.types._
val newDf = df1.select(df1.columns.map(c => col(c).cast(DoubleType)) : _*)
newDf.printSchema
root
 |-- CLASS: double (nullable = true)
 |-- CHROMindex: double (nullable = false)
 |-- POS: double (nullable = false)
 |-- REFindex: double (nullable = false)
 |-- ALTindex: double (nullable = false)
 |-- AF_ESP: double (nullable = false)
 |-- AF_EXAC: double (nullable = false)
 |-- AF_TGP: double (nullable = false)
 |-- CLNDISDBindex: double (nullable = false)
 |-- CLNDISDBINCLindex: double (nullable = false)
 |-- CLNDNindex: double (nullable = false)
 |-- CLNDNINCLindex: double (nullable = false)
 |-- CLNHGVSindex: double (nullable = false)
 |-- CLNSIGINCLindex: double (nullable = false)
 |-- CLNVCindex: double (nullable = false)
 |-- CLNVIindex: double (nullable = false)
 |-- MCindex: double (nullable = false)
 |-- ORIGIN: double (nullable = false)
 |-- SSRindex: double (nullable = false)
 |-- Alleleindex: double (nullable = false)
 |-- Consequenceindex: double (nullable = false)
 |-- IMPACTindex: double (nullable = false)
 |-- SYMBOLindex: double (nullable = false)
 |-- Feature_typeindex: double (nullable = false)
 |-- Featureindex: double (nullable = false)
 |-- BIOTYPEindex: double (nullable = false)
 |-- EXONindex: double (nullable = false)
 |-- INTRONindex: double (nullable = false)
 |-- cDNA_positionindex: double (nullable = false)
 |-- CDS_positionindex: double (nullable = false)
 |-- Protein_positionindex: double (nullable = false)
 |-- Amino_acidsindex: double (nullable = false)
 |-- Codonsindex: double (nullable = false)
 |-- DISTANCE: double (nullable = false)
 |-- STRAND: double (nullable = false)
 |-- BAM_EDITindex: double (nullable = false)
 |-- SIFTindex: double (nullable = false)
 |-- PolyPhenindex: double (nullable = false)
 |-- MOTIF_NAMEindex: double (nullable = false)
 |-- MOTIF_POS: double (nullable = false)
 |-- HIGH_INF_POSindex: double (nullable = false)
 |-- MOTIF_SCORE_CHANGE: double (nullable = false)
 |-- LoFtool: double (nullable = false)
 |-- CADD_PHRED: double (nullable = false)
 |-- CADD_RAW: double (nullable = false)
 |-- BLOSUM62: double (nullable = false)

Векторизация признаков

Apache Spark ML требует, чтобы данные объектов были в форме вектора, т. Е. Поместите все столбцы объектов в один вектор, следующий код предназначен для создания столбца, который представляет собой вектор, содержащий все столбцы объектов.

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val assembler = new VectorAssembler()
.setInputCols(Array("CHROMindex", "POS", "REFindex", "ALTindex", "AF_ESP", "AF_EXAC", "AF_TGP", "CLNDISDBindex"
        , "CLNDISDBINCLindex", "CLNDNindex", "CLNDNINCLindex", "CLNHGVSindex", "CLNSIGINCLindex", "CLNVCindex"
        , "CLNVIindex", "MCindex", "ORIGIN", "SSRindex", "Alleleindex", "Consequenceindex", "IMPACTindex"
        , "SYMBOLindex", "Feature_typeindex", "Featureindex", "BIOTYPEindex", "EXONindex", "INTRONindex"
        , "cDNA_positionindex", "CDS_positionindex", "Protein_positionindex", "Amino_acidsindex", "Codonsindex"
        , "DISTANCE", "STRAND", "BAM_EDITindex", "SIFTindex", "PolyPhenindex", "MOTIF_NAMEindex", "MOTIF_POS"
        , "HIGH_INF_POSindex", "MOTIF_SCORE_CHANGE", "LoFtool", "CADD_PHRED", "CADD_RAW", "BLOSUM62"))
.setOutputCol("features")
val output = assembler.transform(newDf)
Resultant output DataFrame now has a new column features that is the vector containing all feature columns.
output.select("features").show(2,false)
 
|features                                                                                                                                                                                                                     
|(45,[0,1,2,3,4,5,6,11,14,16,18,21,23,25,27,28,29,30,31,33,35,36,42,43,44],[3.0,1168180.0,1.0,3.0,0.0771,0.1002,0.1066,326.0,27451.0,1.0,3.0,1614.0,1728.0,15.0,618.0,471.0,304.0,56.0,129.0,1.0,2.0,1.0,1.053,-0.208682,2.0])|
|(45,[0,1,2,3,7,9,11,14,16,18,21,23,25,27,28,29,30,31,33,34,35,36,42,43,44],[3.0,1470752.0,1.0,1.0,7922.0,8822.0,467.0,25430.0,1.0,1.0,2293.0,2054.0,9.0,1534.0,1465.0,116.0,13.0,17.0,-1.0,1.0,4.0,1.0,31.0,6.517838,-3.0])  |

Извлечение из выходного DataFrame в новый DataFrame, который содержит только функции векторного столбца и столбец меток CLASS.

val whole=output.select("features","CLASS")
whole.printSchema
root
 |-- features: vector (nullable = true)
 |-- CLASS: double (nullable = true)

Нормализация значений функций в функциях векторного столбца с использованием MinMax Scaler

import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(whole)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(whole)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show(2,false)
Features scaled to range: [0.0, 1.0]
|features                                                                                                                                                                                                                     |scaledFeatures                                                                                                                                                                                                               
|(45,[0,1,2,3,4,5,6,11,14,16,18,21,23,25,27,28,29,30,31,33,35,36,42,43,44],[3.0,1168180.0,1.0,3.0,0.0771,0.1002,0.1066,326.0,27451.0,1.0,3.0,1614.0,1728.0,15.0,618.0,471.0,304.0,56.0,129.0,1.0,2.0,1.0,1.053,-0.208682,2.0])|(45,[0,1,2,3,4,5,6,11,14,16,18,21,23,25,27,28,29,30,31,33,35,36,40,42,43,44],[0.13043478260869565,0.004713998164155383,0.0011560693641618498,0.006564551422319475,0.15450901803607214,0.2004440977014943,0.21328531412565024,0.005000997131329867,0.9926592897953279,0.001949317738791423,0.00804289544235925,0.6932989690721649,0.729421696918531,0.004595588235294118,0.044237652111667865,0.03447266339749689,0.041422537130399235,0.044374009508716325,0.05810810810810811,1.0,0.5,0.25,0.9999999999999999,0.010636363636363637,0.10125579884341004,0.8333333333333333])|
|(45,[0,1,2,3,7,9,11,14,16,18,21,23,25,27,28,29,30,31,33,34,35,36,42,43,44],[3.0,1470752.0,1.0,1.0,7922.0,8822.0,467.0,25430.0,1.0,1.0,2293.0,2054.0,9.0,1534.0,1465.0,116.0,13.0,17.0,-1.0,1.0,4.0,1.0,31.0,6.517838,-3.0])  |(45,[0,1,2,3,7,9,11,14,16,18,21,23,25,27,28,29,30,31,34,35,36,40,42,43],[0.13043478260869565,0.0059359829438109775,0.0011560693641618498,0.002188183807439825,0.8580093144156828,0.9528026784749973,0.007164005093040024,0.9195776379547261,0.001949317738791423,0.002680965147453083,0.9849656357388316,0.8670325031658928,0.0027573529411764703,0.1098067287043665,0.10722388933616336,0.015805968115547075,0.010301109350237718,0.0076576576576576575,0.5,1.0,0.25,0.9999999999999999,0.31313131313131315,0.2305282934974466])                                           |

Train/Test Split, случайное разделение 70% строк для обучения, 30% для теста

// Prepare training and test data.
val wholeScaledData=scaledData.select("scaledFeatures","CLASS")
val Array(training, test) = wholeScaledData.randomSplit(Array(0.7, 0.3), seed = 12345)

Обучение и прогнозирование

Выбор алгоритма

Поскольку это бинарная классификация, я планирую использовать многослойный классификатор персептрона и логистическую регрессию.

Многослойный классификатор Perceptron, есть 45 столбцов признаков и бинарная классификация, я строю нейронную сеть из 4 слоев, как показано ниже:

val layers = Array[Int](45, 45, 45, 2)
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val trainer = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100)
  .setFeaturesCol("scaledFeatures")
  .setLabelCol("CLASS")

Обучить и протестировать нейронную сеть

val model=trainer.fit(training.select("scaledFeatures","CLASS"))
and Test
val result = model.transform(test)
Evaluate accuracy metrics
val predictionAndLabels = result.select("prediction", "CLASS")
val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy").setLabelCol("CLASS")
println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")
Test set accuracy = 0.7482470955524848

Классификатор логистической регрессии

import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import scala.collection.mutable
val maxIter=100
val lr = new LogisticRegression()
      .setFeaturesCol("scaledFeatures")
      .setLabelCol("CLASS")
      .setMaxIter(maxIter)
val model_lr = lr.fit(training)
val prediction_lr = model_lr.transform(test)
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val my_mc_accu = new MulticlassClassificationEvaluator().setPredictionCol("prediction").setLabelCol("CLASS").setMetricName("accuracy")
my_mc_accu.evaluate(prediction_lr)
my_mc_accu: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_74ebac4de5a0
res29: Double = 0.7463534469522494

Забрать:

Как многоуровневый классификатор персептрона, так и логистическая регрессия обеспечивают одинаковую точность прогнозирования около 75%, учитывая, что набор данных имеет значительное количество пропущенных значений (нулевые значения). Чтобы повысить производительность машинного обучения, может помочь уменьшение пропущенных значений и возможное добавление дополнительных столбцов функций.

Как всегда, код, используемый в этом письме, находится в моем репозитории GitHub:

https://github.com/geyungjen/jentekllc

Спасибо, что уделили время просмотру этого письма.