package com.mogo.functions.test import androidx.test.core.app.ActivityScenario import androidx.test.ext.junit.runners.AndroidJUnit4 import androidx.test.filters.LargeTest import com.mogo.eagle.core.function.main.MainLauncherActivity import io.reactivex.Observable import io.reactivex.ObservableOnSubscribe import io.reactivex.schedulers.Schedulers import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.Before import org.junit.Test import org.junit.runner.RunWith import java.util.concurrent.TimeUnit.SECONDS @RunWith(AndroidJUnit4::class) @LargeTest class RxJavaSchedulersTest { lateinit var launch: ActivityScenario @Before fun before() { launch = ActivityScenario.launch(MainLauncherActivity::class.java) } @Test fun testRxJavaIoSchedulers() = runBlocking { val list = mutableListOf() for (i in 1..10) { list += i } val result = Observable.fromIterable(list) .doOnNext { assert(Thread.currentThread().name.startsWith("io-pool-")) } .subscribeOn(Schedulers.io()) .reduce(0) { adder, toAdd -> val sum = adder + toAdd sum } .blockingGet() assert(result == 55) delay(SECONDS.toMillis(20)) } @Test fun testRxJavaCpuSchedulers() = runBlocking { val result = Observable.create(ObservableOnSubscribe { emitter -> for (i in 1..10) { emitter.onNext(i) } assert(Thread.currentThread().name.startsWith("cpu-pool-")) emitter.onComplete() }).reduce(0) { adder, toAdd -> adder + toAdd }.subscribeOn(Schedulers.computation()).blockingGet() assert(result == 55) delay(SECONDS.toMillis(20)) } @Test fun testRxJavaIntervalSchedulers() = runBlocking { var counter = 0 Observable.intervalRange(0, 10, 1, 1 ,SECONDS) .doOnNext { counter ++ } .subscribe() delay(SECONDS.toMillis(20)) assert(counter == 10) } }