AssignSrcDataThread.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. #include "AssignSrcDataThread.h"
  2. #include "ThreadManager.h"
  3. #include "AudioData.h"
  4. #include "GlobalInfo.h"
  5. #include "RecordThread.h"
  6. #include "spdlog.h"
  7. #include <QReadWriteLock>
  8. #include <QWriteLocker>
  9. #include <QReadLocker>
  10. #include <cstdint>
  11. #include "CreateDBThread.h"
  12. #include "CreateWAVThread.h"
  13. #include "CreateLongFileThread.h"
  14. AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo)
  15. : BaseRecordThread(threadInfo)
  16. {
  17. }
  18. AssignSrcDataThread::~AssignSrcDataThread()
  19. {
  20. // if(m_pRwLock != nullptr)
  21. // {
  22. // delete m_pRwLock;
  23. // m_pRwLock = nullptr;
  24. // }
  25. for(auto& audioData : m_listAudioSrcData)
  26. {
  27. if(audioData != nullptr)
  28. {
  29. delete audioData;
  30. audioData = nullptr;
  31. }
  32. }
  33. m_listAudioSrcData.clear();
  34. if(m_dispatchSrcData != nullptr)
  35. {
  36. delete m_dispatchSrcData;
  37. m_dispatchSrcData = nullptr;
  38. }
  39. }
  40. /* 停止线程 */
  41. void AssignSrcDataThread::stopThread()
  42. {
  43. m_isRunning = false;
  44. m_condDataUpdate.notify_all(); // 通知所有等待的线程
  45. }
  46. /* 设置数据,这里不用 */
  47. bool AssignSrcDataThread::setData(const AudioSrcData& srcData)
  48. {
  49. return true;
  50. }
  51. /**
  52. * @brief 设置数据,输入小于1秒的数据
  53. *
  54. * @param srcData
  55. * @param dataSize
  56. * @param endTime
  57. * @return true
  58. * @return false
  59. */
  60. bool AssignSrcDataThread::setSrcData(const char* srcData, int32_t dataSize, QDateTime& endTime)
  61. {
  62. AudioSrcData* audioData = new AudioSrcData(dataSize);
  63. if(audioData == nullptr)
  64. {
  65. return false;
  66. }
  67. audioData->appendData(srcData, dataSize);
  68. audioData->endTime = endTime;
  69. /* 获取读写锁 */
  70. {
  71. // std::lock_guard<QReadWriteLock> lock(m_pRwLock);
  72. std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
  73. m_listAudioSrcData.push_back(audioData);
  74. m_listDataSize += dataSize; // 更新缓存中的数据大小
  75. m_isDataUpdate.store(true);
  76. }
  77. m_condDataUpdate.notify_one();
  78. // SPDLOG_LOGGER_WARN(m_logger, "{} 收到音频数据: 大小: {}, 时间: {}", m_logBase, dataSize, endTime.toString().toStdString());
  79. return true;
  80. }
  81. void AssignSrcDataThread::task()
  82. {
  83. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始分派数据线程 ", m_logBase);
  84. /* 初始化数据 */
  85. if(!initData())
  86. {
  87. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  88. return;
  89. }
  90. /* 获取需要分派数据的线程 */
  91. if(!getDispatchThread())
  92. {
  93. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取音频处理线程失败", m_logBase);
  94. return;
  95. }
  96. /* 将自身设置到录音线程中 */
  97. m_pThreadRecord->setAssignSrcDataThread(this);
  98. m_isRunning = true;
  99. while(m_isRunning)
  100. {
  101. /* 休眠一段时间 */
  102. // std::this_thread::sleep_for(std::chrono::milliseconds(10));
  103. std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
  104. m_condDataUpdate.wait(lock, [this] {
  105. return (m_isDataUpdate.load() || !m_isRunning.load());
  106. });
  107. m_isDataUpdate.store(false);
  108. // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
  109. /*------------------------------------------------------------------------
  110. * 分派实时数据
  111. *------------------------------------------------------------------------*/
  112. /* 获取最新的数据,给其添加开始时间戳 */
  113. auto latestData = m_listAudioSrcData.back();
  114. if(latestData == nullptr || latestData->isEmpty())
  115. {
  116. SPDLOG_LOGGER_ERROR(m_logger, "{} 分派数据线程获取到空数据", m_logBase);
  117. continue;
  118. }
  119. latestData->startTime = previTime(latestData->endTime, latestData->dataSize);
  120. sendRealTimeSrcData(*latestData);
  121. /*------------------------------------------------------------------------
  122. * 将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间
  123. *------------------------------------------------------------------------*/
  124. /* 判断数据是否满足1s大小 */
  125. if(!isFullOneSecondData())
  126. {
  127. continue; // 如果不满足1s大小,则继续等待
  128. }
  129. /* 处理数据,将其拼接成1s的数据 */
  130. processData();
  131. /* 处理数据时间戳,计算开始时间 */
  132. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  133. /*------------------------------------------------------------------------
  134. * 分派常规数据
  135. *------------------------------------------------------------------------*/
  136. /* 分派数据给各个线程 */
  137. sendRegularData(*m_dispatchSrcData);
  138. /* 清空分派数据 */
  139. m_dispatchSrcData->clear();
  140. // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now();
  141. // auto duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - startTime);
  142. // SPDLOG_LOGGER_TRACE(m_logger, "{} 分派数据耗时: {}us", m_logBase, duration.count());
  143. }
  144. /* 清理数据 */
  145. clearData();
  146. SPDLOG_LOGGER_WARN(m_logger, "➢ {} 结束分派数据线程", m_logBase);
  147. }
  148. /* 初始化一些数据 */
  149. bool AssignSrcDataThread::initData()
  150. {
  151. /* 初始化数据 */
  152. m_listAudioSrcData.clear(); // 清空数据列表
  153. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  154. m_numChannels = GInfo.numChannels(); /* 声道数 */
  155. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  156. m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8); /* 1秒数据大小 */
  157. m_dispatchSrcData = new AudioSrcData(m_oneSecondSize);
  158. m_lastSendTime = QDateTime::currentDateTime(); // 初始化最后发送时间
  159. /* 初始化读写锁 */
  160. // m_pRwLock = new QReadWriteLock();
  161. return true;
  162. }
  163. /* 清理数据 */
  164. void AssignSrcDataThread::clearData()
  165. {
  166. if(m_dispatchSrcData != nullptr)
  167. {
  168. delete m_dispatchSrcData;
  169. m_dispatchSrcData = nullptr;
  170. }
  171. if(m_pCurrentSrcData != nullptr)
  172. {
  173. delete m_pCurrentSrcData;
  174. m_pCurrentSrcData = nullptr;
  175. }
  176. }
  177. /**
  178. * @brief 获取需要分派数据的线程
  179. * 1、获取的时候按照线程实时性、重要性等因素进行排序获取
  180. * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做)
  181. *
  182. * @return true
  183. * @return false
  184. */
  185. bool AssignSrcDataThread::getDispatchThread()
  186. {
  187. /* 根据生成数据文件的类型顺序获取,循环获取,直到所有的线程都获取到 */
  188. /* 先获取生成wav小文件的线程 */
  189. auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  190. if(pWavThread != nullptr)
  191. {
  192. // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pWavThread));
  193. m_pThreadCreateWAV = dynamic_cast<CreateWAVThread*>(pWavThread);
  194. }
  195. /* 获取生成音量和反相数据的线程 */
  196. auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CreateDB, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  197. if(pDBAndPhaseThread != nullptr)
  198. {
  199. // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pDBAndPhaseThread));
  200. m_pThreadCreateDB = dynamic_cast<CreateDBThread*>(pDBAndPhaseThread);
  201. }
  202. /* 获取生成长文件的线程 */
  203. auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  204. if(pLongFileThread != nullptr )
  205. {
  206. // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pLongFileThread));
  207. m_pThreadCreateLongFile = dynamic_cast<CreateLongFileThread*>(pLongFileThread);
  208. }
  209. /* 获取发送RTP数据的线程 */
  210. m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  211. if(m_rtpSenderThread == nullptr)
  212. {
  213. // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase);
  214. // return false;
  215. }
  216. /* 最后获取录音线程信息,如果获取不到则一直获取 */
  217. while(true)
  218. {
  219. auto pThreadBase = ThreadMan.findRecordThread(EThreadType::Type_RecordSrc, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  220. if(pThreadBase != nullptr)
  221. {
  222. m_pThreadRecord = dynamic_cast<RecordThread*>(pThreadBase);
  223. break; // 找到录音线程了
  224. }
  225. /* 如果没有找到录音线程,则等待一段时间再继续查找 */
  226. std::this_thread::sleep_for(std::chrono::milliseconds(5));
  227. }
  228. return true;
  229. }
  230. /* 发送常规数据,对实时性要求不高的数据,数据时长1秒 */
  231. void AssignSrcDataThread::sendRegularData(const AudioSrcData& srcData)
  232. {
  233. /* 发送计算音量的数据 */
  234. if(m_pThreadCreateWAV != nullptr)
  235. {
  236. m_pThreadCreateWAV->setData(srcData);
  237. }
  238. /* 发送生成wav小文件的数据 */
  239. if(m_pThreadCreateDB != nullptr)
  240. {
  241. m_pThreadCreateDB->setData(srcData);
  242. }
  243. /* 发送生成长文件的数据 */
  244. if(m_pThreadCreateLongFile != nullptr)
  245. {
  246. m_pThreadCreateLongFile->setData(srcData);
  247. }
  248. }
  249. /* 判断是否满足1秒的数据 */
  250. bool AssignSrcDataThread::isFullOneSecondData() const
  251. {
  252. // QReadLocker locker(m_pRwLock);
  253. /* 判断缓存中的数据是否满足1秒大小 */
  254. if(m_listDataSize + m_remainingDataSize >= m_oneSecondSize)
  255. {
  256. return true;
  257. }
  258. return false;
  259. }
  260. /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒
  261. * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */
  262. bool AssignSrcDataThread::processData()
  263. {
  264. m_dispatchSrcData->clear();
  265. while(true)
  266. {
  267. if(m_remainingDataSize > 0)
  268. {
  269. /* 如果剩余数据还有数据,则继续处理 */
  270. int32_t nowIndex = m_pCurrentSrcData->dataSize - m_remainingDataSize; // 当前剩余数据的起始位置
  271. auto copySize = m_dispatchSrcData->appendData(m_pCurrentSrcData->pData + nowIndex, m_remainingDataSize);
  272. /* 更新结束时间 */
  273. if(copySize == m_remainingDataSize)
  274. {
  275. /* 如果拷贝的大小等于剩余数据大小,说明当前的数据缓冲区数据已经用完了,则直接使用当前数据的结束时间
  276. * 并删除这个缓冲区 */
  277. m_dispatchSrcData->endTime = m_pCurrentSrcData->endTime;
  278. delete m_pCurrentSrcData;
  279. m_pCurrentSrcData = nullptr;
  280. }else {
  281. /* 如果拷贝的大小小于剩余数据大小,则计算结束时间 */
  282. m_dispatchSrcData->endTime = previTime(m_pCurrentSrcData->endTime, m_remainingDataSize - copySize);
  283. }
  284. /* 更新剩余数据大小 */
  285. m_remainingDataSize -= copySize;
  286. } else
  287. {
  288. /* 取出最新的一个数据,这里只取出数据,不进行数据拷贝,数据处理进入下一个循环后再处理 */
  289. m_pCurrentSrcData = m_listAudioSrcData.front();
  290. m_listAudioSrcData.pop_front();
  291. if(m_pCurrentSrcData == nullptr)
  292. {
  293. break;
  294. }
  295. /* 更新队列中剩余的数据大小 */
  296. m_listDataSize -= m_pCurrentSrcData->dataSize;
  297. /* 更新剩余数据的大小 */
  298. m_remainingDataSize = m_pCurrentSrcData->dataSize;
  299. }
  300. if(m_dispatchSrcData->isFull())
  301. {
  302. break;
  303. }
  304. }
  305. /* 更新数据的开始时间 */
  306. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  307. return true;
  308. }
  309. /* 发送实时音频数据 */
  310. void AssignSrcDataThread::sendRealTimeSrcData(const AudioSrcData& srcData)
  311. {
  312. /* 发送实时数据到RTP中 */
  313. if(m_rtpSenderThread != nullptr)
  314. {
  315. m_rtpSenderThread->setData(srcData);
  316. }
  317. /* 发送实时数据到创建音量包的线程 */
  318. if(m_pThreadCreateDB != nullptr)
  319. {
  320. m_pThreadCreateDB->setRealTimeData(srcData);
  321. }
  322. }
  323. /* 发送原始数据到Rtp中,实时发送,有新的就发送 */
  324. void AssignSrcDataThread::sendSrcDataToRtp(const AudioSrcData& srcData)
  325. {
  326. // QReadLocker locker(m_pRwLock);
  327. if(m_rtpSenderThread == nullptr)
  328. {
  329. return; // 如果没有RTP发送线程,则直接返回
  330. }
  331. m_rtpSenderThread->setData(srcData);
  332. // if(m_listAudioSrcData.empty())
  333. // {
  334. // return; // 如果没有数据,则直接返回
  335. // }
  336. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送RTP数据,队列大小: {}, 开始时间: {}, 当前时间:{}",
  337. // m_logBase, m_listAudioSrcData.count(), m_listAudioSrcData.back()->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(),
  338. // m_lastSendTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString());
  339. // auto it = m_listAudioSrcData.end();
  340. // if(it != m_listAudioSrcData.begin())
  341. // {
  342. // --it; // 移动到最后一个数据
  343. // }
  344. // /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */
  345. // for(; it != m_listAudioSrcData.begin(); --it)
  346. // {
  347. // if((*it)->startTime == m_lastSendTime)
  348. // {
  349. // break; // 找到已发送数据的位置
  350. // }
  351. // }
  352. // if(it == m_listAudioSrcData.begin())
  353. // {
  354. // SPDLOG_LOGGER_TRACE(m_logger, "{} RTP线程需要拷贝全部数据", m_logBase);
  355. // }
  356. // /* 开始发送数据 */
  357. // for(; it != m_listAudioSrcData.end(); ++it)
  358. // {
  359. // if(*it != nullptr)
  360. // {
  361. // m_rtpSenderThread->setData(**it);
  362. // m_lastSendTime = (*it)->startTime; // 更新已发送数据
  363. // }
  364. // }
  365. }