AssignSrcDataThread.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. #include "AssignSrcDataThread.h"
  2. #include "ThreadManager.h"
  3. #include "AudioData.h"
  4. #include "GlobalInfo.h"
  5. #include "RecordThread.h"
  6. #include "spdlog.h"
  7. #include <QReadWriteLock>
  8. #include <QWriteLocker>
  9. #include <QReadLocker>
  10. #include <cstdint>
  11. #include "CreateDBThread.h"
  12. #include "CreateWAVThread.h"
  13. #include "CreateLongFileThread.h"
  14. AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo)
  15. : BaseRecordThread(threadInfo)
  16. {
  17. }
  18. AssignSrcDataThread::~AssignSrcDataThread()
  19. {
  20. // if(m_pRwLock != nullptr)
  21. // {
  22. // delete m_pRwLock;
  23. // m_pRwLock = nullptr;
  24. // }
  25. for(auto& audioData : m_listAudioSrcData)
  26. {
  27. if(audioData != nullptr)
  28. {
  29. delete audioData;
  30. audioData = nullptr;
  31. }
  32. }
  33. m_listAudioSrcData.clear();
  34. if(m_dispatchSrcData != nullptr)
  35. {
  36. delete m_dispatchSrcData;
  37. m_dispatchSrcData = nullptr;
  38. }
  39. }
  40. /* 停止线程 */
  41. void AssignSrcDataThread::stopThread()
  42. {
  43. m_isRunning = false;
  44. m_condDataUpdate.notify_all(); // 通知所有等待的线程
  45. }
  46. /* 设置数据,这里不用 */
  47. bool AssignSrcDataThread::setData(const AudioSrcData& srcData)
  48. {
  49. return true;
  50. }
  51. /**
  52. * @brief 设置数据,输入小于1秒的数据
  53. *
  54. * @param srcData
  55. * @param dataSize
  56. * @param endTime
  57. * @return true
  58. * @return false
  59. */
  60. bool AssignSrcDataThread::setSrcData(const char* srcData, int32_t dataSize, QDateTime& endTime)
  61. {
  62. AudioSrcData* audioData = new AudioSrcData(dataSize);
  63. if(audioData == nullptr)
  64. {
  65. return false;
  66. }
  67. audioData->appendData(srcData, dataSize);
  68. audioData->endTime = endTime;
  69. /* 获取读写锁 */
  70. {
  71. // std::lock_guard<QReadWriteLock> lock(m_pRwLock);
  72. std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
  73. m_listAudioSrcData.push_back(audioData);
  74. m_listDataSize += dataSize; // 更新缓存中的数据大小
  75. m_isDataUpdate.store(true);
  76. }
  77. m_condDataUpdate.notify_one();
  78. // SPDLOG_LOGGER_WARN(m_logger, "{} 收到音频数据: 大小: {}, 时间: {}", m_logBase, dataSize, endTime.toString().toStdString());
  79. return true;
  80. }
  81. void AssignSrcDataThread::task()
  82. {
  83. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始分派数据线程 ", m_logBase);
  84. /* 初始化数据 */
  85. if(!initData())
  86. {
  87. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  88. return;
  89. }
  90. /* 获取需要分派数据的线程 */
  91. if(!getDispatchThread())
  92. {
  93. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取音频处理线程失败", m_logBase);
  94. return;
  95. }
  96. /* 将自身设置到录音线程中 */
  97. m_pThreadRecord->setAssignSrcDataThread(this);
  98. m_isRunning = true;
  99. while(m_isRunning)
  100. {
  101. /* 休眠一段时间 */
  102. // std::this_thread::sleep_for(std::chrono::milliseconds(10));
  103. std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
  104. m_condDataUpdate.wait(lock, [this] {
  105. return (m_isDataUpdate.load() || !m_isRunning.load());
  106. });
  107. m_isDataUpdate.store(false);
  108. // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
  109. /*------------------------------------------------------------------------
  110. * 分派实时数据
  111. *------------------------------------------------------------------------*/
  112. /* 获取最新的数据,给其添加开始时间戳 */
  113. auto latestData = m_listAudioSrcData.back();
  114. if(latestData == nullptr || latestData->isEmpty())
  115. {
  116. SPDLOG_LOGGER_ERROR(m_logger, "{} 分派数据线程获取到空数据", m_logBase);
  117. continue;
  118. }
  119. latestData->startTime = previTime(latestData->endTime, latestData->dataSize);
  120. /* 将发送数据到Rtp线程 */
  121. sendSrcDataToRtp(*latestData);
  122. /*------------------------------------------------------------------------
  123. * 将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间
  124. *------------------------------------------------------------------------*/
  125. /* 判断数据是否满足1s大小 */
  126. if(!isFullOneSecondData())
  127. {
  128. continue; // 如果不满足1s大小,则继续等待
  129. }
  130. /* 处理数据,将其拼接成1s的数据 */
  131. processData();
  132. /* 处理数据时间戳,计算开始时间 */
  133. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  134. /*------------------------------------------------------------------------
  135. * 分派常规数据
  136. *------------------------------------------------------------------------*/
  137. /* 分派数据给各个线程 */
  138. sendRegularData(*m_dispatchSrcData);
  139. /* 清空分派数据 */
  140. m_dispatchSrcData->clear();
  141. // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now();
  142. // auto duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - startTime);
  143. // SPDLOG_LOGGER_TRACE(m_logger, "{} 分派数据耗时: {}us", m_logBase, duration.count());
  144. }
  145. /* 清理数据 */
  146. clearData();
  147. SPDLOG_LOGGER_WARN(m_logger, "➢ {} 结束分派数据线程", m_logBase);
  148. }
  149. /* 初始化一些数据 */
  150. bool AssignSrcDataThread::initData()
  151. {
  152. /* 初始化数据 */
  153. m_listAudioSrcData.clear(); // 清空数据列表
  154. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  155. m_numChannels = GInfo.numChannels(); /* 声道数 */
  156. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  157. m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8); /* 1秒数据大小 */
  158. m_dispatchSrcData = new AudioSrcData(m_oneSecondSize);
  159. m_lastSendTime = QDateTime::currentDateTime(); // 初始化最后发送时间
  160. /* 初始化读写锁 */
  161. // m_pRwLock = new QReadWriteLock();
  162. return true;
  163. }
  164. /* 清理数据 */
  165. void AssignSrcDataThread::clearData()
  166. {
  167. if(m_dispatchSrcData != nullptr)
  168. {
  169. delete m_dispatchSrcData;
  170. m_dispatchSrcData = nullptr;
  171. }
  172. if(m_pCurrentSrcData != nullptr)
  173. {
  174. delete m_pCurrentSrcData;
  175. m_pCurrentSrcData = nullptr;
  176. }
  177. }
  178. /**
  179. * @brief 获取需要分派数据的线程
  180. * 1、获取的时候按照线程实时性、重要性等因素进行排序获取
  181. * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做)
  182. *
  183. * @return true
  184. * @return false
  185. */
  186. bool AssignSrcDataThread::getDispatchThread()
  187. {
  188. /* 根据生成数据文件的类型顺序获取,循环获取,直到所有的线程都获取到 */
  189. /* 先获取生成wav小文件的线程 */
  190. auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  191. if(pWavThread != nullptr)
  192. {
  193. // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pWavThread));
  194. m_pThreadCreateWAV = dynamic_cast<CreateWAVThread*>(pWavThread);
  195. }
  196. /* 获取生成音量和反相数据的线程 */
  197. auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CreateDB, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  198. if(pDBAndPhaseThread != nullptr)
  199. {
  200. // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pDBAndPhaseThread));
  201. m_pThreadCreateDB = dynamic_cast<CreateDBThread*>(pDBAndPhaseThread);
  202. }
  203. /* 获取生成长文件的线程 */
  204. auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  205. if(pLongFileThread != nullptr )
  206. {
  207. // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pLongFileThread));
  208. m_pThreadCreateLongFile = dynamic_cast<CreateLongFileThread*>(pLongFileThread);
  209. }
  210. /* 获取发送RTP数据的线程 */
  211. m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  212. if(m_rtpSenderThread == nullptr)
  213. {
  214. // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase);
  215. // return false;
  216. }
  217. /* 最后获取录音线程信息,如果获取不到则一直获取 */
  218. while(true)
  219. {
  220. auto pThreadBase = ThreadMan.findRecordThread(EThreadType::Type_RecordSrc, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  221. if(pThreadBase != nullptr)
  222. {
  223. m_pThreadRecord = dynamic_cast<RecordThread*>(pThreadBase);
  224. break; // 找到录音线程了
  225. }
  226. /* 如果没有找到录音线程,则等待一段时间再继续查找 */
  227. std::this_thread::sleep_for(std::chrono::milliseconds(5));
  228. }
  229. return true;
  230. }
  231. /* 判断是否满足1秒的数据 */
  232. bool AssignSrcDataThread::isFullOneSecondData() const
  233. {
  234. // QReadLocker locker(m_pRwLock);
  235. /* 判断缓存中的数据是否满足1秒大小 */
  236. if(m_listDataSize + m_remainingDataSize >= m_oneSecondSize)
  237. {
  238. return true;
  239. }
  240. return false;
  241. }
  242. /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒
  243. * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */
  244. bool AssignSrcDataThread::processData()
  245. {
  246. m_dispatchSrcData->clear();
  247. while(true)
  248. {
  249. if(m_remainingDataSize > 0)
  250. {
  251. /* 如果剩余数据还有数据,则继续处理 */
  252. int32_t nowIndex = m_pCurrentSrcData->dataSize - m_remainingDataSize; // 当前剩余数据的起始位置
  253. auto copySize = m_dispatchSrcData->appendData(m_pCurrentSrcData->pData + nowIndex, m_remainingDataSize);
  254. /* 更新结束时间 */
  255. if(copySize == m_remainingDataSize)
  256. {
  257. /* 如果拷贝的大小等于剩余数据大小,说明当前的数据缓冲区数据已经用完了,则直接使用当前数据的结束时间
  258. * 并删除这个缓冲区 */
  259. m_dispatchSrcData->endTime = m_pCurrentSrcData->endTime;
  260. delete m_pCurrentSrcData;
  261. m_pCurrentSrcData = nullptr;
  262. }else {
  263. /* 如果拷贝的大小小于剩余数据大小,则计算结束时间 */
  264. m_dispatchSrcData->endTime = previTime(m_pCurrentSrcData->endTime, m_remainingDataSize - copySize);
  265. }
  266. /* 更新剩余数据大小 */
  267. m_remainingDataSize -= copySize;
  268. } else
  269. {
  270. /* 取出最新的一个数据,这里只取出数据,不进行数据拷贝,数据处理进入下一个循环后再处理 */
  271. m_pCurrentSrcData = m_listAudioSrcData.front();
  272. m_listAudioSrcData.pop_front();
  273. if(m_pCurrentSrcData == nullptr)
  274. {
  275. break;
  276. }
  277. /* 更新队列中剩余的数据大小 */
  278. m_listDataSize -= m_pCurrentSrcData->dataSize;
  279. /* 更新剩余数据的大小 */
  280. m_remainingDataSize = m_pCurrentSrcData->dataSize;
  281. }
  282. if(m_dispatchSrcData->isFull())
  283. {
  284. break;
  285. }
  286. }
  287. /* 更新数据的开始时间 */
  288. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  289. return true;
  290. }
  291. /* 发送原始数据到Rtp中,实时发送,有新的就发送 */
  292. void AssignSrcDataThread::sendSrcDataToRtp(const AudioSrcData& srcData)
  293. {
  294. // QReadLocker locker(m_pRwLock);
  295. if(m_rtpSenderThread == nullptr)
  296. {
  297. return; // 如果没有RTP发送线程,则直接返回
  298. }
  299. m_rtpSenderThread->setData(srcData);
  300. // if(m_listAudioSrcData.empty())
  301. // {
  302. // return; // 如果没有数据,则直接返回
  303. // }
  304. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送RTP数据,队列大小: {}, 开始时间: {}, 当前时间:{}",
  305. // m_logBase, m_listAudioSrcData.count(), m_listAudioSrcData.back()->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(),
  306. // m_lastSendTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString());
  307. // auto it = m_listAudioSrcData.end();
  308. // if(it != m_listAudioSrcData.begin())
  309. // {
  310. // --it; // 移动到最后一个数据
  311. // }
  312. // /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */
  313. // for(; it != m_listAudioSrcData.begin(); --it)
  314. // {
  315. // if((*it)->startTime == m_lastSendTime)
  316. // {
  317. // break; // 找到已发送数据的位置
  318. // }
  319. // }
  320. // if(it == m_listAudioSrcData.begin())
  321. // {
  322. // SPDLOG_LOGGER_TRACE(m_logger, "{} RTP线程需要拷贝全部数据", m_logBase);
  323. // }
  324. // /* 开始发送数据 */
  325. // for(; it != m_listAudioSrcData.end(); ++it)
  326. // {
  327. // if(*it != nullptr)
  328. // {
  329. // m_rtpSenderThread->setData(**it);
  330. // m_lastSendTime = (*it)->startTime; // 更新已发送数据
  331. // }
  332. // }
  333. }
  334. /* 发送常规数据,对实时性要求不高的数据,数据时长1秒 */
  335. void AssignSrcDataThread::sendRegularData(const AudioSrcData& srcData)
  336. {
  337. /* 发送计算音量的数据 */
  338. if(m_pThreadCreateWAV != nullptr)
  339. {
  340. m_pThreadCreateWAV->setData(srcData);
  341. }
  342. /* 发送生成wav小文件的数据 */
  343. if(m_pThreadCreateDB != nullptr)
  344. {
  345. m_pThreadCreateDB->setData(srcData);
  346. }
  347. /* 发送生成长文件的数据 */
  348. if(m_pThreadCreateLongFile != nullptr)
  349. {
  350. m_pThreadCreateLongFile->setData(srcData);
  351. }
  352. }