123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- #include "AssignSrcDataThread.h"
- #include "ThreadManager.h"
- #include "AudioData.h"
- #include "GlobalInfo.h"
- #include <QReadWriteLock>
- #include <QWriteLocker>
- #include <QReadLocker>
- #include <cstdint>
- #include <qreadwritelock.h>
- AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo)
- : BaseRecordThread(threadInfo)
- {
- }
- AssignSrcDataThread::~AssignSrcDataThread()
- {
- if(m_pRwLock != nullptr)
- {
- delete m_pRwLock;
- m_pRwLock = nullptr;
- }
- for(auto& audioData : m_listAudioSrcData)
- {
- if(audioData != nullptr)
- {
- delete audioData;
- audioData = nullptr;
- }
- }
- m_listAudioSrcData.clear();
- if(m_dispatchSrcData != nullptr)
- {
- delete m_dispatchSrcData;
- m_dispatchSrcData = nullptr;
- }
- }
- void AssignSrcDataThread::threadTask()
- {
- SPDLOG_LOGGER_INFO(m_logger, "{} 开始分派数据线程", m_logBase);
- /* 初始化数据 */
- if(!initData())
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
- return;
- }
- /* 获取需要分派数据的线程 */
- if(!getDispatchThread())
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 获取分派线程失败", m_logBase);
- return;
- }
-
- m_isRunning = true;
- while(m_isRunning)
- {
- /* 休眠一段时间 */
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
-
- /*------------------------------------------------------------------------
- * 获取一个数据并处理数据,将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间
- *------------------------------------------------------------------------*/
- /* 将发送数据到Rtp线程 */
- sendSrcDataToRtp();
-
- /* 判断数据是否满足1s大小 */
- if(!isFullOneSecondData())
- {
- continue; // 如果不满足1s大小,则继续等待
- }
- /* 处理数据,将其拼接成1s的数据 */
- processData();
- /* 处理数据时间戳,计算开始时间 */
- m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
- /*------------------------------------------------------------------------
- * 分派数据
- *------------------------------------------------------------------------*/
- /* 分派数据给各个线程 */
- for(auto& dispatchThread : m_listDispatchThreads)
- {
- if(dispatchThread != nullptr)
- {
- dispatchThread->setData(*m_dispatchSrcData);
- }
- }
- /* 清空分派数据 */
- m_dispatchSrcData->clear();
- }
- /* 清理数据 */
- clearData();
- SPDLOG_LOGGER_INFO(m_logger, "{} 结束分派数据线程", m_logBase);
- }
- /* 设置数据,这里不用 */
- bool AssignSrcDataThread::setData(const AudioSrcData& srcData)
- {
- return true;
- }
- /**
- * @brief 设置数据,输入小于1秒的数据
- *
- * @param srcData
- * @param dataSize
- * @param endTime
- * @return true
- * @return false
- */
- bool AssignSrcDataThread::setSrcData(const char* srcData, uint32_t dataSize, QDateTime& endTime)
- {
- AudioSrcData* audioData = new AudioSrcData(dataSize);
- if(audioData == nullptr)
- {
- return false;
- }
- audioData->appendData(srcData, dataSize);
- audioData->endTime = endTime;
- /* 获取读写锁 */
- QWriteLocker locker(m_pRwLock);
- m_listAudioSrcData.push_back(audioData);
- m_listDataSize += dataSize; // 更新缓存中的数据大小
- return true;
- }
- /* 初始化一些数据 */
- bool AssignSrcDataThread::initData()
- {
- /* 初始化数据 */
- m_listAudioSrcData.clear(); // 清空数据列表
- m_sampleRate = GInfo.sampleRate(); /* 采样率 */
- m_numChannels = GInfo.numChannels(); /* 声道数 */
- m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
- m_oneSecondSize = GInfo.oneSecondCount(); /* 每秒钟的音频数据大小 */
- m_dispatchSrcData = new AudioSrcData(m_oneSecondSize);
- /* 初始化读写锁 */
- m_pRwLock = new QReadWriteLock();
- return true;
- }
- /* 清理数据 */
- void AssignSrcDataThread::clearData()
- {
- if(m_dispatchSrcData != nullptr)
- {
- delete m_dispatchSrcData;
- m_dispatchSrcData = nullptr;
- }
- if(m_pCurrentSrcData != nullptr)
- {
- delete m_pCurrentSrcData;
- m_pCurrentSrcData = nullptr;
- }
-
-
- }
- /**
- * @brief 获取需要分派数据的线程
- * 1、获取的时候按照线程实时性、重要性等因素进行排序获取
- * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做)
- *
- * @return true
- * @return false
- */
- bool AssignSrcDataThread::getDispatchThread()
- {
- /* 根据生成数据文件的类型顺序获取 */
- /* 先获取生成wav小文件的线程 */
- auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
- if(pWavThread != nullptr)
- {
- m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pWavThread));
- }
- /* 获取生成音量和反相数据的线程 */
- auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CheckDBAndPhase, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
- if(pDBAndPhaseThread != nullptr)
- {
- m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pDBAndPhaseThread));
- }
- /* 获取生成长文件的线程 */
- auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
- if(pLongFileThread != nullptr )
- {
- m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pLongFileThread));
- }
- m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
- if(m_rtpSenderThread == nullptr)
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase);
- return false;
- }
- return true;
- }
- /* 判断是否满足1秒的数据 */
- bool AssignSrcDataThread::isFullOneSecondData() const
- {
- QReadLocker locker(m_pRwLock);
- /* 判断缓存中的数据是否满足1秒大小 */
- if(m_listDataSize + m_remainingDataSize >= m_oneSecondSize)
- {
- return true;
- }
- return false;
- }
- /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒
- * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */
- bool AssignSrcDataThread::processData()
- {
-
- m_dispatchSrcData->clear();
- while(true)
- {
- if(m_remainingDataSize > 0)
- {
- /* 如果剩余数据还有数据,则继续处理 */
- uint32_t nowIndex = m_pCurrentSrcData->dataSize - m_remainingDataSize; // 当前剩余数据的起始位置
- auto copySize = m_dispatchSrcData->appendData(m_pCurrentSrcData->pData + nowIndex, m_remainingDataSize);
- /* 更新结束时间 */
- if(copySize == m_remainingDataSize)
- {
- /* 如果拷贝的大小等于剩余数据大小,说明当前的数据缓冲区数据已经用完了,则直接使用当前数据的结束时间
- * 并删除这个缓冲区 */
- m_dispatchSrcData->endTime = m_pCurrentSrcData->endTime;
- delete m_pCurrentSrcData;
- m_pCurrentSrcData = nullptr;
- }else {
- /* 如果拷贝的大小小于剩余数据大小,则计算结束时间 */
- m_dispatchSrcData->endTime = previTime(m_pCurrentSrcData->endTime, m_remainingDataSize - copySize);
- }
- /* 更新剩余数据大小 */
- m_remainingDataSize -= copySize;
- } else
- {
- /* 取出最新的一个数据,这里只取出数据,不进行数据拷贝,数据处理进入下一个循环后再处理 */
- m_pRwLock->lockForRead();
- m_pCurrentSrcData = m_listAudioSrcData.front();
- m_listAudioSrcData.pop_front();
- m_pRwLock->unlock();
- if(m_pCurrentSrcData == nullptr)
- {
- break;
- }
- /* 更新队列中剩余的数据大小 */
- m_listDataSize -= m_pCurrentSrcData->dataSize;
- /* 更新剩余数据的大小 */
- m_remainingDataSize = m_pCurrentSrcData->dataSize;
- }
- if(m_dispatchSrcData->isFull())
- {
- break;
- }
- }
- /* 更新数据的开始时间 */
- m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
- return true;
- }
- /* 发送原始数据到Rtp中,实时发送,有新的就发送 */
- void AssignSrcDataThread::sendSrcDataToRtp()
- {
- QReadLocker locker(m_pRwLock);
- if(m_listAudioSrcData.empty())
- {
- return; // 如果没有数据,则直接返回
- }
- /* 如果发送数据和最新的数据不一样,则发送数据 */
- if(m_pSendSrcData == nullptr)
- {
- /* 还未发送过,直接发送所有数据 */
- for(auto& audioData : m_listAudioSrcData)
- {
- if(audioData != nullptr)
- {
- m_rtpSenderThread->setData(*audioData);
- m_pSendSrcData = audioData; // 更新已发送数据
- }
- }
- }else
- {
- /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */
- auto it = m_listAudioSrcData.begin();
- for(; it != m_listAudioSrcData.end(); ++it)
- {
- if(*it == m_pSendSrcData)
- {
- break; // 找到已发送数据的位置
- }
- }
- if(it != m_listAudioSrcData.end())
- {
- ++it; // 移动到下一个数据
- for(; it != m_listAudioSrcData.end(); ++it)
- {
- if(*it != nullptr)
- {
- m_rtpSenderThread->setData(**it);
- m_pSendSrcData = *it; // 更新已发送数据
- }
- }
- }else
- {
- /* 没有找到相等的,说明这个队列中是全新的数据,所有的都需要发送 */
- for(auto& audioData : m_listAudioSrcData)
- {
- if(audioData != nullptr)
- {
- m_rtpSenderThread->setData(*audioData);
- m_pSendSrcData = audioData; // 更新已发送数据
- }
- }
- }
- }
- }
|