Word analysis, by domain, on the Common Crawl data set for the purpose of finding industry trends
Knowledge Sharing Interview: Using Open Data to Predict Market Movements https://youtu.be/qjlOMoAYKmg?t=134
Knowledge Sharing Paper: Using Open Data to Predict Market Movements https://education.emc.com/content/dam/dell-emc/documents/en-us/2017KS_Ravinder-Using_Open_Data_to_Predict_Market_Movements.pdf
- Start one EC2 m4.xlarge instance with 30GB SSD volume
- SSH to the instance with user: ec2-user
sudo yum -y install git; git clone https://github.com/CI-Research/cdx-index-client cd cdx-index-client sudo pip install -r requirements.txt ./cdx-index-client.py -c CC-MAIN-2016-30 *.netapp.com --json cat domain-* > CC-MAIN-2016-30_July_Netapp cd ~ git clone https://github.com/CI-Research/CommonCrawlDocumentDownload cd CommonCrawlDocumentDownload sudo yum install java-1.8.0-openjdk-devel sudo update-alternatives --config java ./gradlew check cp ~/cdx-index-client/CC-MAIN-2016-30_July_Netapp ~/CommonCrawlDocumentDownload mv CC-MAIN-2016-30_July_Netapp commoncrawl-CC-MAIN.txt nohup ./gradlew downloadDocuments - Uploade data to S3
cd ~/download aws configure aws s3 sync . s3://CommonCrawl/data/NetApp/CC-MAIN-2016-30_July_Netapp/ Run wget from laptop Linux virtual machine
wget -r -nc -np "http://www.netapp.com/" FINISHED --2017-04-28 05:03:37-- Total wall clock time: 9h 14m 12s Downloaded: 10255 files, 1.1G in 4h 51m 33s (67.6 KB/s) after zip/unzip: 10,000 files, 1,429 folders, 1.11GB zip -r NetApp_April_2017 www.netapp.com/ find . -type f -exec cat {} + > Netapp_April_2017.txt Upload files to AWS S3 bucket for use.
- Start 1 nodes AWS EMR (Advance config: Hadoop only, Network "EC2-Classic", Master "m1.large", Core "0")
- SSH to the instance: ec2-54-90-80-85.compute-1.amazonaws.com (change) user: hadoop
- sudo yum install -y git
- wget http://www-eu.apache.org/dist/maven/maven-3/3.5.2/binaries/apache-maven-3.5.2-bin.tar.gz
- tar zxvf apache-maven-3.5.2-bin.tar.gz
- sudo vi .bashrc
export MAVEN_HOME=/home/hadoop/apache-maven-3.5.2 export M2_HOME=/home/hadoop/apache-maven-3.5.2 export M2=/home/hadoop/apache-maven-3.5.2 export PATH=/home/hadoop/apache-maven-3.5.2/bin:$PATH - source .bashrc
- git clone https://github.com/dkpro/dkpro-c4corpus
- aws s3 sync s3://CommonCrawl/data/NetApp/CC-MAIN-2016-30_July_Netapp/ /var/tmp/CC-MAIN-2016-30_July_Netapp 9a. aws s3 sync s3://CommonCrawl/data/StackOverflow/ /var/tmp/StackOverflow/
- run the dkpro-c4corpus-boilerplate project using the below 2 commands::
cd dkpro-c4corpus/dkpro-c4corpus-boilerplate/ mvn package - mkdir /var/tmp/boiler
- Create script to process the file:: (replace name of the directory CC-MAIN to the one you are working on). For StackOverflow i used "/var/tmp/StackOverflow/*;"
vi boiler.sh #!/bin/bash for filename in /var/tmp/CC-MAIN*/*; do java -jar target/dkpro-c4corpus-boilerplate-1.0.1-SNAPSHOT.jar "$filename" "/var/tmp/boiler/$(basename "$filename" .txt)" false done - chmod +x boiler.sh
- nohup ./boiler.sh
- cd /var/tmp/boiler
- aws s3 sync . s3://CommonCrawl/boilerplate/netapp/CC-MAIN-2016-30_July_Netapp/
spark-shell IBM Wordcount process:
val file = sc.textFile("s3://CommonCrawl/ibm_boiler") val counts = file.flatMap(line => line.toLowerCase().replace(".", " ").replace(",", " ").split(" ")).map(word => (word, 1L)).reduceByKey(_ + _) val sorted_counts = counts.collect().sortBy(wc => -wc._2) // 1mins sc.parallelize(sorted_counts.take(60000)).saveAsTextFile("s3://CommonCrawl/boilerplate/ibm_boiler _top60000") sc.parallelize(sorted_counts).saveAsTextFile("s3://CommonCrawl/boilerplate/wordcount-ibm_bolier")Netapp Wordcount process:
val file = sc.textFile("s3://CommonCrawl/boilerplate/netapp_boiler") val counts = file.flatMap(line => line.toLowerCase().replace(".", " ").replace(",", " ").split(" ")).map(word => (word, 1L)).reduceByKey(_ + _) val sorted_counts = counts.collect().sortBy(wc => -wc._2) // 3mins sc.parallelize(sorted_counts.take(20000)).saveAsTextFile("s3://CommonCrawl/top20000_netapp_boiler") sc.parallelize(sorted_counts).saveAsTextFile("s3://CommonCrawl/wordcount-netapp_boiler")Top 10 words
| Word | Count |
|---|---|
| 4327791 | |
| the | 2103578 |
| 0 | 1159355 |
| to | 1097568 |
| and | 1057336 |
| a | 856529 |
| of | 811647 |
| for | 737729 |
| in | 646580 |
| ibm | 623663 |
Convert Text to Parquet, Spark 2.0 convert into parquet file in much more efficient than spark1.6.
[hadoop@ip-10-0-1-27 ~]$ aws s3 cp s3://CommonCrawl/netapp_boiler_top20000_np.csv /var/tmp [hadoop@ip-10-0-1-27 ~]$ hdfs dfs -put /var/tmp/netapp_boiler_top20000_np.csv /user/hadoop/ Spark 1.4+
spark-shell --packages com.databricks:spark-csv_2.11:1.5.0 import org.apache.spark.sql.SQLContext val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/netapp_boiler_top20000_np.csv") val selectedData = df.select("words", "count") selectedData.write.format("com.databricks.spark.csv").option("header", "true").save("netappparquet.csv") The steps are
- remove punctuation, by replace "[^A-Za-z0-9\s]+" with "", or not include numbers "[^A-Za-z\s]+"
- trim all spaces
- lower all words
- remove stop words
aws s3 cp s3://CommonCrawl/netapp_boiler_top20000.txt /var/tmp hdfs dfs -mkdir /user/hadoop/data/ hdfs dfs -put /var/tmp/netapp_boiler_top20000.txt /user/hadoop/data/ spark-shell import org.apache.spark.ml.feature.StopWordsRemover import org.apache.spark.sql.functions.split // val reg = raw"[^A-Za-z0-9\s]+" // remove punctuation with numbers val reg = raw"[^A-Za-z\s]+" // remove punctuation not include numbers val lines = sc.textFile("/user/hadoop/netapp_boiler_top20000_np.csv").map(_.replaceAll(reg, "").trim.toLowerCase).toDF("line") val words = lines.select(split($"line", " ").alias("words")) val remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered") val noStopWords = remover.transform(words) remover.transform(words).show(15) //val counts = noStopWords.select(explode($"filtered")).map(word =>(word, 1)).reduceByKey(_+_) // from word -> num to num -> word //val mostCommon = counts.map(p => (p._2, p._1)).sortByKey(false, 1) //mostCommon.take(5) //dataframe dump to csv val stringify = udf((vs: Seq[String]) => s"""[${vs.mkString(",")}]""") words.withColumn("words", stringify($"words")).write.csv("/netapp_filtered.csv") hdfs dfs -get /netapp_filtered.csv . Introducing the TF-IDF method for vectorizing a "bag of words"
TF: "Term Frequency"
- normalized for the length of the document
- hashed into a fixed-length set of buckets ("the hashing trick") so that we don't have an extremely high number of dimensions (count of all distinct tokens)
- downside: there will be some hash collisions, where unrelated words get mapped to the same "dimension"
IDF: "Inverse Document Frequency"
- Normalize word counts based on how frequently a word occurs in the corpus.
- Logarithmic transformation so that words which occur in literally every document (100% or 1.0) get weighted down to 0 (ln 1)
- Rare words are weighted heavily
- Helpful where rare, technical vocabulary constitutes distinguishing features
In spark 2.0, Spark has made csv a built-in source. We can create Dataframes from csv file.
sudo yum install -y git
git clone https://github.com/phatak-dev/spark-two-migration
aws s3 cp s3://CommonCrawl/netapp_boiler_top20000_np.csv /var/tmp
hdfs dfs -put /var/tmp/netapp_boiler_top20000_np.csv /user/hadoop/
spark-shell
import org.apache.spark.ml.feature.StopWordsRemover import org.apache.spark.sql.functions.split import org.apache.spark.sql.types._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} val sqlContext = new SQLContext(sc) val netappDF = sqlContext.read.format("csv").option("header", "true").load("netapp_boiler_top20000_np.csv") netappDF.columns netappDF.show(15) netappDF.printSchema() netappDF.select("count").show() netappDF.select($"words", $"count").show() netappDF.filter($"count" > 10000).show() netappDF.groupBy("count").count().show() netappDF.groupBy("words").count().show() //try sql query to display specific word //netappDF.createOrReplaceTempView("netappsql") //val sqlDF = spark.sql("SELECT words, count FROM netappsql WHERE words = 'database'".show(20) Lower case the text:
val netappLoweredDF = netappDF.select($"*", lower($"words").as("lowerText")) netappLoweredDF.show(2)Set up the ML Pipeline:
import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover, HashingTF, IDF, Normalizer}Step 1: Natural Language Processing: RegexTokenizer: Convert the lowerText col to a bag of words
val tokenizer = new RegexTokenizer().setInputCol("lowerText").setOutputCol("netappwords").setPattern("""\W+""") val netappWordsDF = tokenizer.transform(netappLoweredDF.na.drop(Array("lowerText"))) netappWordsDF.printSchema netappWordsDF.select("netappwords").firstStep 2: Natural Language Processing: StopWordsRemover: Remove Stop words
val remover = new StopWordsRemover().setInputCol("netappwords").setOutputCol("noStopWords") val noStopWordsListDF = remover.transform(netappWordsDF) noStopWordsListDF.printSchema noStopWordsListDF.select("words", "count", "netappwords", "noStopWords").show(20) noStopWordsListDF.show(15)Step 3: HashingTF// More features = more complexity and computational time and accuracy
val hashingTF = new HashingTF().setInputCol("noStopWords").setOutputCol("hashingTF").setNumFeatures(20000) val featurizedDataDF = hashingTF.transform(noStopWordsListDF) featurizedDataDF.printSchema featurizedDataDF.select("words", "count", "netappwords", "noStopWords").show(7)Step 4: IDF// This will take 30 seconds or so to run
val idf = new IDF().setInputCol("hashingTF").setOutputCol("idf") val idfModel = idf.fit(featurizedDataDF)Step 5: Normalizer// A normalizer is a common operation for text classification. // It simply gets all of the data on the same scale... for example, if one article is much longer and another, it'll normalize the scales for the different features. // If we don't normalize, an article with more words would be weighted differently
val normalizer = new Normalizer().setInputCol("idf").setOutputCol("features")Step 6: k-means & tie it all together...
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.clustering.KMeans val kmeans = new KMeans().setFeaturesCol("features").setPredictionCol("prediction").setK(8).setSeed(0) // for reproducability val pipeline = new Pipeline().setStages(Array(tokenizer, remover, hashingTF, idf, normalizer, kmeans)) // This can take more 1 hour to run!/* val model = pipeline.fit(netappLoweredDF.na.drop(Array("lowerText"))) */Prediction
aws s3 cp s3://CommonCrawl/ibm_boiler_top60000.csv /var/tmp
hdfs dfs -put /var/tmp/ibm_boiler_top60000.csv /user/hadoop/
// val model2 = org.apache.spark.ml.PipelineModel.load("netapp_boiler_top20000_np.csv") // val model2 = org.apache.spark.ml.PipelineModel.load("saves_parquet.csv") // input path errorLet's take a look at a sample of the data to see if we can see a pattern between predicted clusters and titles.
val rawPredictionsDF = model.transform(netappLoweredDF.na.drop(Array("lowerText"))) rawPredictionsDF.columns rawPredictionsDF.show(10) val predictionsDF = rawPredictionsDF.select($"words", $"prediction").cache predictionsDF.show(15) // This could take up to 5 minutes.predictionsDF.groupBy("prediction").count().orderBy($"count" desc).show(100) display(predictionsDF.filter("prediction = 3").select("words", "prediction").limit(30)) display(predictionsDF.filter("prediction = 4").select("words", "prediction").limit(30)) display(predictionsDF.filter("prediction = 2").select("words", "prediction").limit(30)) predictionsDF.filter($"title" === "Apache Spark").show(10) display(predictionsDF.filter("prediction = 25").limit(25))



