AssignSrcDataThread.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. #include "AssignSrcDataThread.h"
  2. #include "ThreadManager.h"
  3. #include "AudioData.h"
  4. #include "GlobalInfo.h"
  5. #include <QReadWriteLock>
  6. #include <QWriteLocker>
  7. #include <QReadLocker>
  8. #include <cstdint>
  9. #include <qreadwritelock.h>
  10. AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo)
  11. : BaseRecordThread(threadInfo)
  12. {
  13. }
  14. AssignSrcDataThread::~AssignSrcDataThread()
  15. {
  16. if(m_pRwLock != nullptr)
  17. {
  18. delete m_pRwLock;
  19. m_pRwLock = nullptr;
  20. }
  21. for(auto& audioData : m_listAudioSrcData)
  22. {
  23. if(audioData != nullptr)
  24. {
  25. delete audioData;
  26. audioData = nullptr;
  27. }
  28. }
  29. m_listAudioSrcData.clear();
  30. if(m_dispatchSrcData != nullptr)
  31. {
  32. delete m_dispatchSrcData;
  33. m_dispatchSrcData = nullptr;
  34. }
  35. }
  36. void AssignSrcDataThread::threadTask()
  37. {
  38. SPDLOG_LOGGER_INFO(m_logger, "{} 开始分派数据线程", m_logBase);
  39. /* 初始化数据 */
  40. if(!initData())
  41. {
  42. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  43. return;
  44. }
  45. /* 获取需要分派数据的线程 */
  46. if(!getDispatchThread())
  47. {
  48. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取分派线程失败", m_logBase);
  49. return;
  50. }
  51. m_isRunning = true;
  52. while(m_isRunning)
  53. {
  54. /* 休眠一段时间 */
  55. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  56. /*------------------------------------------------------------------------
  57. * 获取一个数据并处理数据,将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间
  58. *------------------------------------------------------------------------*/
  59. /* 将发送数据到Rtp线程 */
  60. sendSrcDataToRtp();
  61. /* 判断数据是否满足1s大小 */
  62. if(!isFullOneSecondData())
  63. {
  64. continue; // 如果不满足1s大小,则继续等待
  65. }
  66. /* 处理数据,将其拼接成1s的数据 */
  67. processData();
  68. /* 处理数据时间戳,计算开始时间 */
  69. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  70. /*------------------------------------------------------------------------
  71. * 分派数据
  72. *------------------------------------------------------------------------*/
  73. /* 分派数据给各个线程 */
  74. for(auto& dispatchThread : m_listDispatchThreads)
  75. {
  76. if(dispatchThread != nullptr)
  77. {
  78. dispatchThread->setData(*m_dispatchSrcData);
  79. }
  80. }
  81. /* 清空分派数据 */
  82. m_dispatchSrcData->clear();
  83. }
  84. /* 清理数据 */
  85. clearData();
  86. SPDLOG_LOGGER_INFO(m_logger, "{} 结束分派数据线程", m_logBase);
  87. }
  88. /* 设置数据,这里不用 */
  89. bool AssignSrcDataThread::setData(const AudioSrcData& srcData)
  90. {
  91. return true;
  92. }
  93. /**
  94. * @brief 设置数据,输入小于1秒的数据
  95. *
  96. * @param srcData
  97. * @param dataSize
  98. * @param endTime
  99. * @return true
  100. * @return false
  101. */
  102. bool AssignSrcDataThread::setSrcData(const char* srcData, uint32_t dataSize, QDateTime& endTime)
  103. {
  104. AudioSrcData* audioData = new AudioSrcData(dataSize);
  105. if(audioData == nullptr)
  106. {
  107. return false;
  108. }
  109. audioData->appendData(srcData, dataSize);
  110. audioData->endTime = endTime;
  111. /* 获取读写锁 */
  112. QWriteLocker locker(m_pRwLock);
  113. m_listAudioSrcData.push_back(audioData);
  114. m_listDataSize += dataSize; // 更新缓存中的数据大小
  115. return true;
  116. }
  117. /* 初始化一些数据 */
  118. bool AssignSrcDataThread::initData()
  119. {
  120. /* 初始化数据 */
  121. m_listAudioSrcData.clear(); // 清空数据列表
  122. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  123. m_numChannels = GInfo.numChannels(); /* 声道数 */
  124. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  125. m_oneSecondSize = GInfo.oneSecondCount(); /* 每秒钟的音频数据大小 */
  126. m_dispatchSrcData = new AudioSrcData(m_oneSecondSize);
  127. /* 初始化读写锁 */
  128. m_pRwLock = new QReadWriteLock();
  129. return true;
  130. }
  131. /* 清理数据 */
  132. void AssignSrcDataThread::clearData()
  133. {
  134. if(m_dispatchSrcData != nullptr)
  135. {
  136. delete m_dispatchSrcData;
  137. m_dispatchSrcData = nullptr;
  138. }
  139. if(m_pCurrentSrcData != nullptr)
  140. {
  141. delete m_pCurrentSrcData;
  142. m_pCurrentSrcData = nullptr;
  143. }
  144. }
  145. /**
  146. * @brief 获取需要分派数据的线程
  147. * 1、获取的时候按照线程实时性、重要性等因素进行排序获取
  148. * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做)
  149. *
  150. * @return true
  151. * @return false
  152. */
  153. bool AssignSrcDataThread::getDispatchThread()
  154. {
  155. /* 根据生成数据文件的类型顺序获取 */
  156. /* 先获取生成wav小文件的线程 */
  157. auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  158. if(pWavThread != nullptr)
  159. {
  160. m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pWavThread));
  161. }
  162. /* 获取生成音量和反相数据的线程 */
  163. auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CheckDBAndPhase, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  164. if(pDBAndPhaseThread != nullptr)
  165. {
  166. m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pDBAndPhaseThread));
  167. }
  168. /* 获取生成长文件的线程 */
  169. auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  170. if(pLongFileThread != nullptr )
  171. {
  172. m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pLongFileThread));
  173. }
  174. m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  175. if(m_rtpSenderThread == nullptr)
  176. {
  177. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase);
  178. return false;
  179. }
  180. return true;
  181. }
  182. /* 判断是否满足1秒的数据 */
  183. bool AssignSrcDataThread::isFullOneSecondData() const
  184. {
  185. QReadLocker locker(m_pRwLock);
  186. /* 判断缓存中的数据是否满足1秒大小 */
  187. if(m_listDataSize + m_remainingDataSize >= m_oneSecondSize)
  188. {
  189. return true;
  190. }
  191. return false;
  192. }
  193. /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒
  194. * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */
  195. bool AssignSrcDataThread::processData()
  196. {
  197. m_dispatchSrcData->clear();
  198. while(true)
  199. {
  200. if(m_remainingDataSize > 0)
  201. {
  202. /* 如果剩余数据还有数据,则继续处理 */
  203. uint32_t nowIndex = m_pCurrentSrcData->dataSize - m_remainingDataSize; // 当前剩余数据的起始位置
  204. auto copySize = m_dispatchSrcData->appendData(m_pCurrentSrcData->pData + nowIndex, m_remainingDataSize);
  205. /* 更新结束时间 */
  206. if(copySize == m_remainingDataSize)
  207. {
  208. /* 如果拷贝的大小等于剩余数据大小,说明当前的数据缓冲区数据已经用完了,则直接使用当前数据的结束时间
  209. * 并删除这个缓冲区 */
  210. m_dispatchSrcData->endTime = m_pCurrentSrcData->endTime;
  211. delete m_pCurrentSrcData;
  212. m_pCurrentSrcData = nullptr;
  213. }else {
  214. /* 如果拷贝的大小小于剩余数据大小,则计算结束时间 */
  215. m_dispatchSrcData->endTime = previTime(m_pCurrentSrcData->endTime, m_remainingDataSize - copySize);
  216. }
  217. /* 更新剩余数据大小 */
  218. m_remainingDataSize -= copySize;
  219. } else
  220. {
  221. /* 取出最新的一个数据,这里只取出数据,不进行数据拷贝,数据处理进入下一个循环后再处理 */
  222. m_pRwLock->lockForRead();
  223. m_pCurrentSrcData = m_listAudioSrcData.front();
  224. m_listAudioSrcData.pop_front();
  225. m_pRwLock->unlock();
  226. if(m_pCurrentSrcData == nullptr)
  227. {
  228. break;
  229. }
  230. /* 更新队列中剩余的数据大小 */
  231. m_listDataSize -= m_pCurrentSrcData->dataSize;
  232. /* 更新剩余数据的大小 */
  233. m_remainingDataSize = m_pCurrentSrcData->dataSize;
  234. }
  235. if(m_dispatchSrcData->isFull())
  236. {
  237. break;
  238. }
  239. }
  240. /* 更新数据的开始时间 */
  241. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  242. return true;
  243. }
  244. /* 发送原始数据到Rtp中,实时发送,有新的就发送 */
  245. void AssignSrcDataThread::sendSrcDataToRtp()
  246. {
  247. QReadLocker locker(m_pRwLock);
  248. if(m_listAudioSrcData.empty())
  249. {
  250. return; // 如果没有数据,则直接返回
  251. }
  252. /* 如果发送数据和最新的数据不一样,则发送数据 */
  253. if(m_pSendSrcData == nullptr)
  254. {
  255. /* 还未发送过,直接发送所有数据 */
  256. for(auto& audioData : m_listAudioSrcData)
  257. {
  258. if(audioData != nullptr)
  259. {
  260. m_rtpSenderThread->setData(*audioData);
  261. m_pSendSrcData = audioData; // 更新已发送数据
  262. }
  263. }
  264. }else
  265. {
  266. /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */
  267. auto it = m_listAudioSrcData.begin();
  268. for(; it != m_listAudioSrcData.end(); ++it)
  269. {
  270. if(*it == m_pSendSrcData)
  271. {
  272. break; // 找到已发送数据的位置
  273. }
  274. }
  275. if(it != m_listAudioSrcData.end())
  276. {
  277. ++it; // 移动到下一个数据
  278. for(; it != m_listAudioSrcData.end(); ++it)
  279. {
  280. if(*it != nullptr)
  281. {
  282. m_rtpSenderThread->setData(**it);
  283. m_pSendSrcData = *it; // 更新已发送数据
  284. }
  285. }
  286. }else
  287. {
  288. /* 没有找到相等的,说明这个队列中是全新的数据,所有的都需要发送 */
  289. for(auto& audioData : m_listAudioSrcData)
  290. {
  291. if(audioData != nullptr)
  292. {
  293. m_rtpSenderThread->setData(*audioData);
  294. m_pSendSrcData = audioData; // 更新已发送数据
  295. }
  296. }
  297. }
  298. }
  299. }