Machine Learning with Spark<\/a> from Nick Pentreath.<\/p>\nRunning Spark<\/h2>\n
Spark version provided with HDP 2.2.4<\/p>\n
We already have some datasets in hdfs:\/\/shared<\/code>, we’ll use this file to save some time (and some space)<\/p>\nBlabla MLlib on Spark<\/p>\n
Data is available at<\/p>\n
val path=\"hdfs:\/\/daplab-rt-12:8020\/shared\/20_newsgroups\/*\"\r\nval rdd = sc.wholeTextFiles(path)\r\nval newsgroups = rdd.map { case (file, text) => file.split(\"\/\").takeRight(2).head }\r\nval countByGroups = newsgroups.map(n => (n, 1)).reduceByKey(_ + _).collect.sortBy(-_._2).mkString(\"\\n\")\r\n\/\/ Tokenizing the text data\r\nval text = rdd.map { case (file, text) => text }\r\nval whiteSpaceSplit = text.flatMap(t => t.split(\" \").map(_,toLowerCase))\r\nval nonWordSplit = text.flatMap(t => t.split(\"\"\"\\W+\"\"\").map(_.toLowerCase))\r\nval regex = \"\"\"[^0-9]*\"\"\".r\r\nval filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches)\r\nval tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _)\r\nval stopwords = Set (\"the\",\"a\",\"an\",\"of\",\"or\",\"in\",\"for\",\"by\",\"on\",\"but\",\"is\",\"not\",\"ith\",\"as\",\"was\")\r\nval tokenCountsFilteredStopwords = tokenCounts.filter { case (k, v) =>\u00a0!stopwords.contains(k) }\r\nval tokenCountsFilteredSize = tokenCountsFilteredStopwords.filter { case (k, v) => k.size >=2 }\r\n\/\/ filter out rare tokens with total occurence < 2\r\nval rareTokens = tokenCounts.filter{ case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet\r\nval tokenCountsFilteredAll = tokenCountsFilteredSize.filter { case (k, v) =>\u00a0!rareTokens.contains(k) }\r\n\r\n<\/pre>\n\/\/ create a function to tokenize each document\r\ndef tokenize(line: String): Seq[String] = {\r\n line.split(\"\"\"\\W+\"\"\")\r\n .map(_.toLowerCase)\r\n .filter(token => regex.pattern.matcher(token).matches)\r\n .filterNot(token => stopwords.contains(token))\r\n .filterNot(token => rareTokens.contains(token))\r\n .filter(token => token.size >=2)\r\n .toSeq\r\n}\r\nval tokens = text.map(doc => tokenize(doc))\r\n<\/pre>\nimport org.apache.spark.mllib.feature.Word2Vec\r\nval word2vec = new Word2Vec()\r\nword2vec.setSeed(42) \/\/ we do this to generate the same results each time\r\nval word2vecModel = word2vec.fit(tokens)\r\nword2vecModel = word2vec.fit(tokens)\r\nword2vecModel.findSynonyms(\"research\", 10).foreach(println)\r\n<\/pre>\nword2vecModel.findSynonyms(\"france\", 10).foreach(println)\r\n<\/pre>\n\/* This code is intended to be run in the Scala shell. Launch the Scala Spark shell by running .\/bin\/spark-shell from the Spark directory. You can enter each line in the shell and see the result immediately. The expected output in the Spark console is presented as commented lines following the relevant code<\/p>\n
The Scala shell creates a SparkContex variable available to us as ‘sc’<\/p>\n
Ensure you you start your Spark shell with enough memory: .\/bin\/spark-shell –driver-memory 4g<\/p>\n