Created
August 13, 2012 13:40
-
-
Save fcroiseaux/3340902 to your computer and use it in GitHub Desktop.
Play 2.0 Reactive File Upload using CSVReader and Iteratee. Parse a CSV file using Iteratee and send each line to a local elasticsearch instance
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers | |
import play.api.mvc._ | |
import play.api.libs.iteratee.{Iteratee, Input} | |
import play.api.libs.concurrent.Promise | |
import play.api.libs.iteratee.Input.{El, EOF, Empty} | |
import au.com.bytecode.opencsv.CSVReader | |
import java.io.StringReader | |
import scala.collection.JavaConversions._ | |
import play.api.libs.json.Json._ | |
import play.api.libs.ws.WS | |
object ReactiveFileUpload extends Controller { | |
def upload = Action(BodyParser(rh => new CsvIteratee(isFirst = true))) { | |
request => | |
Ok("File Processed") | |
} | |
} | |
case class CsvIteratee(state: Symbol = 'Cont, input: Input[Array[Byte]] = Empty, lastChunk: String = "", isFirst: Boolean = false) extends Iteratee[Array[Byte], Either[Result, String]] { | |
def fold[B]( | |
done: (Either[Result, String], Input[Array[Byte]]) => Promise[B], | |
cont: (Input[Array[Byte]] => Iteratee[Array[Byte], Either[Result, String]]) => Promise[B], | |
error: (String, Input[Array[Byte]]) => Promise[B] | |
): Promise[B] = state match { | |
case 'Done => | |
done(Right(lastChunk), Input.Empty) | |
case 'Cont => cont(in => in match { | |
case in: El[Array[Byte]] => { | |
// Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk | |
val content = lastChunk + new String(in.e) | |
val csvBody = | |
if (isFirst) | |
// Skip http header if it is the first chunk | |
content.drop(content.indexOf("\r\n\r\n") + 4) | |
else content | |
val csv = new CSVReader(new StringReader(csvBody), ';') | |
val lines = csv.readAll | |
// Process all lines excepted the last one since it is cut by the chunk | |
for (line <- lines.init) | |
processLine(line) | |
// Put forward the part that has not been processed | |
val last = lines.last.toList.mkString(";") | |
copy(input = in, lastChunk = last, isFirst = false) | |
} | |
case Empty => copy(input = in, isFirst = false) | |
case EOF => copy(state = 'Done, input = in, isFirst = false) | |
case _ => copy(state = 'Error, input = in, isFirst = false) | |
}) | |
case _ => | |
error("Unexpected state", input) | |
} | |
def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post( | |
toJson( | |
Map( | |
"date" -> toJson(line(0)), | |
"trig" -> toJson(line(1)), | |
"code" -> toJson(line(2)), | |
"nbjours" -> toJson(line(3).toDouble) | |
) | |
) | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
above code does not compile ,as extending Iteratee needs to override fold[B](folder: %28Step[Array[Byte], Either[Result, String]]%29 => Future[B]): Future[B].can you please help