CreateDBThread.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. #include "CreateDBThread.h"
  2. #include "GlobalVariable.h"
  3. #include "GlobalInfo.h"
  4. #include "CalculateAudio.h"
  5. #include "spdlog.h"
  6. CreateDBThread::CreateDBThread(RecordThreadInfo_t& threadInfo)
  7. : BaseRecordThread(threadInfo)
  8. {
  9. }
  10. CreateDBThread::~CreateDBThread()
  11. {
  12. }
  13. /* 停止线程 */
  14. void CreateDBThread::thread_stop()
  15. {
  16. if(m_isRunning.load())
  17. {
  18. m_isRunning.store(false);
  19. }
  20. /* 环形阻塞的线程 */
  21. m_queueRealTimeData.exit();
  22. }
  23. void CreateDBThread::thread_stop_block()
  24. {
  25. thread_stop();
  26. while(true)
  27. {
  28. if(m_isStoped.load())
  29. {
  30. break;
  31. }
  32. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  33. }
  34. }
  35. /* 设置数据 */
  36. bool CreateDBThread::setData(const AudioSrcData& srcData)
  37. {
  38. if(srcData.pData == nullptr || srcData.dataSize == 0)
  39. {
  40. SPDLOG_LOGGER_ERROR(m_logger, "{} 设置数据失败,srcData为空或dataSize为0", m_logBase);
  41. return false;
  42. }
  43. /* 创建一个新的AudioSrcData对象 */
  44. AudioSrcData* audioData = new AudioSrcData(srcData);
  45. if(audioData == nullptr)
  46. {
  47. SPDLOG_LOGGER_ERROR(m_logger, "{} 创建AudioSrcData对象失败", m_logBase);
  48. return false;
  49. }
  50. /* 判断环形队列是否满 */
  51. if(m_queueAudioData.isFull())
  52. {
  53. /* 出队一个最早的元素 */
  54. AudioSrcData* oldData = m_queueAudioData.front_pop();
  55. SPDLOG_LOGGER_TRACE(m_logger, "{} 环形队列已满,出队一个元素,时间: {}, 大小: {}",
  56. m_logBase, oldData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), oldData->dataSize);
  57. if(oldData != nullptr)
  58. {
  59. delete oldData;
  60. oldData = nullptr;
  61. }
  62. }
  63. /* 入队新的数据 */
  64. if(!m_queueAudioData.push_noBlock(audioData))
  65. {
  66. SPDLOG_LOGGER_ERROR(m_logger, "{} 数据加入环形队列失败", m_logBase);
  67. delete audioData; // 入队失败,释放内存
  68. audioData = nullptr;
  69. return false;
  70. }
  71. return true;
  72. }
  73. /* 设置实时数据 */
  74. bool CreateDBThread::setRealTimeData(const AudioSrcData& srcData)
  75. {
  76. auto oldData = m_queueRealTimeData.push_pop(new AudioSrcData(srcData));
  77. if(oldData != nullptr)
  78. {
  79. /* 这里一般不会满,会被实时消耗,如果满了,说明出现问题了 */
  80. SPDLOG_LOGGER_WARN(m_logger, "{} 实时数据环形队列已满,出队一个元素,时间: {}, 大小: {}",
  81. m_logBase, oldData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), oldData->dataSize);
  82. delete oldData;
  83. oldData = nullptr;
  84. }
  85. return true;
  86. }
  87. /* 获取最新的结果,根据时间进行对比,最新的时间比传入的晚,就是有新的数据了 */
  88. bool CreateDBThread::getLatestResult(OneSecondData& resultData)
  89. {
  90. if(resultData.startTime.isNull())
  91. {
  92. // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取最新结果失败,传入的时间戳为空", m_logBase);
  93. return false;
  94. }
  95. std::lock_guard<std::mutex> lock(m_queueResultData->mutex);
  96. auto lastSecondData = m_queueResultData->back();
  97. if(lastSecondData == nullptr)
  98. {
  99. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取最新结果失败,结果队列为空", m_logBase);
  100. return false;
  101. }
  102. /* 比较时间戳 */
  103. if(lastSecondData->startTime <= resultData.startTime)
  104. {
  105. return false;
  106. }
  107. resultData = *lastSecondData;
  108. return true;
  109. }
  110. /* 获取最新的结果,让整个环形队列相等 */
  111. bool CreateDBThread::getLatestResult(RingQueueManualMutex<OneSecondData*>& resultQueue)
  112. {
  113. std::lock_guard<std::mutex> lock(m_queueResultData->mutex);
  114. if(m_queueResultData->isEmpty())
  115. {
  116. // SPDLOG_LOGGER_TRACE(m_logger, "{} 获取最新数据,数据队列为空", m_logBase);
  117. return false;
  118. }
  119. /* 目标队列为空,全部拷贝 */
  120. if(resultQueue.isEmpty())
  121. {
  122. SPDLOG_LOGGER_DEBUG(m_logger, "{} 获取最新数据,目标队列为空,拷贝整个结果队列", m_logBase);
  123. for(int i = 0; i < m_queueResultData->QueueSize(); ++i)
  124. {
  125. OneSecondData* data = m_queueResultData->at(i);
  126. if(data != nullptr)
  127. {
  128. OneSecondData* newData = new OneSecondData(*data); // 深拷贝
  129. resultQueue.push(newData);
  130. }
  131. }
  132. return true;
  133. }
  134. /* 队列不为空,查找队列中相等的位置,拷贝后面的数据,不拷贝整个队列,减少开销
  135. * 这里直接从最新的数据往前找 */
  136. auto itBack = resultQueue.back();
  137. int index = m_queueResultData->QueueSize() - 1;
  138. while(index >= 0)
  139. {
  140. OneSecondData* data = m_queueResultData->at(index);
  141. if(data == nullptr)
  142. {
  143. --index;
  144. continue;
  145. }
  146. /* 找到相等的位置 */
  147. if(data->startTime == itBack->startTime)
  148. {
  149. break;
  150. }
  151. --index;
  152. }
  153. if(index < 0)
  154. {
  155. SPDLOG_LOGGER_WARN(m_logger, "{} 获取最新数据失败,未找到相等的时间戳,将清空队列,拷贝全部数据", m_logBase);
  156. SPDLOG_LOGGER_WARN(m_logger, "队列大小: {}, 目标队列大小: {}", m_queueResultData->QueueSize(), resultQueue.QueueSize());
  157. /* 清空目标队列,将所有的数据全部拷贝 */
  158. while(!resultQueue.isEmpty())
  159. {
  160. OneSecondData* data = resultQueue.front_pop();
  161. if(data != nullptr)
  162. {
  163. delete data;
  164. data = nullptr;
  165. }
  166. }
  167. /* 拷贝全部数据 */
  168. for(int i = 0; i < m_queueResultData->QueueSize(); ++i)
  169. {
  170. OneSecondData* data = m_queueResultData->at(i);
  171. if(data != nullptr)
  172. {
  173. OneSecondData* newData = new OneSecondData(*data); // 深拷贝
  174. resultQueue.push(newData);
  175. }
  176. }
  177. return true;
  178. }
  179. else if(index == m_queueResultData->QueueSize() - 1)
  180. {
  181. // 已经是最新的数据了,不需要拷贝
  182. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 获取最新数据,已经是最新的数据了", m_logBase);
  183. return false;
  184. }
  185. /* 拷贝数据 */
  186. for(; index < m_queueResultData->QueueSize(); ++index)
  187. {
  188. OneSecondData* data = m_queueResultData->at(index);
  189. if(data == nullptr)
  190. {
  191. continue;
  192. }
  193. OneSecondData* newData = new OneSecondData(*data); // 深拷贝
  194. auto fornt = resultQueue.push(newData);
  195. if(fornt != nullptr)
  196. {
  197. delete fornt; // 出队的元素是之前的结果,释放内存
  198. fornt = nullptr;
  199. }
  200. }
  201. return true;
  202. }
  203. /**
  204. * @brief 获取最新的音量包结果
  205. *
  206. * @param listData
  207. * @param count 需要拷贝的数目,如果暂时没有这么多数据,则返回false
  208. * @return true 获取成功
  209. * @return false 无数据或者数据不足
  210. */
  211. bool CreateDBThread::getLatestRealTimeResult(std::list<OneDBData>& listData, const int getCount)
  212. {
  213. std::lock_guard<std::mutex> lock(m_mutexRealTimeResult);
  214. if(getCount <= 0)
  215. {
  216. return false;
  217. }
  218. if(m_listRealTimeResult.empty() || static_cast<int>(m_listRealTimeResult.size()) < getCount)
  219. {
  220. // SPDLOG_LOGGER_TRACE(m_logger, "{} 最新实时音量数据为空", m_logBase);
  221. return false;
  222. }
  223. auto it = m_listRealTimeResult.end();
  224. if(listData.empty())
  225. {
  226. SPDLOG_LOGGER_TRACE(m_logger, "{} 获取最新实时音量数据,列表为空,拷贝最新的{}个数据", m_logBase, getCount);
  227. for(int i = 0; i < getCount; ++i)
  228. {
  229. it--;
  230. }
  231. }else
  232. {
  233. /* 先向前移动getCount个数据,判断新数据是否满足 */
  234. auto oldData = listData.back();
  235. for(int i = 0; i < getCount; ++i)
  236. {
  237. it--;
  238. }
  239. if(it->startTime <= oldData.startTime)
  240. {
  241. return false;
  242. }
  243. /* 继续向前查找已发送的时间点 */
  244. /* 找到需要开始拷贝的元素位置 */
  245. while(it != m_listRealTimeResult.begin())
  246. {
  247. if(it->startTime <= oldData.startTime)
  248. {
  249. break; // 找到需要开始拷贝的元素位置
  250. }
  251. it--;
  252. }
  253. /* 上面的循环无法判断begin(),这里单独判断一下 */
  254. if(it == m_listRealTimeResult.begin())
  255. {
  256. if(it->startTime <= oldData.startTime)
  257. {
  258. /* begin()是最新的,指向下一个可以拷贝的数据 */
  259. it++;
  260. }else
  261. {
  262. /* 列表中所有的数据都比已发送的数据新,只拷贝最新的getCount个数据 */
  263. it = m_listRealTimeResult.end();
  264. for(int i = 0; i < getCount; ++i)
  265. {
  266. it--;
  267. }
  268. }
  269. }else
  270. {
  271. /* 移动到下一个可以拷贝的新数据位置 */
  272. it++;
  273. }
  274. }
  275. /* 拷贝数据 */
  276. listData.clear();
  277. for(int i = 0; it != m_listRealTimeResult.end() && i < getCount; ++it, ++i)
  278. {
  279. listData.push_back(*it);
  280. }
  281. // SPDLOG_LOGGER_TRACE(m_logger, "{} 获取最新实时音量数据成功,数量: {}", m_logBase, listData.size());
  282. return true;
  283. }
  284. /* 获取一个最新音量值 */
  285. bool CreateDBThread::getLatestRealTimeResult(OneDBData& resultData)
  286. {
  287. std::lock_guard<std::mutex> lock(m_mutexRealTimeResult);
  288. if(m_listRealTimeResult.empty())
  289. {
  290. // SPDLOG_LOGGER_TRACE(m_logger, "{} 最新实时音量数据为空", m_logBase);
  291. return false;
  292. }
  293. resultData = m_listRealTimeResult.front();
  294. m_listRealTimeResult.pop_front();
  295. return true;
  296. }
  297. /* 计算音量和反相的线程函数 */
  298. void CreateDBThread::task()
  299. {
  300. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始计算音量线程", m_logBase);
  301. /* 初始化一些数据 */
  302. if(!initData())
  303. {
  304. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  305. return;
  306. }
  307. while(m_isRunning)
  308. {
  309. /*--------------------------------------------------------------
  310. * 计算实时音量值(这个数据量很小,大概只有33ms,所以阻塞在这里)
  311. *--------------------------------------------------------------*/
  312. /* 判断实时数据列表是否有数据,没有数据则阻塞,直到有数据 */
  313. auto rtData = m_queueRealTimeData.front();
  314. if(rtData == nullptr)
  315. {
  316. SPDLOG_LOGGER_DEBUG(m_logger, "{} 实时数据队列为空,等待数据", m_logBase);
  317. continue;
  318. }
  319. // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
  320. /* 计算实时音量值 */
  321. calculateRealTimeVolume();
  322. // std::chrono::steady_clock::time_point rtEndTime = std::chrono::steady_clock::now();
  323. // std::chrono::microseconds rtDuration = std::chrono::duration_cast<std::chrono::microseconds>(rtEndTime - startTime);
  324. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 计算实时音量包完成,耗时: {} 微秒", m_logBase, rtDuration.count());
  325. /*--------------------------------------------------------------
  326. * 计算常规音量值,1秒计算一次
  327. *--------------------------------------------------------------*/
  328. calculateVolumeData();
  329. // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now();
  330. // std::chrono::microseconds duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - rtEndTime);
  331. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 计算1秒的音量包完成,耗时: {} 微秒", m_logBase, duration.count());
  332. }
  333. /* 清理数据 */
  334. clearData();
  335. SPDLOG_LOGGER_WARN(m_logger, "➢ {} 计算音量线程结束 ", m_logBase);
  336. }
  337. /* 初始化一些数据 */
  338. bool CreateDBThread::initData()
  339. {
  340. int queueSize = GInfo.queueElementCount();
  341. /* 初始化一些数据 */
  342. m_queueAudioData.clearQueue();
  343. m_queueAudioData.setQueueCapacity(queueSize);
  344. m_queueRealTimeData.setQueueCapacity(queueSize);
  345. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  346. m_numChannels = GInfo.numChannels(); /* 声道数 */
  347. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  348. /* 一秒钟数据大小,单位:字节 */
  349. m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8);
  350. /* 计算每个音量值需要的数据大小,单位数据单位: short,一个通道的大小 */
  351. m_oneDBLengthOfSrcData = m_sampleRate / VOLUME_INFO_NUM;
  352. m_singleDataLength = m_oneSecondSize / m_numChannels;
  353. /* 计算音量和反相的环形队列元素数目,默认是180个,即180秒 */
  354. // m_queueAudioDataCapacity = queueSize;
  355. m_queueResultData = new RingQueueManualMutex<OneSecondData*>(queueSize);
  356. m_queueResultData->mutex.lock();
  357. m_queueResultData->setDefaultValue(nullptr);
  358. m_queueResultData->mutex.unlock();
  359. m_numMaxResultData = 60;
  360. return true;
  361. }
  362. /* 清理数据 */
  363. void CreateDBThread::clearData()
  364. {
  365. /* 清理环形队列 */
  366. while(!m_queueAudioData.isEmpty())
  367. {
  368. AudioSrcData* data = m_queueAudioData.front_pop();
  369. if(data != nullptr)
  370. {
  371. delete data;
  372. data = nullptr;
  373. }
  374. }
  375. // if(m_remainData != nullptr)
  376. // {
  377. // delete m_remainData;
  378. // m_remainData = nullptr;
  379. // }
  380. }
  381. /* 计算音量和反相 */
  382. bool CreateDBThread::CreateDBPhase(AudioSrcData* audioData)
  383. {
  384. short* pWaveVu = reinterpret_cast<short*>(audioData->pData);
  385. /* 一秒钟平分30个音量值,每个音量值占有的长度 */
  386. const int oneDBSize = m_oneDBLengthOfSrcData;
  387. StAudioNum audioInfo;
  388. audioInfo.cardRoadInfo = m_threadInfo.cardRoadInfo; // 设置通道信息
  389. audioInfo.nTotal = m_singleDataLength;
  390. int iCurPos = 0;
  391. /* 存储结果 */
  392. m_result = new OneSecondData();
  393. /* 计算音量和反相的计算类 */
  394. CAudio2ChanCorrelator audioCor;
  395. /* 音量值计算,好多个字节计算出一个最大音量值 */
  396. for(int i = 0; i < VOLUME_INFO_NUM; ++i)
  397. {
  398. // 采样点最大值
  399. short sMaxA, sMaxB, sRMSA, sRMSB;
  400. audioCor.CorrelateChunks(pWaveVu + iCurPos, (pWaveVu + iCurPos + 1), oneDBSize, &sMaxA, &sMaxB, &sRMSA, &sRMSB, audioInfo);
  401. /* 这里乘2是要增加两个通道的数据大小 */
  402. iCurPos += (oneDBSize * 2);
  403. m_result->aryLeftDB[i] = calculateDB(sMaxA);
  404. m_result->aryRightDB[i] = calculateDB(sMaxB);
  405. // fmt::print("音量计算: sMaxA: {}, sMaxB: {}, sRMSA: {}, sRMSB: {}, LDB:{}, RDB:{}\n", sMaxA, sMaxB, sRMSA, sRMSB,
  406. // m_result->aryLeftDB[i], m_result->aryRightDB[i]);
  407. // 获取反相值,-100 到 100;反相是左右声道比对得到的值,所以只有一个
  408. int iReversed = audioCor.GetCorrelationLevel();
  409. if(iReversed > REVERSED_MAX_VALUE || iReversed < REVERSED_MIN_VALUE)
  410. {
  411. iReversed = 0;
  412. }
  413. // 和反相阈值比较,来判定是否为反相
  414. float dReversed = iReversed / 100.00;
  415. m_result->aryPhase[i] = dReversed;
  416. }
  417. /* 设置时间戳 */
  418. m_result->startTime = audioData->startTime;
  419. m_result->endTime = audioData->endTime;
  420. return true;
  421. }
  422. /* 计算音量数据 */
  423. bool CreateDBThread::calculateVolumeData()
  424. {
  425. /* 判断是否够一秒的数据 */
  426. if(m_queueAudioData.isEmpty())
  427. {
  428. return false;;
  429. }
  430. auto audioData = m_queueAudioData.front_pop();
  431. if(audioData == nullptr)
  432. {
  433. SPDLOG_LOGGER_WARN(m_logger, "{} 从环形队列中取出数据失败,可能是队列为空", m_logBase);
  434. // std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 等待一段时间再尝试
  435. return false;;
  436. }
  437. /* 计算音量和反相,结果存储在m_result */
  438. if(!CreateDBPhase(audioData))
  439. {
  440. SPDLOG_LOGGER_ERROR(m_logger, "{} 计算音量和反相失败", m_logBase);
  441. }
  442. /* 将结果放入结果队列 */
  443. if(m_result != nullptr)
  444. {
  445. std::lock_guard<std::mutex> lock(m_queueResultData->mutex);
  446. auto result = m_queueResultData->push(m_result);
  447. if(result != nullptr)
  448. {
  449. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 队列已满,出队一个元素", m_logBase);
  450. delete result; // 出队的元素是之前的结果,释放内存
  451. result = nullptr;
  452. }
  453. m_result = nullptr; // 清空结果指针,等待下一次使用
  454. }
  455. delete audioData; // 释放内存
  456. audioData = nullptr;
  457. return true;
  458. }
  459. /* 计算实时音量值 */
  460. bool CreateDBThread::calculateRealTimeVolume()
  461. {
  462. /* 一秒钟平分30个音量值,每个音量值占有的长度 */
  463. const int oneDBSize = m_oneDBLengthOfSrcData;
  464. while(!m_queueRealTimeData.isEmpty())
  465. {
  466. /* 取出一个数据,没有数据则阻塞住 */
  467. AudioSrcData* audioData = m_queueRealTimeData.front_pop_noBlock();
  468. if(audioData == nullptr)
  469. {
  470. continue;
  471. }
  472. /* 计算音量和反相的计算类 */
  473. CAudio2ChanCorrelator audioCor;
  474. StAudioNum audioInfo;
  475. audioInfo.cardRoadInfo = m_threadInfo.cardRoadInfo; // 设置通道信息
  476. audioInfo.nTotal = m_singleDataLength;
  477. short* pData = reinterpret_cast<short*>(audioData->pData);
  478. // 采样点最大值
  479. short sMaxA, sMaxB, sRMSA, sRMSB;
  480. audioCor.CorrelateChunks(pData, pData + 1, oneDBSize, &sMaxA, &sMaxB, &sRMSA, &sRMSB, audioInfo);
  481. /* 这里乘2是要增加两个通道的数据大小 */
  482. int leftDB = calculateDB(sMaxA);
  483. int rightDB = calculateDB(sMaxB);
  484. // fmt::print("音量计算: sMaxA: {}, sMaxB: {}, sRMSA: {}, sRMSB: {}, LDB:{}, RDB:{}\n", sMaxA, sMaxB, sRMSA, sRMSB,
  485. // m_result->aryLeftDB[i], m_result->aryRightDB[i]);
  486. // 获取反相值,-100 到 100;反相是左右声道比对得到的值,所以只有一个
  487. int iReversed = audioCor.GetCorrelationLevel();
  488. if(iReversed > REVERSED_MAX_VALUE || iReversed < REVERSED_MIN_VALUE)
  489. {
  490. iReversed = 0;
  491. }
  492. // 和反相阈值比较,来判定是否为反相
  493. float dReversed = iReversed / 100.00;
  494. OneDBData oneDBData;
  495. oneDBData.leftDB = leftDB;
  496. oneDBData.rightDB = rightDB;
  497. oneDBData.phase = dReversed;
  498. oneDBData.startTime = audioData->startTime;
  499. oneDBData.endTime = audioData->endTime;
  500. /* 保存结果 */
  501. std::lock_guard<std::mutex> lock(m_mutexRealTimeResult);
  502. m_listRealTimeResult.push_back(oneDBData);
  503. while(static_cast<int>(m_listRealTimeResult.size()) > m_numMaxResultData)
  504. {
  505. /* 超过最大数量,删除最早的一个 */
  506. m_listRealTimeResult.pop_front();
  507. }
  508. /* 删除元素内存 */
  509. delete audioData;
  510. audioData = nullptr;
  511. }
  512. return true;
  513. }