this repo has no description
1package associatekbc
2
3import java.nio.file.Files
4
5import cats.Show
6import cats.implicits._
7import monix.execution.Scheduler.Implicits.global
8import monix.reactive.{Consumer, Observable}
9
10import scala.io.Source
11import scala.util.{Failure, Success}
12
13object App extends App {
14 import common.Config._
15
16 implicit val tsvShow: Show[List[(String, String)]] = {
17 case x :: xs => s"${x._1.show}\t${x._2.show}\n${xs.show}"
18 case Nil => ""
19 }
20
21 implicit val tsvShow1: Show[List[String]] = {
22 case x :: xs => s"${x.show}\n${xs.show}"
23 case Nil => ""
24 }
25
26 val file = conf.associatekbc.inputPath
27
28 val source = Observable.defer {
29 Observable
30 .fromIterator(Source.fromFile(file.value.toFile).getLines())
31 .drop(1)
32 }
33
34 val consumer =
35 Consumer.foldLeft[String, String](List[(String, String)]().show) {
36 (xs, s) =>
37 val ss = s.split(",").toList
38 xs +
39 List((ss.reverse.head.trim, ss.take(2).mkString("$$$").trim)).show
40 }
41
42 val lbConsumer = Consumer.loadBalance(10, consumer)
43
44 source
45 .consumeWith(lbConsumer)
46 .runOnComplete {
47 case Success(v) =>
48 Files.write(conf.associatekbc.outputPath, v.show.getBytes)
49 ()
50 case Failure(e) => System.err.println(s"ERROR::\n${e.getMessage}")
51 }
52}