用线程池的方案替换了rxjava

This commit is contained in:
tongchenfei
2020-07-09 16:20:59 +08:00
parent cc07e79efa
commit ecf6109cb6
4 changed files with 91 additions and 72 deletions

View File

@@ -7,7 +7,7 @@ import java.util.concurrent.ThreadFactory;
public class ThreadPoolService {
private static final ExecutorService SERVICE = Executors.newFixedThreadPool( 3, new ThreadFactoryImpl() );
private static final ExecutorService SINGLE_THREAD_SERVICE = Executors.newSingleThreadExecutor(new SingleThreadFactoryImpl());
private static class ThreadFactoryImpl implements ThreadFactory {
private static int mCounter = 1;
@@ -18,10 +18,26 @@ public class ThreadPoolService {
}
}
/**
* 单线程队列执行的ThreadFactory实现应该只会new一个Thread
*/
private static class SingleThreadFactoryImpl implements ThreadFactory{
private static int counter = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "SingleThread - " + counter++);
}
}
private ThreadPoolService() {
}
public static void execute( Runnable task ) {
SERVICE.execute( task );
}
public static void singleExecute(Runnable task) {
SINGLE_THREAD_SERVICE.execute(task);
}
}

View File

@@ -2,7 +2,12 @@ package com.zhidao.mogo.module.event.panel.dao
import androidx.room.*
import com.zhidao.mogo.module.event.panel.bean.TripRecord
import io.reactivex.Single
/**
* 出行动态dao,全部使用同步方法,在线程中执行
*
* @author tongchenfei
*/
@Dao
interface TripRecordDao {
/**
@@ -10,7 +15,7 @@ interface TripRecordDao {
* @param limitTime 当日0点的时间戳
*/
@Query(value = "SELECT * FROM TripRecord WHERE recordTime > :limitTime ORDER BY recordTime DESC")
fun queryAllTripRecord(limitTime:Long):Single<List<TripRecord>>
fun queryAllTripRecord(limitTime: Long): List<TripRecord>
/**
* 根据事件id获取出行动态
@@ -18,16 +23,19 @@ interface TripRecordDao {
* @param eventId 事件id [com.mogo.module.common.entity.V2XRoadEventEntity.noveltyInfo.infoId]
*/
@Query(value = "SELECT * FROM TripRecord WHERE eventId = :eventId")
fun queryTripRecordByEventId(eventId:String):Single<TripRecord>
fun queryTripRecordByEventId(eventId: String): TripRecord?
@Query(value = "SELECT * FROM TripRecord WHERE id = :id")
fun queryTripRecordById(id: Int): TripRecord?
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(vararg tripRecord: TripRecord)
@Update
fun update(vararg tripRecord: TripRecord)
fun update(vararg tripRecord: TripRecord)
@Delete
fun delete(vararg tripRecord: TripRecord)
fun delete(vararg tripRecord: TripRecord)
/**
* 删除超时的数据即当天0时以前的数据

View File

@@ -8,6 +8,7 @@ import com.zhidao.mogo.module.event.panel.EventPanelConstants.MODULE_NAME
import com.zhidao.mogo.module.event.panel.R
import com.zhidao.mogo.module.event.panel.bean.TripRecord
import com.zhidao.mogo.module.event.panel.presenter.TripRecordPresenter
import com.zhidao.mogo.module.event.panel.util.TripRecordDataManager
import kotlinx.android.synthetic.main.module_event_panel_fragment_trip_record.*
import kotlin.random.Random
@@ -19,6 +20,12 @@ class TripRecordFragment : MvpFragment<TripRecordFragment, TripRecordPresenter>(
override fun getLayoutId(): Int = R.layout.module_event_panel_fragment_trip_record
override fun initViews() {
btnInsert.setOnClickListener {
TripRecordDataManager.insertTripRecord(TripRecord(id = 456, eventType = 456,eventId = "456"))
}
btnUpdate.setOnClickListener {
TripRecordDataManager.syncRoadEventModifyState(eventId = "456", modifyType = "2")
}
btnQuery.setOnClickListener {
Log.d(MODULE_NAME, "local list: $tripRecordList")
mPresenter.queryAllTripRecord()

View File

@@ -4,8 +4,7 @@ import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.util.Log
import androidx.lifecycle.LifecycleOwner
import android.os.Handler
import androidx.localbroadcastmanager.content.LocalBroadcastManager
import com.mogo.module.common.entity.V2XMessageEntity
import com.mogo.module.common.entity.V2XMessageEntity.V2XTypeEnum.*
@@ -18,10 +17,6 @@ import com.zhidao.mogo.module.event.panel.bean.TripRecord
import com.zhidao.mogo.module.event.panel.dao.TripRecordDao
import com.zhidao.mogo.module.event.panel.dao.TripRecordDatabase
import com.zhidao.mogo.module.event.panel.listener.ITripRecordCallback
import io.reactivex.Single
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.schedulers.Schedulers
import java.util.*
import kotlin.collections.ArrayList
@@ -37,9 +32,9 @@ private const val BROADCAST_SCENE_MODIFY_EVENT_UPDATE_TYPE_KEY = "updateType"
* 出行动态的数据管理类由于TripRecordFragment初始化时机较晚所以封装一个单例类提早初始化
*/
object TripRecordDataManager {
lateinit var context: Context
private lateinit var context: Context
private val sceneEventReceiver = SceneEventReceiver()
private lateinit var tripRecordDao:TripRecordDao
private lateinit var tripRecordDao: TripRecordDao
private val tripRecordCallbackList = ArrayList<ITripRecordCallback>()
@@ -58,51 +53,49 @@ object TripRecordDataManager {
tripRecordCallbackList.remove(callback)
}
private val compositeDisposable = CompositeDisposable()
private val handler = Handler()
fun queryAllTripRecord() {
val queryDisposable = tripRecordDao.queryAllTripRecord(countLimitTime()).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).map {
Logger.d(MODULE_NAME, "delete over time record when query thread is ${Thread.currentThread().name}")
tripRecordDao.deleteOvertimeTripRecord(countLimitTime())
it
}.observeOn(AndroidSchedulers.mainThread()).subscribe { it ->
Log.d(MODULE_NAME, "db query: $it, thread is ${Thread.currentThread().name}")
tripRecordCallbackList.forEach {callback->
callback.queryTripRecordListSuccess(it)
ThreadPoolService.singleExecute {
val limitTime = countLimitTime()
// 查询所有的出行动态
val tripRecordList = tripRecordDao.queryAllTripRecord(limitTime)
Logger.d(MODULE_NAME, "查询所有出行动态,limitTime: $limitTime, list: $tripRecordList")
// 删除超时的出行动态
tripRecordDao.deleteOvertimeTripRecord(limitTime)
// 切换线程回调
handler.post {
tripRecordCallbackList.forEach { callback ->
callback.queryTripRecordListSuccess(tripRecordList)
}
}
}
compositeDisposable.add(queryDisposable)
}
private fun insertTripRecord(record: TripRecord) {
val disposable = Single.create<TripRecord> {
Logger.d(MODULE_NAME, "准备将数据插入数据库: $record")
tripRecordDao.insert(record)
it.onSuccess(record)
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).map {
fun insertTripRecord(record: TripRecord) {
ThreadPoolService.singleExecute {
// 先查一下此数据是否存在
Logger.d(MODULE_NAME, "出行动态入库===$record")
var check = tripRecordDao.queryTripRecordById(record.id)
if (check == null) {
Logger.d(MODULE_NAME, "数据库中不存在此数据,直接插入")
check = record
tripRecordDao.insert(record)
} else {
Logger.d(MODULE_NAME, "数据库中存在此数据更新recordTime即可其他参数不变")
check.recordTime = System.currentTimeMillis()
tripRecordDao.update(check)
}
// 为了防止只进不出,所以在插入新数据时,进行过期数据删除操作
Logger.d(MODULE_NAME, "delete over time record when insert: $it")
tripRecordDao.deleteOvertimeTripRecord(countLimitTime())
record
}.observeOn(AndroidSchedulers.mainThread()).subscribe{ it->
Logger.d(MODULE_NAME, "插入+删除操作完成,做界面展示===$it")
tripRecordCallbackList.forEach { callback->
callback.insertOrUpdateTripRecordSuccess(it)
// 切换线程回调
handler.post {
tripRecordCallbackList.forEach { callback ->
callback.insertOrUpdateTripRecordSuccess(check)
}
}
}
compositeDisposable.add(disposable)
}
fun updateTripRecords(vararg records: TripRecord) {
ThreadPoolService.execute {
tripRecordDao.update(*records)
}
}
fun deleteTripRecords(vararg records: TripRecord) {
ThreadPoolService.execute {
tripRecordDao.delete(*records)
}
}
/**
@@ -117,22 +110,16 @@ object TripRecordDataManager {
return calendar.timeInMillis
}
fun release(owner: LifecycleOwner) {
if(!compositeDisposable.isDisposed) {
compositeDisposable.dispose()
}
}
/**
* 处理道路事件推送,保存到本地数据库
* 目前只处理道路事件,违章提醒,他车求助,其他事件暂不处理
*/
private fun dealSceneMessage(type: Int, content: Any) {
if (type in arrayOf(ALERT_ROAD_WARNING, ALERT_ILLEGAL_PARK_WARNING, ALERT_SEEK_WARNING)) {
val eventId = if(type == V2XMessageEntity.V2XTypeEnum.ALERT_ROAD_WARNING){
val eventId = if (type == ALERT_ROAD_WARNING) {
val event = content as V2XRoadEventEntity
event.noveltyInfo.infoId
}else{
} else {
""
}
Logger.d(MODULE_NAME, "处理场景事件,准备插入数据库===eventId: $eventId")
@@ -143,23 +130,24 @@ object TripRecordDataManager {
/**
* 本地数据库同步v2x传过来的纠错信息
*/
private fun syncRoadEventModifyState(eventId: String, modifyType: String) {
fun syncRoadEventModifyState(eventId: String, modifyType: String) {
Logger.d(MODULE_NAME, "准备同步纠错信息: $eventId, $modifyType")
val disposable = tripRecordDao.queryTripRecordByEventId(eventId).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).map {
Logger.d(MODULE_NAME, "查询将纠错数据: $it")
it.usefulStatus = modifyType
it
}.map {
Logger.d(MODULE_NAME, "准备更新纠错数据: $it")
tripRecordDao.update(it)
it
}.observeOn(AndroidSchedulers.mainThread()).subscribe { it->
Logger.d(MODULE_NAME, "数据纠错更新完成,准备刷新界面==$it")
tripRecordCallbackList.forEach { callback->
callback.insertOrUpdateTripRecordSuccess(it)
ThreadPoolService.singleExecute {
// 找出需要纠错数据
val willModify = tripRecordDao.queryTripRecordByEventId(eventId)
if (willModify != null) {
// 查到数据进行修改,如果查不到数据,就不做操作了
Logger.d(MODULE_NAME, "准备修改纠错状态:$willModify")
willModify.usefulStatus = modifyType
tripRecordDao.update(willModify)
// 切线程回调
handler.post {
tripRecordCallbackList.forEach { callback ->
callback.insertOrUpdateTripRecordSuccess(willModify)
}
}
}
}
compositeDisposable.add(disposable)
}
class SceneEventReceiver : BroadcastReceiver() {