AssignSrcDataThread.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. #include "AssignSrcDataThread.h"
  2. #include "ThreadManager.h"
  3. #include "AudioData.h"
  4. #include "GlobalInfo.h"
  5. #include "RecordThread.h"
  6. #include "spdlog.h"
  7. #include <QReadWriteLock>
  8. #include <QWriteLocker>
  9. #include <QReadLocker>
  10. #include <cstdint>
  11. #include "CreateDBThread.h"
  12. #include "CreateWAVThread.h"
  13. #include "CreateRecordFileThread.h"
  14. AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo)
  15. : BaseRecordThread(threadInfo)
  16. {
  17. }
  18. AssignSrcDataThread::~AssignSrcDataThread()
  19. {
  20. // if(m_pRwLock != nullptr)
  21. // {
  22. // delete m_pRwLock;
  23. // m_pRwLock = nullptr;
  24. // }
  25. for(auto& audioData : m_listAudioSrcData)
  26. {
  27. if(audioData != nullptr)
  28. {
  29. delete audioData;
  30. audioData = nullptr;
  31. }
  32. }
  33. m_listAudioSrcData.clear();
  34. if(m_dispatchSrcData != nullptr)
  35. {
  36. delete m_dispatchSrcData;
  37. m_dispatchSrcData = nullptr;
  38. }
  39. }
  40. /* 停止线程 */
  41. void AssignSrcDataThread::thread_stop()
  42. {
  43. m_isRunning = false;
  44. m_condDataUpdate.notify_all(); // 通知所有等待的线程
  45. }
  46. /* 停止线程 */
  47. void AssignSrcDataThread::thread_stop_block()
  48. {
  49. thread_stop();
  50. while(m_isStoped.load() == false)
  51. {
  52. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  53. }
  54. }
  55. /* 设置数据,这里不用 */
  56. bool AssignSrcDataThread::setData(const AudioSrcData& srcData)
  57. {
  58. return true;
  59. }
  60. /**
  61. * @brief 设置数据,输入小于1秒的数据
  62. *
  63. * @param srcData
  64. * @param dataSize
  65. * @param endTime
  66. * @return true
  67. * @return false
  68. */
  69. bool AssignSrcDataThread::setSrcData(const char* srcData, int32_t dataSize, QDateTime& endTime)
  70. {
  71. AudioSrcData* audioData = new AudioSrcData(dataSize);
  72. if(audioData == nullptr)
  73. {
  74. return false;
  75. }
  76. audioData->appendData(srcData, dataSize);
  77. audioData->endTime = endTime;
  78. /* 获取读写锁 */
  79. {
  80. // std::lock_guard<QReadWriteLock> lock(m_pRwLock);
  81. std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
  82. m_listAudioSrcData.push_back(audioData);
  83. m_listDataSize += dataSize; // 更新缓存中的数据大小
  84. m_isDataUpdate.store(true);
  85. }
  86. m_condDataUpdate.notify_one();
  87. // SPDLOG_LOGGER_WARN(m_logger, "{} 收到音频数据: 大小: {}, 时间: {}", m_logBase, dataSize, endTime.toString().toStdString());
  88. return true;
  89. }
  90. void AssignSrcDataThread::task()
  91. {
  92. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始分派数据线程 ", m_logBase);
  93. /* 初始化数据 */
  94. if(!initData())
  95. {
  96. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  97. return;
  98. }
  99. /* 获取需要分派数据的线程 */
  100. if(!getDispatchThread())
  101. {
  102. SPDLOG_LOGGER_ERROR(m_logger, "{} 获取音频处理线程失败", m_logBase);
  103. return;
  104. }
  105. m_isRunning = true;
  106. while(m_isRunning)
  107. {
  108. /* 休眠一段时间 */
  109. // std::this_thread::sleep_for(std::chrono::milliseconds(10));
  110. std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
  111. m_condDataUpdate.wait(lock, [this] {
  112. return (m_isDataUpdate.load() || !m_isRunning.load());
  113. });
  114. m_isDataUpdate.store(false);
  115. // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
  116. /*------------------------------------------------------------------------
  117. * 分派实时数据
  118. *------------------------------------------------------------------------*/
  119. /* 获取最新的数据,给其添加开始时间戳 */
  120. if(m_listAudioSrcData.size() == 0)
  121. {
  122. /* 在线程退出的时候会唤醒上面的条件变量,此时队列可能是空的 */
  123. continue;
  124. }
  125. auto latestData = m_listAudioSrcData.back();
  126. if(latestData == nullptr || latestData->isEmpty())
  127. {
  128. SPDLOG_LOGGER_ERROR(m_logger, "{} 分派数据线程获取到空数据", m_logBase);
  129. continue;
  130. }
  131. latestData->startTime = previTime(latestData->endTime, latestData->dataSize);
  132. sendRealTimeSrcData(*latestData);
  133. /*------------------------------------------------------------------------
  134. * 将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间
  135. *------------------------------------------------------------------------*/
  136. /* 判断数据是否满足1s大小 */
  137. if(!isFullOneSecondData())
  138. {
  139. continue;
  140. }
  141. /* 处理数据,将其拼接成1s的数据 */
  142. processData();
  143. // SPDLOG_LOGGER_TRACE(m_logger, "{} 开始分发数据,开始时间: {}, 结束时间: {}, 大小: {}",
  144. // m_logBase, m_dispatchSrcData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(),
  145. // m_dispatchSrcData->endTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), m_dispatchSrcData->dataSize);
  146. /*------------------------------------------------------------------------
  147. * 分派常规数据
  148. *------------------------------------------------------------------------*/
  149. /* 分派数据给各个线程 */
  150. sendRegularData(*m_dispatchSrcData);
  151. /* 清空分派数据 */
  152. m_dispatchSrcData->clear();
  153. // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now();
  154. // auto duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - startTime);
  155. // SPDLOG_LOGGER_TRACE(m_logger, "{} 分派数据耗时: {}us", m_logBase, duration.count());
  156. }
  157. /* 清理数据 */
  158. clearData();
  159. SPDLOG_LOGGER_WARN(m_logger, "➢ {} 结束分派数据线程", m_logBase);
  160. }
  161. /* 初始化一些数据 */
  162. bool AssignSrcDataThread::initData()
  163. {
  164. /* 初始化数据 */
  165. m_listAudioSrcData.clear(); // 清空数据列表
  166. m_sampleRate = GInfo.sampleRate(); /* 采样率 */
  167. m_numChannels = GInfo.numChannels(); /* 声道数 */
  168. m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
  169. m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8); /* 1秒数据大小 */
  170. m_dispatchSrcData = new AudioSrcData(m_oneSecondSize);
  171. m_lastSendTime = QDateTime::currentDateTime(); // 初始化最后发送时间
  172. /* 初始化读写锁 */
  173. // m_pRwLock = new QReadWriteLock();
  174. return true;
  175. }
  176. /* 清理数据 */
  177. void AssignSrcDataThread::clearData()
  178. {
  179. std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
  180. if(m_dispatchSrcData != nullptr)
  181. {
  182. delete m_dispatchSrcData;
  183. m_dispatchSrcData = nullptr;
  184. }
  185. for(auto& it : m_listAudioSrcData)
  186. {
  187. if(it != nullptr)
  188. {
  189. delete it;
  190. it = nullptr;
  191. }
  192. }
  193. m_listAudioSrcData.clear();
  194. }
  195. /**
  196. * @brief 获取需要分派数据的线程
  197. * 1、获取的时候按照线程实时性、重要性等因素进行排序获取
  198. * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做)
  199. *
  200. * @return true
  201. * @return false
  202. */
  203. bool AssignSrcDataThread::getDispatchThread()
  204. {
  205. /* 根据生成数据文件的类型顺序获取,循环获取,直到所有的线程都获取到 */
  206. /* 先获取生成wav小文件的线程 */
  207. auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.pcmInfo.strPCMName);
  208. if(pWavThread != nullptr)
  209. {
  210. m_pThreadCreateWAV = dynamic_cast<CreateWAVThread*>(pWavThread);
  211. }
  212. /* 获取生成音量数据的线程 */
  213. auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CreateDB, m_threadInfo.cardRoadInfo.pcmInfo.strPCMName);
  214. if(pDBAndPhaseThread != nullptr)
  215. {
  216. m_pThreadCreateDB = dynamic_cast<CreateDBThread*>(pDBAndPhaseThread);
  217. }
  218. /* 获取生成长文件的线程 */
  219. auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.pcmInfo.strPCMName);
  220. if(pLongFileThread != nullptr )
  221. {
  222. m_pThreadCreateLongFile = dynamic_cast<CreateRecordFileThread*>(pLongFileThread);
  223. }
  224. /* 获取发送RTP数据的线程 */
  225. m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.pcmInfo.strPCMName);
  226. if(m_rtpSenderThread == nullptr)
  227. {
  228. // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase);
  229. // return false;
  230. }
  231. return true;
  232. }
  233. /* 发送常规数据,对实时性要求不高的数据,数据时长1秒 */
  234. void AssignSrcDataThread::sendRegularData(const AudioSrcData& srcData)
  235. {
  236. /* 发送计算音量的数据 */
  237. if(m_pThreadCreateWAV != nullptr)
  238. {
  239. m_pThreadCreateWAV->setData(srcData);
  240. }
  241. /* 发送生成wav小文件的数据 */
  242. if(m_pThreadCreateDB != nullptr)
  243. {
  244. m_pThreadCreateDB->setData(srcData);
  245. }
  246. /* 发送生成长文件的数据 */
  247. if(m_pThreadCreateLongFile != nullptr)
  248. {
  249. m_pThreadCreateLongFile->setData(srcData);
  250. }
  251. }
  252. /* 判断是否满足1秒的数据 */
  253. bool AssignSrcDataThread::isFullOneSecondData() const
  254. {
  255. // QReadLocker locker(m_pRwLock);
  256. /* 判断缓存中的数据是否满足1秒大小 */
  257. if(m_listDataSize + m_remainingData.size() >= m_oneSecondSize)
  258. {
  259. return true;
  260. }
  261. return false;
  262. }
  263. /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒
  264. * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */
  265. bool AssignSrcDataThread::processData()
  266. {
  267. /* 处理数据 */
  268. m_dispatchSrcData->clear();
  269. while(true)
  270. {
  271. if(m_remainingData.size() > 0)
  272. {
  273. /* 拷贝剩余数据 */
  274. int32_t copySize = m_dispatchSrcData->appendData(m_remainingData.data(), m_remainingData.size());
  275. if(copySize < m_remainingData.size())
  276. {
  277. /* 剩余数据没拷贝完,可能已经满足一次了 */
  278. m_remainingData = m_remainingData.mid(copySize, m_remainingData.size() - copySize);
  279. m_dispatchSrcData->endTime = previTime(m_lastDataEndTime, m_remainingData.size());
  280. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  281. }else {
  282. /* 剩余数据拷贝完了,清空剩余数据 */
  283. m_remainingData.clear();
  284. m_dispatchSrcData->endTime = m_lastDataEndTime;
  285. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  286. }
  287. if(m_dispatchSrcData->isFull())
  288. {
  289. /* 如果已经满足1秒大小,则退出循环 */
  290. break;
  291. }
  292. }else
  293. {
  294. /* 没有剩余数据了,拷贝全新的数据 */
  295. auto pData = m_listAudioSrcData.front();
  296. m_listAudioSrcData.pop_front();
  297. // SPDLOG_LOGGER_TRACE(m_logger, "{} 一条数据 大小: {}, 时间: {}",
  298. // m_logBase, pData->dataSize, pData->endTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString());
  299. if(pData == nullptr)
  300. {
  301. continue;
  302. }
  303. int copySize = m_dispatchSrcData->appendData(pData->pData, pData->dataSize);
  304. if(m_dispatchSrcData->isFull())
  305. {
  306. /* 判断有无剩余数据 */
  307. if(copySize < pData->dataSize)
  308. {
  309. m_remainingData = QByteArray(pData->pData + copySize, pData->dataSize - copySize);
  310. m_lastDataEndTime = pData->endTime;
  311. } else {
  312. m_remainingData.clear();
  313. m_lastDataEndTime = QDateTime();
  314. }
  315. /* 更新结束时间 */
  316. m_dispatchSrcData->endTime = pData->endTime;
  317. /* 删除数据 */
  318. delete pData;
  319. pData = nullptr;
  320. m_listDataSize -= copySize;
  321. break;
  322. }
  323. delete pData;
  324. pData = nullptr;
  325. m_listDataSize -= copySize;
  326. }
  327. }
  328. /* 更新数据的开始时间 */
  329. m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
  330. // SPDLOG_LOGGER_TRACE(m_logger, "{} 一秒的数据拼接完成, 开始时间: {}, 结束时间: {}, 大小: {}",
  331. // m_logBase, m_dispatchSrcData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(),
  332. // m_dispatchSrcData->endTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), m_dispatchSrcData->dataSize);
  333. return true;
  334. }
  335. /* 发送实时音频数据 */
  336. void AssignSrcDataThread::sendRealTimeSrcData(const AudioSrcData& srcData)
  337. {
  338. /* 发送实时数据到RTP中 */
  339. if(m_rtpSenderThread != nullptr)
  340. {
  341. m_rtpSenderThread->setData(srcData);
  342. }
  343. /* 发送实时数据到创建音量包的线程 */
  344. if(m_pThreadCreateDB != nullptr)
  345. {
  346. m_pThreadCreateDB->setRealTimeData(srcData);
  347. }
  348. }
  349. /* 发送原始数据到Rtp中,实时发送,有新的就发送 */
  350. void AssignSrcDataThread::sendSrcDataToRtp(const AudioSrcData& srcData)
  351. {
  352. // QReadLocker locker(m_pRwLock);
  353. if(m_rtpSenderThread == nullptr)
  354. {
  355. return; // 如果没有RTP发送线程,则直接返回
  356. }
  357. m_rtpSenderThread->setData(srcData);
  358. // if(m_listAudioSrcData.empty())
  359. // {
  360. // return; // 如果没有数据,则直接返回
  361. // }
  362. // SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送RTP数据,队列大小: {}, 开始时间: {}, 当前时间:{}",
  363. // m_logBase, m_listAudioSrcData.count(), m_listAudioSrcData.back()->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(),
  364. // m_lastSendTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString());
  365. // auto it = m_listAudioSrcData.end();
  366. // if(it != m_listAudioSrcData.begin())
  367. // {
  368. // --it; // 移动到最后一个数据
  369. // }
  370. // /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */
  371. // for(; it != m_listAudioSrcData.begin(); --it)
  372. // {
  373. // if((*it)->startTime == m_lastSendTime)
  374. // {
  375. // break; // 找到已发送数据的位置
  376. // }
  377. // }
  378. // if(it == m_listAudioSrcData.begin())
  379. // {
  380. // SPDLOG_LOGGER_TRACE(m_logger, "{} RTP线程需要拷贝全部数据", m_logBase);
  381. // }
  382. // /* 开始发送数据 */
  383. // for(; it != m_listAudioSrcData.end(); ++it)
  384. // {
  385. // if(*it != nullptr)
  386. // {
  387. // m_rtpSenderThread->setData(**it);
  388. // m_lastSendTime = (*it)->startTime; // 更新已发送数据
  389. // }
  390. // }
  391. }