Created
June 4, 2017 12:04
-
-
Save harmeetsingh0013/1c9e89ef7061caba46fe9363bb13a2c9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Full - Stack Trace | |
- should add new user *** FAILED *** | |
[info] akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://application/user/jdbcCreateTables-singletonProxy#-322546342]] after [20000 ms]. Sender[null] sent message of type "com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTaskActor$Execute$". | |
[info] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) | |
[info] at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) | |
[info] at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) | |
[info] at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) | |
[info] at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) | |
[info] at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) | |
[info] at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) | |
[info] at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) | |
[info] at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) | |
[info] at java.lang.Thread.run(Thread.java:748) | |
Event Processor Test Case: | |
``` | |
class UserEventProcessorSpec extends AsyncWordSpec with BeforeAndAfterAll | |
with Matchers with MockitoSugar { | |
var producerStub: ProducerStub[api.UserEvent] = _ | |
private lazy val server = ServiceTest.startServer(ServiceTest.defaultSetup) { ctx => | |
new LagomApplication(ctx) with UserComponents with AhcWSComponents with LagomKafkaComponents { | |
val stubFactory = new ProducerStubFactory(actorSystem, materializer) | |
producerStub = stubFactory.producer[api.UserEvent](UserKafkaTopic.NAME) | |
override def serviceLocator = NoServiceLocator | |
override lazy val readSide: ReadSideTestDriver = new ReadSideTestDriver | |
} | |
} | |
override def afterAll() = server.stop() | |
private val testDriver = server.application.readSide | |
private val userRepo = server.application.userRepository | |
private val offset = new AtomicInteger() | |
"The user event processor" should { | |
"add new user " in { | |
val id = UUID.randomUUID() | |
val user = User( | |
id = Some(id), | |
date = Some(new Timestamp(System.currentTimeMillis())), | |
name = "James", | |
mobile1 = "147852369", | |
company = "James ENT", | |
vatNo = "5558746", | |
pinNo = "142001" | |
) | |
for { | |
_ <- feed(user.id.get, UserAdded(user)) | |
repoUser <- userRepo.findUserById(id) | |
} yield { | |
repoUser should === (user) | |
} | |
} | |
} | |
private def feed(itemId: UUID, event: UserEvent) = { | |
testDriver.feed(itemId.toString, event, Sequence(offset.getAndIncrement)) | |
} | |
} | |
``` | |
Event Processor Code: | |
``` | |
class UserEventProcessor(readSide: JdbcReadSide, userRepository: UserRepository) | |
(implicit ec: ExecutionContext) | |
extends ReadSideProcessor[UserEvent] { | |
override def buildHandler(): ReadSideHandler[UserEvent] = { | |
readSide.builder[UserEvent]("read_side_offsets") | |
.setGlobalPrepare(createUserTable) | |
.setEventHandler[UserAdded](insertNewUser) | |
.setEventHandler[UserUpdated](updateUserRecord) | |
.setEventHandler[UserDeleted](deactivateUser) | |
.build() | |
} | |
override def aggregateTags: Set[AggregateEventTag[UserEvent]] = Set(UserEvent.INSTANCE) | |
private def createUserTable(connection: Connection): Unit = { | |
val userTable = | |
""" | |
|CREATE TABLE IF NOT EXISTS users ( | |
| id uuid NOT NULL, | |
| date timestamp DEFAULT CURRENT_TIMESTAMP, | |
| name varchar(100) NOT NULL, | |
| mobile_1 varchar(15) NOT NULL, | |
| mobile_2 varchar(45) DEFAULT NULL, | |
| email varchar(70) DEFAULT NULL, | |
| company varchar(150) NOT NULL, | |
| vat_no varchar(45) NOT NULL, | |
| status varchar(45) NOT NULL DEFAULT 'active', | |
| address text DEFAULT NULL, | |
| country varchar(45) DEFAULT NULL, | |
| state varchar(45) DEFAULT NULL, | |
| city varchar(45) DEFAULT NULL, | |
| pin_no varchar(8) NOT NULL, | |
| PRIMARY KEY (id) | |
| ) | |
""".stripMargin | |
val mobile1Index = "CREATE UNIQUE INDEX IF NOT EXISTS mobile_1 on users (mobile_1)" | |
val vatNoIndex = "CREATE UNIQUE INDEX IF NOT EXISTS vat_no on users (vat_no);" | |
val pinNoIndex = "CREATE UNIQUE INDEX IF NOT EXISTS pinNoIndex on users (pin_no);" | |
executePrepareStatements(connection, | |
Vector(userTable, mobile1Index, vatNoIndex, pinNoIndex)) | |
} | |
private def executePrepareStatements(connection: Connection, statements: Vector[String]): Unit = { | |
statements.foreach(stm => tryWith(connection.prepareStatement(stm))(ps => ps.execute())) | |
} | |
private def insertNewUser(connection: Connection, started: EventStreamElement[UserAdded]) = { | |
userRepository.addNewUser(started.event.user) | |
} | |
private def updateUserRecord(connection: Connection, started: EventStreamElement[UserUpdated]) = { | |
userRepository.updateUserDetail(started.event.user) | |
} | |
private def deactivateUser(connection: Connection, started: EventStreamElement[UserDeleted]) = { | |
userRepository.deactivateUser(started.event.id) | |
} | |
} | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment