123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- #include "AssignSrcDataThread.h"
- #include "ThreadManager.h"
- #include "AudioData.h"
- #include "GlobalInfo.h"
- #include "RecordThread.h"
- #include "spdlog.h"
- #include <QReadWriteLock>
- #include <QWriteLocker>
- #include <QReadLocker>
- #include <cstdint>
- #include "CreateDBThread.h"
- #include "CreateWAVThread.h"
- #include "CreateLongFileThread.h"
- AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo)
- : BaseRecordThread(threadInfo)
- {
- }
- AssignSrcDataThread::~AssignSrcDataThread()
- {
- // if(m_pRwLock != nullptr)
- // {
- // delete m_pRwLock;
- // m_pRwLock = nullptr;
- // }
- for(auto& audioData : m_listAudioSrcData)
- {
- if(audioData != nullptr)
- {
- delete audioData;
- audioData = nullptr;
- }
- }
- m_listAudioSrcData.clear();
- if(m_dispatchSrcData != nullptr)
- {
- delete m_dispatchSrcData;
- m_dispatchSrcData = nullptr;
- }
- }
- /* 停止线程 */
- void AssignSrcDataThread::stopThread()
- {
- m_isRunning = false;
- m_condDataUpdate.notify_all(); // 通知所有等待的线程
- }
- /* 设置数据,这里不用 */
- bool AssignSrcDataThread::setData(const AudioSrcData& srcData)
- {
- return true;
- }
- /**
- * @brief 设置数据,输入小于1秒的数据
- *
- * @param srcData
- * @param dataSize
- * @param endTime
- * @return true
- * @return false
- */
- bool AssignSrcDataThread::setSrcData(const char* srcData, int32_t dataSize, QDateTime& endTime)
- {
- AudioSrcData* audioData = new AudioSrcData(dataSize);
- if(audioData == nullptr)
- {
- return false;
- }
- audioData->appendData(srcData, dataSize);
- audioData->endTime = endTime;
- /* 获取读写锁 */
- {
- // std::lock_guard<QReadWriteLock> lock(m_pRwLock);
- std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
- m_listAudioSrcData.push_back(audioData);
- m_listDataSize += dataSize; // 更新缓存中的数据大小
- m_isDataUpdate.store(true);
- }
- m_condDataUpdate.notify_one();
- // SPDLOG_LOGGER_WARN(m_logger, "{} 收到音频数据: 大小: {}, 时间: {}", m_logBase, dataSize, endTime.toString().toStdString());
- return true;
- }
- void AssignSrcDataThread::task()
- {
- SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始分派数据线程 ", m_logBase);
- /* 初始化数据 */
- if(!initData())
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
- return;
- }
- /* 获取需要分派数据的线程 */
- if(!getDispatchThread())
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 获取音频处理线程失败", m_logBase);
- return;
- }
- /* 将自身设置到录音线程中 */
- m_pThreadRecord->setAssignSrcDataThread(this);
-
- m_isRunning = true;
- while(m_isRunning)
- {
- /* 休眠一段时间 */
- // std::this_thread::sleep_for(std::chrono::milliseconds(10));
- std::unique_lock<std::mutex> lock(m_mutexDataUpdate);
- m_condDataUpdate.wait(lock, [this] {
- return (m_isDataUpdate.load() || !m_isRunning.load());
- });
- m_isDataUpdate.store(false);
- // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
- /*------------------------------------------------------------------------
- * 分派实时数据
- *------------------------------------------------------------------------*/
- /* 获取最新的数据,给其添加开始时间戳 */
- auto latestData = m_listAudioSrcData.back();
- if(latestData == nullptr || latestData->isEmpty())
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 分派数据线程获取到空数据", m_logBase);
- continue;
- }
- latestData->startTime = previTime(latestData->endTime, latestData->dataSize);
- /* 将发送数据到Rtp线程 */
- sendSrcDataToRtp(*latestData);
- /*------------------------------------------------------------------------
- * 将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间
- *------------------------------------------------------------------------*/
- /* 判断数据是否满足1s大小 */
- if(!isFullOneSecondData())
- {
- continue; // 如果不满足1s大小,则继续等待
- }
- /* 处理数据,将其拼接成1s的数据 */
- processData();
- /* 处理数据时间戳,计算开始时间 */
- m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
- /*------------------------------------------------------------------------
- * 分派常规数据
- *------------------------------------------------------------------------*/
- /* 分派数据给各个线程 */
- sendRegularData(*m_dispatchSrcData);
- /* 清空分派数据 */
- m_dispatchSrcData->clear();
- // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now();
- // auto duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - startTime);
- // SPDLOG_LOGGER_TRACE(m_logger, "{} 分派数据耗时: {}us", m_logBase, duration.count());
- }
- /* 清理数据 */
- clearData();
- SPDLOG_LOGGER_WARN(m_logger, "➢ {} 结束分派数据线程", m_logBase);
- }
- /* 初始化一些数据 */
- bool AssignSrcDataThread::initData()
- {
- /* 初始化数据 */
- m_listAudioSrcData.clear(); // 清空数据列表
- m_sampleRate = GInfo.sampleRate(); /* 采样率 */
- m_numChannels = GInfo.numChannels(); /* 声道数 */
- m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */
- m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8); /* 1秒数据大小 */
- m_dispatchSrcData = new AudioSrcData(m_oneSecondSize);
- m_lastSendTime = QDateTime::currentDateTime(); // 初始化最后发送时间
- /* 初始化读写锁 */
- // m_pRwLock = new QReadWriteLock();
- return true;
- }
- /* 清理数据 */
- void AssignSrcDataThread::clearData()
- {
- if(m_dispatchSrcData != nullptr)
- {
- delete m_dispatchSrcData;
- m_dispatchSrcData = nullptr;
- }
- if(m_pCurrentSrcData != nullptr)
- {
- delete m_pCurrentSrcData;
- m_pCurrentSrcData = nullptr;
- }
-
-
- }
- /**
- * @brief 获取需要分派数据的线程
- * 1、获取的时候按照线程实时性、重要性等因素进行排序获取
- * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做)
- *
- * @return true
- * @return false
- */
- bool AssignSrcDataThread::getDispatchThread()
- {
- /* 根据生成数据文件的类型顺序获取,循环获取,直到所有的线程都获取到 */
- /* 先获取生成wav小文件的线程 */
- auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
- if(pWavThread != nullptr)
- {
- // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pWavThread));
- m_pThreadCreateWAV = dynamic_cast<CreateWAVThread*>(pWavThread);
- }
- /* 获取生成音量和反相数据的线程 */
- auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CreateDB, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
- if(pDBAndPhaseThread != nullptr)
- {
- // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pDBAndPhaseThread));
- m_pThreadCreateDB = dynamic_cast<CreateDBThread*>(pDBAndPhaseThread);
- }
- /* 获取生成长文件的线程 */
- auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
- if(pLongFileThread != nullptr )
- {
- // m_listDispatchThreads.push_back(dynamic_cast<BaseRecordThread*>(pLongFileThread));
- m_pThreadCreateLongFile = dynamic_cast<CreateLongFileThread*>(pLongFileThread);
- }
- /* 获取发送RTP数据的线程 */
- m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
- if(m_rtpSenderThread == nullptr)
- {
- // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase);
- // return false;
- }
- /* 最后获取录音线程信息,如果获取不到则一直获取 */
- while(true)
- {
- auto pThreadBase = ThreadMan.findRecordThread(EThreadType::Type_RecordSrc, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
- if(pThreadBase != nullptr)
- {
- m_pThreadRecord = dynamic_cast<RecordThread*>(pThreadBase);
- break; // 找到录音线程了
- }
- /* 如果没有找到录音线程,则等待一段时间再继续查找 */
- std::this_thread::sleep_for(std::chrono::milliseconds(5));
- }
- return true;
- }
- /* 判断是否满足1秒的数据 */
- bool AssignSrcDataThread::isFullOneSecondData() const
- {
- // QReadLocker locker(m_pRwLock);
- /* 判断缓存中的数据是否满足1秒大小 */
- if(m_listDataSize + m_remainingDataSize >= m_oneSecondSize)
- {
- return true;
- }
- return false;
- }
- /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒
- * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */
- bool AssignSrcDataThread::processData()
- {
-
- m_dispatchSrcData->clear();
- while(true)
- {
- if(m_remainingDataSize > 0)
- {
- /* 如果剩余数据还有数据,则继续处理 */
- int32_t nowIndex = m_pCurrentSrcData->dataSize - m_remainingDataSize; // 当前剩余数据的起始位置
- auto copySize = m_dispatchSrcData->appendData(m_pCurrentSrcData->pData + nowIndex, m_remainingDataSize);
- /* 更新结束时间 */
- if(copySize == m_remainingDataSize)
- {
- /* 如果拷贝的大小等于剩余数据大小,说明当前的数据缓冲区数据已经用完了,则直接使用当前数据的结束时间
- * 并删除这个缓冲区 */
- m_dispatchSrcData->endTime = m_pCurrentSrcData->endTime;
- delete m_pCurrentSrcData;
- m_pCurrentSrcData = nullptr;
- }else {
- /* 如果拷贝的大小小于剩余数据大小,则计算结束时间 */
- m_dispatchSrcData->endTime = previTime(m_pCurrentSrcData->endTime, m_remainingDataSize - copySize);
- }
- /* 更新剩余数据大小 */
- m_remainingDataSize -= copySize;
- } else
- {
- /* 取出最新的一个数据,这里只取出数据,不进行数据拷贝,数据处理进入下一个循环后再处理 */
- m_pCurrentSrcData = m_listAudioSrcData.front();
- m_listAudioSrcData.pop_front();
- if(m_pCurrentSrcData == nullptr)
- {
- break;
- }
- /* 更新队列中剩余的数据大小 */
- m_listDataSize -= m_pCurrentSrcData->dataSize;
- /* 更新剩余数据的大小 */
- m_remainingDataSize = m_pCurrentSrcData->dataSize;
- }
- if(m_dispatchSrcData->isFull())
- {
- break;
- }
- }
- /* 更新数据的开始时间 */
- m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize);
- return true;
- }
- /* 发送原始数据到Rtp中,实时发送,有新的就发送 */
- void AssignSrcDataThread::sendSrcDataToRtp(const AudioSrcData& srcData)
- {
- // QReadLocker locker(m_pRwLock);
- if(m_rtpSenderThread == nullptr)
- {
- return; // 如果没有RTP发送线程,则直接返回
- }
- m_rtpSenderThread->setData(srcData);
-
- // if(m_listAudioSrcData.empty())
- // {
- // return; // 如果没有数据,则直接返回
- // }
- // SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送RTP数据,队列大小: {}, 开始时间: {}, 当前时间:{}",
- // m_logBase, m_listAudioSrcData.count(), m_listAudioSrcData.back()->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(),
- // m_lastSendTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString());
- // auto it = m_listAudioSrcData.end();
- // if(it != m_listAudioSrcData.begin())
- // {
- // --it; // 移动到最后一个数据
- // }
- // /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */
- // for(; it != m_listAudioSrcData.begin(); --it)
- // {
- // if((*it)->startTime == m_lastSendTime)
- // {
- // break; // 找到已发送数据的位置
- // }
- // }
-
- // if(it == m_listAudioSrcData.begin())
- // {
- // SPDLOG_LOGGER_TRACE(m_logger, "{} RTP线程需要拷贝全部数据", m_logBase);
- // }
- // /* 开始发送数据 */
- // for(; it != m_listAudioSrcData.end(); ++it)
- // {
- // if(*it != nullptr)
- // {
- // m_rtpSenderThread->setData(**it);
- // m_lastSendTime = (*it)->startTime; // 更新已发送数据
- // }
- // }
- }
- /* 发送常规数据,对实时性要求不高的数据,数据时长1秒 */
- void AssignSrcDataThread::sendRegularData(const AudioSrcData& srcData)
- {
- /* 发送计算音量的数据 */
- if(m_pThreadCreateWAV != nullptr)
- {
- m_pThreadCreateWAV->setData(srcData);
- }
- /* 发送生成wav小文件的数据 */
- if(m_pThreadCreateDB != nullptr)
- {
- m_pThreadCreateDB->setData(srcData);
- }
- /* 发送生成长文件的数据 */
- if(m_pThreadCreateLongFile != nullptr)
- {
- m_pThreadCreateLongFile->setData(srcData);
- }
- }
|