CreateDBThread.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. #include "CreateDBThread.h"
  2. #include "GlobalVariable.h"
  3. #include "GlobalInfo.h"
  4. #include "CalculateAudio.h"
  5. #include "spdlog.h"
  6. CreateDBThread::CreateDBThread(RecordThreadInfo_t& threadInfo)
  7. : BaseRecordThread(threadInfo)
  8. {
  9. }
  10. CreateDBThread::~CreateDBThread()
  11. {
  12. }
  13. /* 设置数据 */
  14. bool CreateDBThread::setData(const AudioSrcData& srcData)
  15. {
  16. if(srcData.pData == nullptr || srcData.dataSize == 0)
  17. {
  18. SPDLOG_LOGGER_ERROR(m_logger, "{} 设置数据失败,srcData为空或dataSize为0", m_logBase);
  19. return false;
  20. }
  21. /* 创建一个新的AudioSrcData对象 */
  22. AudioSrcData* audioData = new AudioSrcData(srcData);
  23. if(audioData == nullptr)
  24. {
  25. SPDLOG_LOGGER_ERROR(m_logger, "{} 创建AudioSrcData对象失败", m_logBase);
  26. return false;
  27. }
  28. /* 判断环形队列是否满 */
  29. if(m_queueAudioData.isFull())
  30. {
  31. /* 出队一个最早的元素 */
  32. AudioSrcData* oldData = m_queueAudioData.front_pop();
  33. SPDLOG_LOGGER_TRACE(m_logger, "{} 环形队列已满,出队一个元素,时间: {}, 大小: {}",
  34. m_logBase, oldData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), oldData->dataSize);
  35. if(oldData != nullptr)
  36. {
  37. delete oldData;
  38. oldData = nullptr;
  39. }
  40. }
  41. /* 入队新的数据 */
  42. if(!m_queueAudioData.push_noBlock(audioData))
  43. {
  44. SPDLOG_LOGGER_ERROR(m_logger, "{} 数据加入环形队列失败", m_logBase);
  45. delete audioData; // 入队失败,释放内存
  46. audioData = nullptr;
  47. return false;
  48. }
  49. return true;
  50. }
  51. /* 设置实时数据 */
  52. bool CreateDBThread::setRealTimeData(const AudioSrcData& srcData)
  53. {
  54. auto oldData = m_queueRealTimeData.push_pop(new AudioSrcData(srcData));
  55. if(oldData != nullptr)
  56. {
  57. /* 这里一般不会满,会被实时消耗,如果满了,说明出现问题了 */
  58. SPDLOG_LOGGER_WARN(m_logger, "{} 实时数据环形队列已满,出队一个元素,时间: {}, 大小: {}",
  59. m_logBase, oldData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), oldData->dataSize);
  60. delete oldData;
  61. oldData = nullptr;
  62. }
  63. return true;
  64. }
  65. /* 获取最新的结果,根据时间进行对比,最新的时间比传入的晚,就是有新的数据了 */
  66. bool CreateDBThread::getLatestResult(OneSecondData& resultData)
  67. {
  68. if(resultData.startTime.isNull())
  69. {
  70. // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取最新结果失败,传入的时间戳为空", m_logBase);
  71. return false;
  72. }
  73. std::lock_guard<std::mutex> lock(m_queueResultData->mutex);
  74. auto lastSecondData = m_queueResultData->back();
  75. if(lastSecondData == nullptr)
  76. {
  77. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取最新结果失败,结果队列为空", m_logBase);
  78. return false;
  79. }
  80. /* 比较时间戳 */
  81. if(lastSecondData->startTime <= resultData.startTime)
  82. {
  83. return false;
  84. }
  85. resultData = *lastSecondData;
  86. return true;
  87. }
  88. /* 获取最新的结果,让整个环形队列相等 */
  89. bool CreateDBThread::getLatestResult(RingQueueManualMutex<OneSecondData*>& resultQueue)
  90. {
  91. std::lock_guard<std::mutex> lock(m_queueResultData->mutex);
  92. if(m_queueResultData->isEmpty())
  93. {
  94. // SPDLOG_LOGGER_TRACE(m_logger, "{} 获取最新数据,数据队列为空", m_logBase);
  95. return false;
  96. }
  97. /* 目标队列为空,全部拷贝 */
  98. if(resultQueue.isEmpty())
  99. {
  100. SPDLOG_LOGGER_DEBUG(m_logger, "{} 获取最新数据,目标队列为空,拷贝整个结果队列", m_logBase);
  101. for(int i = 0; i < m_queueResultData->QueueSize(); ++i)
  102. {
  103. OneSecondData* data = m_queueResultData->at(i);
  104. if(data != nullptr)
  105. {
  106. OneSecondData* newData = new OneSecondData(*data); // 深拷贝
  107. resultQueue.push(newData);
  108. }
  109. }
  110. return true;
  111. }
  112. /* 队列不为空,查找队列中相等的位置,拷贝后面的数据,不拷贝整个队列,减少开销
  113. * 这里直接从最新的数据往前找 */
  114. auto itBack = resultQueue.back();
  115. int index = m_queueResultData->QueueSize() - 1;
  116. while(index >= 0)
  117. {
  118. OneSecondData* data = m_queueResultData->at(index);
  119. if(data == nullptr)
  120. {
  121. --index;
  122. continue;
  123. }
  124. /* 找到相等的位置 */
  125. if(data->startTime == itBack->startTime)
  126. {
  127. break;
  128. }
  129. --index;
  130. }
  131. if(index < 0)
  132. {
  133. SPDLOG_LOGGER_WARN(m_logger, "{} 获取最新数据失败,未找到相等的时间戳,将清空队列,拷贝全部数据", m_logBase);
  134. SPDLOG_LOGGER_WARN(m_logger, "队列大小: {}, 目标队列大小: {}", m_queueResultData->QueueSize(), resultQueue.QueueSize());
  135. /* 清空目标队列,将所有的数据全部拷贝 */
  136. while(!resultQueue.isEmpty())
  137. {
  138. OneSecondData* data = resultQueue.front_pop();
  139. if(data != nullptr)
  140. {
  141. delete data;
  142. data = nullptr;
  143. }
  144. }
  145. /* 拷贝全部数据 */
  146. for(int i = 0; i < m_queueResultData->QueueSize(); ++i)
  147. {
  148. OneSecondData* data = m_queueResultData->at(i);
  149. if(data != nullptr)
  150. {
  151. OneSecondData* newData = new OneSecondData(*data); // 深拷贝
  152. resultQueue.push(newData);
  153. }
  154. }
  155. return true;
  156. }
  157. else if(index == m_queueResultData->QueueSize() - 1)
  158. {
  159. // 已经是最新的数据了,不需要拷贝
  160. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 获取最新数据,已经是最新的数据了", m_logBase);
  161. return false;
  162. }
  163. /* 拷贝数据 */
  164. for(; index < m_queueResultData->QueueSize(); ++index)
  165. {
  166. OneSecondData* data = m_queueResultData->at(index);
  167. if(data == nullptr)
  168. {
  169. continue;
  170. }
  171. OneSecondData* newData = new OneSecondData(*data); // 深拷贝
  172. auto fornt = resultQueue.push(newData);
  173. if(fornt != nullptr)
  174. {
  175. delete fornt; // 出队的元素是之前的结果,释放内存
  176. fornt = nullptr;
  177. }
  178. }
  179. return true;
  180. }
  181. /**
  182. * @brief 获取最新的音量包结果
  183. *
  184. * @param listData
  185. * @param count 需要拷贝的数目,如果暂时没有这么多数据,则返回false
  186. * @return true 获取成功
  187. * @return false 无数据或者数据不足
  188. */
  189. bool CreateDBThread::getLatestRealTimeResult(std::list<OneDBData>& listData, const int getCount)
  190. {
  191. std::lock_guard<std::mutex> lock(m_mutexRealTimeResult);
  192. if(getCount <= 0)
  193. {
  194. return false;
  195. }
  196. if(m_listRealTimeResult.empty() || static_cast<int>(m_listRealTimeResult.size()) < getCount)
  197. {
  198. // SPDLOG_LOGGER_TRACE(m_logger, "{} 最新实时音量数据为空", m_logBase);
  199. return false;
  200. }
  201. auto it = m_listRealTimeResult.end();
  202. if(listData.empty())
  203. {
  204. SPDLOG_LOGGER_TRACE(m_logger, "{} 获取最新实时音量数据,列表为空,拷贝最新的{}个数据", m_logBase, getCount);
  205. for(int i = 0; i < getCount; ++i)
  206. {
  207. it--;
  208. }
  209. }else
  210. {
  211. /* 先向前移动getCount个数据,判断新数据是否满足 */
  212. auto oldData = listData.back();
  213. for(int i = 0; i < getCount; ++i)
  214. {
  215. it--;
  216. }
  217. if(it->startTime <= oldData.startTime)
  218. {
  219. return false;
  220. }
  221. /* 继续向前查找已发送的时间点 */
  222. /* 找到需要开始拷贝的元素位置 */
  223. while(it != m_listRealTimeResult.begin())
  224. {
  225. if(it->startTime <= oldData.startTime)
  226. {
  227. break; // 找到需要开始拷贝的元素位置
  228. }
  229. it--;
  230. }
  231. /* 上面的循环无法判断begin(),这里单独判断一下 */
  232. if(it == m_listRealTimeResult.begin())
  233. {
  234. if(it->startTime <= oldData.startTime)
  235. {
  236. /* begin()是最新的,指向下一个可以拷贝的数据 */
  237. it++;
  238. }else
  239. {
  240. /* 列表中所有的数据都比已发送的数据新,只拷贝最新的getCount个数据 */
  241. it = m_listRealTimeResult.end();
  242. for(int i = 0; i < getCount; ++i)
  243. {
  244. it--;
  245. }
  246. }
  247. }else
  248. {
  249. /* 移动到下一个可以拷贝的新数据位置 */
  250. it++;
  251. }
  252. }
  253. /* 拷贝数据 */
  254. listData.clear();
  255. for(int i = 0; it != m_listRealTimeResult.end() && i < getCount; ++it, ++i)
  256. {
  257. listData.push_back(*it);
  258. }
  259. // SPDLOG_LOGGER_TRACE(m_logger, "{} 获取最新实时音量数据成功,数量: {}", m_logBase, listData.size());
  260. return true;
  261. }
  262. /* 获取一个最新音量值 */
  263. bool CreateDBThread::getLatestRealTimeResult(OneDBData& resultData)
  264. {
  265. std::lock_guard<std::mutex> lock(m_mutexRealTimeResult);
  266. if(m_listRealTimeResult.empty())
  267. {
  268. // SPDLOG_LOGGER_TRACE(m_logger, "{} 最新实时音量数据为空", m_logBase);
  269. return false;
  270. }
  271. resultData = m_listRealTimeResult.front();
  272. m_listRealTimeResult.pop_front();
  273. return true;
  274. }
  275. /* 计算音量和反相的线程函数 */
  276. void CreateDBThread::task()
  277. {
  278. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始计算音量线程", m_logBase);
  279. /* 初始化一些数据 */
  280. if(!initData())
  281. {
  282. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  283. return;
  284. }
  285. while(m_isRunning)
  286. {
  287. /*--------------------------------------------------------------
  288. * 计算实时音量值(这个数据量很小,大概只有33ms,所以阻塞在这里)
  289. *--------------------------------------------------------------*/
  290. /* 判断实时数据列表是否有数据,没有数据则阻塞,直到有数据 */
  291. auto rtData = m_queueRealTimeData.front();
  292. if(rtData == nullptr)
  293. {
  294. SPDLOG_LOGGER_DEBUG(m_logger, "{} 实时数据队列为空,等待数据", m_logBase);
  295. continue;
  296. }
  297. // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
  298. /* 计算实时音量值 */
  299. calculateRealTimeVolume();
  300. // std::chrono::steady_clock::time_point rtEndTime = std::chrono::steady_clock::now();
  301. // std::chrono::microseconds rtDuration = std::chrono::duration_cast<std::chrono::microseconds>(rtEndTime - startTime);
  302. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 计算实时音量包完成,耗时: {} 微秒", m_logBase, rtDuration.count());
  303. /*--------------------------------------------------------------
  304. * 计算常规音量值,1秒计算一次
  305. *--------------------------------------------------------------*/
  306. calculateVolumeData();
  307. // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now();
  308. // std::chrono::microseconds duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - rtEndTime);
  309. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 计算1秒的音量包完成,耗时: {} 微秒", m_logBase, duration.count());
  310. }
  311. /* 清理数据 */
  312. clearData();
  313. SPDLOG_LOGGER_WARN(m_logger, "➢ {} 计算音量线程结束 ", m_logBase);
  314. }
  315. /* 初始化一些数据 */
  316. bool CreateDBThread::initData()
  317. {
  318. int queueSize = GInfo.queueElementCount();
  319. /* 初始化一些数据 */
  320. m_queueAudioData.clearQueue();
  321. m_queueAudioData.setQueueCapacity(queueSize);
  322. m_queueRealTimeData.setQueueCapacity(queueSize);
  323. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  324. m_numChannels = GInfo.numChannels(); /* 声道数 */
  325. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  326. /* 一秒钟数据大小,单位:字节 */
  327. m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8);
  328. /* 计算每个音量值需要的数据大小,单位数据单位: short,一个通道的大小 */
  329. m_oneDBLengthOfSrcData = m_sampleRate / VOLUME_INFO_NUM;
  330. m_singleDataLength = m_oneSecondSize / m_numChannels;
  331. /* 计算音量和反相的环形队列元素数目,默认是180个,即180秒 */
  332. // m_queueAudioDataCapacity = queueSize;
  333. m_queueResultData = new RingQueueManualMutex<OneSecondData*>(queueSize);
  334. m_queueResultData->mutex.lock();
  335. m_queueResultData->setDefaultValue(nullptr);
  336. m_queueResultData->mutex.unlock();
  337. m_numMaxResultData = 60;
  338. return true;
  339. }
  340. /* 清理数据 */
  341. void CreateDBThread::clearData()
  342. {
  343. /* 清理环形队列 */
  344. while(!m_queueAudioData.isEmpty())
  345. {
  346. AudioSrcData* data = m_queueAudioData.front_pop();
  347. if(data != nullptr)
  348. {
  349. delete data;
  350. data = nullptr;
  351. }
  352. }
  353. // if(m_remainData != nullptr)
  354. // {
  355. // delete m_remainData;
  356. // m_remainData = nullptr;
  357. // }
  358. }
  359. /* 计算音量和反相 */
  360. bool CreateDBThread::CreateDBPhase(AudioSrcData* audioData)
  361. {
  362. short* pWaveVu = reinterpret_cast<short*>(audioData->pData);
  363. /* 一秒钟平分30个音量值,每个音量值占有的长度 */
  364. const int oneDBSize = m_oneDBLengthOfSrcData;
  365. StAudioNum audioInfo;
  366. audioInfo.cardRoadInfo = m_threadInfo.cardRoadInfo; // 设置通道信息
  367. audioInfo.nTotal = m_singleDataLength;
  368. int iCurPos = 0;
  369. /* 存储结果 */
  370. m_result = new OneSecondData();
  371. /* 计算音量和反相的计算类 */
  372. CAudio2ChanCorrelator audioCor;
  373. /* 音量值计算,好多个字节计算出一个最大音量值 */
  374. for(int i = 0; i < VOLUME_INFO_NUM; ++i)
  375. {
  376. // 采样点最大值
  377. short sMaxA, sMaxB, sRMSA, sRMSB;
  378. audioCor.CorrelateChunks(pWaveVu + iCurPos, (pWaveVu + iCurPos + 1), oneDBSize, &sMaxA, &sMaxB, &sRMSA, &sRMSB, audioInfo);
  379. /* 这里乘2是要增加两个通道的数据大小 */
  380. iCurPos += (oneDBSize * 2);
  381. m_result->aryLeftDB[i] = calculateDB(sMaxA);
  382. m_result->aryRightDB[i] = calculateDB(sMaxB);
  383. // fmt::print("音量计算: sMaxA: {}, sMaxB: {}, sRMSA: {}, sRMSB: {}, LDB:{}, RDB:{}\n", sMaxA, sMaxB, sRMSA, sRMSB,
  384. // m_result->aryLeftDB[i], m_result->aryRightDB[i]);
  385. // 获取反相值,-100 到 100;反相是左右声道比对得到的值,所以只有一个
  386. int iReversed = audioCor.GetCorrelationLevel();
  387. if(iReversed > REVERSED_MAX_VALUE || iReversed < REVERSED_MIN_VALUE)
  388. {
  389. iReversed = 0;
  390. }
  391. // 和反相阈值比较,来判定是否为反相
  392. float dReversed = iReversed / 100.00;
  393. m_result->aryPhase[i] = dReversed;
  394. }
  395. /* 设置时间戳 */
  396. m_result->startTime = audioData->startTime;
  397. m_result->endTime = audioData->endTime;
  398. return true;
  399. }
  400. /* 计算音量数据 */
  401. bool CreateDBThread::calculateVolumeData()
  402. {
  403. /* 判断是否够一秒的数据 */
  404. if(m_queueAudioData.isEmpty())
  405. {
  406. return false;;
  407. }
  408. auto audioData = m_queueAudioData.front_pop();
  409. if(audioData == nullptr)
  410. {
  411. SPDLOG_LOGGER_WARN(m_logger, "{} 从环形队列中取出数据失败,可能是队列为空", m_logBase);
  412. // std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 等待一段时间再尝试
  413. return false;;
  414. }
  415. /* 计算音量和反相,结果存储在m_result */
  416. if(!CreateDBPhase(audioData))
  417. {
  418. SPDLOG_LOGGER_ERROR(m_logger, "{} 计算音量和反相失败", m_logBase);
  419. }
  420. /* 将结果放入结果队列 */
  421. if(m_result != nullptr)
  422. {
  423. std::lock_guard<std::mutex> lock(m_queueResultData->mutex);
  424. auto result = m_queueResultData->push(m_result);
  425. if(result != nullptr)
  426. {
  427. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 队列已满,出队一个元素", m_logBase);
  428. delete result; // 出队的元素是之前的结果,释放内存
  429. result = nullptr;
  430. }
  431. m_result = nullptr; // 清空结果指针,等待下一次使用
  432. }
  433. delete audioData; // 释放内存
  434. audioData = nullptr;
  435. return true;
  436. }
  437. /* 计算实时音量值 */
  438. bool CreateDBThread::calculateRealTimeVolume()
  439. {
  440. /* 一秒钟平分30个音量值,每个音量值占有的长度 */
  441. const int oneDBSize = m_oneDBLengthOfSrcData;
  442. while(!m_queueRealTimeData.isEmpty())
  443. {
  444. /* 取出一个数据,没有数据则阻塞住 */
  445. AudioSrcData* audioData = m_queueRealTimeData.front_pop_noBlock();
  446. if(audioData == nullptr)
  447. {
  448. continue;
  449. }
  450. /* 计算音量和反相的计算类 */
  451. CAudio2ChanCorrelator audioCor;
  452. StAudioNum audioInfo;
  453. audioInfo.cardRoadInfo = m_threadInfo.cardRoadInfo; // 设置通道信息
  454. audioInfo.nTotal = m_singleDataLength;
  455. short* pData = reinterpret_cast<short*>(audioData->pData);
  456. // 采样点最大值
  457. short sMaxA, sMaxB, sRMSA, sRMSB;
  458. audioCor.CorrelateChunks(pData, pData + 1, oneDBSize, &sMaxA, &sMaxB, &sRMSA, &sRMSB, audioInfo);
  459. /* 这里乘2是要增加两个通道的数据大小 */
  460. int leftDB = calculateDB(sMaxA);
  461. int rightDB = calculateDB(sMaxB);
  462. // fmt::print("音量计算: sMaxA: {}, sMaxB: {}, sRMSA: {}, sRMSB: {}, LDB:{}, RDB:{}\n", sMaxA, sMaxB, sRMSA, sRMSB,
  463. // m_result->aryLeftDB[i], m_result->aryRightDB[i]);
  464. // 获取反相值,-100 到 100;反相是左右声道比对得到的值,所以只有一个
  465. int iReversed = audioCor.GetCorrelationLevel();
  466. if(iReversed > REVERSED_MAX_VALUE || iReversed < REVERSED_MIN_VALUE)
  467. {
  468. iReversed = 0;
  469. }
  470. // 和反相阈值比较,来判定是否为反相
  471. float dReversed = iReversed / 100.00;
  472. OneDBData oneDBData;
  473. oneDBData.leftDB = leftDB;
  474. oneDBData.rightDB = rightDB;
  475. oneDBData.phase = dReversed;
  476. oneDBData.startTime = audioData->startTime;
  477. oneDBData.endTime = audioData->endTime;
  478. /* 保存结果 */
  479. std::lock_guard<std::mutex> lock(m_mutexRealTimeResult);
  480. m_listRealTimeResult.push_back(oneDBData);
  481. while(static_cast<int>(m_listRealTimeResult.size()) > m_numMaxResultData)
  482. {
  483. /* 超过最大数量,删除最早的一个 */
  484. m_listRealTimeResult.pop_front();
  485. }
  486. /* 删除元素内存 */
  487. delete audioData;
  488. audioData = nullptr;
  489. }
  490. return true;
  491. }