diff --git a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/controller/APIKeyController.kt b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/controller/APIKeyController.kt index 7a616d2d0..e85c0495a 100644 --- a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/controller/APIKeyController.kt +++ b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/controller/APIKeyController.kt @@ -15,13 +15,13 @@ import java.security.Principal class APIKeyController(private val apiKeyService: APIKeyServiceImpl) { @GetMapping - suspend fun getKeys(principal: Principal): List { + fun getKeys(principal: Principal): List { return apiKeyService.getKeysByUserId(principal.name) .map { APIKeyResponse(it.label, it.expirationTime, it.allowedIPs, it.key, it.isEnabled) } } @PostMapping - suspend fun create( + fun create( @RequestBody request: CreateAPIKeyRequest, @CurrentSecurityContext securityContext: SecurityContext ): Any { @@ -40,17 +40,17 @@ class APIKeyController(private val apiKeyService: APIKeyServiceImpl) { } @PutMapping("/{key}/enable") - suspend fun enableKey(principal: Principal, @PathVariable key: String) { + fun enableKey(principal: Principal, @PathVariable key: String) { apiKeyService.changeKeyState(principal.name, key, true) } @PutMapping("/{key}/disable") - suspend fun disableKey(principal: Principal, @PathVariable key: String) { + fun disableKey(principal: Principal, @PathVariable key: String) { apiKeyService.changeKeyState(principal.name, key, false) } @DeleteMapping("/{key}") - suspend fun deleteKey(principal: Principal, @PathVariable key: String) { + fun deleteKey(principal: Principal, @PathVariable key: String) { apiKeyService.deleteKey(principal.name, key) } diff --git a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/interceptor/APIKeyFilterImpl.kt b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/interceptor/APIKeyFilterImpl.kt index f015a0ffd..ad16a7572 100644 --- a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/interceptor/APIKeyFilterImpl.kt +++ b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/interceptor/APIKeyFilterImpl.kt @@ -2,33 +2,54 @@ package co.nilin.opex.api.app.interceptor import co.nilin.opex.api.app.service.APIKeyServiceImpl import co.nilin.opex.api.core.spi.APIKeyFilter -import kotlinx.coroutines.runBlocking +import jakarta.servlet.FilterChain +import jakarta.servlet.http.HttpServletRequest +import jakarta.servlet.http.HttpServletRequestWrapper +import jakarta.servlet.http.HttpServletResponse import org.springframework.stereotype.Component -import org.springframework.web.server.ServerWebExchange -import org.springframework.web.server.WebFilter -import org.springframework.web.server.WebFilterChain -import reactor.core.publisher.Mono +import org.springframework.web.filter.OncePerRequestFilter +import java.util.* @Component -class APIKeyFilterImpl(private val apiKeyService: APIKeyServiceImpl) : APIKeyFilter, WebFilter { +class APIKeyFilterImpl(private val apiKeyService: APIKeyServiceImpl) : APIKeyFilter, OncePerRequestFilter() { - override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono { - val request = exchange.request - val key = request.headers["X-API-KEY"] + override fun doFilterInternal( + request: HttpServletRequest, + response: HttpServletResponse, + chain: FilterChain + ) { + val key = request.getHeader("X-API-KEY") if (!key.isNullOrEmpty()) { - val secret = request.headers["X-API-SECRET"] - if (secret.isNullOrEmpty()) - return chain.filter(exchange) + val secret = request.getHeader("X-API-KEY") + if (secret.isNullOrEmpty()) { + chain.doFilter(request, response) + return + } - val apiKey = runBlocking { apiKeyService.getAPIKey(key[0], secret[0]) } + val apiKey = apiKeyService.getAPIKey(key, secret) if (apiKey != null && apiKey.isEnabled && apiKey.accessToken != null && !apiKey.isExpired) { - val req = exchange.request.mutate() - .header("Authorization", "Bearer ${apiKey.accessToken}") - .build() - return chain.filter(exchange.mutate().request(req).build()) + val wrappedReq = RequestWrapper(request) + wrappedReq.addHeader("Authorization", "Bearer ${apiKey.accessToken}") + chain.doFilter(wrappedReq, response) + return } } - return chain.filter(exchange) + chain.doFilter(request, response) } +} + +class RequestWrapper(request: HttpServletRequest) : HttpServletRequestWrapper(request) { + + private val customHeaders = hashMapOf() + + fun addHeader(key: String, value: String) { + customHeaders[key] = value + } + + override fun getHeaderNames(): Enumeration { + val names = HashSet(Collections.list(super.getHeaderNames())) + names.addAll(customHeaders.keys) + return Collections.enumeration(names) + } } \ No newline at end of file diff --git a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/proxy/AuthProxy.kt b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/proxy/AuthProxy.kt index 6dcb89d25..8d85aa426 100644 --- a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/proxy/AuthProxy.kt +++ b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/proxy/AuthProxy.kt @@ -1,25 +1,28 @@ package co.nilin.opex.api.app.proxy import co.nilin.opex.api.app.data.AccessTokenResponse -import kotlinx.coroutines.reactor.awaitSingle +import co.nilin.opex.common.OpexError import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value +import org.springframework.http.HttpEntity +import org.springframework.http.HttpHeaders +import org.springframework.http.HttpMethod import org.springframework.http.MediaType import org.springframework.stereotype.Component +import org.springframework.web.client.RestTemplate +import org.springframework.web.client.exchange import org.springframework.web.reactive.function.BodyInserters -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.bodyToMono @Component class AuthProxy( @Value("\${app.auth.token-url}") - private val tokenUrl: String + private val tokenUrl: String, + private val restTemplate: RestTemplate ) { private val logger = LoggerFactory.getLogger(AuthProxy::class.java) - private val client = WebClient.create() - suspend fun exchangeToken(clientSecret: String, token: String): AccessTokenResponse { + fun exchangeToken(clientSecret: String, token: String): AccessTokenResponse { val body = BodyInserters.fromFormData("client_id", "opex-api-key") .with("client_secret", clientSecret) .with("subject_token", token) @@ -27,32 +30,28 @@ class AuthProxy( .with("scope", "offline_access") logger.info("Request token exchange for user") - return client.post() - .uri(tokenUrl) - .accept(MediaType.APPLICATION_JSON) - .header("Content-Type", "application/x-www-form-urlencoded") - .body(body) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingle() + + val headers = HttpHeaders().apply { + contentType = MediaType.APPLICATION_FORM_URLENCODED + accept = listOf(MediaType.APPLICATION_JSON) + } + val response = restTemplate.exchange(tokenUrl, HttpMethod.POST, HttpEntity(body, headers)) + return response.body ?: throw OpexError.InternalServerError.exception() } - suspend fun refreshToken(clientSecret: String, refreshToken: String): AccessTokenResponse { + fun refreshToken(clientSecret: String, refreshToken: String): AccessTokenResponse { val body = BodyInserters.fromFormData("client_id", "opex-api-key") .with("client_secret", clientSecret) .with("refresh_token", refreshToken) .with("grant_type", "refresh_token") logger.info("Refreshing token") - return client.post() - .uri(tokenUrl) - .accept(MediaType.APPLICATION_JSON) - .header("Content-Type", "application/x-www-form-urlencoded") - .body(body) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingle() + + val headers = HttpHeaders().apply { + contentType = MediaType.APPLICATION_FORM_URLENCODED + accept = listOf(MediaType.APPLICATION_JSON) + } + val response = restTemplate.exchange(tokenUrl, HttpMethod.POST, HttpEntity(body, headers)) + return response.body ?: throw OpexError.InternalServerError.exception() } } \ No newline at end of file diff --git a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/service/APIKeyServiceImpl.kt b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/service/APIKeyServiceImpl.kt index a95ae30f5..6e0441f9d 100644 --- a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/service/APIKeyServiceImpl.kt +++ b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/service/APIKeyServiceImpl.kt @@ -6,12 +6,6 @@ import co.nilin.opex.api.core.spi.APIKeyService import co.nilin.opex.api.ports.postgres.dao.APIKeyRepository import co.nilin.opex.api.ports.postgres.model.APIKeyModel import co.nilin.opex.common.OpexError -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.reactive.awaitFirstOrElse -import kotlinx.coroutines.reactive.awaitFirstOrNull -import kotlinx.coroutines.reactor.awaitSingle -import kotlinx.coroutines.reactor.awaitSingleOrNull import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.cache.Cache @@ -37,14 +31,14 @@ class APIKeyServiceImpl( private val logger = LoggerFactory.getLogger(APIKeyServiceImpl::class.java) - override suspend fun createAPIKey( + override fun createAPIKey( userId: String, label: String, expirationTime: LocalDateTime?, allowedIPs: String?, currentToken: String ): Pair { - if (apiKeyRepository.countByUserId(userId).awaitFirstOrElse { 0 } >= 10) + if ((apiKeyRepository.countByUserId(userId) ?: 0) >= 10) throw OpexError.APIKeyLimitReached.exception() val secret = generateSecret() @@ -60,7 +54,7 @@ class APIKeyServiceImpl( allowedIPs, tokenExpiration(tokenResponse.expires_in) ) - ).awaitSingle() + ) return Pair( secret, @@ -70,13 +64,12 @@ class APIKeyServiceImpl( ) } - override suspend fun getAPIKey(key: String, secret: String): APIKey? = coroutineScope { - val apiKey = getFromCache(key) - ?: apiKeyRepository.findByKey(key).awaitSingleOrNull()?.apply { putCache(this) } + override fun getAPIKey(key: String, secret: String): APIKey? { + val apiKey = getFromCache(key) ?: apiKeyRepository.findByKey(key)?.apply { putCache(this) } - with(apiKey) { + return with(apiKey) { if (this != null) { - launch { checkupAPIKey(this@with, secret) } + checkupAPIKey(this@with, secret) APIKey( userId, label, @@ -92,8 +85,8 @@ class APIKeyServiceImpl( } } - override suspend fun getKeysByUserId(userId: String): List { - return apiKeyRepository.findAllByUserId(userId).collectList().awaitFirstOrElse { emptyList() } + override fun getKeysByUserId(userId: String): List { + return apiKeyRepository.findAllByUserId(userId) .map { APIKey( it.userId, @@ -108,22 +101,22 @@ class APIKeyServiceImpl( } } - override suspend fun changeKeyState(userId: String, key: String, isEnabled: Boolean) { - val apiKey = apiKeyRepository.findByKey(key).awaitSingleOrNull() ?: throw OpexError.NotFound.exception() + override fun changeKeyState(userId: String, key: String, isEnabled: Boolean) { + val apiKey = apiKeyRepository.findByKey(key) ?: throw OpexError.NotFound.exception() if (apiKey.userId != userId) throw OpexError.Forbidden.exception() apiKey.isEnabled = isEnabled - apiKeyRepository.save(apiKey).awaitSingle() + apiKeyRepository.save(apiKey) } - override suspend fun deleteKey(userId: String, key: String) { - val apiKey = apiKeyRepository.findByKey(key).awaitSingleOrNull() ?: throw OpexError.NotFound.exception() + override fun deleteKey(userId: String, key: String) { + val apiKey = apiKeyRepository.findByKey(key) ?: throw OpexError.NotFound.exception() if (apiKey.userId != userId) throw OpexError.Forbidden.exception() - apiKeyRepository.delete(apiKey).awaitFirstOrNull() + apiKeyRepository.delete(apiKey) } - private suspend fun checkupAPIKey(apiKey: APIKeyModel, secret: String) { + private fun checkupAPIKey(apiKey: APIKeyModel, secret: String) { if (apiKey.isExpired || !apiKey.isEnabled) return @@ -132,7 +125,7 @@ class APIKeyServiceImpl( if (apiKey.expirationTime?.isBefore(now) == true) { logger.info("Expiring api key ${apiKey.key}") apiKey.isExpired = true - apiKeyRepository.save(apiKey).awaitSingle().apply { updateCache(this) } + apiKeyRepository.save(apiKey).apply { updateCache(this) } logger.info("API key ${apiKey.key} is expired") return } @@ -144,7 +137,7 @@ class APIKeyServiceImpl( accessToken = encryptAES(response.access_token, secret) tokenExpirationTime = tokenExpiration(response.expires_in) } - apiKeyRepository.save(apiKey).awaitSingle().apply { updateCache(this) } + apiKeyRepository.save(apiKey).apply { updateCache(this) } logger.info("API key ${apiKey.key} token refreshed") } } catch (e: Exception) { diff --git a/api/api-app/src/main/resources/application.yml b/api/api-app/src/main/resources/application.yml index 64c273602..d193c173e 100644 --- a/api/api-app/src/main/resources/application.yml +++ b/api/api-app/src/main/resources/application.yml @@ -6,11 +6,14 @@ spring: main: allow-bean-definition-overriding: true allow-circular-references: true - r2dbc: - url: r2dbc:postgresql://${DB_IP_PORT:localhost}/opex + datasource: + url: jdbc:postgresql://${DB_IP_PORT:localhost}/opex username: ${dbusername:opex} password: ${dbpassword:hiopex} - initialization-mode: always + driver-class-name: org.postgresql.Driver + sql: + init: + mode: always cloud: bootstrap: enabled: true diff --git a/api/api-core/pom.xml b/api/api-core/pom.xml index 3bd667b98..f6ac2e013 100644 --- a/api/api-core/pom.xml +++ b/api/api-core/pom.xml @@ -27,14 +27,6 @@ io.projectreactor.kotlin reactor-kotlin-extensions - - org.jetbrains.kotlinx - kotlinx-coroutines-reactor - - - org.jetbrains.kotlinx - kotlinx-coroutines-core - org.springframework spring-tx diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/APIKeyService.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/APIKeyService.kt index f5329c6fa..8db96913f 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/APIKeyService.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/APIKeyService.kt @@ -5,7 +5,7 @@ import java.time.LocalDateTime interface APIKeyService { - suspend fun createAPIKey( + fun createAPIKey( userId: String, label: String, expirationTime: LocalDateTime?, @@ -13,12 +13,12 @@ interface APIKeyService { currentToken: String ): Pair - suspend fun getAPIKey(key: String, secret: String): APIKey? + fun getAPIKey(key: String, secret: String): APIKey? - suspend fun getKeysByUserId(userId: String): List + fun getKeysByUserId(userId: String): List - suspend fun changeKeyState(userId: String, key: String, isEnabled: Boolean) + fun changeKeyState(userId: String, key: String, isEnabled: Boolean) - suspend fun deleteKey(userId: String, key: String) + fun deleteKey(userId: String, key: String) } \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt index 6885d6c23..14124a738 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt @@ -5,10 +5,10 @@ import co.nilin.opex.api.core.inout.PairConfigResponse interface AccountantProxy { - suspend fun getPairConfigs(): List + fun getPairConfigs(): List - suspend fun getFeeConfigs(): List + fun getFeeConfigs(): List - suspend fun getFeeConfig(symbol: String): PairFeeResponse + fun getFeeConfig(symbol: String): PairFeeResponse } \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/BlockchainGatewayProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/BlockchainGatewayProxy.kt index abc249a26..e92e6fecb 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/BlockchainGatewayProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/BlockchainGatewayProxy.kt @@ -6,11 +6,7 @@ import co.nilin.opex.api.core.inout.DepositDetails interface BlockchainGatewayProxy { - suspend fun assignAddress(uuid: String, currency: String, chain: String): AssignResponse? - - suspend fun getDepositDetails(refs: List): List - -// suspend fun getCurrencyImplementations(currency: String? = null): List - + fun assignAddress(uuid: String, currency: String, chain: String): AssignResponse? + fun getDepositDetails(refs: List): List } \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/GlobalMarketProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/GlobalMarketProxy.kt index bbb1ead57..72ea390ff 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/GlobalMarketProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/GlobalMarketProxy.kt @@ -4,6 +4,6 @@ import co.nilin.opex.api.core.inout.GlobalPrice interface GlobalMarketProxy { - suspend fun getPrices(symbols: List): List + fun getPrices(symbols: List): List } \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketDataProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketDataProxy.kt index f58daca30..5aaea7a3b 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketDataProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketDataProxy.kt @@ -5,23 +5,23 @@ import co.nilin.opex.common.utils.Interval interface MarketDataProxy { - suspend fun getTradeTickerData(interval: Interval): List + fun getTradeTickerData(interval: Interval): List - suspend fun getTradeTickerDataBySymbol(symbol: String, interval: Interval): PriceChange + fun getTradeTickerDataBySymbol(symbol: String, interval: Interval): PriceChange - suspend fun openBidOrders(symbol: String, limit: Int): List + fun openBidOrders(symbol: String, limit: Int): List - suspend fun openAskOrders(symbol: String, limit: Int): List + fun openAskOrders(symbol: String, limit: Int): List - suspend fun lastOrder(symbol: String): Order? + fun lastOrder(symbol: String): Order? - suspend fun recentTrades(symbol: String, limit: Int): List + fun recentTrades(symbol: String, limit: Int): List - suspend fun lastPrice(symbol: String?): List + fun lastPrice(symbol: String?): List - suspend fun getBestPriceForSymbols(symbols: List): List + fun getBestPriceForSymbols(symbols: List): List - suspend fun getCandleInfo( + fun getCandleInfo( symbol: String, interval: String, startTime: Long?, @@ -29,14 +29,14 @@ interface MarketDataProxy { limit: Int ): List - suspend fun getMarketCurrencyRates(quote: String, base: String? = null): List + fun getMarketCurrencyRates(quote: String, base: String? = null): List - suspend fun getExternalCurrencyRates(quote: String, base: String? = null): List + fun getExternalCurrencyRates(quote: String, base: String? = null): List - suspend fun countActiveUsers(interval: Interval): Long + fun countActiveUsers(interval: Interval): Long - suspend fun countTotalOrders(interval: Interval): Long + fun countTotalOrders(interval: Interval): Long - suspend fun countTotalTrades(interval: Interval): Long + fun countTotalTrades(interval: Interval): Long } \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketStatProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketStatProxy.kt index 120b27f63..8790b2a07 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketStatProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketStatProxy.kt @@ -6,12 +6,12 @@ import co.nilin.opex.common.utils.Interval interface MarketStatProxy { - suspend fun getMostIncreasedInPricePairs(interval: Interval, limit: Int): List + fun getMostIncreasedInPricePairs(interval: Interval, limit: Int): List - suspend fun getMostDecreasedInPricePairs(interval: Interval, limit: Int): List + fun getMostDecreasedInPricePairs(interval: Interval, limit: Int): List - suspend fun getHighestVolumePair(interval: Interval): TradeVolumeStat? + fun getHighestVolumePair(interval: Interval): TradeVolumeStat? - suspend fun getTradeCountPair(interval: Interval): TradeVolumeStat? + fun getTradeCountPair(interval: Interval): TradeVolumeStat? } \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketUserDataProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketUserDataProxy.kt index 828a0c210..aee0132b2 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketUserDataProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MarketUserDataProxy.kt @@ -10,11 +10,11 @@ import java.util.* interface MarketUserDataProxy { - suspend fun queryOrder(principal: Principal, symbol: String, orderId: Long?, origClientOrderId: String?): Order? + fun queryOrder(principal: Principal, symbol: String, orderId: Long?, origClientOrderId: String?): Order? - suspend fun openOrders(principal: Principal, symbol: String?, limit: Int?): List + fun openOrders(principal: Principal, symbol: String?, limit: Int?): List - suspend fun allOrders( + fun allOrders( principal: Principal, symbol: String?, startTime: Date?, @@ -22,7 +22,7 @@ interface MarketUserDataProxy { limit: Int? ): List - suspend fun allTrades( + fun allTrades( principal: Principal, symbol: String?, fromTrade: Long?, @@ -31,7 +31,7 @@ interface MarketUserDataProxy { limit: Int? ): List - suspend fun getOrderHistory( + fun getOrderHistory( uuid : String, symbol: String?, startTime: Long?, @@ -42,7 +42,7 @@ interface MarketUserDataProxy { offset: Int?, ): List - suspend fun getOrderHistoryCount( + fun getOrderHistoryCount( uuid : String, symbol: String?, startTime: Long?, @@ -51,7 +51,7 @@ interface MarketUserDataProxy { direction: OrderDirection?, ): Long - suspend fun getTradeHistory( + fun getTradeHistory( uuid : String, symbol: String?, startTime: Long?, @@ -61,7 +61,7 @@ interface MarketUserDataProxy { offset: Int?, ): List - suspend fun getTradeHistoryCount( + fun getTradeHistoryCount( uuid : String, symbol: String?, startTime: Long?, diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MatchingGatewayProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MatchingGatewayProxy.kt index c615e3d3b..bbafda7b4 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MatchingGatewayProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/MatchingGatewayProxy.kt @@ -10,7 +10,7 @@ import java.math.BigDecimal interface MatchingGatewayProxy { - suspend fun createNewOrder( + fun createNewOrder( uuid: String?, pair: String, price: BigDecimal, @@ -22,7 +22,7 @@ interface MatchingGatewayProxy { token: String? ): OrderSubmitResult? - suspend fun cancelOrder( + fun cancelOrder( ouid: String, uuid: String, orderId: Long, @@ -30,5 +30,5 @@ interface MatchingGatewayProxy { token: String? ): OrderSubmitResult? - suspend fun getPairSettings(): List + fun getPairSettings(): List } \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/SymbolMapper.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/SymbolMapper.kt index ce66de33f..bd5473e9f 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/SymbolMapper.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/SymbolMapper.kt @@ -2,9 +2,9 @@ package co.nilin.opex.api.core.spi interface SymbolMapper { - suspend fun fromInternalSymbol(symbol: String?): String? + fun fromInternalSymbol(symbol: String?): String? - suspend fun toInternalSymbol(alias: String?): String? + fun toInternalSymbol(alias: String?): String? - suspend fun symbolToAliasMap(): Map + fun symbolToAliasMap(): Map } diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt index 7f83cb162..62f687285 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt @@ -4,13 +4,13 @@ import co.nilin.opex.api.core.inout.* interface WalletProxy { - suspend fun getWallets(uuid: String?, token: String?): List + fun getWallets(uuid: String?, token: String?): List - suspend fun getWallet(uuid: String?, token: String?, symbol: String): Wallet + fun getWallet(uuid: String?, token: String?, symbol: String): Wallet - suspend fun getOwnerLimits(uuid: String?, token: String?): OwnerLimitsResponse + fun getOwnerLimits(uuid: String?, token: String?): OwnerLimitsResponse - suspend fun getDepositTransactions( + fun getDepositTransactions( uuid: String, token: String, currency: String?, @@ -21,7 +21,7 @@ interface WalletProxy { ascendingByTime: Boolean?, ): List - suspend fun getDepositTransactionsCount( + fun getDepositTransactionsCount( uuid: String, token: String, currency: String?, @@ -29,7 +29,7 @@ interface WalletProxy { endTime: Long?, ): Long - suspend fun getWithdrawTransactions( + fun getWithdrawTransactions( uuid: String, token: String, currency: String?, @@ -40,7 +40,7 @@ interface WalletProxy { ascendingByTime: Boolean?, ): List - suspend fun getWithdrawTransactionsCount( + fun getWithdrawTransactionsCount( uuid: String, token: String, currency: String?, @@ -48,7 +48,7 @@ interface WalletProxy { endTime: Long?, ): Long - suspend fun getTransactions( + fun getTransactions( uuid: String, token: String, currency: String?, @@ -60,7 +60,7 @@ interface WalletProxy { ascendingByTime: Boolean?, ): List - suspend fun getTransactionsCount( + fun getTransactionsCount( uuid: String, token: String, currency: String?, @@ -69,14 +69,14 @@ interface WalletProxy { endTime: Long?, ): Long - suspend fun getGateWays( + fun getGateWays( includeOffChainGateways: Boolean, includeOnChainGateways: Boolean, ): List - suspend fun getCurrencies(): List + fun getCurrencies(): List - suspend fun getUserTradeTransactionSummary( + fun getUserTradeTransactionSummary( uuid: String, token: String, startTime: Long?, @@ -84,7 +84,7 @@ interface WalletProxy { limit: Int?, ): List - suspend fun getUserDepositSummary( + fun getUserDepositSummary( uuid: String, token: String, startTime: Long?, @@ -92,7 +92,7 @@ interface WalletProxy { limit: Int?, ): List - suspend fun getUserWithdrawSummary( + fun getUserWithdrawSummary( uuid: String, token: String, startTime: Long?, @@ -100,29 +100,29 @@ interface WalletProxy { limit: Int?, ): List - suspend fun deposit( + fun deposit( request: RequestDepositBody ): TransferResult? - suspend fun requestWithdraw( + fun requestWithdraw( token: String, request: RequestWithdrawBody ): WithdrawActionResult - suspend fun cancelWithdraw( + fun cancelWithdraw( token: String, withdrawId: Long - ): Void? + ) - suspend fun findWithdraw( + fun findWithdraw( token: String, withdrawId: Long ): WithdrawResponse - suspend fun submitVoucher(code: String, token: String): SubmitVoucherResponse + fun submitVoucher(code: String, token: String): SubmitVoucherResponse - suspend fun getQuoteCurrencies(): List + fun getQuoteCurrencies(): List - suspend fun getSwapTransactions(token: String, request: UserTransactionRequest): List - suspend fun getSwapTransactionsCount(token: String, request: UserTransactionRequest): Long + fun getSwapTransactions(token: String, request: UserTransactionRequest): List + fun getSwapTransactionsCount(token: String, request: UserTransactionRequest): Long } \ No newline at end of file diff --git a/api/api-ports/api-binance-rest/pom.xml b/api/api-ports/api-binance-rest/pom.xml index fd5fdffe3..476a33d46 100644 --- a/api/api-ports/api-binance-rest/pom.xml +++ b/api/api-ports/api-binance-rest/pom.xml @@ -34,7 +34,7 @@ org.springframework.boot - spring-boot-starter-webflux + spring-boot-starter-web 3.4.3 @@ -49,20 +49,6 @@ org.springframework.boot spring-boot-starter-actuator - - org.springframework.boot - spring-boot-starter-data-r2dbc - - - org.postgresql - r2dbc-postgresql - runtime - - - org.postgresql - postgresql - runtime - io.projectreactor.kotlin reactor-kotlin-extensions @@ -84,10 +70,6 @@ bcprov-jdk15on 1.60 - - org.jetbrains.kotlinx - kotlinx-coroutines-core - io.projectreactor reactor-test diff --git a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/SecurityConfig.kt b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/SecurityConfig.kt index 6e504eddd..cac1c5e06 100644 --- a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/SecurityConfig.kt +++ b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/SecurityConfig.kt @@ -1,68 +1,64 @@ package co.nilin.opex.api.ports.binance.config import co.nilin.opex.api.core.spi.APIKeyFilter -import co.nilin.opex.common.security.ReactiveCustomJwtConverter +import co.nilin.opex.common.security.CustomJwtConverter +import jakarta.servlet.Filter import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.http.HttpMethod -import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity -import org.springframework.security.config.web.server.SecurityWebFiltersOrder -import org.springframework.security.config.web.server.ServerHttpSecurity -import org.springframework.security.oauth2.jwt.NimbusReactiveJwtDecoder -import org.springframework.security.oauth2.jwt.ReactiveJwtDecoder -import org.springframework.security.web.server.SecurityWebFilterChain -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.server.WebFilter +import org.springframework.security.config.annotation.web.builders.HttpSecurity +import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity +import org.springframework.security.oauth2.jwt.JwtDecoder +import org.springframework.security.oauth2.jwt.NimbusJwtDecoder +import org.springframework.security.web.SecurityFilterChain +import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter -@EnableWebFluxSecurity -@Configuration("binanceSecurityConfig") +@EnableWebSecurity +@Configuration class SecurityConfig( - private val webClient: WebClient, private val apiKeyFilter: APIKeyFilter, @Value("\${app.auth.cert-url}") private val jwkUrl: String ) { @Bean - fun springSecurityFilterChain(http: ServerHttpSecurity): SecurityWebFilterChain { + fun filterChain(http: HttpSecurity): SecurityFilterChain { return http.csrf { it.disable() } - .authorizeExchange { - it.pathMatchers("/actuator/**").permitAll() - .pathMatchers("/swagger-ui/**").permitAll() - .pathMatchers("/swagger-resources/**").permitAll() - .pathMatchers("/v2/api-docs").permitAll() - .pathMatchers("/v3/depth").permitAll() - .pathMatchers("/v3/trades").permitAll() - .pathMatchers("/v3/ticker/**").permitAll() - .pathMatchers("/v3/exchangeInfo").permitAll() - .pathMatchers("/v3/currencyInfo/**").permitAll() - .pathMatchers("/v3/klines").permitAll() - .pathMatchers("/socket").permitAll() - .pathMatchers("/v1/landing/**").permitAll() - .pathMatchers(HttpMethod.POST, "/v3/order").hasAuthority("PERM_order:write") - .pathMatchers(HttpMethod.DELETE, "/v3/order").hasAuthority("PERM_order:write") + .authorizeHttpRequests { + it.requestMatchers("/actuator/**").permitAll() + .requestMatchers("/swagger-ui/**").permitAll() + .requestMatchers("/swagger-resources/**").permitAll() + .requestMatchers("/v2/api-docs").permitAll() + .requestMatchers("/v3/depth").permitAll() + .requestMatchers("/v3/trades").permitAll() + .requestMatchers("/v3/ticker/**").permitAll() + .requestMatchers("/v3/exchangeInfo").permitAll() + .requestMatchers("/v3/currencyInfo/**").permitAll() + .requestMatchers("/v3/klines").permitAll() + .requestMatchers("/socket").permitAll() + .requestMatchers("/v1/landing/**").permitAll() + .requestMatchers(HttpMethod.POST, "/v3/order").hasAuthority("PERM_order:write") + .requestMatchers(HttpMethod.DELETE, "/v3/order").hasAuthority("PERM_order:write") // Opex endpoints - .pathMatchers("/opex/v1/deposit/**").hasAuthority("DEPOSIT_deposit:write") - .pathMatchers(HttpMethod.POST, "/opex/v1/order").hasAuthority("PERM_order:write") - .pathMatchers(HttpMethod.PUT, "/opex/v1/order").hasAuthority("PERM_order:write") - .pathMatchers(HttpMethod.POST, "/opex/v1/withdraw").hasAuthority("PERM_withdraw:write") - .pathMatchers(HttpMethod.PUT, "/opex/v1/withdraw").hasAuthority("PERM_withdraw:write") - .pathMatchers("/opex/v1/voucher").hasAuthority("PERM_voucher:submit") - .pathMatchers("/opex/v1/market/**").permitAll() - .anyExchange().authenticated() + .requestMatchers("/opex/v1/deposit/**").hasAuthority("DEPOSIT_deposit:write") + .requestMatchers(HttpMethod.POST, "/opex/v1/order").hasAuthority("PERM_order:write") + .requestMatchers(HttpMethod.PUT, "/opex/v1/order").hasAuthority("PERM_order:write") + .requestMatchers(HttpMethod.POST, "/opex/v1/withdraw").hasAuthority("PERM_withdraw:write") + .requestMatchers(HttpMethod.PUT, "/opex/v1/withdraw").hasAuthority("PERM_withdraw:write") + .requestMatchers("/opex/v1/voucher").hasAuthority("PERM_voucher:submit") + .requestMatchers("/opex/v1/market/**").permitAll() + .anyRequest().authenticated() } - .addFilterBefore(apiKeyFilter as WebFilter, SecurityWebFiltersOrder.AUTHENTICATION) - .oauth2ResourceServer { it.jwt { jwt -> jwt.jwtAuthenticationConverter(ReactiveCustomJwtConverter()) } } + .addFilterBefore(apiKeyFilter as Filter, UsernamePasswordAuthenticationFilter::class.java) + .oauth2ResourceServer { it.jwt { jwt -> jwt.jwtAuthenticationConverter(CustomJwtConverter()) } } .build() } @Bean @Throws(Exception::class) - fun reactiveJwtDecoder(): ReactiveJwtDecoder? { - return NimbusReactiveJwtDecoder.withJwkSetUri(jwkUrl) - .webClient(WebClient.create()) - .build() + fun jwtDecoder(): JwtDecoder? { + return NimbusJwtDecoder.withJwkSetUri(jwkUrl).build() } } diff --git a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/WebClientConfig.kt b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/WebClientConfig.kt deleted file mode 100644 index 63a5e11aa..000000000 --- a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/WebClientConfig.kt +++ /dev/null @@ -1,36 +0,0 @@ -package co.nilin.opex.api.ports.binance.config - -import io.netty.channel.ChannelOption -import org.springframework.cloud.client.loadbalancer.LoadBalanced -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.http.client.reactive.ReactorClientHttpConnector -import org.springframework.web.reactive.function.client.WebClient -import reactor.netty.http.client.HttpClient -import reactor.netty.resources.ConnectionProvider -import java.time.Duration - -@Configuration("binanceWebClientConfig") -class WebClientConfig { - - @Bean - @LoadBalanced - fun webClientBuilder(): WebClient.Builder { - return WebClient.builder() - } - - @Bean - fun webClient(webclientBuilder: WebClient.Builder): WebClient { - val cp = ConnectionProvider.builder("apiBinanceWebclientConnectionPool") - .maxConnections(5000) - .maxIdleTime(Duration.ofSeconds(20)) - .maxLifeTime(Duration.ofMinutes(2)) - .pendingAcquireTimeout(Duration.ofSeconds(10)) - .evictInBackground(Duration.ofSeconds(30)) - .build() - val client = HttpClient.create(cp) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) - .responseTimeout(Duration.ofSeconds(10)) - return webclientBuilder.clientConnector(ReactorClientHttpConnector(client)).build() - } -} diff --git a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/AccountController.kt b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/AccountController.kt index f64a43fd2..ee6d24215 100644 --- a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/AccountController.kt +++ b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/AccountController.kt @@ -49,7 +49,7 @@ class AccountController( ) ) ) - suspend fun createNewOrder( + fun createNewOrder( @RequestParam symbol: String, @RequestParam @@ -125,7 +125,7 @@ class AccountController( consumes = [MediaType.APPLICATION_FORM_URLENCODED_VALUE], produces = [MediaType.APPLICATION_JSON_VALUE] ) - suspend fun cancelOrder( + fun cancelOrder( principal: Principal, @RequestParam symbol: String, @@ -203,7 +203,7 @@ class AccountController( ) ) ) - suspend fun queryOrder( + fun queryOrder( principal: Principal, @RequestParam symbol: String, @@ -246,7 +246,7 @@ class AccountController( ) ) ) - suspend fun fetchOpenOrders( + fun fetchOpenOrders( principal: Principal, @RequestParam(required = false) symbol: String?, @@ -284,7 +284,7 @@ class AccountController( ) ) ) - suspend fun fetchAllOrders( + fun fetchAllOrders( principal: Principal, @RequestParam(required = false) symbol: String?, @@ -328,7 +328,7 @@ class AccountController( ) ) ) - suspend fun fetchAllTrades( + fun fetchAllTrades( principal: Principal, @RequestParam symbol: String?, @@ -385,7 +385,7 @@ class AccountController( ) ) ) - suspend fun accountInfo( + fun accountInfo( @CurrentSecurityContext securityContext: SecurityContext, @ApiParam(value = "The value cannot be greater than 60000") @RequestParam(required = false) diff --git a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/LandingController.kt b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/LandingController.kt index b6524c586..50f3f7833 100644 --- a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/LandingController.kt +++ b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/LandingController.kt @@ -31,7 +31,7 @@ class LandingController( private val logger = LoggerFactory.getLogger(LandingController::class.java) @GetMapping("/globalPrices") - suspend fun getCurrencyPrices(@RequestParam usdSymbol: String): GlobalPriceResponse { + fun getCurrencyPrices(@RequestParam usdSymbol: String): GlobalPriceResponse { val irtUSDPrice = marketDataProxy.getExternalCurrencyRates("IRT", usdSymbol) val globalPrice = try { globalMarketProxy.getPrices(symbolMapper.symbolToAliasMap().entries.map { it.value }) @@ -44,48 +44,40 @@ class LandingController( } @GetMapping("/marketStats") - suspend fun getMarketStats( + fun getMarketStats( @RequestParam interval: String, @RequestParam(required = false) limit: Int? - ): MarketStatResponse = coroutineScope { + ): MarketStatResponse { val intervalEnum = Interval.findByLabel(interval) ?: Interval.Week val validLimit = getValidLimit(limit) val symbols = symbolMapper.symbolToAliasMap() - val mostIncreased = async { - marketStatProxy.getMostIncreasedInPricePairs(intervalEnum, validLimit) - .onEach { symbols[it.symbol]?.let { s -> it.symbol = s } } - .ifEmpty { symbols.entries.map { PriceStat(it.value, BigDecimal.ZERO, 0.0) } } - } + val mostIncreased = marketStatProxy.getMostIncreasedInPricePairs(intervalEnum, validLimit) + .onEach { symbols[it.symbol]?.let { s -> it.symbol = s } } + .ifEmpty { symbols.entries.map { PriceStat(it.value, BigDecimal.ZERO, 0.0) } } - val mostDecreased = async { - marketStatProxy.getMostDecreasedInPricePairs(intervalEnum, validLimit) - .onEach { symbols[it.symbol]?.let { s -> it.symbol = s } } - .ifEmpty { symbols.entries.map { PriceStat(it.value, BigDecimal.ZERO, 0.0) } } - } + val mostDecreased = marketStatProxy.getMostDecreasedInPricePairs(intervalEnum, validLimit) + .onEach { symbols[it.symbol]?.let { s -> it.symbol = s } } + .ifEmpty { symbols.entries.map { PriceStat(it.value, BigDecimal.ZERO, 0.0) } } - val highestVolume = async { - marketStatProxy.getHighestVolumePair(intervalEnum) - ?.apply { symbols[symbol]?.let { symbol = it } } - ?: TradeVolumeStat(symbols.entries.random().value, BigDecimal.ZERO, BigDecimal.ZERO, 0.0) - } + val highestVolume = marketStatProxy.getHighestVolumePair(intervalEnum) + ?.apply { symbols[symbol]?.let { symbol = it } } + ?: TradeVolumeStat(symbols.entries.random().value, BigDecimal.ZERO, BigDecimal.ZERO, 0.0) - val mostTrades = async { - marketStatProxy.getTradeCountPair(intervalEnum) - ?.apply { symbols[symbol]?.let { symbol = it } } - ?: TradeVolumeStat(symbols.entries.random().value, BigDecimal.ZERO, BigDecimal.ZERO, 0.0) - } + val mostTrades = marketStatProxy.getTradeCountPair(intervalEnum) + ?.apply { symbols[symbol]?.let { symbol = it } } + ?: TradeVolumeStat(symbols.entries.random().value, BigDecimal.ZERO, BigDecimal.ZERO, 0.0) - MarketStatResponse( - mostIncreased.await(), - mostDecreased.await(), - highestVolume.await(), - mostTrades.await() + return MarketStatResponse( + mostIncreased, + mostDecreased, + highestVolume, + mostTrades ) } @GetMapping("/exchangeInfo") - suspend fun marketInfo(@RequestParam interval: String): MarketInfoResponse { + fun marketInfo(@RequestParam interval: String): MarketInfoResponse { val intervalEnum = Interval.findByLabel(interval) ?: Interval.ThreeMonth return MarketInfoResponse( marketDataProxy.countActiveUsers(intervalEnum), diff --git a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/MarketController.kt b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/MarketController.kt index c83f41bac..3441c0c16 100644 --- a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/MarketController.kt +++ b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/MarketController.kt @@ -36,7 +36,7 @@ class MarketController( // 1000 - 10 // 5000 - 50 @GetMapping("/v3/depth") - suspend fun orderBook( + fun orderBook( @RequestParam symbol: String, @RequestParam(required = false) @@ -74,7 +74,7 @@ class MarketController( } @GetMapping("/v3/trades") - suspend fun recentTrades( + fun recentTrades( principal: Principal, @RequestParam symbol: String, @@ -101,7 +101,7 @@ class MarketController( } @GetMapping("/v3/ticker/{duration:24h|7d|1M}") - suspend fun priceChange( + fun priceChange( @PathVariable duration: String, @RequestParam(required = false) symbol: String?, @RequestParam(required = false) quote: String? @@ -142,7 +142,7 @@ class MarketController( // 1 for a single symbol // 2 when the symbol parameter is omitted @GetMapping("/v3/ticker/price") - suspend fun priceTicker(@RequestParam(required = false) symbol: String?): List { + fun priceTicker(@RequestParam(required = false) symbol: String?): List { val symbols = symbolMapper.symbolToAliasMap() val localSymbol = if (symbol == null) null @@ -152,32 +152,30 @@ class MarketController( } @GetMapping("/v3/exchangeInfo") - suspend fun pairInfo( + fun pairInfo( @RequestParam(required = false) symbol: String?, @RequestParam(required = false) symbols: String? - ): ExchangeInfoResponse = coroutineScope { + ): ExchangeInfoResponse { val symbolsMap = symbolMapper.symbolToAliasMap() - val fee = async { accountantProxy.getFeeConfigs() } - val pairConfigs = async { - accountantProxy.getPairConfigs().map { - ExchangeInfoSymbol( - symbolsMap[it.pair] ?: it.pair, - "TRADING", - it.leftSideWalletSymbol.uppercase(), - it.leftSideFraction.scale(), - it.rightSideWalletSymbol.uppercase(), - it.rightSideFraction.scale() - ) - } + val fee = accountantProxy.getFeeConfigs() + val pairConfigs = accountantProxy.getPairConfigs().map { + ExchangeInfoSymbol( + symbolsMap[it.pair] ?: it.pair, + "TRADING", + it.leftSideWalletSymbol.uppercase(), + it.leftSideFraction.scale(), + it.rightSideWalletSymbol.uppercase(), + it.rightSideFraction.scale() + ) } - ExchangeInfoResponse(fees = fee.await(), symbols = pairConfigs.await()) + return ExchangeInfoResponse(fees = fee, symbols = pairConfigs) } // Custom service // @GetMapping("/v3/currencyInfo") -// suspend fun getNetworks(@RequestParam(required = false) currency: String?): List { +// fun getNetworks(@RequestParam(required = false) currency: String?): List { // return blockchainGatewayProxy.getCurrencyImplementations(currency) // .groupBy { it.currency } // .toList() @@ -201,7 +199,7 @@ class MarketController( // Custom service @GetMapping("/v3/currencyInfo/quotes") - suspend fun getQuoteCurrencies(): List { + fun getQuoteCurrencies(): List { return accountantProxy.getPairConfigs() .map { it.rightSideWalletSymbol } .distinct() @@ -209,7 +207,7 @@ class MarketController( // Weight(IP): 1 @GetMapping("/v3/klines") - suspend fun klines( + fun klines( @RequestParam symbol: String, @RequestParam diff --git a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/WalletController.kt b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/WalletController.kt index 2a857546c..f601a756f 100644 --- a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/WalletController.kt +++ b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/controller/WalletController.kt @@ -25,7 +25,7 @@ class WalletController( ) { @GetMapping("/v1/capital/deposit/address") - suspend fun assignAddress( + fun assignAddress( @RequestParam coin: String, @RequestParam @@ -189,7 +189,7 @@ class WalletController( // } @GetMapping("/v1/asset/tradeFee") - suspend fun getPairFees( + fun getPairFees( @RequestParam(required = false) symbol: String?, @RequestParam(required = false) @@ -223,7 +223,7 @@ class WalletController( } @GetMapping("/v1/asset/getUserAsset") - suspend fun getUserAssets( + fun getUserAssets( @CurrentSecurityContext securityContext: SecurityContext, @RequestParam(required = false) diff --git a/api/api-ports/api-opex-rest/pom.xml b/api/api-ports/api-opex-rest/pom.xml index 50a08fdce..f5de9082f 100644 --- a/api/api-ports/api-opex-rest/pom.xml +++ b/api/api-ports/api-opex-rest/pom.xml @@ -34,7 +34,7 @@ org.springframework.boot - spring-boot-starter-webflux + spring-boot-starter-web org.springframework.cloud @@ -44,28 +44,6 @@ org.springframework.boot spring-boot-starter-actuator - - org.springframework.boot - spring-boot-starter-data-r2dbc - - - org.postgresql - r2dbc-postgresql - runtime - - - org.postgresql - postgresql - runtime - - - io.projectreactor.kotlin - reactor-kotlin-extensions - - - org.jetbrains.kotlinx - kotlinx-coroutines-reactor - org.springframework.boot spring-boot-starter-security diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/config/RestTemplateConfig.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/config/RestTemplateConfig.kt new file mode 100644 index 000000000..231a15e3d --- /dev/null +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/config/RestTemplateConfig.kt @@ -0,0 +1,21 @@ +package co.nilin.opex.api.ports.opex.config + +import org.springframework.cloud.client.loadbalancer.LoadBalanced +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.http.client.SimpleClientHttpRequestFactory +import org.springframework.web.client.RestTemplate + +@Configuration +class RestTemplateConfig { + + @Bean + @LoadBalanced + fun restTemplate(): RestTemplate { + val factory = SimpleClientHttpRequestFactory().apply { + setConnectTimeout(100000) + setReadTimeout(10000) + } + return RestTemplate(factory) + } +} \ No newline at end of file diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/config/SecurityConfig.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/config/SecurityConfig.kt deleted file mode 100644 index d74c85756..000000000 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/config/SecurityConfig.kt +++ /dev/null @@ -1,49 +0,0 @@ -package co.nilin.opex.api.ports.opex.config - -import co.nilin.opex.api.core.spi.APIKeyFilter -import co.nilin.opex.common.security.ReactiveCustomJwtConverter -import org.springframework.beans.factory.annotation.Value -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.context.annotation.Profile -import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity -import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity -import org.springframework.security.config.web.server.ServerHttpSecurity -import org.springframework.security.oauth2.jwt.NimbusReactiveJwtDecoder -import org.springframework.security.oauth2.jwt.ReactiveJwtDecoder -import org.springframework.security.web.server.SecurityWebFilterChain -import org.springframework.web.reactive.function.client.WebClient - -//@EnableWebFluxSecurity -//@EnableMethodSecurity -//@Configuration("opexSecurityConfig") -class SecurityConfig( - private val apiKeyFilter: APIKeyFilter, - @Value("\${app.auth.cert-url}") - private val jwkUrl: String -) { - - //@Bean - fun springSecurityFilterChain(http: ServerHttpSecurity): SecurityWebFilterChain { - return http.csrf { it.disable() } - .authorizeExchange { - it.pathMatchers("/actuator/**").permitAll() - .pathMatchers("/swagger-ui/**").permitAll() - .pathMatchers("/opex/v1/market/**").permitAll() - .pathMatchers("/opex/v1/order/**").hasAuthority("PERM_order:write") - .pathMatchers("/**").hasAuthority("SCOPE_trust") - .anyExchange().authenticated() - } -// .addFilterBefore(apiKeyFilter as WebFilter, SecurityWebFiltersOrder.AUTHENTICATION) - .oauth2ResourceServer { it.jwt { jwt -> jwt.jwtAuthenticationConverter(ReactiveCustomJwtConverter()) } } - .build() - } - - //@Bean - @Throws(Exception::class) - fun reactiveJwtDecoder(): ReactiveJwtDecoder? { - return NimbusReactiveJwtDecoder.withJwkSetUri(jwkUrl) - .webClient(WebClient.builder().build()) - .build() - } -} diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/config/WebClientConfig.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/config/WebClientConfig.kt deleted file mode 100644 index 09ccc6f79..000000000 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/config/WebClientConfig.kt +++ /dev/null @@ -1,22 +0,0 @@ -package co.nilin.opex.api.ports.opex.config - -import org.springframework.cloud.client.loadbalancer.LoadBalanced -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.web.reactive.function.client.WebClient - -@Configuration("opexWebClientConfig") -class WebClientConfig { - - @Bean - @LoadBalanced - fun webClientBuilder(): WebClient.Builder { - return WebClient.builder() - } - - @Bean - fun webClient(webclientBuilder: WebClient.Builder): WebClient { - return webclientBuilder.build() - } - -} diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/DepositController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/DepositController.kt index fd055219b..4fea27849 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/DepositController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/DepositController.kt @@ -13,7 +13,7 @@ import org.springframework.web.bind.annotation.RestController class DepositController(private val walletProxy: WalletProxy) { @PostMapping - suspend fun deposit(@RequestBody request: RequestDepositBody): TransferResult? { + fun deposit(@RequestBody request: RequestDepositBody): TransferResult? { return walletProxy.deposit(request) } } \ No newline at end of file diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt index 291268fe3..da9cedfef 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt @@ -27,12 +27,12 @@ class MarketController( private val validDurations = arrayListOf("24h", "7d", "1M") @GetMapping("/currency") - suspend fun getCurrencies(): List { + fun getCurrencies(): List { return walletProxy.getCurrencies() } @GetMapping("/pair") - suspend fun getPairs(): List { + fun getPairs(): List { val pairSettings = matchingGatewayProxy.getPairSettings().associateBy { it.pair } return accountantProxy.getPairConfigs().mapNotNull { config -> @@ -51,7 +51,7 @@ class MarketController( } @GetMapping("/currency/gateway") - suspend fun getCurrencyGateways( + fun getCurrencyGateways( @RequestParam(defaultValue = "true") includeOffChainGateways: Boolean, @RequestParam(defaultValue = "true") includeOnChainGateways: Boolean, ): List { @@ -59,44 +59,36 @@ class MarketController( } @GetMapping("/pair/fee") - suspend fun getPairFees(): List { + fun getPairFees(): List { return accountantProxy.getFeeConfigs() } @GetMapping("/stats") - suspend fun getMarketStats( + fun getMarketStats( @RequestParam interval: String, @RequestParam(required = false) limit: Int? - ): MarketStatResponse = coroutineScope { + ): MarketStatResponse { val intervalEnum = Interval.findByLabel(interval) ?: Interval.Week val validLimit = getValidLimit(limit) - val mostIncreased = async { - marketStatProxy.getMostIncreasedInPricePairs(intervalEnum, validLimit) - } + val mostIncreased = marketStatProxy.getMostIncreasedInPricePairs(intervalEnum, validLimit) - val mostDecreased = async { - marketStatProxy.getMostDecreasedInPricePairs(intervalEnum, validLimit) - } + val mostDecreased = marketStatProxy.getMostDecreasedInPricePairs(intervalEnum, validLimit) - val highestVolume = async { - marketStatProxy.getHighestVolumePair(intervalEnum) - } + val highestVolume = marketStatProxy.getHighestVolumePair(intervalEnum) - val mostTrades = async { - marketStatProxy.getTradeCountPair(intervalEnum) - } + val mostTrades = marketStatProxy.getTradeCountPair(intervalEnum) - MarketStatResponse( - mostIncreased.await(), - mostDecreased.await(), - highestVolume.await(), - mostTrades.await() + return MarketStatResponse( + mostIncreased, + mostDecreased, + highestVolume, + mostTrades ) } @GetMapping("/info") - suspend fun getMarketInfo(@RequestParam interval: String): MarketInfoResponse { + fun getMarketInfo(@RequestParam interval: String): MarketInfoResponse { val intervalEnum = Interval.findByLabel(interval) ?: Interval.ThreeMonth return MarketInfoResponse( marketDataProxy.countActiveUsers(intervalEnum), @@ -106,7 +98,7 @@ class MarketController( } @GetMapping("/depth") - suspend fun orderBook( + fun orderBook( @RequestParam symbol: String, @RequestParam(required = false) @@ -143,7 +135,7 @@ class MarketController( } @GetMapping("/trades") - suspend fun recentTrades( + fun recentTrades( @RequestParam symbol: String, @RequestParam(required = false) @@ -168,7 +160,7 @@ class MarketController( } @GetMapping("/ticker/{duration:24h|7d|1M}") - suspend fun priceChange( + fun priceChange( @PathVariable duration: String, @RequestParam(required = false) symbol: String?, @RequestParam(required = false) quote: String? @@ -195,17 +187,17 @@ class MarketController( } @GetMapping("/ticker/price") - suspend fun priceTicker(@RequestParam(required = false) symbol: String?): List { + fun priceTicker(@RequestParam(required = false) symbol: String?): List { return marketDataProxy.lastPrice(symbol) } @GetMapping("/currencyInfo/quotes") - suspend fun getQuoteCurrencies(): List { + fun getQuoteCurrencies(): List { return walletProxy.getQuoteCurrencies().map { it.currency } } @GetMapping("/klines") - suspend fun klines( + fun klines( @RequestParam symbol: String, @RequestParam diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/OrderController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/OrderController.kt index 57289f3da..b7916afea 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/OrderController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/OrderController.kt @@ -24,7 +24,7 @@ class OrderController( val matchingGatewayProxy: MatchingGatewayProxy, ) { @PostMapping - suspend fun createNewOrder( + fun createNewOrder( @RequestParam symbol: String, @RequestParam @@ -61,7 +61,7 @@ class OrderController( } @PutMapping - suspend fun cancelOrder( + fun cancelOrder( principal: Principal, @RequestParam symbol: String, @@ -110,7 +110,7 @@ class OrderController( } @GetMapping - suspend fun queryOrder( + fun queryOrder( principal: Principal, @RequestParam symbol: String, @@ -126,7 +126,7 @@ class OrderController( } @GetMapping("/open") - suspend fun fetchOpenOrders( + fun fetchOpenOrders( principal: Principal, @RequestParam(required = false) symbol: String?, diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserHistoryController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserHistoryController.kt index 8ea58b43a..8920699d0 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserHistoryController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserHistoryController.kt @@ -19,7 +19,7 @@ class UserHistoryController( ) { @GetMapping("/history/order") - suspend fun getOrderHistory( + fun getOrderHistory( @RequestParam symbol: String?, @RequestParam startTime: Long?, @RequestParam endTime: Long?, @@ -42,7 +42,7 @@ class UserHistoryController( } @GetMapping("/history/order/count") - suspend fun getOrderHistoryCount( + fun getOrderHistoryCount( @RequestParam symbol: String?, @RequestParam startTime: Long?, @RequestParam endTime: Long?, @@ -61,7 +61,7 @@ class UserHistoryController( } @GetMapping("/history/trade") - suspend fun getTradeHistory( + fun getTradeHistory( @RequestParam symbol: String?, @RequestParam startTime: Long?, @RequestParam endTime: Long?, @@ -76,7 +76,7 @@ class UserHistoryController( } @GetMapping("/history/trade/count") - suspend fun getTradeHistoryCount( + fun getTradeHistoryCount( @RequestParam symbol: String?, @RequestParam startTime: Long?, @RequestParam endTime: Long?, @@ -89,7 +89,7 @@ class UserHistoryController( } @GetMapping("/history/withdraw") - suspend fun getWithdrawHistory( + fun getWithdrawHistory( @RequestParam currency: String?, @RequestParam startTime: Long?, @RequestParam endTime: Long?, @@ -111,7 +111,7 @@ class UserHistoryController( } @GetMapping("/history/withdraw/count") - suspend fun getWithdrawHistoryCount( + fun getWithdrawHistoryCount( @RequestParam currency: String?, @RequestParam startTime: Long?, @RequestParam endTime: Long?, @@ -127,7 +127,7 @@ class UserHistoryController( } @GetMapping("/history/deposit") - suspend fun getDepositHistory( + fun getDepositHistory( @RequestParam currency: String?, @RequestParam startTime: Long?, @RequestParam endTime: Long?, @@ -149,7 +149,7 @@ class UserHistoryController( } @GetMapping("/history/deposit/count") - suspend fun getDepositHistoryCount( + fun getDepositHistoryCount( @RequestParam currency: String?, @RequestParam startTime: Long?, @RequestParam endTime: Long?, @@ -165,7 +165,7 @@ class UserHistoryController( } @GetMapping("/history/transaction") - suspend fun getTransactionHistory( + fun getTransactionHistory( @RequestParam currency: String?, @RequestParam category: UserTransactionCategory?, @RequestParam startTime: Long?, @@ -189,7 +189,7 @@ class UserHistoryController( } @GetMapping("/history/transaction/count") - suspend fun getTransactionHistoryCount( + fun getTransactionHistoryCount( @RequestParam currency: String?, @RequestParam category: UserTransactionCategory?, @RequestParam startTime: Long?, @@ -207,7 +207,7 @@ class UserHistoryController( } @GetMapping("/summary/trade") - suspend fun getTradeTransactionSummary( + fun getTradeTransactionSummary( @RequestParam startTime: Long?, @RequestParam endTime: Long?, @RequestParam limit: Int?, @@ -223,7 +223,7 @@ class UserHistoryController( } @GetMapping("/summary/deposit") - suspend fun getDepositSummary( + fun getDepositSummary( @RequestParam startTime: Long?, @RequestParam endTime: Long?, @RequestParam limit: Int?, @@ -239,7 +239,7 @@ class UserHistoryController( } @GetMapping("/summary/withdraw") - suspend fun getWithdrawSummary( + fun getWithdrawSummary( @RequestParam startTime: Long?, @RequestParam endTime: Long?, @RequestParam limit: Int?, @@ -255,7 +255,7 @@ class UserHistoryController( } @PostMapping("/history/swap") - suspend fun getSwapHistory( + fun getSwapHistory( @CurrentSecurityContext securityContext: SecurityContext, @RequestBody request: UserTransactionRequest ): List { @@ -263,7 +263,7 @@ class UserHistoryController( } @PostMapping("/history/swap/count") - suspend fun getSwapHistoryCount( + fun getSwapHistoryCount( @CurrentSecurityContext securityContext: SecurityContext, @RequestBody request: UserTransactionRequest ): Long { diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/VoucherController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/VoucherController.kt index 44605ec35..0cb236a47 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/VoucherController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/VoucherController.kt @@ -16,7 +16,7 @@ import org.springframework.web.bind.annotation.RestController class VoucherController(private val walletProxy: WalletProxy) { @PutMapping("/{code}") - suspend fun submitVoucher( + fun submitVoucher( @PathVariable code: String, @CurrentSecurityContext securityContext: SecurityContext ): SubmitVoucherResponse { diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/WalletController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/WalletController.kt index 007f0375b..17878ce8b 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/WalletController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/WalletController.kt @@ -19,7 +19,7 @@ class WalletController( ) { @GetMapping("/asset") - suspend fun getUserAssets( + fun getUserAssets( @CurrentSecurityContext securityContext: SecurityContext, @RequestParam(required = false) symbol: String?, ): List { @@ -39,7 +39,7 @@ class WalletController( } @GetMapping("/limits") - suspend fun getWalletOwnerLimits(@CurrentSecurityContext securityContext: SecurityContext): OwnerLimitsResponse { + fun getWalletOwnerLimits(@CurrentSecurityContext securityContext: SecurityContext): OwnerLimitsResponse { return walletProxy.getOwnerLimits( securityContext.jwtAuthentication().name, securityContext.jwtAuthentication().tokenValue(), diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/WithdrawController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/WithdrawController.kt index aa2e6a0b7..be93e2953 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/WithdrawController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/WithdrawController.kt @@ -17,7 +17,7 @@ class WithdrawController( ) { @PostMapping - suspend fun requestWithdraw( + fun requestWithdraw( @CurrentSecurityContext securityContext: SecurityContext, @RequestBody request: RequestWithdrawBody ): WithdrawActionResult? { @@ -28,7 +28,7 @@ class WithdrawController( } @PutMapping("/{withdrawId}/cancel") - suspend fun cancelWithdraw( + fun cancelWithdraw( @CurrentSecurityContext securityContext: SecurityContext, @PathVariable withdrawId: Long ) { @@ -39,7 +39,7 @@ class WithdrawController( } @GetMapping("/{withdrawId}") - suspend fun findWithdraw( + fun findWithdraw( @CurrentSecurityContext securityContext: SecurityContext, @PathVariable withdrawId: Long ): WithdrawResponse { diff --git a/api/api-ports/api-persister-postgres/pom.xml b/api/api-ports/api-persister-postgres/pom.xml index 695c5d793..2dd30e8cb 100644 --- a/api/api-ports/api-persister-postgres/pom.xml +++ b/api/api-ports/api-persister-postgres/pom.xml @@ -30,12 +30,7 @@ org.springframework.boot - spring-boot-starter-data-r2dbc - - - org.postgresql - r2dbc-postgresql - runtime + spring-boot-starter-data-jdbc org.postgresql @@ -46,23 +41,10 @@ io.projectreactor.kotlin reactor-kotlin-extensions - - org.jetbrains.kotlinx - kotlinx-coroutines-reactor - - - org.jetbrains.kotlinx - kotlinx-coroutines-core - com.google.code.gson gson - - io.projectreactor - reactor-test - test - io.mockk mockk diff --git a/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/config/PostgresConfig.kt b/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/config/PostgresConfig.kt index 47e7038ae..dd3a63d38 100644 --- a/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/config/PostgresConfig.kt +++ b/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/config/PostgresConfig.kt @@ -3,22 +3,27 @@ package co.nilin.opex.api.ports.postgres.config import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Configuration import org.springframework.core.io.Resource -import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories -import org.springframework.r2dbc.core.DatabaseClient +import org.springframework.data.jdbc.repository.config.EnableJdbcRepositories +import org.springframework.jdbc.core.JdbcTemplate +import java.nio.charset.StandardCharsets @Configuration -@EnableR2dbcRepositories(basePackages = ["co.nilin.opex"]) +@EnableJdbcRepositories(basePackages = ["co.nilin.opex"]) class PostgresConfig( - db: DatabaseClient, + template: JdbcTemplate, @Value("classpath:schema.sql") private val schemaResource: Resource ) { init { - val schemaReader = schemaResource.inputStream.reader() - val schema = schemaReader.readText().trim() - schemaReader.close() - val initDb = db.sql { schema } - initDb // initialize the database - .then() - .subscribe() // execute + val schema = schemaResource.inputStream + .bufferedReader(StandardCharsets.UTF_8) + .use { it.readText().trim() } + + val statements = schema.split(";") + .map { it.trim() } + .filter { it.isNotBlank() } + + statements.forEach { + template.execute(it) + } } } diff --git a/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/dao/APIKeyRepository.kt b/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/dao/APIKeyRepository.kt index 21fa5d9fc..7f5eb79ea 100644 --- a/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/dao/APIKeyRepository.kt +++ b/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/dao/APIKeyRepository.kt @@ -1,18 +1,16 @@ package co.nilin.opex.api.ports.postgres.dao import co.nilin.opex.api.ports.postgres.model.APIKeyModel -import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.data.repository.CrudRepository import org.springframework.stereotype.Repository -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono @Repository -interface APIKeyRepository : ReactiveCrudRepository { +interface APIKeyRepository : CrudRepository { - fun findAllByUserId(userId: String): Flux + fun findAllByUserId(userId: String): List - fun findByKey(key: String): Mono + fun findByKey(key: String): APIKeyModel? - fun countByUserId(userId: String): Mono + fun countByUserId(userId: String): Long? } \ No newline at end of file diff --git a/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/dao/SymbolMapRepository.kt b/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/dao/SymbolMapRepository.kt index 1ecc358fc..516d853f3 100644 --- a/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/dao/SymbolMapRepository.kt +++ b/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/dao/SymbolMapRepository.kt @@ -1,21 +1,19 @@ package co.nilin.opex.api.ports.postgres.dao import co.nilin.opex.api.ports.postgres.model.SymbolMapModel -import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.jdbc.repository.query.Query +import org.springframework.data.repository.CrudRepository import org.springframework.data.repository.query.Param -import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono @Repository -interface SymbolMapRepository : ReactiveCrudRepository { +interface SymbolMapRepository : CrudRepository { @Query("select * from symbol_maps where symbol = :symbol and alias_key = :aliasKey") - fun findByAliasKeyAndSymbol(aliasKey: String, @Param("symbol") symbol: String): Mono + fun findByAliasKeyAndSymbol(aliasKey: String, @Param("symbol") symbol: String): SymbolMapModel? @Query("select * from symbol_maps where alias_key = :aliasKey and alias = :alias") - fun findByAliasKeyAndAlias(aliasKey: String, @Param("alias") alias: String): Mono + fun findByAliasKeyAndAlias(aliasKey: String, @Param("alias") alias: String): SymbolMapModel? - fun findAllByAliasKey(aliasKey: String): Flux + fun findAllByAliasKey(aliasKey: String): List } diff --git a/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/impl/SymbolMapperImpl.kt b/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/impl/SymbolMapperImpl.kt index 0aa8abe91..b9dfa6d98 100644 --- a/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/impl/SymbolMapperImpl.kt +++ b/api/api-ports/api-persister-postgres/src/main/kotlin/co/nilin/opex/api/ports/postgres/impl/SymbolMapperImpl.kt @@ -11,22 +11,19 @@ class SymbolMapperImpl(val symbolMapRepository: SymbolMapRepository) : SymbolMap private var symbolsCache: Map? = null - override suspend fun fromInternalSymbol(symbol: String?): String? { + override fun fromInternalSymbol(symbol: String?): String? { if (symbol == null) return null - return symbolMapRepository.findByAliasKeyAndSymbol("binance", symbol).awaitFirstOrNull()?.alias + return symbolMapRepository.findByAliasKeyAndSymbol("binance", symbol)?.alias } - override suspend fun toInternalSymbol(alias: String?): String? { + override fun toInternalSymbol(alias: String?): String? { if (alias == null) return null - return symbolMapRepository.findByAliasKeyAndAlias("binance", alias).awaitFirstOrNull()?.symbol + return symbolMapRepository.findByAliasKeyAndAlias("binance", alias)?.symbol } - override suspend fun symbolToAliasMap(): Map { + override fun symbolToAliasMap(): Map { if (symbolsCache.isNullOrEmpty()) { - symbolsCache = symbolMapRepository.findAllByAliasKey("binance") - .collectList() - .awaitFirstOrElse { emptyList() } - .associate { it.symbol to it.alias } + symbolsCache = symbolMapRepository.findAllByAliasKey("binance").associate { it.symbol to it.alias } } return symbolsCache!! } diff --git a/api/api-ports/api-persister-postgres/src/main/resources/schema.sql b/api/api-ports/api-persister-postgres/src/main/resources/schema.sql index 5f4a12367..1d4db9878 100644 --- a/api/api-ports/api-persister-postgres/src/main/resources/schema.sql +++ b/api/api-ports/api-persister-postgres/src/main/resources/schema.sql @@ -1,54 +1,23 @@ CREATE TABLE IF NOT EXISTS symbol_maps ( - id - SERIAL - PRIMARY - KEY, - symbol - VARCHAR -( - 72 -) NOT NULL, - alias_key VARCHAR -( - 72 -) NOT NULL, - alias VARCHAR -( - 72 -) NOT NULL, - UNIQUE -( - symbol, - alias_key, - alias -) - ); + id SERIAL PRIMARY KEY, + symbol VARCHAR(72) NOT NULL, + alias_key VARCHAR(72) NOT NULL, + alias VARCHAR(72) NOT NULL, + UNIQUE (symbol, alias_key, alias) +); CREATE TABLE IF NOT EXISTS api_key ( - id - SERIAL - PRIMARY - KEY, - user_id - VARCHAR -( - 36 -) NOT NULL, - label VARCHAR -( - 200 -) NOT NULL, - access_token TEXT NOT NULL, - refresh_token TEXT NOT NULL, - expiration_time TIMESTAMP, - allowed_ips TEXT, - token_expiration_time TIMESTAMP NOT NULL, - key VARCHAR -( - 36 -) NOT NULL UNIQUE, - is_enabled BOOLEAN NOT NULL DEFAULT true, - is_expired BOOLEAN NOT NULL DEFAULT false - ); + id SERIAL PRIMARY KEY, + user_id VARCHAR(36) NOT NULL, + label VARCHAR(200) NOT NULL, + access_token TEXT NOT NULL, + refresh_token TEXT NOT NULL, + expiration_time TIMESTAMP, + allowed_ips TEXT, + token_expiration_time TIMESTAMP NOT NULL, + key VARCHAR(36) NOT NULL UNIQUE, + is_enabled BOOLEAN NOT NULL DEFAULT true, + is_expired BOOLEAN NOT NULL DEFAULT false +); diff --git a/api/api-ports/api-persister-postgres/src/test/kotlin/co/nilin/opex/api/ports/postgres/impl/SymbolMapperTest.kt b/api/api-ports/api-persister-postgres/src/test/kotlin/co/nilin/opex/api/ports/postgres/impl/SymbolMapperTest.kt index 2283712ca..a2a65491c 100644 --- a/api/api-ports/api-persister-postgres/src/test/kotlin/co/nilin/opex/api/ports/postgres/impl/SymbolMapperTest.kt +++ b/api/api-ports/api-persister-postgres/src/test/kotlin/co/nilin/opex/api/ports/postgres/impl/SymbolMapperTest.kt @@ -9,8 +9,6 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono @TestInstance(TestInstance.Lifecycle.PER_CLASS) class SymbolMapperTest { @@ -21,16 +19,16 @@ class SymbolMapperTest { fun setUp() { every { symbolMapRepository.findByAliasKeyAndAlias("binance", "ETHUSDT") - } returns Mono.just(VALID.SYMBOL_MAP_MODEL) + } returns VALID.SYMBOL_MAP_MODEL every { symbolMapRepository.findByAliasKeyAndSymbol("binance", VALID.ETH_USDT) - } returns Mono.just(VALID.SYMBOL_MAP_MODEL) + } returns VALID.SYMBOL_MAP_MODEL every { symbolMapRepository.findAllByAliasKey("binance") - } returns Flux.just(VALID.SYMBOL_MAP_MODEL) + } returns listOf(VALID.SYMBOL_MAP_MODEL) every { symbolMapRepository.findAll() - } returns Flux.just(VALID.SYMBOL_MAP_MODEL) + } returns listOf(VALID.SYMBOL_MAP_MODEL) } @Test diff --git a/api/api-ports/api-proxy-rest/pom.xml b/api/api-ports/api-proxy-rest/pom.xml index 9ddf1d7be..a44dcec75 100644 --- a/api/api-ports/api-proxy-rest/pom.xml +++ b/api/api-ports/api-proxy-rest/pom.xml @@ -28,27 +28,10 @@ co.nilin.opex.utility error-handler - - org.springframework.boot - spring-boot-starter-data-r2dbc - - - io.projectreactor.kotlin - reactor-kotlin-extensions - - - org.jetbrains.kotlinx - kotlinx-coroutines-reactor - org.jetbrains.kotlinx kotlinx-coroutines-core - - io.projectreactor - reactor-test - test - io.mockk mockk diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/config/ProxyDispatchers.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/config/ProxyDispatchers.kt deleted file mode 100644 index b82f269d9..000000000 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/config/ProxyDispatchers.kt +++ /dev/null @@ -1,11 +0,0 @@ -package co.nilin.opex.api.ports.proxy.config - -import kotlinx.coroutines.reactor.asCoroutineDispatcher -import reactor.core.scheduler.Schedulers - -object ProxyDispatchers { - - val general = Schedulers.newBoundedElastic(10, 20, "general").asCoroutineDispatcher() - val market = Schedulers.newBoundedElastic(30, 60, "market").asCoroutineDispatcher() - val wallet = Schedulers.newBoundedElastic(10, 20, "wallet").asCoroutineDispatcher() -} \ No newline at end of file diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt index c7ca50130..6d14e8dd1 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt @@ -1,65 +1,37 @@ package co.nilin.opex.api.ports.proxy.impl -import co.nilin.opex.api.core.inout.PairFeeResponse import co.nilin.opex.api.core.inout.PairConfigResponse +import co.nilin.opex.api.core.inout.PairFeeResponse import co.nilin.opex.api.core.spi.AccountantProxy -import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers +import co.nilin.opex.api.ports.proxy.utils.defaultHeaders import co.nilin.opex.common.utils.LoggerDelegate -import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import org.springframework.beans.factory.annotation.Value -import org.springframework.http.MediaType import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.bodyToFlux -import org.springframework.web.reactive.function.client.bodyToMono +import org.springframework.web.client.RestTemplate +import org.springframework.web.client.getForObject @Component -class AccountantProxyImpl(private val webClient: WebClient) : AccountantProxy { +class AccountantProxyImpl(private val restTemplate: RestTemplate) : AccountantProxy { private val logger by LoggerDelegate() @Value("\${app.accountant.url}") private lateinit var baseUrl: String - override suspend fun getPairConfigs(): List { + override fun getPairConfigs(): List { logger.info("fetching pair configs") - return withContext(ProxyDispatchers.general) { - webClient.get() - .uri("$baseUrl/config/all") - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitSingle() - } + return restTemplate.getForObject>("$baseUrl/config/all", defaultHeaders()).toList() } - override suspend fun getFeeConfigs(): List { + override fun getFeeConfigs(): List { logger.info("fetching fee configs") - return withContext(ProxyDispatchers.general) { - webClient.get() - .uri("$baseUrl/config/fee") - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitSingle() - } + return restTemplate.getForObject>("$baseUrl/config/fee", defaultHeaders()).toList() } - override suspend fun getFeeConfig(symbol: String): PairFeeResponse { + override fun getFeeConfig(symbol: String): PairFeeResponse { logger.info("fetching fee configs for $symbol") - return withContext(ProxyDispatchers.general) { - webClient.get() - .uri("$baseUrl/config/fee/$symbol") - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingle() - } + return restTemplate.getForObject("$baseUrl/config/fee/$symbol", defaultHeaders()) } } \ No newline at end of file diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BinanceGlobalMarketProxy.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BinanceGlobalMarketProxy.kt index 25f79f59c..c93a2b62e 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BinanceGlobalMarketProxy.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BinanceGlobalMarketProxy.kt @@ -2,27 +2,22 @@ package co.nilin.opex.api.ports.proxy.impl import co.nilin.opex.api.core.inout.GlobalPrice import co.nilin.opex.api.core.spi.GlobalMarketProxy -import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers -import kotlinx.coroutines.reactive.awaitFirstOrElse -import kotlinx.coroutines.withContext import org.springframework.beans.factory.annotation.Value -import org.springframework.http.HttpHeaders -import org.springframework.http.MediaType import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.bodyToFlux +import org.springframework.web.client.RestTemplate +import org.springframework.web.client.getForEntity import org.springframework.web.util.UriComponentsBuilder import java.net.URLEncoder @Component class BinanceGlobalMarketProxy( @Value("\${app.binance.api-url}") - val baseUrl: String + private val baseUrl: String, ) : GlobalMarketProxy { - private val webClient = WebClient.builder().build() + private val restTemplate = RestTemplate() - override suspend fun getPrices(symbols: List): List { + override fun getPrices(symbols: List): List { // Binance encoding requires to change some of the Java's encoding model // https://binance-docs.github.io/apidocs/spot/en/#symbol-price-ticker val param = symbols.map { s -> "\"$s\"" }.toString().replace(" ", "") @@ -31,16 +26,6 @@ class BinanceGlobalMarketProxy( .build(true) .toUri() - return withContext(ProxyDispatchers.general) { - webClient.get() - .uri(uri) - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + return restTemplate.getForEntity>(uri).body?.toList() ?: emptyList() } } \ No newline at end of file diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BlockchainGatewayProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BlockchainGatewayProxyImpl.kt index a7d53ac6f..fb981ceb1 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BlockchainGatewayProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BlockchainGatewayProxyImpl.kt @@ -1,62 +1,39 @@ package co.nilin.opex.api.ports.proxy.impl import co.nilin.opex.api.core.inout.AssignResponse -import co.nilin.opex.api.core.inout.CurrencyImplementation import co.nilin.opex.api.core.inout.DepositDetails import co.nilin.opex.api.core.spi.BlockchainGatewayProxy -import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers import co.nilin.opex.api.ports.proxy.data.AssignAddressRequest import co.nilin.opex.api.ports.proxy.data.DepositDetailsRequest +import co.nilin.opex.api.ports.proxy.utils.body import co.nilin.opex.common.utils.LoggerDelegate -import kotlinx.coroutines.reactive.awaitFirstOrElse -import kotlinx.coroutines.reactor.awaitSingleOrNull -import kotlinx.coroutines.withContext import org.springframework.beans.factory.annotation.Value -import org.springframework.http.MediaType import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.body -import org.springframework.web.reactive.function.client.bodyToFlux -import reactor.core.publisher.Mono -import java.net.URI +import org.springframework.web.client.RestTemplate +import org.springframework.web.client.postForObject @Component -class BlockchainGatewayProxyImpl(private val client: WebClient) : BlockchainGatewayProxy { +class BlockchainGatewayProxyImpl(private val restTemplate: RestTemplate) : BlockchainGatewayProxy { private val logger by LoggerDelegate() @Value("\${app.opex-bc-gateway.url}") private lateinit var baseUrl: String - override suspend fun assignAddress(uuid: String, currency: String, chain: String): AssignResponse? { + override fun assignAddress(uuid: String, currency: String, chain: String): AssignResponse? { logger.info("calling bc-gateway assign") - return withContext(ProxyDispatchers.general) { - client.post() - .uri(URI.create("$baseUrl/v1/address/assign")) - .accept(MediaType.APPLICATION_JSON) - .contentType(MediaType.APPLICATION_JSON) - .body(Mono.just(AssignAddressRequest(uuid, currency, chain))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono(AssignResponse::class.java) - .awaitSingleOrNull() - } + return restTemplate.postForObject( + "$baseUrl/v1/address/assign", + body(AssignAddressRequest(uuid, currency, chain)) + ) } - override suspend fun getDepositDetails(refs: List): List { + override fun getDepositDetails(refs: List): List { logger.info("calling bc-gateway deposit details") - return withContext(ProxyDispatchers.general) { - client.post() - .uri(URI.create("$baseUrl/deposit/find/all")) - .accept(MediaType.APPLICATION_JSON) - .contentType(MediaType.APPLICATION_JSON) - .body(Mono.just(DepositDetailsRequest(refs))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + return restTemplate.postForObject>( + "$baseUrl/deposit/find/all", + body(DepositDetailsRequest(refs)) + ).toList() } // override suspend fun getCurrencyImplementations(currency: String?): List { diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketDataProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketDataProxyImpl.kt index 742a1a848..31c32d58c 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketDataProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketDataProxyImpl.kt @@ -2,258 +2,145 @@ package co.nilin.opex.api.ports.proxy.impl import co.nilin.opex.api.core.inout.* import co.nilin.opex.api.core.spi.MarketDataProxy -import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers +import co.nilin.opex.api.ports.proxy.utils.defaultHeaders +import co.nilin.opex.api.ports.proxy.utils.noBody import co.nilin.opex.common.utils.Interval import co.nilin.opex.common.utils.LoggerDelegate -import kotlinx.coroutines.reactive.awaitFirstOrElse -import kotlinx.coroutines.reactor.awaitSingleOrNull -import kotlinx.coroutines.withContext import org.springframework.beans.factory.annotation.Value -import org.springframework.http.HttpHeaders -import org.springframework.http.MediaType +import org.springframework.http.HttpMethod import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.bodyToFlux -import org.springframework.web.reactive.function.client.bodyToMono +import org.springframework.web.client.RestTemplate +import org.springframework.web.client.exchange +import org.springframework.web.client.getForObject +import org.springframework.web.util.UriComponentsBuilder import java.util.* @Component -class MarketDataProxyImpl(private val webClient: WebClient) : MarketDataProxy { +class MarketDataProxyImpl(private val restTemplate: RestTemplate) : MarketDataProxy { private val logger by LoggerDelegate() @Value("\${app.market.url}") private lateinit var baseUrl: String - override suspend fun getTradeTickerData(interval: Interval): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/ticker") { - it.queryParam("interval", interval) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun getTradeTickerData(interval: Interval): List { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/ticker") + .queryParam("interval", interval) + .build().toUri() + + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getTradeTickerDataBySymbol(symbol: String, interval: Interval): PriceChange { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/$symbol/ticker") { - it.queryParam("interval", interval) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - ?: PriceChange(symbol, openTime = Date().time, closeTime = interval.getTime()) - } + override fun getTradeTickerDataBySymbol(symbol: String, interval: Interval): PriceChange { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/$symbol/ticker") + .queryParam("interval", interval) + .build().toUri() + + return restTemplate.exchange(uri, HttpMethod.GET, noBody()).body ?: PriceChange( + symbol, + openTime = Date().time, + closeTime = interval.getTime() + ) } - override suspend fun openBidOrders(symbol: String, limit: Int): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/$symbol/order-book") { - it.queryParam("limit", limit) - it.queryParam("direction", OrderDirection.BID) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun openBidOrders(symbol: String, limit: Int): List { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/$symbol/order-book") + .queryParam("limit", limit) + .queryParam("direction", OrderDirection.BID) + .build().toUri() + + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun openAskOrders(symbol: String, limit: Int): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/$symbol/order-book") { - it.queryParam("limit", limit) - it.queryParam("direction", OrderDirection.ASK) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun openAskOrders(symbol: String, limit: Int): List { + + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/$symbol/order-book") + .queryParam("limit", limit) + .queryParam("direction", OrderDirection.ASK) + .build().toUri() + + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun lastOrder(symbol: String): Order? { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/$symbol/last-order") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - } + override fun lastOrder(symbol: String): Order? { + return restTemplate.getForObject("$baseUrl/v1/market/$symbol/last-order", defaultHeaders()) } - override suspend fun recentTrades(symbol: String, limit: Int): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/$symbol/recent-trades") { - it.queryParam("limit", limit) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun recentTrades(symbol: String, limit: Int): List { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/$symbol/recent-trades") + .queryParam("limit", limit) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun lastPrice(symbol: String?): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/prices") { - it.queryParam("symbol", symbol) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun lastPrice(symbol: String?): List { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/prices") + .queryParam("symbol", symbol) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getBestPriceForSymbols(symbols: List): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/best-prices") { - it.queryParam("symbols", symbols) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun getBestPriceForSymbols(symbols: List): List { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/best-price") + .queryParam("symbols", symbols) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getCandleInfo( + override fun getCandleInfo( symbol: String, interval: String, startTime: Long?, endTime: Long?, limit: Int ): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/chart/$symbol/candle") { - it.queryParam("interval", interval) - it.queryParam("since", startTime) - it.queryParam("until", endTime) - it.queryParam("limit", limit) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/chart/$symbol/candle") + .queryParam("interval", interval) + .queryParam("since", startTime) + .queryParam("until", endTime) + .queryParam("limit", limit) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getMarketCurrencyRates(quote: String, base: String?): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri(if (base.isNullOrEmpty()) "$baseUrl/v1/rate/EXTERNAL" else "$baseUrl/v1/rate/$base/EXTERNAL") { - it.queryParam("quote", quote) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun getMarketCurrencyRates(quote: String, base: String?): List { + val url = if (base.isNullOrEmpty()) "$baseUrl/v1/rate/EXTERNAL" else "$baseUrl/v1/rate/$base/EXTERNAL" + val uri = UriComponentsBuilder.fromUriString(url) + .queryParam("quote", quote) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getExternalCurrencyRates(quote: String, base: String?): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri(if (base.isNullOrEmpty()) "$baseUrl/v1/rate/EXTERNAL" else "$baseUrl/v1/rate/$base/EXTERNAL") { - it.queryParam("quote", quote) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun getExternalCurrencyRates(quote: String, base: String?): List { + + val url = if (base.isNullOrEmpty()) "$baseUrl/v1/rate/EXTERNAL" else "$baseUrl/v1/rate/$base/EXTERNAL" + val uri = UriComponentsBuilder.fromUriString(url) + .queryParam("quote", quote) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun countActiveUsers(interval: Interval): Long { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/active-users") { - it.queryParam("interval", interval) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - ?.value ?: 0 - } + override fun countActiveUsers(interval: Interval): Long { + + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/active-users") + .queryParam("interval", interval) + .build().toUri() + return restTemplate.exchange(uri, HttpMethod.GET, noBody()).body?.value ?: 0 } - override suspend fun countTotalOrders(interval: Interval): Long { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/orders-count") { - it.queryParam("interval", interval) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - ?.value ?: 0 - } + override fun countTotalOrders(interval: Interval): Long { + + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/orders-count") + .queryParam("interval", interval) + .build().toUri() + return restTemplate.exchange(uri, HttpMethod.GET, noBody()).body?.value ?: 0 } - override suspend fun countTotalTrades(interval: Interval): Long { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/market/trades-count") { - it.queryParam("interval", interval) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - ?.value ?: 0 - } + override fun countTotalTrades(interval: Interval): Long { + + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/market/trades-count") + .queryParam("interval", interval) + .build().toUri() + return restTemplate.exchange(uri, HttpMethod.GET, noBody()).body?.value ?: 0 } } \ No newline at end of file diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketStatProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketStatProxyImpl.kt index 369ac556e..7e94c821e 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketStatProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketStatProxyImpl.kt @@ -3,87 +3,49 @@ package co.nilin.opex.api.ports.proxy.impl import co.nilin.opex.api.core.inout.PriceStat import co.nilin.opex.api.core.inout.TradeVolumeStat import co.nilin.opex.api.core.spi.MarketStatProxy -import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers +import co.nilin.opex.api.ports.proxy.utils.noBody import co.nilin.opex.common.utils.Interval -import kotlinx.coroutines.reactive.awaitFirstOrElse -import kotlinx.coroutines.reactor.awaitSingleOrNull -import kotlinx.coroutines.withContext import org.springframework.beans.factory.annotation.Value -import org.springframework.http.HttpHeaders -import org.springframework.http.MediaType +import org.springframework.http.HttpMethod import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.bodyToFlux -import org.springframework.web.reactive.function.client.bodyToMono +import org.springframework.web.client.RestTemplate +import org.springframework.web.client.exchange +import org.springframework.web.util.UriComponentsBuilder @Component class MarketStatProxyImpl( - private val webClient: WebClient, + private val restTemplate: RestTemplate, @Value("\${app.market.url}") private val baseUrl: String ) : MarketStatProxy { - override suspend fun getMostIncreasedInPricePairs(interval: Interval, limit: Int): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/stats/price/most-increased") { - it.queryParam("interval", interval) - it.queryParam("limit", limit) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun getMostIncreasedInPricePairs(interval: Interval, limit: Int): List { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/stats/price/most-increased") + .queryParam("interval", interval) + .queryParam("limit", limit) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getMostDecreasedInPricePairs(interval: Interval, limit: Int): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/stats/price/most-decreased") { - it.queryParam("interval", interval) - it.queryParam("limit", limit) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun getMostDecreasedInPricePairs(interval: Interval, limit: Int): List { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/stats/price/most-decreased") + .queryParam("interval", interval) + .queryParam("limit", limit) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getHighestVolumePair(interval: Interval): TradeVolumeStat? { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/stats/volume/highest") { - it.queryParam("interval", interval) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - } + override fun getHighestVolumePair(interval: Interval): TradeVolumeStat? { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/stats/volume/highest") + .queryParam("interval", interval) + .build().toUri() + return restTemplate.exchange(uri, HttpMethod.GET, noBody()).body } - override suspend fun getTradeCountPair(interval: Interval): TradeVolumeStat? { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/stats/most-trades") { - it.queryParam("interval", interval) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - } + override fun getTradeCountPair(interval: Interval): TradeVolumeStat? { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/stats/most-trades") + .queryParam("interval", interval) + .build().toUri() + return restTemplate.exchange(uri, HttpMethod.GET, noBody()).body } } \ No newline at end of file diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketUserDataProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketUserDataProxyImpl.kt index c3b06e2ec..d519d0066 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketUserDataProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MarketUserDataProxyImpl.kt @@ -2,91 +2,61 @@ package co.nilin.opex.api.ports.proxy.impl import co.nilin.opex.api.core.inout.* import co.nilin.opex.api.core.spi.MarketUserDataProxy -import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers -import co.nilin.opex.api.ports.proxy.data.AllOrderRequest import co.nilin.opex.api.ports.proxy.data.QueryOrderRequest import co.nilin.opex.api.ports.proxy.data.TradeRequest +import co.nilin.opex.api.ports.proxy.utils.body +import co.nilin.opex.api.ports.proxy.utils.defaultHeaders +import co.nilin.opex.api.ports.proxy.utils.noBody import co.nilin.opex.common.utils.LoggerDelegate -import kotlinx.coroutines.reactive.awaitFirstOrElse -import kotlinx.coroutines.reactor.awaitSingleOrNull -import kotlinx.coroutines.withContext import org.springframework.beans.factory.annotation.Value -import org.springframework.http.HttpHeaders -import org.springframework.http.MediaType +import org.springframework.http.HttpMethod import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.body -import org.springframework.web.reactive.function.client.bodyToFlux -import org.springframework.web.reactive.function.client.bodyToMono -import reactor.core.publisher.Mono +import org.springframework.web.client.RestTemplate +import org.springframework.web.client.exchange +import org.springframework.web.client.postForObject +import org.springframework.web.util.UriComponentsBuilder import java.security.Principal import java.util.* @Component -class MarketUserDataProxyImpl(private val webClient: WebClient) : MarketUserDataProxy { +class MarketUserDataProxyImpl(private val restTemplate: RestTemplate) : MarketUserDataProxy { private val logger by LoggerDelegate() @Value("\${app.market.url}") private lateinit var baseUrl: String - override suspend fun queryOrder( + override fun queryOrder( principal: Principal, symbol: String, orderId: Long?, origClientOrderId: String?, ): Order? { - return withContext(ProxyDispatchers.market) { - webClient.post() - .uri("$baseUrl/v1/user/${principal.name}/order/query") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .body(Mono.just(QueryOrderRequest(symbol, orderId, origClientOrderId))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - } + return restTemplate.postForObject( + "$baseUrl/v1/user/${principal.name}/order/query", + body(QueryOrderRequest(symbol, orderId, origClientOrderId)) + ) } - override suspend fun openOrders(principal: Principal, symbol: String?, limit: Int?): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/user/${principal.name}/orders/$symbol/open") { - it.queryParam("limit", limit ?: 100) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun openOrders(principal: Principal, symbol: String?, limit: Int?): List { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/user/${principal.name}/orders/$symbol/open") + .queryParam("limit", limit ?: 100) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun allOrders( + override fun allOrders( principal: Principal, symbol: String?, startTime: Date?, endTime: Date?, limit: Int?, ): List { - return withContext(ProxyDispatchers.market) { - webClient.post() - .uri("$baseUrl/v1/user/${principal.name}/orders") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .body(Mono.just(AllOrderRequest(symbol, startTime, endTime, limit ?: 500))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + return restTemplate.postForObject>("$baseUrl/v1/user/${principal.name}/orders", defaultHeaders()) + .toList() } - override suspend fun allTrades( + override fun allTrades( principal: Principal, symbol: String?, fromTrade: Long?, @@ -94,21 +64,13 @@ class MarketUserDataProxyImpl(private val webClient: WebClient) : MarketUserData endTime: Date?, limit: Int?, ): List { - return withContext(ProxyDispatchers.market) { - webClient.post() - .uri("$baseUrl/v1/user/${principal.name}/trades") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .body(Mono.just(TradeRequest(symbol, fromTrade, startTime, endTime, limit ?: 500))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + return restTemplate.postForObject>( + "$baseUrl/v1/user/${principal.name}/trades", + body(TradeRequest(symbol, fromTrade, startTime, endTime, limit ?: 500)) + ).toList() } - override suspend fun getOrderHistory( + override fun getOrderHistory( uuid: String, symbol: String?, startTime: Long?, @@ -118,28 +80,19 @@ class MarketUserDataProxyImpl(private val webClient: WebClient) : MarketUserData limit: Int?, offset: Int?, ): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/user/order/history/$uuid") { - it.queryParam("symbol", symbol) - it.queryParam("startTime", startTime) - it.queryParam("endTime", endTime) - it.queryParam("orderType", orderType) - it.queryParam("direction", direction) - it.queryParam("limit", limit) - it.queryParam("offset", offset) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/user/order/history/$uuid") + .queryParam("symbol", symbol) + .queryParam("startTime", startTime) + .queryParam("endTime", endTime) + .queryParam("orderType", orderType) + .queryParam("direction", direction) + .queryParam("limit", limit) + .queryParam("offset", offset) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getOrderHistoryCount( + override fun getOrderHistoryCount( uuid: String, symbol: String?, startTime: Long?, @@ -147,25 +100,17 @@ class MarketUserDataProxyImpl(private val webClient: WebClient) : MarketUserData orderType: MatchingOrderType?, direction: OrderDirection?, ): Long { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/user/order/history/count/$uuid") { - it.queryParam("symbol", symbol) - it.queryParam("startTime", startTime) - it.queryParam("endTime", endTime) - it.queryParam("orderType", orderType) - it.queryParam("direction", direction) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrElse { 0L } - } + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/user/order/history/count/$uuid") + .queryParam("symbol", symbol) + .queryParam("startTime", startTime) + .queryParam("endTime", endTime) + .queryParam("orderType", orderType) + .queryParam("direction", direction) + .build().toUri() + return restTemplate.exchange(uri, HttpMethod.GET, noBody()).body ?: 0 } - override suspend fun getTradeHistory( + override fun getTradeHistory( uuid: String, symbol: String?, startTime: Long?, @@ -174,47 +119,30 @@ class MarketUserDataProxyImpl(private val webClient: WebClient) : MarketUserData limit: Int?, offset: Int?, ): List { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/user/trade/history/$uuid") { - it.queryParam("symbol", symbol) - it.queryParam("startTime", startTime) - it.queryParam("endTime", endTime) - it.queryParam("direction", direction) - it.queryParam("limit", limit) - it.queryParam("offset", offset) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/user/trade/history/$uuid") + .queryParam("symbol", symbol) + .queryParam("startTime", startTime) + .queryParam("endTime", endTime) + .queryParam("direction", direction) + .queryParam("limit", limit) + .queryParam("offset", offset) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getTradeHistoryCount( + override fun getTradeHistoryCount( uuid: String, symbol: String?, startTime: Long?, endTime: Long?, direction: OrderDirection?, ): Long { - return withContext(ProxyDispatchers.market) { - webClient.get() - .uri("$baseUrl/v1/user/trade/history/count/$uuid") { - it.queryParam("symbol", symbol) - it.queryParam("startTime", startTime) - it.queryParam("endTime", endTime) - it.queryParam("direction", direction) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrElse { 0L } - } + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v1/user/trade/history/count/$uuid") + .queryParam("symbol", symbol) + .queryParam("startTime", startTime) + .queryParam("endTime", endTime) + .queryParam("direction", direction) + .build().toUri() + return restTemplate.exchange(uri, HttpMethod.GET, noBody()).body ?: 0 } } \ No newline at end of file diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MatchingGatewayProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MatchingGatewayProxyImpl.kt index 92bed3034..cd6ec38c4 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MatchingGatewayProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/MatchingGatewayProxyImpl.kt @@ -2,34 +2,27 @@ package co.nilin.opex.api.ports.proxy.impl import co.nilin.opex.api.core.inout.* import co.nilin.opex.api.core.spi.MatchingGatewayProxy -import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers import co.nilin.opex.api.ports.proxy.data.CancelOrderRequest import co.nilin.opex.api.ports.proxy.data.CreateOrderRequest +import co.nilin.opex.api.ports.proxy.utils.body +import co.nilin.opex.api.ports.proxy.utils.defaultHeaders import co.nilin.opex.common.utils.LoggerDelegate -import kotlinx.coroutines.reactive.awaitFirstOrElse -import kotlinx.coroutines.reactor.awaitSingleOrNull -import kotlinx.coroutines.withContext import org.springframework.beans.factory.annotation.Value -import org.springframework.http.HttpHeaders -import org.springframework.http.MediaType import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.body -import org.springframework.web.reactive.function.client.bodyToFlux -import org.springframework.web.reactive.function.client.bodyToMono -import reactor.core.publisher.Mono +import org.springframework.web.client.RestTemplate +import org.springframework.web.client.getForObject +import org.springframework.web.client.postForObject import java.math.BigDecimal -import java.net.URI @Component -class MatchingGatewayProxyImpl(private val client: WebClient) : MatchingGatewayProxy { +class MatchingGatewayProxyImpl(private val restTemplate: RestTemplate) : MatchingGatewayProxy { private val logger by LoggerDelegate() @Value("\${app.matching-gateway.url}") private lateinit var baseUrl: String - override suspend fun createNewOrder( + override fun createNewOrder( uuid: String?, pair: String, price: BigDecimal, @@ -41,23 +34,11 @@ class MatchingGatewayProxyImpl(private val client: WebClient) : MatchingGatewayP token: String?, ): OrderSubmitResult? { logger.info("calling matching-gateway order create") - val body = CreateOrderRequest(uuid, pair, price, quantity, direction, matchConstraint, orderType, userLevel) - return withContext(ProxyDispatchers.general) { - client.post() - .uri(URI.create("$baseUrl/order")) - .accept(MediaType.APPLICATION_JSON) - .contentType(MediaType.APPLICATION_JSON) - .header("Authorization", "Bearer $token") - .body(Mono.just(body)) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - } - + val request = CreateOrderRequest(uuid, pair, price, quantity, direction, matchConstraint, orderType, userLevel) + return restTemplate.postForObject("$baseUrl/order", body(request)) } - override suspend fun cancelOrder( + override fun cancelOrder( ouid: String, uuid: String, orderId: Long, @@ -65,31 +46,13 @@ class MatchingGatewayProxyImpl(private val client: WebClient) : MatchingGatewayP token: String?, ): OrderSubmitResult? { logger.info("calling matching-gateway order cancel") - return withContext(ProxyDispatchers.general) { - client.post() - .uri(URI.create("$baseUrl/order/cancel")) - .accept(MediaType.APPLICATION_JSON) - .contentType(MediaType.APPLICATION_JSON) - .header("Authorization", "Bearer $token") - .body(Mono.just(CancelOrderRequest(ouid, uuid, orderId, symbol))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingleOrNull() - } + return restTemplate.postForObject( + "$baseUrl/order/cancel", + body(CancelOrderRequest(ouid, uuid, orderId, symbol)) + ) } - override suspend fun getPairSettings(): List { - return withContext(ProxyDispatchers.wallet) { - client.get() - .uri("$baseUrl/pair-setting") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun getPairSettings(): List { + return restTemplate.getForObject>("$baseUrl/pair-setting", defaultHeaders()).toList() } } \ No newline at end of file diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt index c934ad904..b9955e1dc 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt @@ -2,76 +2,56 @@ package co.nilin.opex.api.ports.proxy.impl import co.nilin.opex.api.core.inout.* import co.nilin.opex.api.core.spi.WalletProxy -import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers import co.nilin.opex.api.ports.proxy.data.TransactionRequest +import co.nilin.opex.api.ports.proxy.utils.body +import co.nilin.opex.api.ports.proxy.utils.defaultHeaders +import co.nilin.opex.api.ports.proxy.utils.noBody +import co.nilin.opex.api.ports.proxy.utils.withAuth import co.nilin.opex.common.OpexError import co.nilin.opex.common.utils.LoggerDelegate -import kotlinx.coroutines.reactive.awaitFirstOrElse -import kotlinx.coroutines.reactive.awaitFirstOrNull -import kotlinx.coroutines.reactive.awaitSingle -import kotlinx.coroutines.withContext import org.springframework.beans.factory.annotation.Value -import org.springframework.http.HttpHeaders -import org.springframework.http.MediaType +import org.springframework.http.HttpMethod import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.body -import org.springframework.web.reactive.function.client.bodyToFlux -import org.springframework.web.reactive.function.client.bodyToMono -import reactor.core.publisher.Mono +import org.springframework.web.client.RestTemplate +import org.springframework.web.client.exchange +import org.springframework.web.client.getForObject +import org.springframework.web.client.postForObject +import org.springframework.web.util.UriComponentsBuilder @Component -class WalletProxyImpl(private val webClient: WebClient) : WalletProxy { +class WalletProxyImpl(private val restTemplate: RestTemplate) : WalletProxy { private val logger by LoggerDelegate() @Value("\${app.wallet.url}") private lateinit var baseUrl: String - override suspend fun getWallets(uuid: String?, token: String?): List { + override fun getWallets(uuid: String?, token: String?): List { logger.info("fetching wallets for $uuid") - return withContext(ProxyDispatchers.wallet) { - webClient.get() - .uri("$baseUrl/v1/owner/$uuid/wallets") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitSingle() - } + return restTemplate.getForObject>( + "$baseUrl/v1/owner/$uuid/wallets", + defaultHeaders().withAuth(token) + ).toList() } - override suspend fun getWallet(uuid: String?, token: String?, symbol: String): Wallet { + override fun getWallet(uuid: String?, token: String?, symbol: String): Wallet { logger.info("fetching wallet for $uuid") - return withContext(ProxyDispatchers.wallet) { - webClient.get() - .uri("$baseUrl/v1/owner/$uuid/wallets/$symbol") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingle() - } + return restTemplate.getForObject( + "$baseUrl/v1/owner/$uuid/wallets/$symbol", + defaultHeaders().withAuth(token) + ) } - override suspend fun getOwnerLimits(uuid: String?, token: String?): OwnerLimitsResponse { + override fun getOwnerLimits(uuid: String?, token: String?): OwnerLimitsResponse { logger.info("fetching owner limits for $uuid") - return withContext(ProxyDispatchers.wallet) { - webClient.get() - .uri("$baseUrl/v1/owner/$uuid/limits") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitSingle() - } + return restTemplate.getForObject( + "$baseUrl/v1/owner/$uuid/limits", + defaultHeaders().withAuth(token) + ) + } - override suspend fun getDepositTransactions( + override fun getDepositTransactions( uuid: String, token: String, currency: String?, @@ -82,21 +62,13 @@ class WalletProxyImpl(private val webClient: WebClient) : WalletProxy { ascendingByTime: Boolean?, ): List { logger.info("fetching deposit transaction history for $uuid") - return withContext(ProxyDispatchers.wallet) { - webClient.post() - .uri("$baseUrl/v1/deposit/history") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .body(Mono.just(TransactionRequest(currency, startTime, endTime, limit, offset, ascendingByTime))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + return restTemplate.postForObject>( + "$baseUrl/v1/deposit/history", + body(TransactionRequest(currency, startTime, endTime, limit, offset, ascendingByTime), token) + ).toList() } - override suspend fun getDepositTransactionsCount( + override fun getDepositTransactionsCount( uuid: String, token: String, currency: String?, @@ -104,20 +76,13 @@ class WalletProxyImpl(private val webClient: WebClient) : WalletProxy { endTime: Long?, ): Long { logger.info("fetching deposit transaction count for $uuid") - return withContext(ProxyDispatchers.wallet) { - webClient.post() - .uri("$baseUrl/v1/deposit/history/count") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .body(Mono.just(TransactionRequest(currency, startTime, endTime, null, null))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrElse { 0L } - } + return restTemplate.getForObject( + "$baseUrl/v1/deposit/history/count", + body(TransactionRequest(currency, startTime, endTime, null, null), token) + ) } - override suspend fun getWithdrawTransactions( + override fun getWithdrawTransactions( uuid: String, token: String, currency: String?, @@ -128,21 +93,13 @@ class WalletProxyImpl(private val webClient: WebClient) : WalletProxy { ascendingByTime: Boolean?, ): List { logger.info("fetching withdraw transaction history for $uuid") - return withContext(ProxyDispatchers.wallet) { - webClient.post() - .uri("$baseUrl/withdraw/history") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .body(Mono.just(TransactionRequest(currency, startTime, endTime, limit, offset, ascendingByTime))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + return restTemplate.postForObject>( + "$baseUrl/withdraw/history", + body(TransactionRequest(currency, startTime, endTime, limit, offset, ascendingByTime), token) + ).toList() } - override suspend fun getWithdrawTransactionsCount( + override fun getWithdrawTransactionsCount( uuid: String, token: String, currency: String?, @@ -150,20 +107,14 @@ class WalletProxyImpl(private val webClient: WebClient) : WalletProxy { endTime: Long?, ): Long { logger.info("fetching withdraw transaction count for $uuid") - return withContext(ProxyDispatchers.wallet) { - webClient.post() - .uri("$baseUrl/withdraw/history/count") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .body(Mono.just(TransactionRequest(currency, startTime, endTime, null, null))) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrElse { 0L } - } + return restTemplate.postForObject( + "$baseUrl/withdraw/history/count", + TransactionRequest(currency, startTime, endTime, null, null), + token + ) } - override suspend fun getTransactions( + override fun getTransactions( uuid: String, token: String, currency: String?, @@ -174,35 +125,26 @@ class WalletProxyImpl(private val webClient: WebClient) : WalletProxy { offset: Int, ascendingByTime: Boolean? ): List { - return webClient.post() - .uri("$baseUrl/v2/transaction") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .body( - Mono.just( - UserTransactionRequest( - null, - currency, - null, - null, - category, - startTime, - endTime, - limit, - offset, - ascendingByTime == true, - null - ) - ) - ) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } + val request = UserTransactionRequest( + null, + currency, + null, + null, + category, + startTime, + endTime, + limit, + offset, + ascendingByTime == true, + null + ) + return restTemplate.postForObject>( + "$baseUrl/v2/transaction", + body(request, token) + ).toList() } - override suspend fun getTransactionsCount( + override fun getTransactionsCount( uuid: String, token: String, currency: String?, @@ -210,248 +152,135 @@ class WalletProxyImpl(private val webClient: WebClient) : WalletProxy { startTime: Long?, endTime: Long?, ): Long { - return webClient.post() - .uri("$baseUrl/v2/transaction/count") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .body( - Mono.just( - UserTransactionRequest( - null, - currency, - null, - null, - category, - startTime, - endTime, - null - ) - ) - ) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrElse { 0L } + val request = UserTransactionRequest(null, currency, null, null, category, startTime, endTime, null) + return restTemplate.postForObject("$baseUrl/v2/transaction/count", body(request, token)) } - override suspend fun getGateWays( + override fun getGateWays( includeOffChainGateways: Boolean, includeOnChainGateways: Boolean, ): List { - return withContext(ProxyDispatchers.wallet) { - webClient.get() - .uri("$baseUrl/currency/gateways") { - it.queryParam("includeOffChainGateways", includeOffChainGateways) - it.queryParam("includeOnChainGateways", includeOnChainGateways) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + val uri = UriComponentsBuilder.fromUriString("$baseUrl/currency/gateways") + .queryParam("includeOffChainGateways", includeOffChainGateways) + .queryParam("includeOnChainGateways", includeOnChainGateways) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() + ?: emptyList() } - override suspend fun getCurrencies(): List { - return withContext(ProxyDispatchers.wallet) { - webClient.get() - .uri("$baseUrl/currency/all") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun getCurrencies(): List { + return restTemplate.getForObject>("$baseUrl/currency/all", defaultHeaders()).toList() } - override suspend fun getUserTradeTransactionSummary( + override fun getUserTradeTransactionSummary( uuid: String, token: String, startTime: Long?, endTime: Long?, limit: Int?, ): List { - return withContext(ProxyDispatchers.wallet) { - webClient.get() - .uri("$baseUrl/v2/transaction/trade/summary/$uuid") { - it.queryParam("startTime", startTime) - it.queryParam("endTime", endTime) - it.queryParam("limit", limit) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + val uri = UriComponentsBuilder.fromUriString("$baseUrl/v2/transaction/trade/summary/$uuid") + .queryParam("startTime", startTime) + .queryParam("endTime", endTime) + .queryParam("limit", limit) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() + ?: emptyList() } - override suspend fun getUserDepositSummary( + override fun getUserDepositSummary( uuid: String, token: String, startTime: Long?, endTime: Long?, limit: Int?, ): List { - return withContext(ProxyDispatchers.wallet) { - webClient.get() - .uri("$baseUrl/deposit/summary/$uuid") { - it.queryParam("startTime", startTime) - it.queryParam("endTime", endTime) - it.queryParam("limit", limit) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + val uri = UriComponentsBuilder.fromUriString("$baseUrl/deposit/summary/$uuid") + .queryParam("startTime", startTime) + .queryParam("endTime", endTime) + .queryParam("limit", limit) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() + ?: emptyList() + } - override suspend fun getUserWithdrawSummary( + override fun getUserWithdrawSummary( uuid: String, token: String, startTime: Long?, endTime: Long?, limit: Int?, ): List { - return withContext(ProxyDispatchers.wallet) { - webClient.get() - .uri("$baseUrl/withdraw/summary/$uuid") { - it.queryParam("startTime", startTime) - it.queryParam("endTime", endTime) - it.queryParam("limit", limit) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + val uri = UriComponentsBuilder.fromUriString("$baseUrl/withdraw/summary/$uuid") + .queryParam("startTime", startTime) + .queryParam("endTime", endTime) + .queryParam("limit", limit) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() + ?: emptyList() } - override suspend fun deposit( + override fun deposit( request: RequestDepositBody ): TransferResult? { - return withContext(ProxyDispatchers.wallet) { - webClient.post() - .uri("$baseUrl/deposit/${request.amount}_${request.chain}_${request.symbol}/${request.receiverUuid}_${request.receiverWalletType}") { - it.apply { - request.description?.let { description -> queryParam("description", description) } - request.transferRef?.let { transferRef -> queryParam("transferRef", transferRef) } - request.gatewayUuid?.let { gatewayUuid -> queryParam("gatewayUuid", gatewayUuid) } - }.build() - }.accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrNull() - } + val uri = + UriComponentsBuilder.fromUriString("$baseUrl/deposit/${request.amount}_${request.chain}_${request.symbol}/${request.receiverUuid}_${request.receiverWalletType}") + .apply { + request.description?.let { description -> queryParam("description", description) } + request.transferRef?.let { transferRef -> queryParam("transferRef", transferRef) } + request.gatewayUuid?.let { gatewayUuid -> queryParam("gatewayUuid", gatewayUuid) } + } + .build().toUri() + return restTemplate.exchange(uri, HttpMethod.POST, noBody()).body } - override suspend fun requestWithdraw( + override fun requestWithdraw( token: String, request: RequestWithdrawBody ): WithdrawActionResult { - return webClient.post() - .uri("$baseUrl/withdraw") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .body(Mono.just(request)) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrElse { throw OpexError.BadRequest.exception() } + return restTemplate.postForObject("$baseUrl/withdraw", body(request, token)) + ?: throw OpexError.BadRequest.exception() } - override suspend fun cancelWithdraw(token: String, withdrawId: Long): Void? { - return webClient.post() - .uri("$baseUrl/withdraw/$withdrawId/cancel") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono(Void::class.java) - .awaitFirstOrNull() + override fun cancelWithdraw(token: String, withdrawId: Long) { + restTemplate.postForObject("$baseUrl/withdraw/$withdrawId/cancel", defaultHeaders().withAuth(token)) } - override suspend fun findWithdraw(token: String, withdrawId: Long): WithdrawResponse { - return webClient.get() - .uri("$baseUrl/withdraw/$withdrawId") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrElse { throw OpexError.WithdrawNotFound.exception() } + override fun findWithdraw(token: String, withdrawId: Long): WithdrawResponse { + return restTemplate.postForObject( + "$baseUrl/withdraw/$withdrawId", + defaultHeaders().withAuth(token) + ) ?: throw OpexError.WithdrawNotFound.exception() } - override suspend fun submitVoucher( + override fun submitVoucher( code: String, token: String ): SubmitVoucherResponse { - return webClient.put() - .uri("$baseUrl/voucher/$code") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrElse { throw OpexError.BadRequest.exception() } + return restTemplate.postForObject( + "$baseUrl/voucher/$code", + defaultHeaders().withAuth(token) + ) ?: throw OpexError.BadRequest.exception() } - override suspend fun getQuoteCurrencies(): List { - return withContext(ProxyDispatchers.wallet) { - webClient.get() - .uri("$baseUrl/currency/quotes") { - it.queryParam("isActive", true) - it.build() - }.accept(MediaType.APPLICATION_JSON) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } - } + override fun getQuoteCurrencies(): List { + val uri = UriComponentsBuilder.fromUriString("$baseUrl/currency/quotes") + .queryParam("isActive", true) + .build().toUri() + return restTemplate.exchange>(uri, HttpMethod.GET, noBody()).body?.toList() ?: emptyList() } - override suspend fun getSwapTransactions(token: String, request: UserTransactionRequest): List { - return webClient.post() - .uri("$baseUrl/v1/swap/history") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .body(Mono.just(request)) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToFlux() - .collectList() - .awaitFirstOrElse { emptyList() } + override fun getSwapTransactions(token: String, request: UserTransactionRequest): List { + return restTemplate.postForObject>("$baseUrl/v1/swap/history", body(request, token)) + .toList() } - override suspend fun getSwapTransactionsCount( + override fun getSwapTransactionsCount( token: String, request: UserTransactionRequest ): Long { - return webClient.post() - .uri("$baseUrl/v1/swap/history/count") - .accept(MediaType.APPLICATION_JSON) - .header(HttpHeaders.AUTHORIZATION, "Bearer $token") - .body(Mono.just(request)) - .retrieve() - .onStatus({ t -> t.isError }, { it.createException() }) - .bodyToMono() - .awaitFirstOrElse { 0L } + return restTemplate.postForObject("$baseUrl/v1/swap/history/count", body(request, token)) ?: 0 } } diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/utils/Extensions.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/utils/Extensions.kt new file mode 100644 index 000000000..38cc7d9da --- /dev/null +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/utils/Extensions.kt @@ -0,0 +1,43 @@ +package co.nilin.opex.api.ports.proxy.utils + +import org.springframework.http.* + +//fun tryRest(action: () -> T): T { +// return try { +// action() +// } +// catch (e: HttpClientErrorException) { +// logger.error("Client error fetching pair configs from $url: ${e.statusCode} - ${e.responseBodyAsString}") +// throw e +// } catch (e: HttpServerErrorException) { +// logger.error("Server error fetching pair configs from $url: ${e.statusCode} - ${e.responseBodyAsString}") +// throw e +// } catch (e: ResourceAccessException) { +// logger.error("Network or connection error fetching pair configs from $url: ${e.message}") +// throw e +// } catch (e: Exception) { +// logger.error("Unexpected error fetching pair configs from $url: ${e.message}", e) +// throw e +// } +//} + +internal fun defaultHeaders() = HttpHeaders().apply { + add(HttpHeaders.ACCEPT, "application/json") + add(HttpHeaders.CONTENT_TYPE, "application/json") +} + +internal fun defaultHeaders(contentType: MediaType) = HttpHeaders().apply { + add(HttpHeaders.ACCEPT, "application/json") + this.contentType = contentType +} + +internal fun body(body: Any) = HttpEntity(body, defaultHeaders()) + +internal fun body(body: Any, auth: String) = HttpEntity(body, defaultHeaders().withAuth(auth)) + +internal fun noBody() = HttpEntity(defaultHeaders()) + +internal fun HttpHeaders.withAuth(auth: String?): HttpHeaders { + add("Authorization", "Bearer $auth") + return this +} \ No newline at end of file diff --git a/api/pom.xml b/api/pom.xml index 236297638..679b969aa 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -45,7 +45,12 @@ org.zalando - logbook-spring-boot-webflux-autoconfigure + logbook-spring-boot-starter + 3.9.0 + + + org.zalando + logbook-spring-boot-autoconfigure 3.9.0 diff --git a/common/src/main/kotlin/co/nilin/opex/common/security/CustomJwtAuthConverter.kt b/common/src/main/kotlin/co/nilin/opex/common/security/CustomJwtAuthConverter.kt index a08de7b5e..4dae0c156 100644 --- a/common/src/main/kotlin/co/nilin/opex/common/security/CustomJwtAuthConverter.kt +++ b/common/src/main/kotlin/co/nilin/opex/common/security/CustomJwtAuthConverter.kt @@ -22,4 +22,17 @@ class ReactiveCustomJwtConverter : Converter { + + override fun convert(source: Jwt): AbstractAuthenticationToken { + val permissions = source.getClaimAsStringList("permissions") + ?.map { SimpleGrantedAuthority("PERM_${it}") } + ?.toList() ?: emptyList() + val roles = source.getClaimAsStringList("roles") + ?.map { SimpleGrantedAuthority("ROLE_${it}") } + ?.toList() ?: emptyList() + return JwtAuthenticationToken(source, roles + permissions) + } } \ No newline at end of file