this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 52 lines 1.3 kB view raw
1package associatekbc 2 3import java.nio.file.Files 4 5import cats.Show 6import cats.implicits._ 7import monix.execution.Scheduler.Implicits.global 8import monix.reactive.{Consumer, Observable} 9 10import scala.io.Source 11import scala.util.{Failure, Success} 12 13object App extends App { 14 import common.Config._ 15 16 implicit val tsvShow: Show[List[(String, String)]] = { 17 case x :: xs => s"${x._1.show}\t${x._2.show}\n${xs.show}" 18 case Nil => "" 19 } 20 21 implicit val tsvShow1: Show[List[String]] = { 22 case x :: xs => s"${x.show}\n${xs.show}" 23 case Nil => "" 24 } 25 26 val file = conf.associatekbc.inputPath 27 28 val source = Observable.defer { 29 Observable 30 .fromIterator(Source.fromFile(file.value.toFile).getLines()) 31 .drop(1) 32 } 33 34 val consumer = 35 Consumer.foldLeft[String, String](List[(String, String)]().show) { 36 (xs, s) => 37 val ss = s.split(",").toList 38 xs + 39 List((ss.reverse.head.trim, ss.take(2).mkString("$$$").trim)).show 40 } 41 42 val lbConsumer = Consumer.loadBalance(10, consumer) 43 44 source 45 .consumeWith(lbConsumer) 46 .runOnComplete { 47 case Success(v) => 48 Files.write(conf.associatekbc.outputPath, v.show.getBytes) 49 () 50 case Failure(e) => System.err.println(s"ERROR::\n${e.getMessage}") 51 } 52}