Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 186 additions & 45 deletions app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,32 @@ class IptvRepository @Inject constructor(
private val emptyShortEpgCooldownUntil = ConcurrentHashMap<String, Long>()
private val visibleXmlEpgCooldownUntil = ConcurrentHashMap<String, Long>()

private val guideKeyCandidatesCache = java.util.Collections.synchronizedMap(
object : java.util.LinkedHashMap<String, Set<String>>(512, 0.75f, true) {
override fun removeEldestEntry(eldest: Map.Entry<String, Set<String>>?): Boolean = size > 16384
}
)

internal class EpgNotModifiedException : Exception("EPG content has not modified (HTTP 304)")

private fun getEpgHttpCachePrefs() = context.getSharedPreferences("arvio_epg_http_cache", Context.MODE_PRIVATE)

private fun getEpgCachedEtag(url: String): String? {
return getEpgHttpCachePrefs().getString("${url}_etag", null)
}

private fun getEpgCachedLastModified(url: String): String? {
return getEpgHttpCachePrefs().getString("${url}_last_modified", null)
}

private fun saveEpgHttpCacheHeaders(url: String, etag: String?, lastModified: String?) {
getEpgHttpCachePrefs().edit().apply {
if (etag != null) putString("${url}_etag", etag) else remove("${url}_etag")
if (lastModified != null) putString("${url}_last_modified", lastModified) else remove("${url}_last_modified")
apply()
}
}

@Volatile
private var cachedPlaylistAt: Long = 0L

Expand Down Expand Up @@ -1417,7 +1443,8 @@ class IptvRepository @Inject constructor(
}
if (playlistChannels.isNotEmpty()) {
aggregatedChannels += playlistChannels
runCatching { onChannelsReady(aggregatedChannels.toList()) }
val currentList = synchronized(aggregatedChannels) { aggregatedChannels.toList() }
runCatching { onChannelsReady(currentList) }
}
playlistChannels
}
Expand Down Expand Up @@ -1544,6 +1571,7 @@ class IptvRepository @Inject constructor(
val epgCandidatesToTry = epgCandidates
var bestCoverage = epgCoverageRatio(channels, resolvedNowNext)
val mergedXmlNowNext = ConcurrentHashMap(resolvedNowNext)
var xmltvChanged = false
for ((index, candidate) in epgCandidatesToTry.withIndex()) {
val epgUrl = candidate.url
val candidateChannels = channelsForScopedEpgCandidate(candidate, channels)
Expand All @@ -1561,6 +1589,7 @@ class IptvRepository @Inject constructor(
val parsed = attempt.getOrDefault(emptyMap())
val parsedHasPrograms = hasAnyProgramData(parsed)
if (parsedHasPrograms) {
xmltvChanged = true
parsed.forEach { (channelId, nowNext) ->
val current = mergedXmlNowNext[channelId]
if (!hasProgramData(current) && hasProgramData(nowNext)) {
Expand All @@ -1581,16 +1610,40 @@ class IptvRepository @Inject constructor(
}
}
} else {
epgFailureMessage = attempt.exceptionOrNull()?.message
System.err.println("[EPG] XMLTV attempt ${index + 1} failed: ${epgFailureMessage}")
val exception = attempt.exceptionOrNull()
if (exception is EpgNotModifiedException) {
System.err.println("[EPG] XMLTV candidate ${index + 1} is unchanged (HTTP 304). Loading existing index...")
val existing = runCatching {
epgIndex.loadNowNext(
sourceKey = currentEpgIndexKey(config),
channelIds = candidateChannels.map { it.id }.toSet()
)
}.getOrDefault(emptyMap())
existing.forEach { (channelId, nowNext) ->
val current = mergedXmlNowNext[channelId]
if (!hasProgramData(current) && hasProgramData(nowNext)) {
mergedXmlNowNext[channelId] = nowNext
} else if (current == null) {
mergedXmlNowNext[channelId] = nowNext
}
}
resolved = true
} else {
epgFailureMessage = exception?.message
System.err.println("[EPG] XMLTV attempt ${index + 1} failed: ${epgFailureMessage}")
}
}
}
if (resolved) {
shortEpgResult?.let { mergedXmlNowNext.putAll(it) } // Short EPG wins for channels it covers
resolvedNowNext = mergedXmlNowNext
cachedNowNext = ConcurrentHashMap(mergedXmlNowNext)
cachedEpgAt = System.currentTimeMillis()
persistEpgIndexAll(config, mergedXmlNowNext, cachedEpgAt)
if (xmltvChanged) {
persistEpgIndexAll(config, mergedXmlNowNext, cachedEpgAt)
} else {
System.err.println("[EPG] Skipping persistEpgIndexAll because XMLTV index is unchanged")
}
epgUpdated = true
System.err.println("[EPG] Final merged EPG coverage=${(epgCoverageRatio(channels, mergedXmlNowNext) * 100).toInt()}% for ${channels.size} channels")
}
Expand Down Expand Up @@ -2140,8 +2193,12 @@ class IptvRepository @Inject constructor(
!hasProgramData(mergedNowNext[channel.id])
}
val hasXtreamRequestedChannels = channelsByCredentials.isNotEmpty()
var xmlFallback: Map<String, IptvNowNext> = emptyMap()
var isXmlCached = false
if (missingXmlChannels.isNotEmpty() && !preferFullCatchupHistory && !hasXtreamRequestedChannels) {
val xmlFallback = fetchVisibleXmlEpgForChannels(config, missingXmlChannels)
val result = fetchVisibleXmlEpgForChannels(config, missingXmlChannels)
xmlFallback = result.first
isXmlCached = result.second
if (xmlFallback.isNotEmpty()) {
mergedNowNext.putAll(xmlFallback)
System.err.println(
Expand All @@ -2164,7 +2221,15 @@ class IptvRepository @Inject constructor(
}
cachedNowNext.putAll(mergedForCache)
cachedEpgAt = System.currentTimeMillis()
persistEpgIndexChannels(config, mergedForCache, cachedEpgAt)

val toPersist = if (isXmlCached && xmlFallback.isNotEmpty()) {
mergedForCache.filterKeys { it !in xmlFallback.keys }
} else {
mergedForCache
}
if (toPersist.isNotEmpty()) {
persistEpgIndexChannels(config, toPersist, cachedEpgAt)
}

System.err.println(
"[EPG-Refresh] Updated ${mergedForCache.size} channels in cache " +
Expand All @@ -2190,10 +2255,10 @@ class IptvRepository @Inject constructor(
private suspend fun fetchVisibleXmlEpgForChannels(
config: IptvConfig,
channels: List<IptvChannel>
): Map<String, IptvNowNext> {
if (channels.isEmpty()) return emptyMap()
): Pair<Map<String, IptvNowNext>, Boolean> {
if (channels.isEmpty()) return Pair(emptyMap(), false)
val candidates = resolveScopedEpgCandidates(config)
if (candidates.isEmpty()) return emptyMap()
if (candidates.isEmpty()) return Pair(emptyMap(), false)

val playlistKey = channels
.asSequence()
Expand All @@ -2216,7 +2281,7 @@ class IptvRepository @Inject constructor(
System.err.println(
"[EPG-Refresh] Skipping XMLTV visible fallback for ${(cooldownUntil - nowMs) / 1000}s"
)
return emptyMap()
return Pair(emptyMap(), false)
}
visibleXmlEpgCooldownUntil[cooldownKey] = nowMs + 90_000L

Expand All @@ -2228,22 +2293,36 @@ class IptvRepository @Inject constructor(
"[EPG-Refresh] XMLTV visible fallback ${index + 1}/${visibleCandidates.size} " +
"for ${candidateChannels.size} channels"
)
var isCached = false
val parsed = runCatching {
withTimeoutOrNull(12_000L) {
fetchAndParseEpg(candidate.url, candidateChannels)
} ?: emptyMap()
}.recover { error ->
if (error is EpgNotModifiedException) {
System.err.println("[EPG-Refresh] XMLTV visible fallback candidate is unchanged (HTTP 304). Loading existing index...")
isCached = true
runCatching {
epgIndex.loadNowNext(
sourceKey = currentEpgIndexKey(config),
channelIds = candidateChannels.map { it.id }.toSet()
)
}.getOrDefault(emptyMap())
} else {
throw error
}
}.onFailure { error ->
System.err.println("[EPG-Refresh] XMLTV visible fallback failed: ${error.message}")
}.getOrDefault(emptyMap())

if (parsed.isNotEmpty() && hasAnyProgramData(parsed)) {
visibleXmlEpgCooldownUntil[cooldownKey] = System.currentTimeMillis() + 30_000L
return parsed
return Pair(parsed, isCached)
}
}

visibleXmlEpgCooldownUntil[cooldownKey] = System.currentTimeMillis() + 5 * 60_000L
return emptyMap()
return Pair(emptyMap(), false)
}

private fun persistCurrentCacheSnapshot(config: IptvConfig, loadedAtMs: Long = System.currentTimeMillis()) {
Expand Down Expand Up @@ -5096,27 +5175,55 @@ class IptvRepository @Inject constructor(
}

private fun fetchAndParseEpg(url: String, channels: List<IptvChannel>): Map<String, IptvNowNext> {
fun epgRequest(targetUrl: String, userAgent: String): Request {
return Request.Builder()
val hasDbEntries = currentEpgIndexKey.isNotBlank() && runCatching {
epgIndex.countPrograms(currentEpgIndexKey)
}.getOrDefault(0) > 0

fun epgRequest(targetUrl: String, userAgent: String, forceFull: Boolean = false): Request {
val builder = Request.Builder()
.url(targetUrl)
.header("User-Agent", userAgent)
.header("Accept", "*/*")
.header("Accept-Language", "en-US,en;q=0.9")
.header("Cache-Control", "no-cache")
.get()
.build()

if (hasDbEntries && !forceFull) {
getEpgCachedEtag(targetUrl)?.let { etag ->
builder.header("If-None-Match", etag)
}
getEpgCachedLastModified(targetUrl)?.let { lm ->
builder.header("If-Modified-Since", lm)
}
}

return builder.get().build()
}

val primaryUserAgent = OkHttpProvider.userAgentOr(IPTV_USER_AGENT)
val fallbackUserAgent = OkHttpProvider.userAgentOr(BROWSER_USER_AGENT)
var response = iptvHttpClient.newCall(epgRequest(url, primaryUserAgent)).execute()
if (response.code == 304) {
response.close()
throw EpgNotModifiedException()
}
if (!response.isSuccessful && response.code in setOf(511, 403, 401)) {
response.close()
response = iptvHttpClient.newCall(
epgRequest(url, fallbackUserAgent)
).execute()
if (response.code == 304) {
response.close()
throw EpgNotModifiedException()
}
}
response.use { safeResponse ->
if (safeResponse.isSuccessful) {
val etag = safeResponse.header("ETag")?.trim()
val lastModified = safeResponse.header("Last-Modified")?.trim()
if (etag != null || lastModified != null) {
saveEpgHttpCacheHeaders(url, etag, lastModified)
}
}
val stream = safeResponse.body?.byteStream() ?: throw IllegalStateException("Empty EPG response")
val prepared = BufferedInputStream(prepareInputStream(stream, url))
if (!safeResponse.isSuccessful && !looksLikeXmlTv(prepared)) {
Expand All @@ -5138,7 +5245,7 @@ class IptvRepository @Inject constructor(
try {
// Re-download
val retryResponse = iptvHttpClient.newCall(
epgRequest(url, primaryUserAgent)
epgRequest(url, primaryUserAgent, forceFull = true)
).execute()
retryResponse.use { rr ->
val retryStream = rr.body?.byteStream()
Expand Down Expand Up @@ -5998,39 +6105,68 @@ class IptvRepository @Inject constructor(
input: InputStream
) : FilterInputStream(input) {
override fun read(): Int {
val current = super.read()
if (current == -1) return -1
val buf = ByteArray(1)
val read = read(buf, 0, 1)
return if (read <= 0) -1 else buf[0].toInt() and 0xFF
}

val mapped = if (current == '\\'.code) {
val next = super.read()
if (next == -1) {
current
} else {
when (next.toChar()) {
'\\' -> '\\'.code
'"' -> '"'.code
'\'' -> '\''.code
'/' -> '/'.code
'n' -> '\n'.code
'r' -> '\r'.code
't' -> '\t'.code
'b' -> '\b'.code
'f' -> 0x0C
else -> {
// Unknown escape (for example \y): drop the slash and keep the char.
next
override fun read(b: ByteArray, off: Int, len: Int): Int {
if (len == 0) return 0

// Read from the underlying stream
val rawRead = super.read(b, off, len)
if (rawRead == -1) return -1

var writeIdx = off
var readIdx = off
val endIdx = off + rawRead

while (readIdx < endIdx) {
val current = b[readIdx++].toInt() and 0xFF
if (current == '\\'.code) {
val next = if (readIdx < endIdx) {
b[readIdx++].toInt() and 0xFF
} else {
// The backslash is at the very end of the read chunk.
// Fetch the next byte from the underlying stream.
val n = super.read()
if (n == -1) -1 else n
}

if (next == -1) {
b[writeIdx++] = '\\'.toByte()
} else {
val mapped = when (next.toChar()) {
'\\' -> '\\'.code
'"' -> '"'.code
'\'' -> '\''.code
'/' -> '/'.code
'n' -> '\n'.code
'r' -> '\r'.code
't' -> '\t'.code
'b' -> '\b'.code
'f' -> 0x0C
else -> next
}

val finalChar = if (mapped in 0x00..0x1F && mapped != '\n'.code && mapped != '\r'.code && mapped != '\t'.code) {
' '.code
} else {
mapped
}
b[writeIdx++] = finalChar.toByte()
}
} else {
val finalChar = if (current in 0x00..0x1F && current != '\n'.code && current != '\r'.code && current != '\t'.code) {
' '.code
} else {
current
}
b[writeIdx++] = finalChar.toByte()
}
} else {
current
}

// XML 1.0 forbids most control chars; normalize them to space.
if (mapped in 0x00..0x1F && mapped != '\n'.code && mapped != '\r'.code && mapped != '\t'.code) {
return ' '.code
}
return mapped
return writeIdx - off
}
}

Expand Down Expand Up @@ -6921,6 +7057,8 @@ class IptvRepository @Inject constructor(
private fun guideKeyCandidates(value: String?): Set<String> {
val raw = value?.trim().orEmpty()
if (raw.isBlank()) return emptySet()
val cached = guideKeyCandidatesCache[raw]
if (cached != null) return cached

val withoutBrackets = raw
.replace(BRACKET_CONTENT_REGEX, " ")
Expand Down Expand Up @@ -6948,7 +7086,7 @@ class IptvRepository @Inject constructor(
}
}

return rawAliases
val result = rawAliases
.asSequence()
.map { it.trim() }
.filter { it.isNotBlank() }
Expand All @@ -6961,6 +7099,9 @@ class IptvRepository @Inject constructor(
}
.filter { it.isNotBlank() }
.toSet()

guideKeyCandidatesCache[raw] = result
return result
}

private fun stripGuidePrefix(value: String): String {
Expand Down
Loading