this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Initial Commit

Signed-off-by: Kaushik Chakraborty <git@kaushikc.org>

authored by

Kaushik Chakraborty and committed by
Kaushik Chakraborty
705c44a1

+548
+19
.gitignore
··· 1 + # Created by .ignore support plugin (hsz.mobi) 2 + dist/* 3 + target/ 4 + lib_managed/ 5 + src_managed/ 6 + project/boot/ 7 + project/plugins/project/ 8 + .history 9 + .cache 10 + .lib/ 11 + *.class 12 + *.log 13 + /*.tsv 14 + /.idea/ 15 + /src/main/resources/* 16 + !/src/main/resources/application.conf 17 + *.sc 18 + /.ensime 19 + /.ensime_cache/
+29
README.md
··· 1 + ## Data Pre-Processing Tasks for Knowledge Base Creation 2 + 3 + This project consists of different pre-processing tasks required on input file(s) before they can be used for respective Knowledge Base Creation (KBC). 4 + 5 + ### Technologies Used 6 + 7 + These are all `Scala` based scripts / programs each representing individual pre-processing tasks built using `sbt` 8 + 9 + __Dependencies__ 10 + 11 + - `cats` - for typeclasses & data types 12 + - `monix` - for observables, non-blocking Task and parallel processing; in other words for all the side-effects 13 + - `pureconfig` - for typed configuration (if and when required) 14 + 15 + __how to run__ 16 + 17 + - make relevant changes to `application.conf` for the respective module (like associatekbc or domain) 18 + - `sbt run` command will ask you to select the `App` you want to run 19 + 20 + ### TODO 21 + 22 + - [ ] replace current multiple main classes by multiple `sbt` projects 23 + - [ ] better way to do parallel & non-blocking IO for huge files without non-daemonic threads 24 + - [ ] TODOS from Domain 25 + - [ ] refactor regex(s) and keep them in one place 26 + - [ ] introduce free monads for actions and make the current implementation of parsing text as part of an interpretor there by making the whole parsing action extensible to any kind of input data 27 + - [ ] once a free monad structure is introduced for domain objects, create new interpretors with Akka Stream or FS2 as effects to see if they help improve the performance 28 + 29 + _there will be bugs_
+33
build.sbt
··· 1 + name := "kbc-data-processing" 2 + 3 + version := "1.0" 4 + 5 + scalaVersion := "2.12.1" 6 + 7 + libraryDependencies ++= Seq( 8 + "org.typelevel" %% "cats" % "0.9.0", 9 + "com.github.melrief" %% "pureconfig" % "0.6.0", 10 + "io.monix" %% "monix" % "2.2.4" 11 + ) 12 + 13 + scalacOptions ++= Seq( 14 + "-unchecked", 15 + "-deprecation", 16 + "-feature", 17 + "-encoding", 18 + "UTF-8", 19 + "-language:higherKinds", 20 + "-language:implicitConversions", 21 + "-language:existentials", 22 + "-Ywarn-dead-code", 23 + "-Ywarn-unused" 24 + ) 25 + 26 + def latestScalafmt = "0.7.0-RC1" 27 + commands += Command.args("scalafmt", "Run scalafmt cli.") { 28 + case (state, args) => 29 + lazy val Right(scalafmt) = 30 + org.scalafmt.bootstrap.ScalafmtBootstrap.fromVersion(latestScalafmt) 31 + scalafmt.main("--non-interactive" +: args.toArray) 32 + state 33 + }
+1
project/build.properties
··· 1 + sbt.version = 0.13.13
+2
project/plugins.sbt
··· 1 + logLevel := Level.Warn 2 + libraryDependencies += "com.geirsson" %% "scalafmt-bootstrap" % "0.6.8"
+20
src/main/resources/application.conf
··· 1 + associatekbc = { 2 + input-path = "" 3 + output-path = "" 4 + } 5 + 6 + domain = { 7 + chat = { 8 + input-path = "" 9 + output-path = "" 10 + } 11 + faq = { 12 + input-path = "" 13 + output-path = "" 14 + } 15 + } 16 + adhoc = { 17 + inputs = [], 18 + output-path = "" 19 + } 20 +
+76
src/main/scala/adhoc/Main.scala
··· 1 + package adhoc 2 + 3 + import java.nio.file.Files 4 + import java.util.UUID 5 + 6 + import cats.Show 7 + import cats.data.ReaderT 8 + import cats.implicits._ 9 + import common.Config.{AppConfig, InputPath} 10 + import monix.eval.Task 11 + import monix.execution.Scheduler 12 + 13 + import scala.collection.JavaConverters._ 14 + import scala.util.{Failure, Success, Try} 15 + 16 + object Main extends App { 17 + sealed trait ContentType 18 + final case class Content(id: UUID, c: String) extends ContentType 19 + 20 + implicit val contentShow: Show[ContentType] = { 21 + case Content(id, c) => s"$id\t$c" 22 + } 23 + 24 + implicit val listContentShow: Show[List[ContentType]] = { 25 + case Nil => "" 26 + case x :: xs => s"${x.show}\n${xs.show}" 27 + } 28 + 29 + type Error = Throwable 30 + 31 + def rawText: InputPath => Either[Error, String] = { in => 32 + Either.fromTry { 33 + Try { 34 + Files.readAllLines(in.value).asScala.foldLeft("")((a,b) => s"$a $b") 35 + } 36 + } 37 + } 38 + 39 + def cleanse: String => Either[Error, String] = {str => 40 + Right { 41 + str.replaceAll("[\n\t]"," ").trim 42 + } 43 + } 44 + def genUUID: String => Either[Error, (UUID, String)] = {str => 45 + Either.fromTry { 46 + Try { 47 + (java.util.UUID.randomUUID, str) 48 + } 49 + } 50 + } 51 + def content: ((UUID, String)) => Either[Error, ContentType] = 52 + t => Right(Content(t._1, t._2)) 53 + 54 + def process: InputPath => Either[Error, ContentType] = 55 + rawText(_) >>= cleanse >>= genUUID >>= content 56 + 57 + def prog: ReaderT[Task, AppConfig, Unit] = ReaderT { conf => 58 + Task { 59 + val processed: List[ContentType] = 60 + conf.adhoc.inputs 61 + .map(process) 62 + .filter(_.isRight) 63 + .map(_.right.get) 64 + 65 + Files.write(conf.adhoc.outputPath, processed.show.getBytes("utf-8")) 66 + () 67 + } 68 + } 69 + 70 + import common.Config._ 71 + implicit lazy val ioSched = Scheduler.io("my-io", false) 72 + prog.run(conf).runOnComplete { 73 + case Success(_) => println("DONE") 74 + case Failure(e) => println(s"ERROR:\n${e.getMessage}") 75 + } 76 + }
+52
src/main/scala/associatekbc/App.scala
··· 1 + package associatekbc 2 + 3 + import java.nio.file.Files 4 + 5 + import cats.Show 6 + import cats.implicits._ 7 + import monix.execution.Scheduler.Implicits.global 8 + import monix.reactive.{Consumer, Observable} 9 + 10 + import scala.io.Source 11 + import scala.util.{Failure, Success} 12 + 13 + object App extends App { 14 + import common.Config._ 15 + 16 + implicit val tsvShow: Show[List[(String, String)]] = { 17 + case x :: xs => s"${x._1.show}\t${x._2.show}\n${xs.show}" 18 + case Nil => "" 19 + } 20 + 21 + implicit val tsvShow1: Show[List[String]] = { 22 + case x :: xs => s"${x.show}\n${xs.show}" 23 + case Nil => "" 24 + } 25 + 26 + val file = conf.associatekbc.inputPath 27 + 28 + val source = Observable.defer { 29 + Observable 30 + .fromIterator(Source.fromFile(file.value.toFile).getLines()) 31 + .drop(1) 32 + } 33 + 34 + val consumer = 35 + Consumer.foldLeft[String, String](List[(String, String)]().show) { 36 + (xs, s) => 37 + val ss = s.split(",").toList 38 + xs + 39 + List((ss.reverse.head.trim, ss.take(2).mkString("$$$").trim)).show 40 + } 41 + 42 + val lbConsumer = Consumer.loadBalance(10, consumer) 43 + 44 + source 45 + .consumeWith(lbConsumer) 46 + .runOnComplete { 47 + case Success(v) => 48 + Files.write(conf.associatekbc.outputPath, v.show.getBytes) 49 + () 50 + case Failure(e) => System.err.println(s"ERROR::\n${e.getMessage}") 51 + } 52 + }
+36
src/main/scala/common/Config.scala
··· 1 + package common 2 + 3 + import java.nio.file.{FileSystems, Path, Paths} 4 + 5 + import pureconfig.ConfigConvert 6 + 7 + import scala.util.Try 8 + 9 + object Config { 10 + 11 + case class InputPath(value: Path) extends AnyVal 12 + case class PathConfig(inputPath: InputPath, outputPath: Path) 13 + case class DomainConfig(chat: PathConfig, faq: PathConfig) 14 + case class AdhocConfig(inputs: List[InputPath], outputPath: Path) 15 + case class AppConfig(associatekbc: PathConfig, 16 + domain: DomainConfig, 17 + adhoc: AdhocConfig) 18 + 19 + private implicit val inpathConvert: ConfigConvert[InputPath] = 20 + ConfigConvert.fromStringConvertTry[InputPath]( 21 + s => 22 + Try( 23 + InputPath( 24 + Paths.get(Config.getClass.getClassLoader.getResource(s).getPath))) 25 + .orElse(Try(InputPath(FileSystems.getDefault.getPath(s)))), 26 + _.value.toString 27 + ) 28 + 29 + private implicit val pathConvert: ConfigConvert[Path] = 30 + ConfigConvert.fromStringConvertTry[Path]( 31 + s => Try(FileSystems.getDefault.getPath(s)), 32 + _.toString) 33 + 34 + lazy val conf: AppConfig = pureconfig.loadConfigOrThrow[AppConfig] 35 + 36 + }
+65
src/main/scala/domain/ChatMain.scala
··· 1 + package domain 2 + 3 + import java.nio.file.{Files, StandardOpenOption} 4 + 5 + import cats.implicits._ 6 + import monix.eval.Task 7 + import monix.execution.Scheduler 8 + import monix.execution.Scheduler.Implicits.global 9 + import monix.reactive.{Consumer, Observable} 10 + 11 + import scala.io.Source 12 + import scala.util.{Failure, Success} 13 + 14 + object ChatMain extends App { 15 + 16 + // 17 + // TODO: 18 + // - improve the latency of the overall process by figuring out better way to parallelise Monix 19 + // tasks & observables 20 + // - once a free monad structure is introduced for domain objects, create a new interpretor 21 + // with Akka stream or FS2 to see if they help improve the performance 22 + // 23 + 24 + import common.Config._ 25 + import domain.Domain.Transcript._ 26 + 27 + val file = conf.domain.chat.inputPath 28 + lazy val io = Scheduler.io("custom-io", daemonic = false) 29 + 30 + val source = Observable.defer { 31 + Observable 32 + .fromIterator(Source.fromFile(file.value.toAbsolutePath.toFile).getLines) 33 + .filter(_.startsWith("""<p align="center"""")) 34 + .map(transcript) 35 + .map(_.show) 36 + } 37 + 38 + val consumer = 39 + Consumer 40 + .foreachParallelAsync[String](300) { o => 41 + for { 42 + _ <- Task.fork( 43 + Task.eval( 44 + Files.write(conf.domain.chat.outputPath, 45 + s"$o".getBytes, 46 + StandardOpenOption.APPEND, 47 + StandardOpenOption.CREATE, 48 + StandardOpenOption.WRITE)), 49 + io 50 + ) 51 + } yield () 52 + } 53 + 54 + source 55 + .consumeWith(consumer) 56 + .runOnComplete { 57 + case Success(_) => 58 + io.shutdown() 59 + println("DONE") 60 + case Failure(e) => 61 + io.shutdown() 62 + e.printStackTrace() 63 + } 64 + 65 + }
+128
src/main/scala/domain/Domain.scala
··· 1 + package domain 2 + 3 + import java.time.ZonedDateTime 4 + import java.time.format.DateTimeFormatter 5 + import java.util.UUID 6 + 7 + import cats.Show 8 + import cats.implicits._ 9 + 10 + import scala.collection.mutable 11 + import scala.util.matching.Regex 12 + 13 + object Domain { 14 + 15 + // 16 + // TODO: 17 + // - refactor regex(s) and keep them in one place 18 + // - introduce free monads and make the current implementation of parsing text as one of the 19 + // interpretor 20 + // 21 + 22 + final case class ChatOrigin(value: String) extends AnyVal 23 + final case class ChatText(value: String) extends AnyVal 24 + 25 + sealed trait Party 26 + final case class User(value: String) extends Party 27 + final case class Agent(value: String) extends Party 28 + final case class Blank() extends Party 29 + 30 + final case class Chat(from: Party, to: Party, text: String) 31 + final case class Conversation(cs: mutable.LinkedHashSet[Chat]) 32 + 33 + final case class Transcript(id: UUID, 34 + origDate: ZonedDateTime, 35 + origin: ChatOrigin, 36 + agent: Agent, 37 + text: Conversation) 38 + 39 + object Conversation { 40 + def conversation: String => Agent => Conversation = { str => a => 41 + val chatTextRegex = """(?=[ms]\s+\)\s*(.*?)\s*(?:\(\s+\d+[hms]|$))""".r 42 + val chats = for (t <- chatTextRegex.findAllMatchIn(str)) yield t.group(1) 43 + val chatMap: mutable.LinkedHashSet[Chat] = for { 44 + c <- chats.to[mutable.LinkedHashSet] 45 + s <- c.split(":").toList match { 46 + case (x :: y :: xs) if x == a.value => 47 + Option(Chat(a, Blank(), (y :: xs).mkString(": "))) 48 + case (x :: y :: xs) if x != a.value => 49 + Option(Chat(User(x), a, (y :: xs).mkString(": "))) 50 + case _ => None 51 + } 52 + } yield s 53 + 54 + val users: mutable.LinkedHashSet[Party] = for { 55 + Chat(x, _, _) <- chatMap 56 + } yield x 57 + 58 + Conversation { 59 + chatMap.collect { 60 + case Chat(ag, Blank(), t) => 61 + Chat(ag, (users - ag).headOption.getOrElse(Blank()), t) 62 + case c => c 63 + } 64 + } 65 + } 66 + 67 + private def escapeString: String => String = 68 + _.replaceAll("\t", " ").replaceAll("\\\\", "\\\\\\\\") 69 + 70 + implicit val chatShow: Show[Chat] = { 71 + case Chat(Agent(a), _, text) => 72 + s"[Agent($a)]# ${escapeString(text)}" 73 + case Chat(User(a), _, text) => 74 + s"[User($a)]# ${escapeString(text)}" 75 + case _ => "" 76 + } 77 + 78 + implicit val hashSetShow: Show[mutable.LinkedHashSet[Chat]] = { 79 + case f: mutable.LinkedHashSet[Chat] if f.nonEmpty && f.tail.nonEmpty => 80 + s"${f.head.show} ${f.tail.show}" 81 + case f: mutable.LinkedHashSet[Chat] if f.nonEmpty && f.tail.isEmpty => 82 + s"${f.head.show}" 83 + case _ => "" 84 + } 85 + } 86 + 87 + object Transcript { 88 + import Conversation._ 89 + 90 + implicit def optShow: Show[Option[Transcript]] = { 91 + case Some(a) => a.show 92 + case None => "" 93 + } 94 + 95 + implicit val transcriptShow: Show[Transcript] = (f: Transcript) => 96 + s"\n${f.id.toString}\t${f.origDate.toString}\t${f.origin.value}\t${f.agent.value}\t${f.text.cs.show}" 97 + 98 + def transcript: String => Option[Transcript] = { s => 99 + def firstMatchFirstGroup: Regex => Option[String] = 100 + r => for (m <- r.findFirstMatchIn(s)) yield m.group(1) 101 + 102 + lazy val origDate = firstMatchFirstGroup( 103 + """<p align="center">Chat Started: (.*?)</p>""".r) 104 + lazy val origin = firstMatchFirstGroup( 105 + """<p align="center">Chat Origin: (.*?)</p>""".r) 106 + lazy val agent = firstMatchFirstGroup( 107 + """<p align="center">Agent (.*?)</p>""".r) 108 + lazy val text = firstMatchFirstGroup("""</p>(\( (?:\d[hms]\s)+.*)$""".r) 109 + 110 + for { 111 + dt <- origDate 112 + ori <- origin 113 + a <- agent 114 + tex <- text 115 + } yield 116 + Transcript( 117 + java.util.UUID.randomUUID, 118 + ZonedDateTime.parse(dt, 119 + DateTimeFormatter 120 + .ofPattern("EEEE, MMMM d, y, H:m:ss (X)")), 121 + ChatOrigin(ori), 122 + Agent(a), 123 + conversation(tex)(Agent(a)) 124 + ) 125 + } 126 + } 127 + 128 + }
+87
src/main/scala/domain/FAQMain.scala
··· 1 + package domain 2 + 3 + import cats.Show 4 + import java.nio.file.Files 5 + 6 + import common.Config.AppConfig 7 + import java.util.UUID 8 + import monix.eval.Task 9 + import monix.execution.Scheduler 10 + import monix.reactive.{Consumer, Observable} 11 + import cats.implicits._ 12 + import scala.util.{Failure, Success} 13 + 14 + object FAQMain extends App { 15 + 16 + sealed trait FAQType 17 + final case class FAQ(id: UUID, q: String, a: String) extends FAQType 18 + 19 + implicit val faqShow: Show[FAQType] = { 20 + case FAQ(id, q, a) => s"$id\t$q\t$a" 21 + } 22 + 23 + implicit val faqsShow: Show[List[FAQType]] = { 24 + case Nil => "" 25 + case x :: xs => s"${x.show}\n${xs.show}" 26 + } 27 + 28 + def parse: String => List[FAQType] = { line => 29 + val qaReg = 30 + """(?=question":"(.*?)","answer":"\[(.*?)\]".*?(?=question|$))""".r 31 + 32 + def cleanse: String => String = _.trim.replaceAll("\t", " ") 33 + 34 + val faqs = for { 35 + qa <- qaReg.findAllMatchIn(line).map(m => (m.group(1), m.group(2))) 36 + // (q, a) = qa 37 + } yield 38 + FAQ( 39 + java.util.UUID.randomUUID, 40 + cleanse(qa._1), 41 + cleanse(qa._2) 42 + ) 43 + 44 + faqs.toList 45 + } 46 + 47 + def prog: AppConfig => Unit = { conf => 48 + implicit lazy val ioSched = Scheduler.io("my-io", false) 49 + 50 + val in = Files.newBufferedReader(conf.domain.faq.inputPath.value) 51 + val ob = Observable.fromLinesReader(in) 52 + val defOb = Observable.defer(ob) 53 + 54 + val source: Observable[FAQType] = 55 + defOb 56 + .mapTask { line => 57 + Task { 58 + parse(line) 59 + } 60 + } 61 + .foldLeftF[List[FAQType]](List[FAQType]()) { (acc, ls) => 62 + acc ++ ls 63 + } 64 + .flatMap(Observable.fromIterable) 65 + 66 + val sink: Consumer[FAQType, List[FAQType]] = 67 + Consumer.foldLeftAsync[List[FAQType], FAQType](List[FAQType]()) { 68 + (acc, f) => 69 + Task { 70 + f :: acc 71 + } 72 + } 73 + 74 + source.consumeWith(sink).runOnComplete { 75 + case Success(fs) => { 76 + Files.write(conf.domain.faq.outputPath, fs.show.getBytes("utf-8")) 77 + ioSched.shutdown() 78 + println("Done") 79 + } 80 + case Failure(e) => ioSched.shutdown(); e.printStackTrace 81 + } 82 + () 83 + } 84 + 85 + import common.Config._ 86 + prog(conf) 87 + }