AssignSrcDataThread.cpp 12 KB


  1. #include "AssignSrcDataThread.h"
  2. #include "ThreadManager.h"
  3. #include "AudioData.h"
  4. #include "GlobalInfo.h"
  5. #include "RecordThread.h"
  6. #include "spdlog.h"
  7. #include <QReadWriteLock>
  8. #include <QWriteLocker>
  9. #include <QReadLocker>
  10. #include <cstdint>
  11. #include <qreadwritelock.h>
  12. AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo)
  13. : BaseRecordThread(threadInfo)
  14. {
  15. }
  16. AssignSrcDataThread::~AssignSrcDataThread()
  17. {
  18. if(m_pRwLock != nullptr)
  19. {
  20. delete m_pRwLock;
  21. m_pRwLock = nullptr;
  22. }
  23. for(auto& audioData : m_listAudioSrcData)
  24. {
  25. if(audioData != nullptr)
  26. {
  27. delete audioData;
  28. audioData = nullptr;
  29. }
  30. }
  31. m_listAudioSrcData.clear();
  32. if(m_dispatchSrcData != nullptr)
  33. {
  34. delete m_dispatchSrcData;
  35. m_dispatchSrcData = nullptr;
  36. }
  37. }
  38. /* 设置数据,这里不用 */
  39. bool AssignSrcDataThread::setData(const AudioSrcData& srcData)
  40. {
  41. return true;
  42. }
  43. /**
  44. * @brief 设置数据,输入小于1秒的数据
  45. *
  46. * @param srcData
  47. * @param dataSize
  48. * @param endTime
  49. * @return true
  50. * @return false
  51. */
  52. bool AssignSrcDataThread::setSrcData(const char* srcData, int32_t dataSize, QDateTime& endTime)
  53. {
  54. AudioSrcData* audioData = new AudioSrcData(dataSize);
  55. if(audioData == nullptr)
  56. {
  57. return false;
  58. }
  59. audioData->appendData(srcData, dataSize);
  60. audioData->endTime = endTime;
  61. /* 获取读写锁 */
  62. QWriteLocker locker(m_pRwLock);
  63. m_listAudioSrcData.push_back(audioData);
  64. m_listDataSize += dataSize; // 更新缓存中的数据大小
  65. return true;
  66. }
  67. void AssignSrcDataThread::task()
  68. {
  69. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始分派数据线程 ", m_logBase);
  70. /* 初始化数据 */
  71. if(!initData())
  72. {
  73. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  74. return;
  75. }
  76. /* 获取需要分派数据的线程 */
  77. if(!getDispatchThread())
  78. {
  79. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取音频处理线程失败", m_logBase);
  80. return;
  81. }
  82. /* 将自身设置到录音线程中 */
  83. m_pRecordThread->setAssignSrcDataThread(this);
  84. m_isRunning = true;
  85. while(m_isRunning)
  86. {
  87. /* 休眠一段时间 */
  88. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  89. /*------------------------------------------------------------------------
  90. * 获取一个数据并处理数据,将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间
  91. *------------------------------------------------------------------------*/
  92. /* 将发送数据到Rtp线程 */
  93. sendSrcDataToRtp();
  94. /* 判断数据是否满足1s大小 */
  95. if(!isFullOneSecondData())
  96. {
  97. continue; // 如果不满足1s大小,则继续等待
  98. }
  99. /* 处理数据,将其拼接成1s的数据 */
  100. processData();
  101. /* 处理数据时间戳,计算开始时间 */
  102. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  103. /*------------------------------------------------------------------------
  104. * 分派数据
  105. *------------------------------------------------------------------------*/
  106. /* 分派数据给各个线程 */
  107. for(auto& dispatchThread : m_listDispatchThreads)
  108. {
  109. if(dispatchThread != nullptr)
  110. {
  111. dispatchThread->setData(*m_dispatchSrcData);
  112. }
  113. }
  114. /* 清空分派数据 */
  115. m_dispatchSrcData->clear();
  116. }
  117. /* 清理数据 */
  118. clearData();
  119. SPDLOG_LOGGER_WARN(m_logger, "➢ {} 结束分派数据线程", m_logBase);
  120. }
  121. /* 初始化一些数据 */
  122. bool AssignSrcDataThread::initData()
  123. {
  124. /* 初始化数据 */
  125. m_listAudioSrcData.clear(); // 清空数据列表
  126. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  127. m_numChannels = GInfo.numChannels(); /* 声道数 */
  128. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  129. m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8); /* 1秒数据大小 */
  130. m_dispatchSrcData = new AudioSrcData(m_oneSecondSize);
  131. /* 初始化读写锁 */
  132. m_pRwLock = new QReadWriteLock();
  133. return true;
  134. }
  135. /* 清理数据 */
  136. void AssignSrcDataThread::clearData()
  137. {
  138. if(m_dispatchSrcData != nullptr)
  139. {
  140. delete m_dispatchSrcData;
  141. m_dispatchSrcData = nullptr;
  142. }
  143. if(m_pCurrentSrcData != nullptr)
  144. {
  145. delete m_pCurrentSrcData;
  146. m_pCurrentSrcData = nullptr;
  147. }
  148. }
  149. /**
  150. * @brief 获取需要分派数据的线程
  151. * 1、获取的时候按照线程实时性、重要性等因素进行排序获取
  152. * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做)
  153. *
  154. * @return true
  155. * @return false
  156. */
  157. bool AssignSrcDataThread::getDispatchThread()
  158. {
  159. /* 根据生成数据文件的类型顺序获取,循环获取,直到所有的线程都获取到 */
  160. /* 先获取生成wav小文件的线程 */
  161. auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  162. if(pWavThread != nullptr)
  163. {
  164. m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pWavThread));
  165. }
  166. /* 获取生成音量和反相数据的线程 */
  167. auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CreateDB, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  168. if(pDBAndPhaseThread != nullptr)
  169. {
  170. m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pDBAndPhaseThread));
  171. }
  172. /* 获取生成长文件的线程 */
  173. auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  174. if(pLongFileThread != nullptr )
  175. {
  176. m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pLongFileThread));
  177. }
  178. /* 获取发送RTP数据的线程 */
  179. m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  180. if(m_rtpSenderThread == nullptr)
  181. {
  182. // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase);
  183. // return false;
  184. }
  185. /* 最后获取录音线程信息,如果获取不到则一直获取 */
  186. while(true)
  187. {
  188. auto pThreadBase = ThreadMan.findRecordThread(EThreadType::Type_RecordSrc, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  189. if(pThreadBase != nullptr)
  190. {
  191. m_pRecordThread = dynamic_cast<RecordThread*>(pThreadBase);
  192. break; // 找到录音线程了
  193. }
  194. /* 如果没有找到录音线程,则等待一段时间再继续查找 */
  195. std::this_thread::sleep_for(std::chrono::milliseconds(5));
  196. }
  197. return true;
  198. }
  199. /* 判断是否满足1秒的数据 */
  200. bool AssignSrcDataThread::isFullOneSecondData() const
  201. {
  202. QReadLocker locker(m_pRwLock);
  203. /* 判断缓存中的数据是否满足1秒大小 */
  204. if(m_listDataSize + m_remainingDataSize >= m_oneSecondSize)
  205. {
  206. return true;
  207. }
  208. return false;
  209. }
  210. /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒
  211. * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */
  212. bool AssignSrcDataThread::processData()
  213. {
  214. m_dispatchSrcData->clear();
  215. while(true)
  216. {
  217. if(m_remainingDataSize > 0)
  218. {
  219. /* 如果剩余数据还有数据,则继续处理 */
  220. int32_t nowIndex = m_pCurrentSrcData->dataSize - m_remainingDataSize; // 当前剩余数据的起始位置
  221. auto copySize = m_dispatchSrcData->appendData(m_pCurrentSrcData->pData + nowIndex, m_remainingDataSize);
  222. /* 更新结束时间 */
  223. if(copySize == m_remainingDataSize)
  224. {
  225. /* 如果拷贝的大小等于剩余数据大小,说明当前的数据缓冲区数据已经用完了,则直接使用当前数据的结束时间
  226. * 并删除这个缓冲区 */
  227. m_dispatchSrcData->endTime = m_pCurrentSrcData->endTime;
  228. delete m_pCurrentSrcData;
  229. m_pCurrentSrcData = nullptr;
  230. }else {
  231. /* 如果拷贝的大小小于剩余数据大小,则计算结束时间 */
  232. m_dispatchSrcData->endTime = previTime(m_pCurrentSrcData->endTime, m_remainingDataSize - copySize);
  233. }
  234. /* 更新剩余数据大小 */
  235. m_remainingDataSize -= copySize;
  236. } else
  237. {
  238. /* 取出最新的一个数据,这里只取出数据,不进行数据拷贝,数据处理进入下一个循环后再处理 */
  239. m_pRwLock->lockForRead();
  240. m_pCurrentSrcData = m_listAudioSrcData.front();
  241. m_listAudioSrcData.pop_front();
  242. m_pRwLock->unlock();
  243. if(m_pCurrentSrcData == nullptr)
  244. {
  245. break;
  246. }
  247. /* 更新队列中剩余的数据大小 */
  248. m_listDataSize -= m_pCurrentSrcData->dataSize;
  249. /* 更新剩余数据的大小 */
  250. m_remainingDataSize = m_pCurrentSrcData->dataSize;
  251. }
  252. if(m_dispatchSrcData->isFull())
  253. {
  254. break;
  255. }
  256. }
  257. /* 更新数据的开始时间 */
  258. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  259. return true;
  260. }
  261. /* 发送原始数据到Rtp中,实时发送,有新的就发送 */
  262. void AssignSrcDataThread::sendSrcDataToRtp()
  263. {
  264. QReadLocker locker(m_pRwLock);
  265. if(m_rtpSenderThread == nullptr)
  266. {
  267. return; // 如果没有RTP发送线程,则直接返回
  268. }
  269. if(m_listAudioSrcData.empty())
  270. {
  271. return; // 如果没有数据,则直接返回
  272. }
  273. /* 如果发送数据和最新的数据不一样,则发送数据 */
  274. if(m_pSendSrcData == nullptr)
  275. {
  276. /* 还未发送过,直接发送所有数据 */
  277. for(auto& audioData : m_listAudioSrcData)
  278. {
  279. if(audioData != nullptr)
  280. {
  281. m_rtpSenderThread->setData(*audioData);
  282. m_pSendSrcData = audioData; // 更新已发送数据
  283. }
  284. }
  285. }else
  286. {
  287. /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */
  288. auto it = m_listAudioSrcData.begin();
  289. for(; it != m_listAudioSrcData.end(); ++it)
  290. {
  291. if(*it == m_pSendSrcData)
  292. {
  293. break; // 找到已发送数据的位置
  294. }
  295. }
  296. if(it != m_listAudioSrcData.end())
  297. {
  298. ++it; // 移动到下一个数据
  299. for(; it != m_listAudioSrcData.end(); ++it)
  300. {
  301. if(*it != nullptr)
  302. {
  303. m_rtpSenderThread->setData(**it);
  304. m_pSendSrcData = *it; // 更新已发送数据
  305. }
  306. }
  307. }else
  308. {
  309. /* 没有找到相等的,说明这个队列中是全新的数据,所有的都需要发送 */
  310. for(auto& audioData : m_listAudioSrcData)
  311. {
  312. if(audioData != nullptr)
  313. {
  314. m_rtpSenderThread->setData(*audioData);
  315. m_pSendSrcData = audioData; // 更新已发送数据
  316. }
  317. }
  318. }
  319. }
  320. }