1. First step, it will load massive news from Mysql.
val df = sqlContext.read.format("jdbc"). option("url", url). option("driver", driver). option("dbtable", "news"). option("user", user). option("password", pwd). load()
2. Tokenize and Term Counts.
val corpus: RDD[String] = df.map { case Row(document: String) => document } val tokenized: RDD[Seq[String]] = corpus.map(_.toLowerCase.split("\\s")).map(_.filter(_.length > 2).filter(_.forall(java.lang.Character.isLetter))) val termCounts: Array[(String, Long)] = tokenized.flatMap(_.map(_ -> 1L)).reduceByKey(_ + _).collect().sortBy(-_._2) // 0.3 is arbitrary percentage of common words.
val numStopwords = (termCounts.length * 0.3).toInt val vocabArray: Array[String] = termCounts.takeRight(termCounts.length - numStopwords).map(_._1) val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap
3. Convert documents into term count vectors, and fitting the model.
val documents: RDD[(Long, Vector)] = tokenized.zipWithIndex().map { case (tokens, id) => val counts = new mutable.HashMap[Int, Double]() tokens.foreach { term => if (vocab.contains(term)) { val idx = vocab(term) counts(idx) = counts.getOrElse(idx, 0.0) + 1.0 } } (id, Vectors.sparse(vocab.size, counts.toSeq)) } // Set LDA parameters val lda = new LDA().setK(10).setMaxIterations(10) val ldaModel = lda.run(documents) // Print topics, showing top-weighted 3 terms for each topic. val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 3) topicIndices.foreach { case (terms, termWeights) => println("TOPIC:") terms.zip(termWeights).foreach { case (term, weight) => println(s"${vocabArray(term.toInt)}\t$weight") } println() }
4. Finally, it will display most weighted 3 topics in documents:
TOPIC: offshore 0.009812238949505817 media 0.009100067491152785 soccer 0.007214104190610413
Documents are associated with multiple topics, which are clustered by the approach above. This clustering can help organize or summarize document collection.
Also, LDA result can be used as features for other ML algorithms.
fixed
ReplyDeletejava.sql.SQLException: Value '0000-00-00 00:00:00' can not be represented as java.sql.Timestamp
jdbc:mysql://yourserver:3306/yourdatabase?zeroDateTimeBehavior=convertToNull