Skip to content

Commit 144d7f1

Browse files
Convert SubscriptionManager.upsertAll() to a suspend function
1 parent abaf16e commit 144d7f1

17 files changed

Lines changed: 178 additions & 252 deletions

File tree

app/src/androidTest/java/org/schabi/newpipe/database/FeedDAOTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import android.content.Context
44
import androidx.room.Room
55
import androidx.test.core.app.ApplicationProvider
66
import io.reactivex.rxjava3.core.Single
7+
import kotlinx.coroutines.Dispatchers
8+
import kotlinx.coroutines.runBlocking
79
import org.junit.After
810
import org.junit.Assert.assertEquals
911
import org.junit.Assert.assertNotNull
@@ -22,7 +24,6 @@ import org.schabi.newpipe.extractor.channel.ChannelInfo
2224
import org.schabi.newpipe.extractor.stream.StreamType
2325
import java.io.IOException
2426
import java.time.OffsetDateTime
25-
import kotlin.streams.toList
2627

2728
class FeedDAOTest {
2829
private lateinit var db: AppDatabase
@@ -104,7 +105,7 @@ class FeedDAOTest {
104105
}.blockingSubscribe()
105106
}
106107

107-
private fun clearAndFillTables() {
108+
private fun clearAndFillTables() = runBlocking(Dispatchers.IO) {
108109
db.clearAllTables()
109110
streamDAO.insertAll(allStreams)
110111
subscriptionDAO.insertAll(

app/src/androidTest/java/org/schabi/newpipe/local/playlist/LocalPlaylistManagerTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.schabi.newpipe.local.playlist
22

3+
import kotlinx.coroutines.runBlocking
34
import org.junit.After
45
import org.junit.Before
56
import org.junit.Rule
@@ -62,7 +63,7 @@ class LocalPlaylistManagerTest {
6263
streamType = StreamType.VIDEO_STREAM, duration = 1, uploader = "uploader",
6364
uploaderUrl = "https://newpipe.net/"
6465
)
65-
database.streamDAO().insert(stream)
66+
runBlocking { database.streamDAO().insert(stream) }
6667
val upserted = StreamEntity(
6768
serviceId = 1, url = "https://newpipe.net/2", title = "title2",
6869
streamType = StreamType.VIDEO_STREAM, duration = 1, uploader = "uploader",
@@ -72,6 +73,6 @@ class LocalPlaylistManagerTest {
7273
val result = manager.createPlaylist("name", listOf(stream, upserted))
7374

7475
result.test().await().assertComplete()
75-
database.streamDAO().all.test().awaitCount(1).assertValue(listOf(stream, upserted))
76+
database.streamDAO().getAll().test().awaitCount(1).assertValue(listOf(stream, upserted))
7677
}
7778
}

app/src/main/java/org/schabi/newpipe/database/feed/dao/FeedDAO.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,22 +137,22 @@ abstract class FeedDAO {
137137
)
138138
"""
139139
)
140-
abstract fun unlinkOldLivestreams(subscriptionId: Long)
140+
abstract suspend fun unlinkOldLivestreams(subscriptionId: Long)
141141

142142
@Insert(onConflict = OnConflictStrategy.IGNORE)
143143
abstract fun insert(feedEntity: FeedEntity)
144144

145145
@Insert(onConflict = OnConflictStrategy.IGNORE)
146-
abstract fun insertAll(entities: List<FeedEntity>): List<Long>
146+
abstract suspend fun insertAll(entities: List<FeedEntity>): List<Long>
147147

148148
@Insert(onConflict = OnConflictStrategy.IGNORE)
149-
internal abstract fun insertLastUpdated(lastUpdatedEntity: FeedLastUpdatedEntity): Long
149+
internal abstract suspend fun insertLastUpdated(lastUpdatedEntity: FeedLastUpdatedEntity): Long
150150

151151
@Update(onConflict = OnConflictStrategy.IGNORE)
152-
internal abstract fun updateLastUpdated(lastUpdatedEntity: FeedLastUpdatedEntity)
152+
internal abstract suspend fun updateLastUpdated(lastUpdatedEntity: FeedLastUpdatedEntity)
153153

154154
@Transaction
155-
open fun setLastUpdatedForSubscription(lastUpdatedEntity: FeedLastUpdatedEntity) {
155+
open suspend fun setLastUpdatedForSubscription(lastUpdatedEntity: FeedLastUpdatedEntity) {
156156
val id = insertLastUpdated(lastUpdatedEntity)
157157

158158
if (id == -1L) {
Lines changed: 32 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,40 @@
11
package org.schabi.newpipe.database.stream.dao
22

3-
import androidx.room.ColumnInfo
43
import androidx.room.Dao
54
import androidx.room.Insert
65
import androidx.room.OnConflictStrategy
76
import androidx.room.Query
87
import androidx.room.Transaction
9-
import io.reactivex.rxjava3.core.Completable
8+
import androidx.room.Update
109
import io.reactivex.rxjava3.core.Flowable
11-
import io.reactivex.rxjava3.core.Maybe
12-
import org.schabi.newpipe.database.BasicDAO
10+
import kotlinx.coroutines.Dispatchers
11+
import kotlinx.coroutines.runBlocking
12+
import kotlinx.coroutines.rx3.rxMaybe
1313
import org.schabi.newpipe.database.stream.model.StreamEntity
14-
import org.schabi.newpipe.database.stream.model.StreamEntity.Companion.STREAM_ID
15-
import org.schabi.newpipe.extractor.stream.StreamType
1614
import org.schabi.newpipe.util.StreamTypeUtil
17-
import java.time.OffsetDateTime
1815

1916
@Dao
20-
abstract class StreamDAO : BasicDAO<StreamEntity> {
21-
@Query("SELECT * FROM streams")
22-
abstract override fun getAll(): Flowable<List<StreamEntity>>
23-
24-
@Query("DELETE FROM streams")
25-
abstract override fun deleteAll(): Int
26-
27-
@Query("SELECT * FROM streams WHERE service_id = :serviceId")
28-
abstract override fun listByService(serviceId: Int): Flowable<List<StreamEntity>>
29-
30-
@Query("SELECT * FROM streams WHERE url = :url AND service_id = :serviceId")
31-
abstract fun getStream(serviceId: Long, url: String): Maybe<StreamEntity>
17+
interface StreamDAO {
18+
@Insert(onConflict = OnConflictStrategy.IGNORE)
19+
suspend fun insert(entity: StreamEntity): Long
3220

33-
@Query("UPDATE streams SET uploader_url = :uploaderUrl WHERE url = :url AND service_id = :serviceId")
34-
abstract fun setUploaderUrl(serviceId: Long, url: String, uploaderUrl: String): Completable
21+
@Insert
22+
suspend fun insertAll(entities: List<StreamEntity>)
3523

36-
@Insert(onConflict = OnConflictStrategy.IGNORE)
37-
internal abstract fun silentInsertInternal(stream: StreamEntity): Long
24+
@Update
25+
suspend fun update(entity: StreamEntity)
3826

39-
@Insert(onConflict = OnConflictStrategy.IGNORE)
40-
internal abstract fun silentInsertAllInternal(streams: List<StreamEntity>): List<Long>
27+
@Query("SELECT * FROM streams")
28+
fun getAll(): Flowable<List<StreamEntity>>
4129

42-
@Query("SELECT COUNT(*) != 0 FROM streams WHERE url = :url AND service_id = :serviceId")
43-
internal abstract fun exists(serviceId: Int, url: String): Boolean
30+
@Query("SELECT * FROM streams WHERE url = :url AND service_id = :serviceId")
31+
suspend fun getStream(serviceId: Int, url: String): StreamEntity?
4432

45-
@Query(
46-
"""
47-
SELECT uid, stream_type, textual_upload_date, upload_date, is_upload_date_approximation, duration
48-
FROM streams WHERE url = :url AND service_id = :serviceId
49-
"""
50-
)
51-
internal abstract fun getMinimalStreamForCompare(serviceId: Int, url: String): StreamCompareFeed?
33+
fun getStreamAsMaybe(serviceId: Int, url: String) = rxMaybe(Dispatchers.IO) { getStream(serviceId, url) }
5234

5335
@Transaction
54-
open fun upsert(newerStream: StreamEntity): Long {
55-
val uid = silentInsertInternal(newerStream)
36+
suspend fun upsert(newerStream: StreamEntity): Long {
37+
val uid = insert(newerStream)
5638

5739
if (uid != -1L) {
5840
newerStream.uid = uid
@@ -65,46 +47,31 @@ abstract class StreamDAO : BasicDAO<StreamEntity> {
6547
return newerStream.uid
6648
}
6749

50+
fun upsertBlocking(entity: StreamEntity) = runBlocking(Dispatchers.IO) { upsert(entity) }
51+
6852
@Transaction
69-
open fun upsertAll(streams: List<StreamEntity>): List<Long> {
70-
val insertUidList = silentInsertAllInternal(streams)
71-
72-
val streamIds = ArrayList<Long>(streams.size)
73-
for ((index, uid) in insertUidList.withIndex()) {
74-
val newerStream = streams[index]
75-
if (uid != -1L) {
76-
streamIds.add(uid)
77-
newerStream.uid = uid
78-
continue
79-
}
53+
suspend fun upsertAll(streams: List<StreamEntity>) = streams.map { upsert(it) }
8054

81-
compareAndUpdateStream(newerStream)
82-
streamIds.add(newerStream.uid)
83-
}
55+
fun upsertAllBlocking(streams: List<StreamEntity>) = runBlocking(Dispatchers.IO) { upsertAll(streams) }
8456

85-
update(streams)
86-
return streamIds
87-
}
88-
89-
private fun compareAndUpdateStream(newerStream: StreamEntity) {
90-
val existentMinimalStream = getMinimalStreamForCompare(newerStream.serviceId, newerStream.url)
57+
private suspend fun compareAndUpdateStream(newerStream: StreamEntity) {
58+
val existingStream = getStream(newerStream.serviceId, newerStream.url)
9159
?: throw IllegalStateException("Stream cannot be null just after insertion.")
92-
newerStream.uid = existentMinimalStream.uid
60+
newerStream.uid = existingStream.uid
9361

9462
if (!StreamTypeUtil.isLiveStream(newerStream.streamType)) {
95-
9663
// Use the existent upload date if the newer stream does not have a better precision
9764
// (i.e. is an approximation). This is done to prevent unnecessary changes.
9865
val hasBetterPrecision =
9966
newerStream.uploadDate != null && newerStream.isUploadDateApproximation != true
100-
if (existentMinimalStream.uploadDate != null && !hasBetterPrecision) {
101-
newerStream.uploadDate = existentMinimalStream.uploadDate
102-
newerStream.textualUploadDate = existentMinimalStream.textualUploadDate
103-
newerStream.isUploadDateApproximation = existentMinimalStream.isUploadDateApproximation
67+
if (existingStream.uploadDate != null && !hasBetterPrecision) {
68+
newerStream.uploadDate = existingStream.uploadDate
69+
newerStream.textualUploadDate = existingStream.textualUploadDate
70+
newerStream.isUploadDateApproximation = existingStream.isUploadDateApproximation
10471
}
10572

106-
if (existentMinimalStream.duration > 0 && newerStream.duration < 0) {
107-
newerStream.duration = existentMinimalStream.duration
73+
if (existingStream.duration > 0 && newerStream.duration < 0) {
74+
newerStream.duration = existingStream.duration
10875
}
10976
}
11077
}
@@ -123,28 +90,5 @@ abstract class StreamDAO : BasicDAO<StreamEntity> {
12390
WHERE f.stream_id = streams.uid)
12491
"""
12592
)
126-
abstract fun deleteOrphans(): Int
127-
128-
/**
129-
* Minimal entry class used when comparing/updating an existent stream.
130-
*/
131-
internal data class StreamCompareFeed(
132-
@ColumnInfo(name = STREAM_ID)
133-
var uid: Long = 0,
134-
135-
@ColumnInfo(name = StreamEntity.STREAM_TYPE)
136-
var streamType: StreamType,
137-
138-
@ColumnInfo(name = StreamEntity.STREAM_TEXTUAL_UPLOAD_DATE)
139-
var textualUploadDate: String? = null,
140-
141-
@ColumnInfo(name = StreamEntity.STREAM_UPLOAD_DATE)
142-
var uploadDate: OffsetDateTime? = null,
143-
144-
@ColumnInfo(name = StreamEntity.STREAM_IS_UPLOAD_DATE_APPROXIMATION)
145-
var isUploadDateApproximation: Boolean? = null,
146-
147-
@ColumnInfo(name = StreamEntity.STREAM_DURATION)
148-
var duration: Long
149-
)
93+
fun deleteOrphans(): Int
15094
}

app/src/main/java/org/schabi/newpipe/database/subscription/SubscriptionDAO.kt

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,32 @@
11
package org.schabi.newpipe.database.subscription
22

33
import androidx.room.Dao
4+
import androidx.room.Delete
45
import androidx.room.Insert
56
import androidx.room.OnConflictStrategy
67
import androidx.room.Query
78
import androidx.room.RewriteQueriesToDropUnusedColumns
89
import androidx.room.Transaction
10+
import androidx.room.Update
911
import io.reactivex.rxjava3.core.Flowable
10-
import io.reactivex.rxjava3.core.Maybe
11-
import org.schabi.newpipe.database.BasicDAO
12+
import kotlinx.coroutines.flow.Flow
1213

1314
@Dao
14-
abstract class SubscriptionDAO : BasicDAO<SubscriptionEntity> {
15-
@Query("SELECT COUNT(*) FROM subscriptions")
16-
abstract fun rowCount(): Flowable<Long>
15+
interface SubscriptionDAO {
16+
@Insert
17+
fun insert(entity: SubscriptionEntity)
1718

18-
@Query("SELECT * FROM subscriptions WHERE service_id = :serviceId")
19-
abstract override fun listByService(serviceId: Int): Flowable<List<SubscriptionEntity>>
19+
@Insert(onConflict = OnConflictStrategy.IGNORE)
20+
suspend fun insertAll(entities: List<SubscriptionEntity>): List<Long>
21+
22+
@Update
23+
suspend fun update(entity: SubscriptionEntity)
24+
25+
@Delete
26+
fun delete(entity: SubscriptionEntity)
2027

2128
@Query("SELECT * FROM subscriptions ORDER BY name COLLATE NOCASE ASC")
22-
abstract override fun getAll(): Flowable<List<SubscriptionEntity>>
29+
fun getAll(): Flow<List<SubscriptionEntity>>
2330

2431
@Query(
2532
"""
@@ -30,7 +37,7 @@ abstract class SubscriptionDAO : BasicDAO<SubscriptionEntity> {
3037
ORDER BY name COLLATE NOCASE ASC
3138
"""
3239
)
33-
abstract fun getSubscriptionsFiltered(filter: String): Flowable<List<SubscriptionEntity>>
40+
fun getSubscriptionsFiltered(filter: String): Flow<List<SubscriptionEntity>>
3441

3542
@RewriteQueriesToDropUnusedColumns
3643
@Query(
@@ -45,9 +52,7 @@ abstract class SubscriptionDAO : BasicDAO<SubscriptionEntity> {
4552
ORDER BY name COLLATE NOCASE ASC
4653
"""
4754
)
48-
abstract fun getSubscriptionsOnlyUngrouped(
49-
currentGroupId: Long
50-
): Flowable<List<SubscriptionEntity>>
55+
fun getSubscriptionsOnlyUngrouped(currentGroupId: Long): Flow<List<SubscriptionEntity>>
5156

5257
@RewriteQueriesToDropUnusedColumns
5358
@Query(
@@ -63,45 +68,36 @@ abstract class SubscriptionDAO : BasicDAO<SubscriptionEntity> {
6368
ORDER BY name COLLATE NOCASE ASC
6469
"""
6570
)
66-
abstract fun getSubscriptionsOnlyUngroupedFiltered(
71+
fun getSubscriptionsOnlyUngroupedFiltered(
6772
currentGroupId: Long,
6873
filter: String
69-
): Flowable<List<SubscriptionEntity>>
74+
): Flow<List<SubscriptionEntity>>
7075

7176
@Query("SELECT * FROM subscriptions WHERE url LIKE :url AND service_id = :serviceId")
72-
abstract fun getSubscriptionFlowable(serviceId: Int, url: String): Flowable<List<SubscriptionEntity>>
77+
fun getSubscriptionFlowable(serviceId: Int, url: String): Flowable<List<SubscriptionEntity>>
7378

7479
@Query("SELECT * FROM subscriptions WHERE url LIKE :url AND service_id = :serviceId")
75-
abstract fun getSubscription(serviceId: Int, url: String): Maybe<SubscriptionEntity>
80+
suspend fun getSubscription(serviceId: Int, url: String): SubscriptionEntity?
7681

7782
@Query("SELECT * FROM subscriptions WHERE uid = :subscriptionId")
78-
abstract fun getSubscription(subscriptionId: Long): SubscriptionEntity
79-
80-
@Query("DELETE FROM subscriptions")
81-
abstract override fun deleteAll(): Int
83+
suspend fun getSubscription(subscriptionId: Long): SubscriptionEntity
8284

8385
@Query("DELETE FROM subscriptions WHERE url LIKE :url AND service_id = :serviceId")
84-
abstract fun deleteSubscription(serviceId: Int, url: String): Int
85-
86-
@Query("SELECT uid FROM subscriptions WHERE url LIKE :url AND service_id = :serviceId")
87-
internal abstract fun getSubscriptionIdInternal(serviceId: Int, url: String): Long?
88-
89-
@Insert(onConflict = OnConflictStrategy.IGNORE)
90-
internal abstract fun silentInsertAllInternal(entities: List<SubscriptionEntity>): List<Long>
86+
fun deleteSubscription(serviceId: Int, url: String): Int
9187

9288
@Transaction
93-
open fun upsertAll(entities: List<SubscriptionEntity>) {
94-
val insertUidList = silentInsertAllInternal(entities)
89+
suspend fun upsertAll(entities: List<SubscriptionEntity>) {
90+
val insertUidList = insertAll(entities)
9591

9692
insertUidList.forEachIndexed { index: Int, uidFromInsert: Long ->
9793
val entity = entities[index]
9894

9995
if (uidFromInsert != -1L) {
10096
entity.uid = uidFromInsert
10197
} else {
102-
val subscriptionIdFromDb = getSubscriptionIdInternal(entity.serviceId, entity.url)
98+
val subscriptionIdFromDb = getSubscription(entity.serviceId, entity.url)
10399
?: throw IllegalStateException("Subscription cannot be null just after insertion.")
104-
entity.uid = subscriptionIdFromDb
100+
entity.uid = subscriptionIdFromDb.uid
105101

106102
update(entity)
107103
}

app/src/main/java/org/schabi/newpipe/fragments/list/channel/ChannelFragment.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,6 @@ private void updateSubscription(final ChannelInfo info) {
318318
"Updating subscription for " + info.getUrl(), info));
319319

320320
disposables.add(subscriptionManager.updateChannelInfo(info)
321-
.subscribeOn(Schedulers.io())
322321
.observeOn(AndroidSchedulers.mainThread())
323322
.subscribe(onComplete, onError));
324323
}
@@ -444,8 +443,6 @@ private void setNotify(final boolean isEnabled) {
444443
currentInfo.getServiceId(),
445444
currentInfo.getUrl(),
446445
isEnabled ? NotificationMode.ENABLED : NotificationMode.DISABLED)
447-
.subscribeOn(Schedulers.io())
448-
.observeOn(AndroidSchedulers.mainThread())
449446
.subscribe()
450447
);
451448
}

app/src/main/java/org/schabi/newpipe/local/feed/FeedDatabaseManager.kt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,13 @@ class FeedDatabaseManager(context: Context) {
7373
outdatedThreshold: OffsetDateTime
7474
) = feedTable.getAllOutdatedForGroup(groupId, outdatedThreshold)
7575

76-
fun markAsOutdated(subscriptionId: Long) = feedTable
77-
.setLastUpdatedForSubscription(FeedLastUpdatedEntity(subscriptionId, null))
76+
suspend fun markAsOutdated(subscriptionId: Long) =
77+
feedTable.setLastUpdatedForSubscription(FeedLastUpdatedEntity(subscriptionId))
7878

79-
fun doesStreamExist(stream: StreamInfoItem): Boolean {
80-
return streamTable.exists(stream.serviceId, stream.url)
81-
}
79+
suspend fun doesStreamExist(stream: StreamInfoItem) =
80+
streamTable.getStream(stream.serviceId, stream.url) != null
8281

83-
fun upsertAll(
82+
suspend fun upsertAll(
8483
subscriptionId: Long,
8584
items: List<StreamInfoItem>,
8685
oldestAllowedDate: OffsetDateTime = FEED_OLDEST_ALLOWED_DATE

0 commit comments

Comments
 (0)