Using Kotlin Coroutines with Spring

  • Language: Kotlin
  • Spring Boot version : 2.2.0.BUILD-SNAPSHOT
  • Dependencies: Web Reactive
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>${kotlinx-coroutines.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
<version>${kotlinx-coroutines.version}</version>
</dependency>
<kotlinx-coroutines.version>1.2.0</kotlinx-coroutines.version>
<kotlin.version>1.3.30</kotlin.version>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-r2dbc</artifactId>
<version>${spring-data-r2dbc.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
</dependency>
<spring-data-r2dbc.version>1.0.0.BUILD-SNAPSHOT</spring-data-r2dbc.version>
@Configuration
@EnableR2dbcRepositories
class DatabaseConfig : AbstractR2dbcConfiguration() {
override fun connectionFactory(): ConnectionFactory {
return PostgresqlConnectionFactory(
PostgresqlConnectionConfiguration.builder()
.host("localhost")
.database("test")
.username("user")
.password("password")
.build()
)
}
}
@Table("posts")
data class Post(@Id val id: Long? = null,
@Column("title") val title: String? = null,
@Column("content") val content: String? = null
)
@Component
class PostRepository(private val client: DatabaseClient) {
suspend fun count(): Long =
client.execute().sql("SELECT COUNT(*) FROM posts")
.asType<Long>().fetch().awaitOne()
fun findAll(): Flow<Post> =
client.select().from("posts").asType<Post>().fetch().flow()
suspend fun findOne(id: Long): Post? =
client.execute()
.sql("SELECT * FROM posts WHERE id = \$1")
.bind(0, id).asType<Post>()
.fetch()
.awaitOneOrNull()
suspend fun deleteAll() =
client.execute()
.sql("DELETE FROM posts")
.fetch()
.rowsUpdated()
.awaitSingle()
suspend fun save(post: Post) =
client.insert()
.into<Post>()
.table("posts")
.using(post)
.await()
suspend fun init() {
save(Post(title = "My first post title", content = "Content of my first post"))
save(Post(title = "My second post title", content = "Content of my second post"))
}
}
@RestController
@RequestMapping("/posts")
class PostController(
private val postRepository: PostRepository
) {
@GetMapping("")
fun findAll(): Flow<Post> =
postRepository.findAll()
@GetMapping("{id}")
suspend fun findOne(@PathVariable id: Long): Post? =
postRepository.findOne(id) ?: throw PostNotFoundException(id)
@PostMapping("")
suspend fun save(@RequestBody post: Post) =
postRepository.save(post)
@GetMapping("{id}/comments")
fun findCommentsByPostId(@PathVariable id: Long): Flow<Comment> =
commentRepository.findByPostId(id)
}
runBlocking {
val deleted = postRepository.deleteAll()
println(" $deleted posts was cleared.")
postRepository.init()
}
docker-compose up
@Configuration
class RouterConfiguration {
@Bean
fun routes(postHandler: PostHandler) = coRouter {
"/posts".nest {
GET("", postHandler::all)
GET("/{id}", postHandler::get)
POST("", postHandler::create)
PUT("/{id}", postHandler::update)
DELETE("/{id}", postHandler::delete)
}
}
}
@Component
class PostHandler(private val posts: PostRepository) {
suspend fun all(req: ServerRequest): ServerResponse {
return ok().bodyAndAwait(this.posts.findAll())
}
suspend fun create(req: ServerRequest): ServerResponse {
val body = req.awaitBody<Post>()
val createdPost = this.posts.save(body)
return created(URI.create("/posts/$createdPost")).buildAndAwait()
}
suspend fun get(req: ServerRequest): ServerResponse {
println("path variable::${req.pathVariable("id")}")
val foundPost = this.posts.findOne(req.pathVariable("id").toLong())
println("found post:$foundPost")
return when {
foundPost != null -> ok().bodyAndAwait(foundPost)
else -> notFound().buildAndAwait()
}
}
suspend fun update(req: ServerRequest): ServerResponse {
val foundPost = this.posts.findOne(req.pathVariable("id").toLong())
val body = req.awaitBody<Post>()
return when {
foundPost != null -> {
this.posts.update(foundPost.copy(title = body.title, content = body.content))
noContent().buildAndAwait()
}
else -> notFound().buildAndAwait()
}
}
suspend fun delete(req: ServerRequest): ServerResponse {
val deletedCount = this.posts.deleteById(req.pathVariable("id").toLong())
println("$deletedCount posts was deleted")
return notFound().buildAndAwait()
}
}
@RestController
@RequestMapping("/posts")
class PostController(private val client: WebClient) {
@GetMapping("")
suspend fun findAll() =
client.get()
.uri("/posts")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Any>()
/*
@GetMapping("")
suspend fun findAll(): Flow<Post> =
client.get()
.uri("/posts")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody()
*/
@GetMapping("/{id}")
suspend fun findOne(@PathVariable id: Long): PostDetails = withDetails(id)
private suspend fun withDetails(id: Long): PostDetails {
val post =
client.get().uri("/posts/$id")
.accept(APPLICATION_JSON)
.awaitExchange().awaitBody<Post>()
val count =
client.get().uri("/posts/$id/comments/count")
.accept(APPLICATION_JSON)
.awaitExchange().awaitBody<Long>()
return PostDetails(post, count)
}
}
private suspend fun withDetails(id: Long): PostDetails = coroutineScope {
val post = async {
client.get().uri("/posts/$id")
.accept(APPLICATION_JSON)
.awaitExchange().awaitBody<Post>()
}
val count = async {
client.get().uri("/posts/$id/comments/count")
.accept(APPLICATION_JSON)
.awaitExchange().awaitBody<Long>()
}
PostDetails(post.await(), count.await())
}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Hantsy

Hantsy

Self-employed technical consultant, solution architect and full-stack developer, open source contributor, freelancer and remote worker