From 76ed86b104ab7247ddd901ba9ae6c0e2a4c12545 Mon Sep 17 00:00:00 2001 From: Bhuvanesh S Date: Thu, 4 Jun 2026 22:12:15 +0530 Subject: [PATCH 1/3] perf: EPG performance optimization implementation --- .../tv/data/repository/IptvRepository.kt | 145 ++++++++++++++++-- 1 file changed, 128 insertions(+), 17 deletions(-) diff --git a/app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt b/app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt index 3baf1d08..b92f8b4b 100644 --- a/app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt +++ b/app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt @@ -185,6 +185,32 @@ class IptvRepository @Inject constructor( private val emptyShortEpgCooldownUntil = ConcurrentHashMap() private val visibleXmlEpgCooldownUntil = ConcurrentHashMap() + private val guideKeyCandidatesCache = java.util.Collections.synchronizedMap( + object : java.util.LinkedHashMap>(512, 0.75f, true) { + override fun removeEldestEntry(eldest: Map.Entry>?): Boolean = size > 16384 + } + ) + + internal class EpgNotModifiedException : Exception("EPG content has not modified (HTTP 304)") + + private fun getEpgHttpCachePrefs() = context.getSharedPreferences("arvio_epg_http_cache", Context.MODE_PRIVATE) + + private fun getEpgCachedEtag(url: String): String? { + return getEpgHttpCachePrefs().getString("${url}_etag", null) + } + + private fun getEpgCachedLastModified(url: String): String? { + return getEpgHttpCachePrefs().getString("${url}_last_modified", null) + } + + private fun saveEpgHttpCacheHeaders(url: String, etag: String?, lastModified: String?) { + getEpgHttpCachePrefs().edit().apply { + if (etag != null) putString("${url}_etag", etag) else remove("${url}_etag") + if (lastModified != null) putString("${url}_last_modified", lastModified) else remove("${url}_last_modified") + apply() + } + } + @Volatile private var cachedPlaylistAt: Long = 0L @@ -1545,6 +1571,7 @@ class IptvRepository @Inject constructor( val epgCandidatesToTry = epgCandidates var bestCoverage = epgCoverageRatio(channels, resolvedNowNext) val mergedXmlNowNext = ConcurrentHashMap(resolvedNowNext) + var xmltvChanged = false for ((index, candidate) in epgCandidatesToTry.withIndex()) { val epgUrl = candidate.url val candidateChannels = channelsForScopedEpgCandidate(candidate, channels) @@ -1562,6 +1589,7 @@ class IptvRepository @Inject constructor( val parsed = attempt.getOrDefault(emptyMap()) val parsedHasPrograms = hasAnyProgramData(parsed) if (parsedHasPrograms) { + xmltvChanged = true parsed.forEach { (channelId, nowNext) -> val current = mergedXmlNowNext[channelId] if (!hasProgramData(current) && hasProgramData(nowNext)) { @@ -1582,8 +1610,28 @@ class IptvRepository @Inject constructor( } } } else { - epgFailureMessage = attempt.exceptionOrNull()?.message - System.err.println("[EPG] XMLTV attempt ${index + 1} failed: ${epgFailureMessage}") + val exception = attempt.exceptionOrNull() + if (exception is EpgNotModifiedException) { + System.err.println("[EPG] XMLTV candidate ${index + 1} is unchanged (HTTP 304). Loading existing index...") + val existing = runCatching { + epgIndex.loadNowNext( + sourceKey = currentEpgIndexKey(config), + channelIds = candidateChannels.map { it.id }.toSet() + ) + }.getOrDefault(emptyMap()) + existing.forEach { (channelId, nowNext) -> + val current = mergedXmlNowNext[channelId] + if (!hasProgramData(current) && hasProgramData(nowNext)) { + mergedXmlNowNext[channelId] = nowNext + } else if (current == null) { + mergedXmlNowNext[channelId] = nowNext + } + } + resolved = true + } else { + epgFailureMessage = exception?.message + System.err.println("[EPG] XMLTV attempt ${index + 1} failed: ${epgFailureMessage}") + } } } if (resolved) { @@ -1591,7 +1639,11 @@ class IptvRepository @Inject constructor( resolvedNowNext = mergedXmlNowNext cachedNowNext = ConcurrentHashMap(mergedXmlNowNext) cachedEpgAt = System.currentTimeMillis() - persistEpgIndexAll(config, mergedXmlNowNext, cachedEpgAt) + if (xmltvChanged) { + persistEpgIndexAll(config, mergedXmlNowNext, cachedEpgAt) + } else { + System.err.println("[EPG] Skipping persistEpgIndexAll because XMLTV index is unchanged") + } epgUpdated = true System.err.println("[EPG] Final merged EPG coverage=${(epgCoverageRatio(channels, mergedXmlNowNext) * 100).toInt()}% for ${channels.size} channels") } @@ -2141,8 +2193,12 @@ class IptvRepository @Inject constructor( !hasProgramData(mergedNowNext[channel.id]) } val hasXtreamRequestedChannels = channelsByCredentials.isNotEmpty() + var xmlFallback: Map = emptyMap() + var isXmlCached = false if (missingXmlChannels.isNotEmpty() && !preferFullCatchupHistory && !hasXtreamRequestedChannels) { - val xmlFallback = fetchVisibleXmlEpgForChannels(config, missingXmlChannels) + val result = fetchVisibleXmlEpgForChannels(config, missingXmlChannels) + xmlFallback = result.first + isXmlCached = result.second if (xmlFallback.isNotEmpty()) { mergedNowNext.putAll(xmlFallback) System.err.println( @@ -2165,7 +2221,15 @@ class IptvRepository @Inject constructor( } cachedNowNext.putAll(mergedForCache) cachedEpgAt = System.currentTimeMillis() - persistEpgIndexChannels(config, mergedForCache, cachedEpgAt) + + val toPersist = if (isXmlCached && xmlFallback.isNotEmpty()) { + mergedForCache.filterKeys { it !in xmlFallback.keys } + } else { + mergedForCache + } + if (toPersist.isNotEmpty()) { + persistEpgIndexChannels(config, toPersist, cachedEpgAt) + } System.err.println( "[EPG-Refresh] Updated ${mergedForCache.size} channels in cache " + @@ -2191,10 +2255,10 @@ class IptvRepository @Inject constructor( private suspend fun fetchVisibleXmlEpgForChannels( config: IptvConfig, channels: List - ): Map { - if (channels.isEmpty()) return emptyMap() + ): Pair, Boolean> { + if (channels.isEmpty()) return Pair(emptyMap(), false) val candidates = resolveScopedEpgCandidates(config) - if (candidates.isEmpty()) return emptyMap() + if (candidates.isEmpty()) return Pair(emptyMap(), false) val playlistKey = channels .asSequence() @@ -2217,7 +2281,7 @@ class IptvRepository @Inject constructor( System.err.println( "[EPG-Refresh] Skipping XMLTV visible fallback for ${(cooldownUntil - nowMs) / 1000}s" ) - return emptyMap() + return Pair(emptyMap(), false) } visibleXmlEpgCooldownUntil[cooldownKey] = nowMs + 90_000L @@ -2229,22 +2293,36 @@ class IptvRepository @Inject constructor( "[EPG-Refresh] XMLTV visible fallback ${index + 1}/${visibleCandidates.size} " + "for ${candidateChannels.size} channels" ) + var isCached = false val parsed = runCatching { withTimeoutOrNull(12_000L) { fetchAndParseEpg(candidate.url, candidateChannels) } ?: emptyMap() + }.recover { error -> + if (error is EpgNotModifiedException) { + System.err.println("[EPG-Refresh] XMLTV visible fallback candidate is unchanged (HTTP 304). Loading existing index...") + isCached = true + runCatching { + epgIndex.loadNowNext( + sourceKey = currentEpgIndexKey(config), + channelIds = candidateChannels.map { it.id }.toSet() + ) + }.getOrDefault(emptyMap()) + } else { + throw error + } }.onFailure { error -> System.err.println("[EPG-Refresh] XMLTV visible fallback failed: ${error.message}") }.getOrDefault(emptyMap()) if (parsed.isNotEmpty() && hasAnyProgramData(parsed)) { visibleXmlEpgCooldownUntil[cooldownKey] = System.currentTimeMillis() + 30_000L - return parsed + return Pair(parsed, isCached) } } visibleXmlEpgCooldownUntil[cooldownKey] = System.currentTimeMillis() + 5 * 60_000L - return emptyMap() + return Pair(emptyMap(), false) } private fun persistCurrentCacheSnapshot(config: IptvConfig, loadedAtMs: Long = System.currentTimeMillis()) { @@ -5097,27 +5175,55 @@ class IptvRepository @Inject constructor( } private fun fetchAndParseEpg(url: String, channels: List): Map { - fun epgRequest(targetUrl: String, userAgent: String): Request { - return Request.Builder() + val hasDbEntries = currentEpgIndexKey.isNotBlank() && runCatching { + epgIndex.countPrograms(currentEpgIndexKey) + }.getOrDefault(0) > 0 + + fun epgRequest(targetUrl: String, userAgent: String, forceFull: Boolean = false): Request { + val builder = Request.Builder() .url(targetUrl) .header("User-Agent", userAgent) .header("Accept", "*/*") .header("Accept-Language", "en-US,en;q=0.9") .header("Cache-Control", "no-cache") - .get() - .build() + + if (hasDbEntries && !forceFull) { + getEpgCachedEtag(targetUrl)?.let { etag -> + builder.header("If-None-Match", etag) + } + getEpgCachedLastModified(targetUrl)?.let { lm -> + builder.header("If-Modified-Since", lm) + } + } + + return builder.get().build() } val primaryUserAgent = OkHttpProvider.userAgentOr(IPTV_USER_AGENT) val fallbackUserAgent = OkHttpProvider.userAgentOr(BROWSER_USER_AGENT) var response = iptvHttpClient.newCall(epgRequest(url, primaryUserAgent)).execute() + if (response.code == 304) { + response.close() + throw EpgNotModifiedException() + } if (!response.isSuccessful && response.code in setOf(511, 403, 401)) { response.close() response = iptvHttpClient.newCall( epgRequest(url, fallbackUserAgent) ).execute() + if (response.code == 304) { + response.close() + throw EpgNotModifiedException() + } } response.use { safeResponse -> + if (safeResponse.isSuccessful) { + val etag = safeResponse.header("ETag")?.trim() + val lastModified = safeResponse.header("Last-Modified")?.trim() + if (etag != null || lastModified != null) { + saveEpgHttpCacheHeaders(url, etag, lastModified) + } + } val stream = safeResponse.body?.byteStream() ?: throw IllegalStateException("Empty EPG response") val prepared = BufferedInputStream(prepareInputStream(stream, url)) if (!safeResponse.isSuccessful && !looksLikeXmlTv(prepared)) { @@ -5139,7 +5245,7 @@ class IptvRepository @Inject constructor( try { // Re-download val retryResponse = iptvHttpClient.newCall( - epgRequest(url, primaryUserAgent) + epgRequest(url, primaryUserAgent, forceFull = true) ).execute() retryResponse.use { rr -> val retryStream = rr.body?.byteStream() @@ -6951,6 +7057,8 @@ class IptvRepository @Inject constructor( private fun guideKeyCandidates(value: String?): Set { val raw = value?.trim().orEmpty() if (raw.isBlank()) return emptySet() + val cached = guideKeyCandidatesCache[raw] + if (cached != null) return cached val withoutBrackets = raw .replace(BRACKET_CONTENT_REGEX, " ") @@ -6978,7 +7086,7 @@ class IptvRepository @Inject constructor( } } - return rawAliases + val result = rawAliases .asSequence() .map { it.trim() } .filter { it.isNotBlank() } @@ -6991,6 +7099,9 @@ class IptvRepository @Inject constructor( } .filter { it.isNotBlank() } .toSet() + + guideKeyCandidatesCache[raw] = result + return result } private fun stripGuidePrefix(value: String): String { From 005a93bc2e85e0878c44b9c6c3031a6477940c26 Mon Sep 17 00:00:00 2001 From: Bhuvanesh S Date: Thu, 4 Jun 2026 22:18:44 +0530 Subject: [PATCH 2/3] perf: add unit tests for EPG caching and HTTP 304 incremental refresh --- .../IptvRepositoryOptimizationTest.kt | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/app/src/test/kotlin/com/arflix/tv/data/repository/IptvRepositoryOptimizationTest.kt b/app/src/test/kotlin/com/arflix/tv/data/repository/IptvRepositoryOptimizationTest.kt index 04d7d2d9..19309d85 100644 --- a/app/src/test/kotlin/com/arflix/tv/data/repository/IptvRepositoryOptimizationTest.kt +++ b/app/src/test/kotlin/com/arflix/tv/data/repository/IptvRepositoryOptimizationTest.kt @@ -133,4 +133,70 @@ class IptvRepositoryOptimizationTest { assertEquals("Should not encounter any ConcurrentModificationException", 0, exceptionCount.get()) } + + @Test + fun testGuideKeyCandidatesCaching() { + val context = io.mockk.mockk(relaxed = true) + val okHttpClient = io.mockk.mockk(relaxed = true) + val profileManager = io.mockk.mockk(relaxed = true) + val invalidationBus = io.mockk.mockk(relaxed = true) + val repository = IptvRepository(context, okHttpClient, profileManager, invalidationBus) + + // Get private field guideKeyCandidatesCache using reflection + val cacheField = IptvRepository::class.java.getDeclaredField("guideKeyCandidatesCache") + cacheField.isAccessible = true + val cache = cacheField.get(repository) as Map<*, *> + + val method = IptvRepository::class.java.getDeclaredMethod("guideKeyCandidates", String::class.java) + method.isAccessible = true + + val initialSize = cache.size + + // First call + val result1 = method.invoke(repository, "NPO 1 FHD [NL]") as Set + val sizeAfterFirst = cache.size + assertEquals(initialSize + 1, sizeAfterFirst) + + // Second call with same value (should hit cache) + val result2 = method.invoke(repository, "NPO 1 FHD [NL]") as Set + assertEquals(sizeAfterFirst, cache.size) + assertEquals(result1, result2) + } + + @Test + fun testFetchAndParseEpgThrows304Exception() { + val context = io.mockk.mockk(relaxed = true) + val okHttpClient = io.mockk.mockk() + val profileManager = io.mockk.mockk(relaxed = true) + val invalidationBus = io.mockk.mockk(relaxed = true) + + val builder = io.mockk.mockk(relaxed = true) + io.mockk.every { okHttpClient.newBuilder() } returns builder + io.mockk.every { builder.connectTimeout(any(), any()) } returns builder + io.mockk.every { builder.readTimeout(any(), any()) } returns builder + io.mockk.every { builder.writeTimeout(any(), any()) } returns builder + io.mockk.every { builder.callTimeout(any(), any()) } returns builder + + val customClient = io.mockk.mockk() + io.mockk.every { builder.build() } returns customClient + + val call = io.mockk.mockk() + val response = io.mockk.mockk() + io.mockk.every { response.code } returns 304 + io.mockk.every { response.close() } returns Unit + io.mockk.every { call.execute() } returns response + io.mockk.every { customClient.newCall(any()) } returns call + + val repository = IptvRepository(context, okHttpClient, profileManager, invalidationBus) + val method = IptvRepository::class.java.getDeclaredMethod("fetchAndParseEpg", String::class.java, List::class.java) + method.isAccessible = true + + try { + method.invoke(repository, "http://example.com/epg.xml", emptyList()) + fail("Expected EpgNotModifiedException to be thrown") + } catch (e: java.lang.reflect.InvocationTargetException) { + val cause = e.cause + assertEquals(IptvRepository.EpgNotModifiedException::class.java, cause?.javaClass) + } + } } From 376d266bb2b87423ecbd91fffdf3f853ed406e58 Mon Sep 17 00:00:00 2001 From: Bhuvanesh S Date: Fri, 5 Jun 2026 19:12:48 +0530 Subject: [PATCH 3/3] style: remove trailing whitespace --- .../tv/data/repository/IptvRepository.kt | 12 +++---- .../IptvRepositoryOptimizationTest.kt | 32 +++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt b/app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt index b92f8b4b..435e107c 100644 --- a/app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt +++ b/app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt @@ -6112,15 +6112,15 @@ class IptvRepository @Inject constructor( override fun read(b: ByteArray, off: Int, len: Int): Int { if (len == 0) return 0 - + // Read from the underlying stream val rawRead = super.read(b, off, len) if (rawRead == -1) return -1 - + var writeIdx = off var readIdx = off val endIdx = off + rawRead - + while (readIdx < endIdx) { val current = b[readIdx++].toInt() and 0xFF if (current == '\\'.code) { @@ -6132,7 +6132,7 @@ class IptvRepository @Inject constructor( val n = super.read() if (n == -1) -1 else n } - + if (next == -1) { b[writeIdx++] = '\\'.toByte() } else { @@ -6148,7 +6148,7 @@ class IptvRepository @Inject constructor( 'f' -> 0x0C else -> next } - + val finalChar = if (mapped in 0x00..0x1F && mapped != '\n'.code && mapped != '\r'.code && mapped != '\t'.code) { ' '.code } else { @@ -6165,7 +6165,7 @@ class IptvRepository @Inject constructor( b[writeIdx++] = finalChar.toByte() } } - + return writeIdx - off } } diff --git a/app/src/test/kotlin/com/arflix/tv/data/repository/IptvRepositoryOptimizationTest.kt b/app/src/test/kotlin/com/arflix/tv/data/repository/IptvRepositoryOptimizationTest.kt index 19309d85..39d91306 100644 --- a/app/src/test/kotlin/com/arflix/tv/data/repository/IptvRepositoryOptimizationTest.kt +++ b/app/src/test/kotlin/com/arflix/tv/data/repository/IptvRepositoryOptimizationTest.kt @@ -31,13 +31,13 @@ class IptvRepositoryOptimizationTest { // regular chars not escaped (e.g. \y) -> keep y val inputStr = "Hello\\\"World\\nTest\\yDone\u0001Control" val expectedStr = "Hello\"World\nTestyDone Control" - + val rawStream = ByteArrayInputStream(inputStr.toByteArray(Charsets.UTF_8)) val sanitizingStream = createSanitizingStream(rawStream) - + val buffer = ByteArray(100) val readCount = sanitizingStream.read(buffer, 0, buffer.size) - + val result = String(buffer, 0, readCount, Charsets.UTF_8) assertEquals(expectedStr, result) } @@ -46,17 +46,17 @@ class IptvRepositoryOptimizationTest { fun testBackslashSanitizationByteByByteRead() { val inputStr = "Hello\\\"World\\nTest\\yDone\u0001Control" val expectedStr = "Hello\"World\nTestyDone Control" - + val rawStream = ByteArrayInputStream(inputStr.toByteArray(Charsets.UTF_8)) val sanitizingStream = createSanitizingStream(rawStream) - + val sb = StringBuilder() while (true) { val b = sanitizingStream.read() if (b == -1) break sb.append(b.toChar()) } - + assertEquals(expectedStr, sb.toString()) } @@ -66,7 +66,7 @@ class IptvRepositoryOptimizationTest { val inputStr = "A\\nB" // length 4 val rawStream = ByteArrayInputStream(inputStr.toByteArray(Charsets.UTF_8)) val sanitizingStream = createSanitizingStream(rawStream) - + // Read exactly up to the backslash first (2 bytes: 'A', '\') // Actually, let's read with buffer size 2 val buffer = ByteArray(2) @@ -76,7 +76,7 @@ class IptvRepositoryOptimizationTest { // maps it to '\n' and puts it into buffer[1]. So buffer should have ['A', '\n']. assertEquals('A', buffer[0].toChar()) assertEquals('\n', buffer[1].toChar()) - + // Next read should get the remaining 'B' val read2 = sanitizingStream.read(buffer, 0, 2) assertEquals(1, read2) @@ -88,7 +88,7 @@ class IptvRepositoryOptimizationTest { val inputStr = "A\\" // Traling backslash val rawStream = ByteArrayInputStream(inputStr.toByteArray(Charsets.UTF_8)) val sanitizingStream = createSanitizingStream(rawStream) - + val buffer = ByteArray(5) val read = sanitizingStream.read(buffer, 0, 5) assertEquals(2, read) @@ -101,7 +101,7 @@ class IptvRepositoryOptimizationTest { // Simulate synchronized list modification and copying concurrently val list = Collections.synchronizedList(mutableListOf()) val exceptionCount = AtomicInteger(0) - + // Coroutines writing to the list and reading from it (calling toList()) val writers = (1..10).map { id -> async { @@ -110,7 +110,7 @@ class IptvRepositoryOptimizationTest { } } } - + val readers = (1..10).map { async { repeat(1000) { @@ -127,10 +127,10 @@ class IptvRepositoryOptimizationTest { } } } - + writers.awaitAll() readers.awaitAll() - + assertEquals("Should not encounter any ConcurrentModificationException", 0, exceptionCount.get()) } @@ -151,7 +151,7 @@ class IptvRepositoryOptimizationTest { method.isAccessible = true val initialSize = cache.size - + // First call val result1 = method.invoke(repository, "NPO 1 FHD [NL]") as Set val sizeAfterFirst = cache.size @@ -169,14 +169,14 @@ class IptvRepositoryOptimizationTest { val okHttpClient = io.mockk.mockk() val profileManager = io.mockk.mockk(relaxed = true) val invalidationBus = io.mockk.mockk(relaxed = true) - + val builder = io.mockk.mockk(relaxed = true) io.mockk.every { okHttpClient.newBuilder() } returns builder io.mockk.every { builder.connectTimeout(any(), any()) } returns builder io.mockk.every { builder.readTimeout(any(), any()) } returns builder io.mockk.every { builder.writeTimeout(any(), any()) } returns builder io.mockk.every { builder.callTimeout(any(), any()) } returns builder - + val customClient = io.mockk.mockk() io.mockk.every { builder.build() } returns customClient