AssignSrcDataThread.cpp 15 KB


  1. #include "AssignSrcDataThread.h"
  2. #include "ThreadManager.h"
  3. #include "AudioData.h"
  4. #include "GlobalInfo.h"
  5. #include "RecordThread.h"
  6. #include "spdlog.h"
  7. #include <QReadWriteLock>
  8. #include <QWriteLocker>
  9. #include <QReadLocker>
  10. #include <cstdint>
  11. #include "CreateDBThread.h"
  12. #include "CreateWAVThread.h"
  13. #include "CreateRecordFileThread.h"
  14. AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo)
  15. : BaseRecordThread(threadInfo)
  16. {
  17. }
  18. AssignSrcDataThread::~AssignSrcDataThread()
  19. {
  20. // if(m_pRwLock != nullptr)
  21. // {
  22. // delete m_pRwLock;
  23. // m_pRwLock = nullptr;
  24. // }
  25. for(auto& audioData : m_listAudioSrcData)
  26. {
  27. if(audioData != nullptr)
  28. {
  29. delete audioData;
  30. audioData = nullptr;
  31. }
  32. }
  33. m_listAudioSrcData.clear();
  34. if(m_dispatchSrcData != nullptr)
  35. {
  36. delete m_dispatchSrcData;
  37. m_dispatchSrcData = nullptr;
  38. }
  39. }
  40. /* 停止线程 */
  41. void AssignSrcDataThread::stopThread()
  42. {
  43. m_isRunning = false;
  44. m_condDataUpdate.notify_all(); // 通知所有等待的线程
  45. }
  46. /* 设置数据,这里不用 */
  47. bool AssignSrcDataThread::setData(const AudioSrcData& srcData)
  48. {
  49. return true;
  50. }
  51. /**
  52. * @brief 设置数据,输入小于1秒的数据
  53. *
  54. * @param srcData
  55. * @param dataSize
  56. * @param endTime
  57. * @return true
  58. * @return false
  59. */
  60. bool AssignSrcDataThread::setSrcData(const char* srcData, int32_t dataSize, QDateTime& endTime)
  61. {
  62. AudioSrcData* audioData = new AudioSrcData(dataSize);
  63. if(audioData == nullptr)
  64. {
  65. return false;
  66. }
  67. audioData->appendData(srcData, dataSize);
  68. audioData->endTime = endTime;
  69. /* 获取读写锁 */
  70. {
  71. // std::lock_guard<QReadWriteLock> lock(m_pRwLock);
  72. std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
  73. m_listAudioSrcData.push_back(audioData);
  74. m_listDataSize += dataSize; // 更新缓存中的数据大小
  75. m_isDataUpdate.store(true);
  76. }
  77. m_condDataUpdate.notify_one();
  78. // SPDLOG_LOGGER_WARN(m_logger, "{} 收到音频数据: 大小: {}, 时间: {}", m_logBase, dataSize, endTime.toString().toStdString());
  79. return true;
  80. }
  81. void AssignSrcDataThread::task()
  82. {
  83. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始分派数据线程 ", m_logBase);
  84. /* 初始化数据 */
  85. if(!initData())
  86. {
  87. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  88. return;
  89. }
  90. /* 获取需要分派数据的线程 */
  91. if(!getDispatchThread())
  92. {
  93. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取音频处理线程失败", m_logBase);
  94. return;
  95. }
  96. /* 将自身设置到录音线程中 */
  97. m_pThreadRecord->setAssignSrcDataThread(this);
  98. m_isRunning = true;
  99. while(m_isRunning)
  100. {
  101. /* 休眠一段时间 */
  102. // std::this_thread::sleep_for(std::chrono::milliseconds(10));
  103. std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
  104. m_condDataUpdate.wait(lock, [this] {
  105. return (m_isDataUpdate.load() || !m_isRunning.load());
  106. });
  107. m_isDataUpdate.store(false);
  108. // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
  109. /*------------------------------------------------------------------------
  110. * 分派实时数据
  111. *------------------------------------------------------------------------*/
  112. /* 获取最新的数据,给其添加开始时间戳 */
  113. auto latestData = m_listAudioSrcData.back();
  114. if(latestData == nullptr || latestData->isEmpty())
  115. {
  116. SPDLOG_LOGGER_ERROR(m_logger, "{} 分派数据线程获取到空数据", m_logBase);
  117. continue;
  118. }
  119. latestData->startTime = previTime(latestData->endTime, latestData->dataSize);
  120. sendRealTimeSrcData(*latestData);
  121. /*------------------------------------------------------------------------
  122. * 将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间
  123. *------------------------------------------------------------------------*/
  124. /* 判断数据是否满足1s大小 */
  125. if(!isFullOneSecondData())
  126. {
  127. continue;
  128. }
  129. /* 处理数据,将其拼接成1s的数据 */
  130. processData();
  131. // SPDLOG_LOGGER_TRACE(m_logger, "{} 开始分发数据,开始时间: {}, 结束时间: {}, 大小: {}",
  132. // m_logBase, m_dispatchSrcData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(),
  133. // m_dispatchSrcData->endTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), m_dispatchSrcData->dataSize);
  134. /*------------------------------------------------------------------------
  135. * 分派常规数据
  136. *------------------------------------------------------------------------*/
  137. /* 分派数据给各个线程 */
  138. sendRegularData(*m_dispatchSrcData);
  139. /* 清空分派数据 */
  140. m_dispatchSrcData->clear();
  141. // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now();
  142. // auto duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - startTime);
  143. // SPDLOG_LOGGER_TRACE(m_logger, "{} 分派数据耗时: {}us", m_logBase, duration.count());
  144. }
  145. /* 清理数据 */
  146. clearData();
  147. SPDLOG_LOGGER_WARN(m_logger, "➢ {} 结束分派数据线程", m_logBase);
  148. }
  149. /* 初始化一些数据 */
  150. bool AssignSrcDataThread::initData()
  151. {
  152. /* 初始化数据 */
  153. m_listAudioSrcData.clear(); // 清空数据列表
  154. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  155. m_numChannels = GInfo.numChannels(); /* 声道数 */
  156. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  157. m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8); /* 1秒数据大小 */
  158. m_dispatchSrcData = new AudioSrcData(m_oneSecondSize);
  159. m_lastSendTime = QDateTime::currentDateTime(); // 初始化最后发送时间
  160. /* 初始化读写锁 */
  161. // m_pRwLock = new QReadWriteLock();
  162. return true;
  163. }
  164. /* 清理数据 */
  165. void AssignSrcDataThread::clearData()
  166. {
  167. if(m_dispatchSrcData != nullptr)
  168. {
  169. delete m_dispatchSrcData;
  170. m_dispatchSrcData = nullptr;
  171. }
  172. // if(m_pCurrentSrcData != nullptr)
  173. // {
  174. // delete m_pCurrentSrcData;
  175. // m_pCurrentSrcData = nullptr;
  176. // }
  177. }
  178. /**
  179. * @brief 获取需要分派数据的线程
  180. * 1、获取的时候按照线程实时性、重要性等因素进行排序获取
  181. * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做)
  182. *
  183. * @return true
  184. * @return false
  185. */
  186. bool AssignSrcDataThread::getDispatchThread()
  187. {
  188. /* 根据生成数据文件的类型顺序获取,循环获取,直到所有的线程都获取到 */
  189. /* 先获取生成wav小文件的线程 */
  190. auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  191. if(pWavThread != nullptr)
  192. {
  193. // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pWavThread));
  194. m_pThreadCreateWAV = dynamic_cast<CreateWAVThread*>(pWavThread);
  195. }
  196. /* 获取生成音量和反相数据的线程 */
  197. auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CreateDB, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  198. if(pDBAndPhaseThread != nullptr)
  199. {
  200. // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pDBAndPhaseThread));
  201. m_pThreadCreateDB = dynamic_cast<CreateDBThread*>(pDBAndPhaseThread);
  202. }
  203. /* 获取生成长文件的线程 */
  204. auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  205. if(pLongFileThread != nullptr )
  206. {
  207. // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pLongFileThread));
  208. m_pThreadCreateLongFile = dynamic_cast<CreateRecordFileThread*>(pLongFileThread);
  209. }
  210. /* 获取发送RTP数据的线程 */
  211. m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  212. if(m_rtpSenderThread == nullptr)
  213. {
  214. // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase);
  215. // return false;
  216. }
  217. /* 最后获取录音线程信息,如果获取不到则一直获取 */
  218. while(true)
  219. {
  220. auto pThreadBase = ThreadMan.findRecordThread(EThreadType::Type_RecordSrc, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  221. if(pThreadBase != nullptr)
  222. {
  223. m_pThreadRecord = dynamic_cast<RecordThread*>(pThreadBase);
  224. break; // 找到录音线程了
  225. }
  226. /* 如果没有找到录音线程,则等待一段时间再继续查找 */
  227. std::this_thread::sleep_for(std::chrono::milliseconds(5));
  228. }
  229. return true;
  230. }
  231. /* 发送常规数据,对实时性要求不高的数据,数据时长1秒 */
  232. void AssignSrcDataThread::sendRegularData(const AudioSrcData& srcData)
  233. {
  234. /* 发送计算音量的数据 */
  235. if(m_pThreadCreateWAV != nullptr)
  236. {
  237. m_pThreadCreateWAV->setData(srcData);
  238. }
  239. /* 发送生成wav小文件的数据 */
  240. if(m_pThreadCreateDB != nullptr)
  241. {
  242. m_pThreadCreateDB->setData(srcData);
  243. }
  244. /* 发送生成长文件的数据 */
  245. if(m_pThreadCreateLongFile != nullptr)
  246. {
  247. m_pThreadCreateLongFile->setData(srcData);
  248. }
  249. }
  250. /* 判断是否满足1秒的数据 */
  251. bool AssignSrcDataThread::isFullOneSecondData() const
  252. {
  253. // QReadLocker locker(m_pRwLock);
  254. /* 判断缓存中的数据是否满足1秒大小 */
  255. if(m_listDataSize + m_remainingData.size() >= m_oneSecondSize)
  256. {
  257. return true;
  258. }
  259. return false;
  260. }
  261. /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒
  262. * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */
  263. bool AssignSrcDataThread::processData()
  264. {
  265. /* 处理数据 */
  266. m_dispatchSrcData->clear();
  267. while(true)
  268. {
  269. if(m_remainingData.size() > 0)
  270. {
  271. /* 拷贝剩余数据 */
  272. int32_t copySize = m_dispatchSrcData->appendData(m_remainingData.data(), m_remainingData.size());
  273. if(copySize < m_remainingData.size())
  274. {
  275. /* 剩余数据没拷贝完,可能已经满足一次了 */
  276. m_remainingData = m_remainingData.mid(copySize);
  277. m_dispatchSrcData->endTime = previTime(m_lastDataEndTime, m_remainingData.size());
  278. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  279. }else {
  280. /* 剩余数据拷贝完了,清空剩余数据 */
  281. m_remainingData.clear();
  282. m_dispatchSrcData->endTime = m_lastDataEndTime;
  283. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  284. }
  285. if(m_dispatchSrcData->isFull())
  286. {
  287. /* 如果已经满足1秒大小,则退出循环 */
  288. break;
  289. }
  290. }else
  291. {
  292. /* 没有剩余数据了,拷贝全新的数据 */
  293. auto pData = m_listAudioSrcData.front();
  294. m_listAudioSrcData.pop_front();
  295. // SPDLOG_LOGGER_TRACE(m_logger, "{} 一条数据 大小: {}, 时间: {}",
  296. // m_logBase, pData->dataSize, pData->endTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString());
  297. if(pData == nullptr)
  298. {
  299. continue;
  300. }
  301. int copySize = m_dispatchSrcData->appendData(pData->pData, pData->dataSize);
  302. if(m_dispatchSrcData->isFull())
  303. {
  304. /* 判断有无剩余数据 */
  305. if(copySize < pData->dataSize)
  306. {
  307. m_remainingData = QByteArray(pData->pData + copySize, pData->dataSize - copySize);
  308. m_lastDataEndTime = pData->endTime;
  309. } else {
  310. m_remainingData.clear();
  311. m_lastDataEndTime = QDateTime();
  312. }
  313. /* 更新结束时间 */
  314. m_dispatchSrcData->endTime = pData->endTime;
  315. /* 删除数据 */
  316. delete pData;
  317. pData = nullptr;
  318. m_listDataSize -= copySize;
  319. break;
  320. }
  321. delete pData;
  322. pData = nullptr;
  323. m_listDataSize -= copySize;
  324. }
  325. }
  326. /* 更新数据的开始时间 */
  327. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  328. // SPDLOG_LOGGER_TRACE(m_logger, "{} 一秒的数据拼接完成, 开始时间: {}, 结束时间: {}, 大小: {}",
  329. // m_logBase, m_dispatchSrcData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(),
  330. // m_dispatchSrcData->endTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), m_dispatchSrcData->dataSize);
  331. return true;
  332. }
  333. /* 发送实时音频数据 */
  334. void AssignSrcDataThread::sendRealTimeSrcData(const AudioSrcData& srcData)
  335. {
  336. /* 发送实时数据到RTP中 */
  337. if(m_rtpSenderThread != nullptr)
  338. {
  339. m_rtpSenderThread->setData(srcData);
  340. }
  341. /* 发送实时数据到创建音量包的线程 */
  342. if(m_pThreadCreateDB != nullptr)
  343. {
  344. m_pThreadCreateDB->setRealTimeData(srcData);
  345. }
  346. }
  347. /* 发送原始数据到Rtp中,实时发送,有新的就发送 */
  348. void AssignSrcDataThread::sendSrcDataToRtp(const AudioSrcData& srcData)
  349. {
  350. // QReadLocker locker(m_pRwLock);
  351. if(m_rtpSenderThread == nullptr)
  352. {
  353. return; // 如果没有RTP发送线程,则直接返回
  354. }
  355. m_rtpSenderThread->setData(srcData);
  356. // if(m_listAudioSrcData.empty())
  357. // {
  358. // return; // 如果没有数据,则直接返回
  359. // }
  360. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送RTP数据,队列大小: {}, 开始时间: {}, 当前时间:{}",
  361. // m_logBase, m_listAudioSrcData.count(), m_listAudioSrcData.back()->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(),
  362. // m_lastSendTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString());
  363. // auto it = m_listAudioSrcData.end();
  364. // if(it != m_listAudioSrcData.begin())
  365. // {
  366. // --it; // 移动到最后一个数据
  367. // }
  368. // /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */
  369. // for(; it != m_listAudioSrcData.begin(); --it)
  370. // {
  371. // if((*it)->startTime == m_lastSendTime)
  372. // {
  373. // break; // 找到已发送数据的位置
  374. // }
  375. // }
  376. // if(it == m_listAudioSrcData.begin())
  377. // {
  378. // SPDLOG_LOGGER_TRACE(m_logger, "{} RTP线程需要拷贝全部数据", m_logBase);
  379. // }
  380. // /* 开始发送数据 */
  381. // for(; it != m_listAudioSrcData.end(); ++it)
  382. // {
  383. // if(*it != nullptr)
  384. // {
  385. // m_rtpSenderThread->setData(**it);
  386. // m_lastSendTime = (*it)->startTime; // 更新已发送数据
  387. // }
  388. // }
  389. }