CreateDBThread.cpp 18 KB


  1. #include "CreateDBThread.h"
  2. #include "GlobalVariable.h"
  3. #include "GlobalInfo.h"
  4. #include "CalculateAudio.h"
  5. #include "spdlog.h"
  6. CreateDBThread::CreateDBThread(RecordThreadInfo_t& threadInfo)
  7. : BaseRecordThread(threadInfo)
  8. {
  9. }
  10. CreateDBThread::~CreateDBThread()
  11. {
  12. }
  13. /* 设置数据 */
  14. bool CreateDBThread::setData(const AudioSrcData& srcData)
  15. {
  16. if(srcData.pData == nullptr || srcData.dataSize == 0)
  17. {
  18. SPDLOG_LOGGER_ERROR(m_logger, "{} 设置数据失败,srcData为空或dataSize为0", m_logBase);
  19. return false;
  20. }
  21. /* 创建一个新的AudioSrcData对象 */
  22. AudioSrcData* audioData = new AudioSrcData(srcData);
  23. if(audioData == nullptr)
  24. {
  25. SPDLOG_LOGGER_ERROR(m_logger, "{} 创建AudioSrcData对象失败", m_logBase);
  26. return false;
  27. }
  28. /* 判断环形队列是否满 */
  29. if(m_queueAudioData.isFull())
  30. {
  31. /* 出队一个最早的元素 */
  32. AudioSrcData* oldData = m_queueAudioData.front_pop();
  33. SPDLOG_LOGGER_WARN(m_logger, "{} 环形队列已满,出队一个元素,时间: {}, 大小: {}",
  34. m_logBase, oldData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), oldData->dataSize);
  35. if(oldData != nullptr){
  36. delete oldData;
  37. oldData = nullptr;
  38. }
  39. }
  40. /* 入队新的数据 */
  41. if(!m_queueAudioData.push_noBlock(audioData))
  42. {
  43. SPDLOG_LOGGER_ERROR(m_logger, "{} 数据加入环形队列失败", m_logBase);
  44. delete audioData; // 入队失败,释放内存
  45. audioData = nullptr;
  46. return false;
  47. }
  48. return true;
  49. }
  50. /* 设置实时数据 */
  51. bool CreateDBThread::setRealTimeData(const AudioSrcData& srcData)
  52. {
  53. auto oldData = m_queueRealTimeData.push_pop(new AudioSrcData(srcData));
  54. if(oldData != nullptr)
  55. {
  56. /* 这里一般不会满,会被实时消耗,如果满了,说明出现问题了 */
  57. SPDLOG_LOGGER_WARN(m_logger, "{} 实时数据环形队列已满,出队一个元素,时间: {}, 大小: {}",
  58. m_logBase, oldData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), oldData->dataSize);
  59. delete oldData;
  60. oldData = nullptr;
  61. }
  62. return true;
  63. }
  64. /* 获取最新的结果,根据时间进行对比,最新的时间比传入的晚,就是有新的数据了 */
  65. bool CreateDBThread::getLatestResult(OneSecondData& resultData)
  66. {
  67. if(resultData.startTime.isNull())
  68. {
  69. // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取最新结果失败,传入的时间戳为空", m_logBase);
  70. return false;
  71. }
  72. std::lock_guard<std::mutex> lock(m_queueResultData->mutex);
  73. auto lastSecondData = m_queueResultData->back();
  74. if(lastSecondData == nullptr)
  75. {
  76. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取最新结果失败,结果队列为空", m_logBase);
  77. return false;
  78. }
  79. /* 比较时间戳 */
  80. if(lastSecondData->startTime <= resultData.startTime)
  81. {
  82. return false;
  83. }
  84. resultData = *lastSecondData;
  85. return true;
  86. }
  87. /* 获取最新的结果,让整个环形队列相等 */
  88. bool CreateDBThread::getLatestResult(RingQueueManualMutex<OneSecondData*>& resultQueue)
  89. {
  90. std::lock_guard<std::mutex> lock(m_queueResultData->mutex);
  91. if(m_queueResultData->isEmpty())
  92. {
  93. // SPDLOG_LOGGER_TRACE(m_logger, "{} 获取最新数据,数据队列为空", m_logBase);
  94. return false;
  95. }
  96. /* 目标队列为空,全部拷贝 */
  97. if(resultQueue.isEmpty())
  98. {
  99. SPDLOG_LOGGER_DEBUG(m_logger, "{} 获取最新数据,目标队列为空,拷贝整个结果队列", m_logBase);
  100. for(int i = 0; i < m_queueResultData->QueueSize(); ++i)
  101. {
  102. OneSecondData* data = m_queueResultData->at(i);
  103. if(data != nullptr)
  104. {
  105. OneSecondData* newData = new OneSecondData(*data); // 深拷贝
  106. resultQueue.push(newData);
  107. }
  108. }
  109. return true;
  110. }
  111. /* 队列不为空,查找队列中相等的位置,拷贝后面的数据,不拷贝整个队列,减少开销
  112. * 这里直接从最新的数据往前找 */
  113. auto itBack = resultQueue.back();
  114. int index = m_queueResultData->QueueSize() - 1;
  115. while(index >= 0)
  116. {
  117. OneSecondData* data = m_queueResultData->at(index);
  118. if(data == nullptr)
  119. {
  120. --index;
  121. continue;
  122. }
  123. /* 找到相等的位置 */
  124. if(data->startTime == itBack->startTime)
  125. {
  126. break;
  127. }
  128. --index;
  129. }
  130. if(index < 0)
  131. {
  132. SPDLOG_LOGGER_WARN(m_logger, "{} 获取最新数据失败,未找到相等的时间戳,将清空队列,拷贝全部数据", m_logBase);
  133. SPDLOG_LOGGER_WARN(m_logger, "队列大小: {}, 目标队列大小: {}", m_queueResultData->QueueSize(), resultQueue.QueueSize());
  134. /* 清空目标队列,将所有的数据全部拷贝 */
  135. while(!resultQueue.isEmpty())
  136. {
  137. OneSecondData* data = resultQueue.front_pop();
  138. if(data != nullptr)
  139. {
  140. delete data;
  141. data = nullptr;
  142. }
  143. }
  144. /* 拷贝全部数据 */
  145. for(int i = 0; i < m_queueResultData->QueueSize(); ++i)
  146. {
  147. OneSecondData* data = m_queueResultData->at(i);
  148. if(data != nullptr)
  149. {
  150. OneSecondData* newData = new OneSecondData(*data); // 深拷贝
  151. resultQueue.push(newData);
  152. }
  153. }
  154. return true;
  155. }
  156. else if(index == m_queueResultData->QueueSize() - 1)
  157. {
  158. // 已经是最新的数据了,不需要拷贝
  159. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 获取最新数据,已经是最新的数据了", m_logBase);
  160. return false;
  161. }
  162. /* 拷贝数据 */
  163. for(; index < m_queueResultData->QueueSize(); ++index)
  164. {
  165. OneSecondData* data = m_queueResultData->at(index);
  166. if(data == nullptr)
  167. {
  168. continue;
  169. }
  170. OneSecondData* newData = new OneSecondData(*data); // 深拷贝
  171. auto fornt = resultQueue.push(newData);
  172. if(fornt != nullptr)
  173. {
  174. delete fornt; // 出队的元素是之前的结果,释放内存
  175. fornt = nullptr;
  176. }
  177. }
  178. return true;
  179. }
  180. /**
  181. * @brief 获取最新的音量包结果
  182. *
  183. * @param listData
  184. * @param count 需要拷贝的数目,如果暂时没有这么多数据,则返回false
  185. * @return true 获取成功
  186. * @return false 无数据或者数据不足
  187. */
  188. bool CreateDBThread::getLatestRealTimeResult(std::list<OneDBData>& listData, const int getCount)
  189. {
  190. std::lock_guard<std::mutex> lock(m_mutexRealTimeResult);
  191. if(getCount <= 0)
  192. {
  193. return false;
  194. }
  195. if(m_listRealTimeResult.empty() || static_cast<int>(m_listRealTimeResult.size()) < getCount)
  196. {
  197. // SPDLOG_LOGGER_TRACE(m_logger, "{} 最新实时音量数据为空", m_logBase);
  198. return false;
  199. }
  200. auto it = m_listRealTimeResult.end();
  201. if(listData.empty())
  202. {
  203. SPDLOG_LOGGER_TRACE(m_logger, "{} 获取最新实时音量数据,列表为空,拷贝最新的{}个数据", m_logBase, getCount);
  204. for(int i = 0; i < getCount; ++i)
  205. {
  206. it--;
  207. }
  208. }else
  209. {
  210. /* 先向前移动getCount个数据,判断新数据是否满足 */
  211. auto oldData = listData.back();
  212. for(int i = 0; i < getCount; ++i)
  213. {
  214. it--;
  215. }
  216. if(it->startTime <= oldData.startTime)
  217. {
  218. return false;
  219. }
  220. /* 继续向前查找已发送的时间点 */
  221. /* 找到需要开始拷贝的元素位置 */
  222. while(it != m_listRealTimeResult.begin())
  223. {
  224. if(it->startTime <= oldData.startTime)
  225. {
  226. break; // 找到需要开始拷贝的元素位置
  227. }
  228. it--;
  229. }
  230. /* 上面的循环无法判断begin(),这里单独判断一下 */
  231. if(it == m_listRealTimeResult.begin())
  232. {
  233. if(it->startTime <= oldData.startTime)
  234. {
  235. /* begin()是最新的,指向下一个可以拷贝的数据 */
  236. it++;
  237. }else
  238. {
  239. /* 列表中所有的数据都比已发送的数据新,只拷贝最新的getCount个数据 */
  240. it = m_listRealTimeResult.end();
  241. for(int i = 0; i < getCount; ++i)
  242. {
  243. it--;
  244. }
  245. }
  246. }else
  247. {
  248. /* 移动到下一个可以拷贝的新数据位置 */
  249. it++;
  250. }
  251. }
  252. /* 拷贝数据 */
  253. listData.clear();
  254. for(int i = 0; it != m_listRealTimeResult.end() && i < getCount; ++it, ++i)
  255. {
  256. listData.push_back(*it);
  257. }
  258. // SPDLOG_LOGGER_TRACE(m_logger, "{} 获取最新实时音量数据成功,数量: {}", m_logBase, listData.size());
  259. return true;
  260. }
  261. /* 获取一个最新音量值 */
  262. bool CreateDBThread::getLatestRealTimeResult(OneDBData& resultData)
  263. {
  264. std::lock_guard<std::mutex> lock(m_mutexRealTimeResult);
  265. if(m_listRealTimeResult.empty())
  266. {
  267. // SPDLOG_LOGGER_TRACE(m_logger, "{} 最新实时音量数据为空", m_logBase);
  268. return false;
  269. }
  270. resultData = m_listRealTimeResult.front();
  271. m_listRealTimeResult.pop_front();
  272. return true;
  273. }
  274. /* 计算音量和反相的线程函数 */
  275. void CreateDBThread::task()
  276. {
  277. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始计算音量线程", m_logBase);
  278. /* 初始化一些数据 */
  279. if(!initData())
  280. {
  281. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  282. return;
  283. }
  284. while(m_isRunning)
  285. {
  286. /*--------------------------------------------------------------
  287. * 计算实时音量值(这个数据量很小,大概只有33ms,所以阻塞在这里)
  288. *--------------------------------------------------------------*/
  289. /* 判断实时数据列表是否有数据,没有数据则阻塞,直到有数据 */
  290. auto rtData = m_queueRealTimeData.front();
  291. if(rtData == nullptr)
  292. {
  293. SPDLOG_LOGGER_DEBUG(m_logger, "{} 实时数据队列为空,等待数据", m_logBase);
  294. continue;
  295. }
  296. // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
  297. /* 计算实时音量值 */
  298. calculateRealTimeVolume();
  299. // std::chrono::steady_clock::time_point rtEndTime = std::chrono::steady_clock::now();
  300. // std::chrono::microseconds rtDuration = std::chrono::duration_cast<std::chrono::microseconds>(rtEndTime - startTime);
  301. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 计算实时音量包完成,耗时: {} 微秒", m_logBase, rtDuration.count());
  302. /*--------------------------------------------------------------
  303. * 计算常规音量值,1秒计算一次
  304. *--------------------------------------------------------------*/
  305. /* 判断是否够一秒的数据 */
  306. if(m_queueAudioData.isEmpty())
  307. {
  308. continue;
  309. }
  310. auto audioData = m_queueAudioData.front_pop();
  311. if(audioData == nullptr)
  312. {
  313. SPDLOG_LOGGER_WARN(m_logger, "{} 从环形队列中取出数据失败,可能是队列为空", m_logBase);
  314. // std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 等待一段时间再尝试
  315. continue;
  316. }
  317. /* 计算音量和反相,结果存储在m_result */
  318. if(!CreateDBPhase(audioData))
  319. {
  320. SPDLOG_LOGGER_ERROR(m_logger, "{} 计算音量和反相失败", m_logBase);
  321. }
  322. /* 将结果放入结果队列 */
  323. if(m_result != nullptr)
  324. {
  325. std::lock_guard<std::mutex> lock(m_queueResultData->mutex);
  326. auto result = m_queueResultData->push(m_result);
  327. if(result != nullptr)
  328. {
  329. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 队列已满,出队一个元素", m_logBase);
  330. delete result; // 出队的元素是之前的结果,释放内存
  331. result = nullptr;
  332. }
  333. m_result = nullptr; // 清空结果指针,等待下一次使用
  334. }
  335. delete audioData; // 释放内存
  336. audioData = nullptr;
  337. // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now();
  338. // std::chrono::microseconds duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - rtEndTime);
  339. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 计算1秒的音量包完成,耗时: {} 微秒", m_logBase, duration.count());
  340. }
  341. /* 清理数据 */
  342. clearData();
  343. SPDLOG_LOGGER_WARN(m_logger, "➢ {} 计算音量线程结束 ", m_logBase);
  344. }
  345. /* 初始化一些数据 */
  346. bool CreateDBThread::initData()
  347. {
  348. int queueSize = GInfo.queueElementCount();
  349. /* 初始化一些数据 */
  350. m_queueAudioData.clearQueue();
  351. m_queueAudioData.setQueueCapacity(queueSize);
  352. m_queueRealTimeData.setQueueCapacity(queueSize);
  353. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  354. m_numChannels = GInfo.numChannels(); /* 声道数 */
  355. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  356. /* 一秒钟数据大小,单位:字节 */
  357. m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8);
  358. /* 计算每个音量值需要的数据大小,单位数据单位: short,一个通道的大小 */
  359. m_oneDBLengthOfSrcData = m_sampleRate / VOLUME_INFO_NUM;
  360. m_singleDataLength = m_oneSecondSize / m_numChannels;
  361. /* 计算音量和反相的环形队列元素数目,默认是180个,即180秒 */
  362. // m_queueAudioDataCapacity = queueSize;
  363. m_queueResultData = new RingQueueManualMutex<OneSecondData*>(queueSize);
  364. m_queueResultData->mutex.lock();
  365. m_queueResultData->setDefaultValue(nullptr);
  366. m_queueResultData->mutex.unlock();
  367. m_numMaxResultData = 60;
  368. return true;
  369. }
  370. /* 清理数据 */
  371. void CreateDBThread::clearData()
  372. {
  373. /* 清理环形队列 */
  374. while(!m_queueAudioData.isEmpty())
  375. {
  376. AudioSrcData* data = m_queueAudioData.front_pop();
  377. if(data != nullptr)
  378. {
  379. delete data;
  380. data = nullptr;
  381. }
  382. }
  383. // if(m_remainData != nullptr)
  384. // {
  385. // delete m_remainData;
  386. // m_remainData = nullptr;
  387. // }
  388. }
  389. /* 计算音量和反相 */
  390. bool CreateDBThread::CreateDBPhase(AudioSrcData* audioData)
  391. {
  392. short* pWaveVu = reinterpret_cast<short*>(audioData->pData);
  393. /* 一秒钟平分30个音量值,每个音量值占有的长度 */
  394. const int oneDBSize = m_oneDBLengthOfSrcData;
  395. StAudioNum audioInfo;
  396. audioInfo.roadInfo = m_threadInfo.cardRoadInfo; // 设置通道信息
  397. audioInfo.nTotal = m_singleDataLength;
  398. int iCurPos = 0;
  399. /* 存储结果 */
  400. m_result = new OneSecondData();
  401. /* 计算音量和反相的计算类 */
  402. CAudio2ChanCorrelator audioCor;
  403. /* 音量值计算,好多个字节计算出一个最大音量值 */
  404. for(int i = 0; i < VOLUME_INFO_NUM; ++i)
  405. {
  406. // 采样点最大值
  407. short sMaxA, sMaxB, sRMSA, sRMSB;
  408. audioCor.CorrelateChunks(pWaveVu + iCurPos, (pWaveVu + iCurPos + 1), oneDBSize, &sMaxA, &sMaxB, &sRMSA, &sRMSB, audioInfo);
  409. /* 这里乘2是要增加两个通道的数据大小 */
  410. iCurPos += (oneDBSize * 2);
  411. m_result->aryLeftDB[i] = calculateDB(sMaxA);
  412. m_result->aryRightDB[i] = calculateDB(sMaxB);
  413. // fmt::print("音量计算: sMaxA: {}, sMaxB: {}, sRMSA: {}, sRMSB: {}, LDB:{}, RDB:{}\n", sMaxA, sMaxB, sRMSA, sRMSB,
  414. // m_result->aryLeftDB[i], m_result->aryRightDB[i]);
  415. // 获取反相值,-100 到 100;反相是左右声道比对得到的值,所以只有一个
  416. int iReversed = audioCor.GetCorrelationLevel();
  417. if(iReversed > REVERSED_MAX_VALUE || iReversed < REVERSED_MIN_VALUE)
  418. {
  419. iReversed = 0;
  420. }
  421. // 和反相阈值比较,来判定是否为反相
  422. float dReversed = iReversed / 100.00;
  423. m_result->aryPhase[i] = dReversed;
  424. }
  425. /* 设置时间戳 */
  426. m_result->startTime = audioData->startTime;
  427. m_result->endTime = audioData->endTime;
  428. return true;
  429. }
  430. /* 计算实时音量值 */
  431. bool CreateDBThread::calculateRealTimeVolume()
  432. {
  433. /* 一秒钟平分30个音量值,每个音量值占有的长度 */
  434. const int oneDBSize = m_oneDBLengthOfSrcData;
  435. while(!m_queueRealTimeData.isEmpty())
  436. {
  437. /* 取出一个数据,没有数据则阻塞住 */
  438. AudioSrcData* audioData = m_queueRealTimeData.front_pop_noBlock();
  439. if(audioData == nullptr)
  440. {
  441. continue;
  442. }
  443. /* 计算音量和反相的计算类 */
  444. CAudio2ChanCorrelator audioCor;
  445. StAudioNum audioInfo;
  446. audioInfo.roadInfo = m_threadInfo.cardRoadInfo; // 设置通道信息
  447. audioInfo.nTotal = m_singleDataLength;
  448. short* pData = reinterpret_cast<short*>(audioData->pData);
  449. // 采样点最大值
  450. short sMaxA, sMaxB, sRMSA, sRMSB;
  451. audioCor.CorrelateChunks(pData, pData + 1, oneDBSize, &sMaxA, &sMaxB, &sRMSA, &sRMSB, audioInfo);
  452. /* 这里乘2是要增加两个通道的数据大小 */
  453. int leftDB = calculateDB(sMaxA);
  454. int rightDB = calculateDB(sMaxB);
  455. // fmt::print("音量计算: sMaxA: {}, sMaxB: {}, sRMSA: {}, sRMSB: {}, LDB:{}, RDB:{}\n", sMaxA, sMaxB, sRMSA, sRMSB,
  456. // m_result->aryLeftDB[i], m_result->aryRightDB[i]);
  457. // 获取反相值,-100 到 100;反相是左右声道比对得到的值,所以只有一个
  458. int iReversed = audioCor.GetCorrelationLevel();
  459. if(iReversed > REVERSED_MAX_VALUE || iReversed < REVERSED_MIN_VALUE)
  460. {
  461. iReversed = 0;
  462. }
  463. // 和反相阈值比较,来判定是否为反相
  464. float dReversed = iReversed / 100.00;
  465. OneDBData oneDBData;
  466. oneDBData.leftDB = leftDB;
  467. oneDBData.rightDB = rightDB;
  468. oneDBData.phase = dReversed;
  469. oneDBData.startTime = audioData->startTime;
  470. oneDBData.endTime = audioData->endTime;
  471. /* 保存结果 */
  472. std::lock_guard<std::mutex> lock(m_mutexRealTimeResult);
  473. m_listRealTimeResult.push_back(oneDBData);
  474. while(static_cast<int>(m_listRealTimeResult.size()) > m_numMaxResultData)
  475. {
  476. /* 超过最大数量,删除最早的一个 */
  477. m_listRealTimeResult.pop_front();
  478. }
  479. }
  480. return true;
  481. }