AssignSrcDataThread.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. #include "AssignSrcDataThread.h"
  2. #include "ThreadManager.h"
  3. #include "AudioData.h"
  4. #include "GlobalInfo.h"
  5. #include "RecordThread.h"
  6. #include <QReadWriteLock>
  7. #include <QWriteLocker>
  8. #include <QReadLocker>
  9. #include <cstdint>
  10. #include <qreadwritelock.h>
  11. AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo)
  12. : BaseRecordThread(threadInfo)
  13. {
  14. }
  15. AssignSrcDataThread::~AssignSrcDataThread()
  16. {
  17. if(m_pRwLock != nullptr)
  18. {
  19. delete m_pRwLock;
  20. m_pRwLock = nullptr;
  21. }
  22. for(auto& audioData : m_listAudioSrcData)
  23. {
  24. if(audioData != nullptr)
  25. {
  26. delete audioData;
  27. audioData = nullptr;
  28. }
  29. }
  30. m_listAudioSrcData.clear();
  31. if(m_dispatchSrcData != nullptr)
  32. {
  33. delete m_dispatchSrcData;
  34. m_dispatchSrcData = nullptr;
  35. }
  36. }
  37. /* 设置数据,这里不用 */
  38. bool AssignSrcDataThread::setData(const AudioSrcData& srcData)
  39. {
  40. return true;
  41. }
  42. /**
  43. * @brief 设置数据,输入小于1秒的数据
  44. *
  45. * @param srcData
  46. * @param dataSize
  47. * @param endTime
  48. * @return true
  49. * @return false
  50. */
  51. bool AssignSrcDataThread::setSrcData(const char* srcData, int32_t dataSize, QDateTime& endTime)
  52. {
  53. AudioSrcData* audioData = new AudioSrcData(dataSize);
  54. if(audioData == nullptr)
  55. {
  56. return false;
  57. }
  58. audioData->appendData(srcData, dataSize);
  59. audioData->endTime = endTime;
  60. /* 获取读写锁 */
  61. QWriteLocker locker(m_pRwLock);
  62. m_listAudioSrcData.push_back(audioData);
  63. m_listDataSize += dataSize; // 更新缓存中的数据大小
  64. return true;
  65. }
  66. void AssignSrcDataThread::task()
  67. {
  68. SPDLOG_LOGGER_INFO(m_logger, "------------- {} 开始分派数据线程 -------------", m_logBase);
  69. /* 初始化数据 */
  70. if(!initData())
  71. {
  72. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  73. return;
  74. }
  75. /* 获取需要分派数据的线程 */
  76. if(!getDispatchThread())
  77. {
  78. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取音频处理线程失败", m_logBase);
  79. return;
  80. }
  81. /* 将自身设置到录音线程中 */
  82. m_pRecordThread->setAssignSrcDataThread(this);
  83. m_isRunning = true;
  84. while(m_isRunning)
  85. {
  86. /* 休眠一段时间 */
  87. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  88. /*------------------------------------------------------------------------
  89. * 获取一个数据并处理数据,将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间
  90. *------------------------------------------------------------------------*/
  91. /* 将发送数据到Rtp线程 */
  92. sendSrcDataToRtp();
  93. /* 判断数据是否满足1s大小 */
  94. if(!isFullOneSecondData())
  95. {
  96. continue; // 如果不满足1s大小,则继续等待
  97. }
  98. /* 处理数据,将其拼接成1s的数据 */
  99. processData();
  100. /* 处理数据时间戳,计算开始时间 */
  101. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  102. /*------------------------------------------------------------------------
  103. * 分派数据
  104. *------------------------------------------------------------------------*/
  105. /* 分派数据给各个线程 */
  106. for(auto& dispatchThread : m_listDispatchThreads)
  107. {
  108. if(dispatchThread != nullptr)
  109. {
  110. dispatchThread->setData(*m_dispatchSrcData);
  111. }
  112. }
  113. /* 清空分派数据 */
  114. m_dispatchSrcData->clear();
  115. }
  116. /* 清理数据 */
  117. clearData();
  118. SPDLOG_LOGGER_INFO(m_logger, "------------- {} 结束分派数据线程 -------------", m_logBase);
  119. }
  120. /* 初始化一些数据 */
  121. bool AssignSrcDataThread::initData()
  122. {
  123. /* 初始化数据 */
  124. m_listAudioSrcData.clear(); // 清空数据列表
  125. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  126. m_numChannels = GInfo.numChannels(); /* 声道数 */
  127. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  128. m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8); /* 1秒数据大小 */
  129. m_dispatchSrcData = new AudioSrcData(m_oneSecondSize);
  130. /* 初始化读写锁 */
  131. m_pRwLock = new QReadWriteLock();
  132. return true;
  133. }
  134. /* 清理数据 */
  135. void AssignSrcDataThread::clearData()
  136. {
  137. if(m_dispatchSrcData != nullptr)
  138. {
  139. delete m_dispatchSrcData;
  140. m_dispatchSrcData = nullptr;
  141. }
  142. if(m_pCurrentSrcData != nullptr)
  143. {
  144. delete m_pCurrentSrcData;
  145. m_pCurrentSrcData = nullptr;
  146. }
  147. }
  148. /**
  149. * @brief 获取需要分派数据的线程
  150. * 1、获取的时候按照线程实时性、重要性等因素进行排序获取
  151. * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做)
  152. *
  153. * @return true
  154. * @return false
  155. */
  156. bool AssignSrcDataThread::getDispatchThread()
  157. {
  158. /* 根据生成数据文件的类型顺序获取,循环获取,直到所有的线程都获取到 */
  159. /* 先获取生成wav小文件的线程 */
  160. auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  161. if(pWavThread != nullptr)
  162. {
  163. m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pWavThread));
  164. }
  165. /* 获取生成音量和反相数据的线程 */
  166. auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CreateDB, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  167. if(pDBAndPhaseThread != nullptr)
  168. {
  169. m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pDBAndPhaseThread));
  170. }
  171. /* 获取生成长文件的线程 */
  172. auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  173. if(pLongFileThread != nullptr )
  174. {
  175. m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pLongFileThread));
  176. }
  177. /* 获取发送RTP数据的线程 */
  178. m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  179. if(m_rtpSenderThread == nullptr)
  180. {
  181. // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase);
  182. // return false;
  183. }
  184. /* 最后获取录音线程信息,如果获取不到则一直获取 */
  185. while(true)
  186. {
  187. auto pThreadBase = ThreadMan.findRecordThread(EThreadType::Type_RecordSrc, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  188. if(pThreadBase != nullptr)
  189. {
  190. m_pRecordThread = dynamic_cast<RecordThread*>(pThreadBase);
  191. break; // 找到录音线程了
  192. }
  193. /* 如果没有找到录音线程,则等待一段时间再继续查找 */
  194. std::this_thread::sleep_for(std::chrono::milliseconds(5));
  195. }
  196. return true;
  197. }
  198. /* 判断是否满足1秒的数据 */
  199. bool AssignSrcDataThread::isFullOneSecondData() const
  200. {
  201. QReadLocker locker(m_pRwLock);
  202. /* 判断缓存中的数据是否满足1秒大小 */
  203. if(m_listDataSize + m_remainingDataSize >= m_oneSecondSize)
  204. {
  205. return true;
  206. }
  207. return false;
  208. }
  209. /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒
  210. * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */
  211. bool AssignSrcDataThread::processData()
  212. {
  213. m_dispatchSrcData->clear();
  214. while(true)
  215. {
  216. if(m_remainingDataSize > 0)
  217. {
  218. /* 如果剩余数据还有数据,则继续处理 */
  219. int32_t nowIndex = m_pCurrentSrcData->dataSize - m_remainingDataSize; // 当前剩余数据的起始位置
  220. auto copySize = m_dispatchSrcData->appendData(m_pCurrentSrcData->pData + nowIndex, m_remainingDataSize);
  221. /* 更新结束时间 */
  222. if(copySize == m_remainingDataSize)
  223. {
  224. /* 如果拷贝的大小等于剩余数据大小,说明当前的数据缓冲区数据已经用完了,则直接使用当前数据的结束时间
  225. * 并删除这个缓冲区 */
  226. m_dispatchSrcData->endTime = m_pCurrentSrcData->endTime;
  227. delete m_pCurrentSrcData;
  228. m_pCurrentSrcData = nullptr;
  229. }else {
  230. /* 如果拷贝的大小小于剩余数据大小,则计算结束时间 */
  231. m_dispatchSrcData->endTime = previTime(m_pCurrentSrcData->endTime, m_remainingDataSize - copySize);
  232. }
  233. /* 更新剩余数据大小 */
  234. m_remainingDataSize -= copySize;
  235. } else
  236. {
  237. /* 取出最新的一个数据,这里只取出数据,不进行数据拷贝,数据处理进入下一个循环后再处理 */
  238. m_pRwLock->lockForRead();
  239. m_pCurrentSrcData = m_listAudioSrcData.front();
  240. m_listAudioSrcData.pop_front();
  241. m_pRwLock->unlock();
  242. if(m_pCurrentSrcData == nullptr)
  243. {
  244. break;
  245. }
  246. /* 更新队列中剩余的数据大小 */
  247. m_listDataSize -= m_pCurrentSrcData->dataSize;
  248. /* 更新剩余数据的大小 */
  249. m_remainingDataSize = m_pCurrentSrcData->dataSize;
  250. }
  251. if(m_dispatchSrcData->isFull())
  252. {
  253. break;
  254. }
  255. }
  256. /* 更新数据的开始时间 */
  257. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  258. return true;
  259. }
  260. /* 发送原始数据到Rtp中,实时发送,有新的就发送 */
  261. void AssignSrcDataThread::sendSrcDataToRtp()
  262. {
  263. QReadLocker locker(m_pRwLock);
  264. if(m_rtpSenderThread == nullptr)
  265. {
  266. return; // 如果没有RTP发送线程,则直接返回
  267. }
  268. if(m_listAudioSrcData.empty())
  269. {
  270. return; // 如果没有数据,则直接返回
  271. }
  272. /* 如果发送数据和最新的数据不一样,则发送数据 */
  273. if(m_pSendSrcData == nullptr)
  274. {
  275. /* 还未发送过,直接发送所有数据 */
  276. for(auto& audioData : m_listAudioSrcData)
  277. {
  278. if(audioData != nullptr)
  279. {
  280. m_rtpSenderThread->setData(*audioData);
  281. m_pSendSrcData = audioData; // 更新已发送数据
  282. }
  283. }
  284. }else
  285. {
  286. /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */
  287. auto it = m_listAudioSrcData.begin();
  288. for(; it != m_listAudioSrcData.end(); ++it)
  289. {
  290. if(*it == m_pSendSrcData)
  291. {
  292. break; // 找到已发送数据的位置
  293. }
  294. }
  295. if(it != m_listAudioSrcData.end())
  296. {
  297. ++it; // 移动到下一个数据
  298. for(; it != m_listAudioSrcData.end(); ++it)
  299. {
  300. if(*it != nullptr)
  301. {
  302. m_rtpSenderThread->setData(**it);
  303. m_pSendSrcData = *it; // 更新已发送数据
  304. }
  305. }
  306. }else
  307. {
  308. /* 没有找到相等的,说明这个队列中是全新的数据,所有的都需要发送 */
  309. for(auto& audioData : m_listAudioSrcData)
  310. {
  311. if(audioData != nullptr)
  312. {
  313. m_rtpSenderThread->setData(*audioData);
  314. m_pSendSrcData = audioData; // 更新已发送数据
  315. }
  316. }
  317. }
  318. }
  319. }