Apache Spark SQL и машинное обучение для классификаций генетических вариантов
Джордж Джен, Jen Tek LLC
Введение
Covid-19 вызвал интерес к биоинформатике, науке о данных, связанной со сбором и анализом сложных биологических данных, таких как генетические коды и последовательности, сочетание математики и биологии.
Контекст
ClinVar — общедоступный ресурс, содержащий аннотации о генетических вариантах человека. Подробности можно найти на сайте Национального института здравоохранения.
https://www.ncbi.nlm.nih.gov/clinvar/
Эти варианты обычно вручную классифицируются клиническими лабораториями по категориальному спектру, начиная от доброкачественных, вероятно доброкачественных, неопределенного значения, вероятно патогенных и патогенных. Варианты, которые имеют противоречивые классификации (от лаборатории к лаборатории), могут вызвать путаницу, когда клиницисты или исследователи пытаются интерпретировать, влияет ли вариант на заболевание данного пациента.
Цель
Цель состоит в том, чтобы предсказать, будет ли вариант ClinVar иметь конфликтующие классификации. Это представлено как проблема бинарной классификации, каждая запись в наборе данных является генетическим вариантом.
Конфликтующие классификации - это когда две из любой из следующих трех категорий присутствуют для одного варианта, две заявки одной категории не считаются конфликтующими.
· Вероятная доброкачественная или доброкачественная
· VUS (изменение неопределенной значимости. Изменение в генетической последовательности, для которого связь с риском заболевания неясна)
· Вероятно патогенный или патогенный
Метка класса
Столбцу CLASS присвоена конфликтующая классификация. Это двоичное представление того, имеет ли вариант конфликтующие классификации, где 0 представляет согласованные классификации, а 1 представляет конфликтующие классификации.
Набор данных генетических вариантов является общественным достоянием, доступным на Kaggle.
https://www.kaggle.com/kevinarvai/clinvar-conflicting
Шаги проекта машинного обучения
Получение данных
Загрузите файл csv с Kaggle, поместите в папку на узле драйвера Apache Spark:
https://www.kaggle.com/kevinarvai/clinvar-conflicting?select=clinvar_conflicting.csv
Загрузите файл csv в Spark SQL DataFrame.
import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.util.LongAccumulator import org.apache.log4j._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql._ Logger.getLogger("org").setLevel(Level.ERROR) val spark = SparkSession .builder .appName("genetic classifier") .master("local[*]") .config("spark.sql.warehouse.dir", "file:///tmp") .getOrCreate() /* Load the csv file into SparkSQL DataFrame, with options: inferSchema=true, set the datatype of the feature columns based upon value data types header=true, set the DataFrame column names from the headers of the csv file. */ val ds = spark.read.format("csv").option("inferSchema", "true").option("header", "true") .option("quote", "\"") .load("file:///home/bigdata2/dataset/clinvar_conflicting.csv") val df: DataFrame = ds.toDF()
Исследование данных:
df.show(3) |CHROM| POS|REF|ALT|AF_ESP|AF_EXAC|AF_TGP| CLNDISDB|CLNDISDBINCL| CLNDN|CLNDNINCL| CLNHGVS|CLNSIGINCL| CLNVC| CLNVI| MC|ORIGIN|SSR|CLASS|Allele| Consequence| IMPACT| SYMBOL|Feature_type| Feature| BIOTYPE|EXON|INTRON|cDNA_position|CDS_position|Protein_position|Amino_acids| Codons|DISTANCE|STRAND|BAM_EDIT| SIFT| PolyPhen|MOTIF_NAME|MOTIF_POS|HIGH_INF_POS|MOTIF_SCORE_CHANGE|LoFtool|CADD_PHRED| CADD_RAW|BLOSUM62| | 1|1168180| G| C|0.0771| 0.1002|0.1066| MedGen:CN169374| nan| not_specified| nan|NC_000001.10:g.11...| nan|single_nucleotide...|UniProtKB_(protei...|SO:0001583|missen...| 1|nan| 0| C|missense_variant|MODERATE|B3GALT6| Transcript| NM_080605.3|protein_coding| 1/1| null| 552| 522| 174| E/D|gaG/gaC| null| 1| null| tolerated| benign| null| null| null| null| null| 1.053|-0.208682| 2| | 1|1470752| G| A| 0.0| 0.0| 0.0|MedGen:C1843891,O...| nan|Spinocerebellar_a...| nan|NC_000001.10:g.14...| nan|single_nucleotide...|OMIM_Allelic_Vari...|SO:0001583|missen...| 1|nan| 0| A|missense_variant|MODERATE|TMEM240| Transcript|NM_001114748.1|protein_coding| 4/4| null| 523| 509| 170| P/L|cCg/cTg| null| -1| OK|deleterious_low_c...| benign| null| null| null| null| null| 31.0| 6.517838| -3| | 1|1737942| A| G| 0.0| 1.0E-5| 0.0|Human_Phenotype_O...| nan|Strabismus|Nystag...| nan|NC_000001.10:g.17...| nan|single_nucleotide...|OMIM_Allelic_Vari...|SO:0001583|missen...| 35|nan| 1| G|missense_variant|MODERATE| GNB1| Transcript| NM_002074.4|protein_coding|6/12| null| 632| 239| 80| I/T|aTc/aCc| null| -1| OK| deleterious|probably_damaging| null| null| null| null| null| 28.1| 6.061752| -1| only showing top 3 rows
Обратите внимание, что в наборе данных много пропущенных значений, т. е. нулевых значений. Набор данных со значительными пропущенными значениями обычно влияет на точность обучения и прогнозирования.
Некоторые значения столбца большие, например 6 цифр, некоторые значения столбца меньше 1. Без адресации это может сбить с толку алгоритм ML, который может рассматривать некоторые небольшие значения как статистически незначимые и игнорировать результат. Это можно решить при предварительной обработке данных с помощью скейлера. Я планирую использовать MinMaxScaler, который сделает значения всех столбцов функций между 0 и 1.
Оценить схему:
df.printSchema root |-- CHROM: string (nullable = true) |-- POS: integer (nullable = true) |-- REF: string (nullable = true) |-- ALT: string (nullable = true) |-- AF_ESP: double (nullable = true) |-- AF_EXAC: double (nullable = true) |-- AF_TGP: double (nullable = true) |-- CLNDISDB: string (nullable = true) |-- CLNDISDBINCL: string (nullable = true) |-- CLNDN: string (nullable = true) |-- CLNDNINCL: string (nullable = true) |-- CLNHGVS: string (nullable = true) |-- CLNSIGINCL: string (nullable = true) |-- CLNVC: string (nullable = true) |-- CLNVI: string (nullable = true) |-- MC: string (nullable = true) |-- ORIGIN: integer (nullable = true) |-- SSR: string (nullable = true) |-- CLASS: integer (nullable = true) |-- Allele: string (nullable = true) |-- Consequence: string (nullable = true) |-- IMPACT: string (nullable = true) |-- SYMBOL: string (nullable = true) |-- Feature_type: string (nullable = true) |-- Feature: string (nullable = true) |-- BIOTYPE: string (nullable = true) |-- EXON: string (nullable = true) |-- INTRON: string (nullable = true) |-- cDNA_position: string (nullable = true) |-- CDS_position: string (nullable = true) |-- Protein_position: string (nullable = true) |-- Amino_acids: string (nullable = true) |-- Codons: string (nullable = true) |-- DISTANCE: integer (nullable = true) |-- STRAND: integer (nullable = true) |-- BAM_EDIT: string (nullable = true) |-- SIFT: string (nullable = true) |-- PolyPhen: string (nullable = true) |-- MOTIF_NAME: string (nullable = true) |-- MOTIF_POS: integer (nullable = true) |-- HIGH_INF_POS: string (nullable = true) |-- MOTIF_SCORE_CHANGE: double (nullable = true) |-- LoFtool: double (nullable = true) |-- CADD_PHRED: double (nullable = true) |-- CADD_RAW: double (nullable = true) |-- BLOSUM62: integer (nullable = true)
DataFrame имеет 46 столбцов, один столбец CLASS в качестве цели/метки и 45 столбцов в качестве функций.
Некоторые из них представляют собой строковые типы данных, которые необходимо закодировать в числовой тип данных, чтобы алгоритм ML мог их обрабатывать.
Распределение количества строк КЛАССА 1 (генетически конфликтующие) и 0 (генетически непротиворечивые) выглядит следующим образом:
df.groupBy("CLASS").count().show() +-----+-----+ |CLASS|count| +-----+-----+ | 1|16434| | 0|48754| +-----+-----+
Предварительная обработка данных
НУЛЕВАЯ замена
Замените нулевые значения в столбцах String на «ABCD» и 0 в числовых столбцах.
Закодируйте значение String в целое число, используя StringIndexer
Перестройте DataFrame после кодирования, первый столбец результирующего DataFrame — это CLASS, метка.
import org.apache.spark.ml.feature.StringIndexer var df1=df.select("CLASS") val featureDF=df.drop("CLASS") val typeArray=featureDF.dtypes for (i<-0 until featureDF.columns.size) { if (typeArray(i)._2=="StringType") { var indexer = new StringIndexer().setInputCol(typeArray(i)._1).setOutputCol(typeArray(i)._1+"Index") var indexed = indexer.fit(featureDF.na.fill("ABCD").select(typeArray(i)._1)).transform(featureDF.na.fill("ABCD").select(typeArray(i)._1)) var temp=indexed.select(typeArray(i)._1+"index") df1=df1.withColumn("id", monotonically_increasing_id()) .join(temp.withColumn("id", monotonically_increasing_id()), Seq("id")) } else { var temp=featureDF.na.fill(0).select(typeArray(i)._1) df1=df1.withColumn("id", monotonically_increasing_id()).join(temp.withColumn("id", monotonically_increasing_id()), Seq("id")) } } df1=df1.drop("id") df1.printSchema root |-- CLASS: integer (nullable = true) |-- CHROMindex: double (nullable = false) |-- POS: integer (nullable = false) |-- REFindex: double (nullable = false) |-- ALTindex: double (nullable = false) |-- AF_ESP: double (nullable = false) |-- AF_EXAC: double (nullable = false) |-- AF_TGP: double (nullable = false) |-- CLNDISDBindex: double (nullable = false) |-- CLNDISDBINCLindex: double (nullable = false) |-- CLNDNindex: double (nullable = false) |-- CLNDNINCLindex: double (nullable = false) |-- CLNHGVSindex: double (nullable = false) |-- CLNSIGINCLindex: double (nullable = false) |-- CLNVCindex: double (nullable = false) |-- CLNVIindex: double (nullable = false) |-- MCindex: double (nullable = false) |-- ORIGIN: integer (nullable = false) |-- SSRindex: double (nullable = false) |-- Alleleindex: double (nullable = false) |-- Consequenceindex: double (nullable = false) |-- IMPACTindex: double (nullable = false) |-- SYMBOLindex: double (nullable = false) |-- Feature_typeindex: double (nullable = false) |-- Featureindex: double (nullable = false) |-- BIOTYPEindex: double (nullable = false) |-- EXONindex: double (nullable = false) |-- INTRONindex: double (nullable = false) |-- cDNA_positionindex: double (nullable = false) |-- CDS_positionindex: double (nullable = false) |-- Protein_positionindex: double (nullable = false) |-- Amino_acidsindex: double (nullable = false) |-- Codonsindex: double (nullable = false) |-- DISTANCE: integer (nullable = false) |-- STRAND: integer (nullable = false) |-- BAM_EDITindex: double (nullable = false) |-- SIFTindex: double (nullable = false) |-- PolyPhenindex: double (nullable = false) |-- MOTIF_NAMEindex: double (nullable = false) |-- MOTIF_POS: integer (nullable = false) |-- HIGH_INF_POSindex: double (nullable = false) |-- MOTIF_SCORE_CHANGE: double (nullable = false) |-- LoFtool: double (nullable = false) |-- CADD_PHRED: double (nullable = false) |-- CADD_RAW: double (nullable = false) |-- BLOSUM62: integer (nullable = false)
Сделать все целочисленные столбцы двойными
import org.apache.spark.sql.types._ val newDf = df1.select(df1.columns.map(c => col(c).cast(DoubleType)) : _*) newDf.printSchema root |-- CLASS: double (nullable = true) |-- CHROMindex: double (nullable = false) |-- POS: double (nullable = false) |-- REFindex: double (nullable = false) |-- ALTindex: double (nullable = false) |-- AF_ESP: double (nullable = false) |-- AF_EXAC: double (nullable = false) |-- AF_TGP: double (nullable = false) |-- CLNDISDBindex: double (nullable = false) |-- CLNDISDBINCLindex: double (nullable = false) |-- CLNDNindex: double (nullable = false) |-- CLNDNINCLindex: double (nullable = false) |-- CLNHGVSindex: double (nullable = false) |-- CLNSIGINCLindex: double (nullable = false) |-- CLNVCindex: double (nullable = false) |-- CLNVIindex: double (nullable = false) |-- MCindex: double (nullable = false) |-- ORIGIN: double (nullable = false) |-- SSRindex: double (nullable = false) |-- Alleleindex: double (nullable = false) |-- Consequenceindex: double (nullable = false) |-- IMPACTindex: double (nullable = false) |-- SYMBOLindex: double (nullable = false) |-- Feature_typeindex: double (nullable = false) |-- Featureindex: double (nullable = false) |-- BIOTYPEindex: double (nullable = false) |-- EXONindex: double (nullable = false) |-- INTRONindex: double (nullable = false) |-- cDNA_positionindex: double (nullable = false) |-- CDS_positionindex: double (nullable = false) |-- Protein_positionindex: double (nullable = false) |-- Amino_acidsindex: double (nullable = false) |-- Codonsindex: double (nullable = false) |-- DISTANCE: double (nullable = false) |-- STRAND: double (nullable = false) |-- BAM_EDITindex: double (nullable = false) |-- SIFTindex: double (nullable = false) |-- PolyPhenindex: double (nullable = false) |-- MOTIF_NAMEindex: double (nullable = false) |-- MOTIF_POS: double (nullable = false) |-- HIGH_INF_POSindex: double (nullable = false) |-- MOTIF_SCORE_CHANGE: double (nullable = false) |-- LoFtool: double (nullable = false) |-- CADD_PHRED: double (nullable = false) |-- CADD_RAW: double (nullable = false) |-- BLOSUM62: double (nullable = false)
Векторизация признаков
Apache Spark ML требует, чтобы данные объектов были в форме вектора, т. Е. Поместите все столбцы объектов в один вектор, следующий код предназначен для создания столбца, который представляет собой вектор, содержащий все столбцы объектов.
import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors val assembler = new VectorAssembler() .setInputCols(Array("CHROMindex", "POS", "REFindex", "ALTindex", "AF_ESP", "AF_EXAC", "AF_TGP", "CLNDISDBindex" , "CLNDISDBINCLindex", "CLNDNindex", "CLNDNINCLindex", "CLNHGVSindex", "CLNSIGINCLindex", "CLNVCindex" , "CLNVIindex", "MCindex", "ORIGIN", "SSRindex", "Alleleindex", "Consequenceindex", "IMPACTindex" , "SYMBOLindex", "Feature_typeindex", "Featureindex", "BIOTYPEindex", "EXONindex", "INTRONindex" , "cDNA_positionindex", "CDS_positionindex", "Protein_positionindex", "Amino_acidsindex", "Codonsindex" , "DISTANCE", "STRAND", "BAM_EDITindex", "SIFTindex", "PolyPhenindex", "MOTIF_NAMEindex", "MOTIF_POS" , "HIGH_INF_POSindex", "MOTIF_SCORE_CHANGE", "LoFtool", "CADD_PHRED", "CADD_RAW", "BLOSUM62")) .setOutputCol("features") val output = assembler.transform(newDf) Resultant output DataFrame now has a new column features that is the vector containing all feature columns. output.select("features").show(2,false) |features |(45,[0,1,2,3,4,5,6,11,14,16,18,21,23,25,27,28,29,30,31,33,35,36,42,43,44],[3.0,1168180.0,1.0,3.0,0.0771,0.1002,0.1066,326.0,27451.0,1.0,3.0,1614.0,1728.0,15.0,618.0,471.0,304.0,56.0,129.0,1.0,2.0,1.0,1.053,-0.208682,2.0])| |(45,[0,1,2,3,7,9,11,14,16,18,21,23,25,27,28,29,30,31,33,34,35,36,42,43,44],[3.0,1470752.0,1.0,1.0,7922.0,8822.0,467.0,25430.0,1.0,1.0,2293.0,2054.0,9.0,1534.0,1465.0,116.0,13.0,17.0,-1.0,1.0,4.0,1.0,31.0,6.517838,-3.0]) |
Извлечение из выходного DataFrame в новый DataFrame, который содержит только функции векторного столбца и столбец меток CLASS.
val whole=output.select("features","CLASS") whole.printSchema root |-- features: vector (nullable = true) |-- CLASS: double (nullable = true)
Нормализация значений функций в функциях векторного столбца с использованием MinMax Scaler
import org.apache.spark.ml.feature.MinMaxScaler import org.apache.spark.ml.linalg.Vectors val scaler = new MinMaxScaler() .setInputCol("features") .setOutputCol("scaledFeatures") // Compute summary statistics and generate MinMaxScalerModel val scalerModel = scaler.fit(whole) // rescale each feature to range [min, max]. val scaledData = scalerModel.transform(whole) println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]") scaledData.select("features", "scaledFeatures").show(2,false) Features scaled to range: [0.0, 1.0] |features |scaledFeatures |(45,[0,1,2,3,4,5,6,11,14,16,18,21,23,25,27,28,29,30,31,33,35,36,42,43,44],[3.0,1168180.0,1.0,3.0,0.0771,0.1002,0.1066,326.0,27451.0,1.0,3.0,1614.0,1728.0,15.0,618.0,471.0,304.0,56.0,129.0,1.0,2.0,1.0,1.053,-0.208682,2.0])|(45,[0,1,2,3,4,5,6,11,14,16,18,21,23,25,27,28,29,30,31,33,35,36,40,42,43,44],[0.13043478260869565,0.004713998164155383,0.0011560693641618498,0.006564551422319475,0.15450901803607214,0.2004440977014943,0.21328531412565024,0.005000997131329867,0.9926592897953279,0.001949317738791423,0.00804289544235925,0.6932989690721649,0.729421696918531,0.004595588235294118,0.044237652111667865,0.03447266339749689,0.041422537130399235,0.044374009508716325,0.05810810810810811,1.0,0.5,0.25,0.9999999999999999,0.010636363636363637,0.10125579884341004,0.8333333333333333])| |(45,[0,1,2,3,7,9,11,14,16,18,21,23,25,27,28,29,30,31,33,34,35,36,42,43,44],[3.0,1470752.0,1.0,1.0,7922.0,8822.0,467.0,25430.0,1.0,1.0,2293.0,2054.0,9.0,1534.0,1465.0,116.0,13.0,17.0,-1.0,1.0,4.0,1.0,31.0,6.517838,-3.0]) |(45,[0,1,2,3,7,9,11,14,16,18,21,23,25,27,28,29,30,31,34,35,36,40,42,43],[0.13043478260869565,0.0059359829438109775,0.0011560693641618498,0.002188183807439825,0.8580093144156828,0.9528026784749973,0.007164005093040024,0.9195776379547261,0.001949317738791423,0.002680965147453083,0.9849656357388316,0.8670325031658928,0.0027573529411764703,0.1098067287043665,0.10722388933616336,0.015805968115547075,0.010301109350237718,0.0076576576576576575,0.5,1.0,0.25,0.9999999999999999,0.31313131313131315,0.2305282934974466]) |
Train/Test Split, случайное разделение 70% строк для обучения, 30% для теста
// Prepare training and test data. val wholeScaledData=scaledData.select("scaledFeatures","CLASS") val Array(training, test) = wholeScaledData.randomSplit(Array(0.7, 0.3), seed = 12345)
Обучение и прогнозирование
Выбор алгоритма
Поскольку это бинарная классификация, я планирую использовать многослойный классификатор персептрона и логистическую регрессию.
Многослойный классификатор Perceptron, есть 45 столбцов признаков и бинарная классификация, я строю нейронную сеть из 4 слоев, как показано ниже:
val layers = Array[Int](45, 45, 45, 2) import org.apache.spark.ml.classification.MultilayerPerceptronClassifier import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator val trainer = new MultilayerPerceptronClassifier() .setLayers(layers) .setBlockSize(128) .setSeed(1234L) .setMaxIter(100) .setFeaturesCol("scaledFeatures") .setLabelCol("CLASS")
Обучить и протестировать нейронную сеть
val model=trainer.fit(training.select("scaledFeatures","CLASS")) and Test val result = model.transform(test) Evaluate accuracy metrics val predictionAndLabels = result.select("prediction", "CLASS") val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy").setLabelCol("CLASS") println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}") Test set accuracy = 0.7482470955524848
Классификатор логистической регрессии
import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} import scala.collection.mutable val maxIter=100 val lr = new LogisticRegression() .setFeaturesCol("scaledFeatures") .setLabelCol("CLASS") .setMaxIter(maxIter) val model_lr = lr.fit(training) val prediction_lr = model_lr.transform(test) import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator val my_mc_accu = new MulticlassClassificationEvaluator().setPredictionCol("prediction").setLabelCol("CLASS").setMetricName("accuracy") my_mc_accu.evaluate(prediction_lr) my_mc_accu: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_74ebac4de5a0 res29: Double = 0.7463534469522494
Забрать:
Как многоуровневый классификатор персептрона, так и логистическая регрессия обеспечивают одинаковую точность прогнозирования около 75%, учитывая, что набор данных имеет значительное количество пропущенных значений (нулевые значения). Чтобы повысить производительность машинного обучения, может помочь уменьшение пропущенных значений и возможное добавление дополнительных столбцов функций.
Как всегда, код, используемый в этом письме, находится в моем репозитории GitHub:
https://github.com/geyungjen/jentekllc
Спасибо, что уделили время просмотру этого письма.