@@ -13,7 +13,9 @@ import com.github.libretube.extensions.toID
1313import com.github.libretube.helpers.NewPipeExtractorInstance
1414import com.github.libretube.helpers.PreferenceHelper
1515import com.github.libretube.ui.dialogs.ShareDialog.Companion.YOUTUBE_FRONTEND_URL
16+ import kotlinx.coroutines.Dispatchers
1617import kotlinx.coroutines.delay
18+ import kotlinx.coroutines.withContext
1719import org.schabi.newpipe.extractor.channel.ChannelInfo
1820import org.schabi.newpipe.extractor.channel.tabs.ChannelTabInfo
1921import org.schabi.newpipe.extractor.channel.tabs.ChannelTabs
@@ -33,7 +35,10 @@ class LocalFeedRepository : FeedRepository {
3335 if (filter.isEnabled) tab else null
3436 }.toTypedArray()
3537
36- override suspend fun getFeed (forceRefresh : Boolean ): List <StreamItem > {
38+ override suspend fun getFeed (
39+ forceRefresh : Boolean ,
40+ onProgressUpdate : (FeedProgress ) -> Unit
41+ ): List <StreamItem > {
3742 val nowMillis = Instant .now().toEpochMilli()
3843 val minimumDateMillis = nowMillis - Duration .ofDays(MAX_FEED_AGE_DAYS ).toMillis()
3944
@@ -58,21 +63,31 @@ class LocalFeedRepository : FeedRepository {
5863 }
5964
6065 DatabaseHolder .Database .feedDao().cleanUpOlderThan(minimumDateMillis)
61- refreshFeed(channelIds, minimumDateMillis)
66+ refreshFeed(channelIds, minimumDateMillis, onProgressUpdate )
6267 PreferenceHelper .putLong(PreferenceKeys .LAST_FEED_REFRESH_TIMESTAMP_MILLIS , nowMillis)
6368
6469 return DatabaseHolder .Database .feedDao().getAll().map(SubscriptionsFeedItem ::toStreamItem)
6570 }
6671
67- private suspend fun refreshFeed (channelIds : List <String >, minimumDateMillis : Long ) {
68- val extractionCount = AtomicInteger ()
72+ private suspend fun refreshFeed (
73+ channelIds : List <String >,
74+ minimumDateMillis : Long ,
75+ onProgressUpdate : (FeedProgress ) -> Unit
76+ ) {
77+ if (channelIds.isEmpty()) return
78+
79+ val totalExtractionCount = AtomicInteger ()
80+ val chunkedExtractionCount = AtomicInteger ()
81+ withContext(Dispatchers .Main ) {
82+ onProgressUpdate(FeedProgress (0 , channelIds.size))
83+ }
6984
7085 for (channelIdChunk in channelIds.chunked(CHUNK_SIZE )) {
7186 // add a delay after each BATCH_SIZE amount of visited channels
72- val count = extractionCount .get();
87+ val count = chunkedExtractionCount .get();
7388 if (count >= BATCH_SIZE ) {
7489 delay(BATCH_DELAY .random())
75- extractionCount .set(0 )
90+ chunkedExtractionCount .set(0 )
7691 }
7792
7893 val collectedFeedItems = channelIdChunk.parallelMap { channelId ->
@@ -82,7 +97,12 @@ class LocalFeedRepository : FeedRepository {
8297 Log .e(channelId, e.stackTraceToString())
8398 null
8499 } finally {
85- extractionCount.incrementAndGet();
100+ chunkedExtractionCount.incrementAndGet()
101+ val currentProgress = totalExtractionCount.incrementAndGet()
102+
103+ withContext(Dispatchers .Main ) {
104+ onProgressUpdate(FeedProgress (currentProgress, channelIds.size))
105+ }
86106 }
87107 }.filterNotNull().flatten().map(StreamItem ::toFeedItem)
88108
@@ -133,10 +153,12 @@ class LocalFeedRepository : FeedRepository {
133153
134154 companion object {
135155 private const val CHUNK_SIZE = 2
156+
136157 /* *
137158 * Maximum amount of feeds that should be fetched together, before a delay should be applied.
138159 */
139160 private const val BATCH_SIZE = 50
161+
140162 /* *
141163 * Millisecond delay between two consecutive batches to avoid throttling.
142164 */
0 commit comments