#include "AssignSrcDataThread.h" #include "ThreadManager.h" #include "AudioData.h" #include "GlobalInfo.h" #include #include #include #include #include AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo) : BaseRecordThread(threadInfo) { } AssignSrcDataThread::~AssignSrcDataThread() { if(m_pRwLock != nullptr) { delete m_pRwLock; m_pRwLock = nullptr; } for(auto& audioData : m_listAudioSrcData) { if(audioData != nullptr) { delete audioData; audioData = nullptr; } } m_listAudioSrcData.clear(); if(m_dispatchSrcData != nullptr) { delete m_dispatchSrcData; m_dispatchSrcData = nullptr; } } void AssignSrcDataThread::threadTask() { SPDLOG_LOGGER_INFO(m_logger, "{} 开始分派数据线程", m_logBase); /* 初始化数据 */ if(!initData()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase); return; } /* 获取需要分派数据的线程 */ if(!getDispatchThread()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 获取分派线程失败", m_logBase); return; } m_isRunning = true; while(m_isRunning) { /* 休眠一段时间 */ std::this_thread::sleep_for(std::chrono::milliseconds(10)); /*------------------------------------------------------------------------ * 获取一个数据并处理数据,将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间 *------------------------------------------------------------------------*/ /* 将发送数据到Rtp线程 */ sendSrcDataToRtp(); /* 判断数据是否满足1s大小 */ if(!isFullOneSecondData()) { continue; // 如果不满足1s大小,则继续等待 } /* 处理数据,将其拼接成1s的数据 */ processData(); /* 处理数据时间戳,计算开始时间 */ m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize); /*------------------------------------------------------------------------ * 分派数据 *------------------------------------------------------------------------*/ /* 分派数据给各个线程 */ for(auto& dispatchThread : m_listDispatchThreads) { if(dispatchThread != nullptr) { dispatchThread->setData(*m_dispatchSrcData); } } /* 清空分派数据 */ m_dispatchSrcData->clear(); } /* 清理数据 */ clearData(); SPDLOG_LOGGER_INFO(m_logger, "{} 结束分派数据线程", m_logBase); } /* 设置数据,这里不用 */ bool AssignSrcDataThread::setData(const AudioSrcData& srcData) { return true; } /** * @brief 设置数据,输入小于1秒的数据 * * @param srcData * @param dataSize * @param endTime * @return true * @return false */ bool AssignSrcDataThread::setSrcData(const char* srcData, uint32_t dataSize, QDateTime& endTime) { AudioSrcData* audioData = new AudioSrcData(dataSize); if(audioData == nullptr) { return false; } audioData->appendData(srcData, dataSize); audioData->endTime = endTime; /* 获取读写锁 */ QWriteLocker locker(m_pRwLock); m_listAudioSrcData.push_back(audioData); m_listDataSize += dataSize; // 更新缓存中的数据大小 return true; } /* 初始化一些数据 */ bool AssignSrcDataThread::initData() { /* 初始化数据 */ m_listAudioSrcData.clear(); // 清空数据列表 m_sampleRate = GInfo.sampleRate(); /* 采样率 */ m_numChannels = GInfo.numChannels(); /* 声道数 */ m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */ m_oneSecondSize = GInfo.oneSecondCount(); /* 每秒钟的音频数据大小 */ m_dispatchSrcData = new AudioSrcData(m_oneSecondSize); /* 初始化读写锁 */ m_pRwLock = new QReadWriteLock(); return true; } /* 清理数据 */ void AssignSrcDataThread::clearData() { if(m_dispatchSrcData != nullptr) { delete m_dispatchSrcData; m_dispatchSrcData = nullptr; } if(m_pCurrentSrcData != nullptr) { delete m_pCurrentSrcData; m_pCurrentSrcData = nullptr; } } /** * @brief 获取需要分派数据的线程 * 1、获取的时候按照线程实时性、重要性等因素进行排序获取 * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做) * * @return true * @return false */ bool AssignSrcDataThread::getDispatchThread() { /* 根据生成数据文件的类型顺序获取 */ /* 先获取生成wav小文件的线程 */ auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); if(pWavThread != nullptr) { m_listDispatchThreads.push_back(dynamic_cast(pWavThread)); } /* 获取生成音量和反相数据的线程 */ auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CheckDBAndPhase, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); if(pDBAndPhaseThread != nullptr) { m_listDispatchThreads.push_back(dynamic_cast(pDBAndPhaseThread)); } /* 获取生成长文件的线程 */ auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); if(pLongFileThread != nullptr ) { m_listDispatchThreads.push_back(dynamic_cast(pLongFileThread)); } m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); if(m_rtpSenderThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase); return false; } return true; } /* 判断是否满足1秒的数据 */ bool AssignSrcDataThread::isFullOneSecondData() const { QReadLocker locker(m_pRwLock); /* 判断缓存中的数据是否满足1秒大小 */ if(m_listDataSize + m_remainingDataSize >= m_oneSecondSize) { return true; } return false; } /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒 * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */ bool AssignSrcDataThread::processData() { m_dispatchSrcData->clear(); while(true) { if(m_remainingDataSize > 0) { /* 如果剩余数据还有数据,则继续处理 */ uint32_t nowIndex = m_pCurrentSrcData->dataSize - m_remainingDataSize; // 当前剩余数据的起始位置 auto copySize = m_dispatchSrcData->appendData(m_pCurrentSrcData->pData + nowIndex, m_remainingDataSize); /* 更新结束时间 */ if(copySize == m_remainingDataSize) { /* 如果拷贝的大小等于剩余数据大小,说明当前的数据缓冲区数据已经用完了,则直接使用当前数据的结束时间 * 并删除这个缓冲区 */ m_dispatchSrcData->endTime = m_pCurrentSrcData->endTime; delete m_pCurrentSrcData; m_pCurrentSrcData = nullptr; }else { /* 如果拷贝的大小小于剩余数据大小,则计算结束时间 */ m_dispatchSrcData->endTime = previTime(m_pCurrentSrcData->endTime, m_remainingDataSize - copySize); } /* 更新剩余数据大小 */ m_remainingDataSize -= copySize; } else { /* 取出最新的一个数据,这里只取出数据,不进行数据拷贝,数据处理进入下一个循环后再处理 */ m_pRwLock->lockForRead(); m_pCurrentSrcData = m_listAudioSrcData.front(); m_listAudioSrcData.pop_front(); m_pRwLock->unlock(); if(m_pCurrentSrcData == nullptr) { break; } /* 更新队列中剩余的数据大小 */ m_listDataSize -= m_pCurrentSrcData->dataSize; /* 更新剩余数据的大小 */ m_remainingDataSize = m_pCurrentSrcData->dataSize; } if(m_dispatchSrcData->isFull()) { break; } } /* 更新数据的开始时间 */ m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize); return true; } /* 发送原始数据到Rtp中,实时发送,有新的就发送 */ void AssignSrcDataThread::sendSrcDataToRtp() { QReadLocker locker(m_pRwLock); if(m_listAudioSrcData.empty()) { return; // 如果没有数据,则直接返回 } /* 如果发送数据和最新的数据不一样,则发送数据 */ if(m_pSendSrcData == nullptr) { /* 还未发送过,直接发送所有数据 */ for(auto& audioData : m_listAudioSrcData) { if(audioData != nullptr) { m_rtpSenderThread->setData(*audioData); m_pSendSrcData = audioData; // 更新已发送数据 } } }else { /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */ auto it = m_listAudioSrcData.begin(); for(; it != m_listAudioSrcData.end(); ++it) { if(*it == m_pSendSrcData) { break; // 找到已发送数据的位置 } } if(it != m_listAudioSrcData.end()) { ++it; // 移动到下一个数据 for(; it != m_listAudioSrcData.end(); ++it) { if(*it != nullptr) { m_rtpSenderThread->setData(**it); m_pSendSrcData = *it; // 更新已发送数据 } } }else { /* 没有找到相等的,说明这个队列中是全新的数据,所有的都需要发送 */ for(auto& audioData : m_listAudioSrcData) { if(audioData != nullptr) { m_rtpSenderThread->setData(*audioData); m_pSendSrcData = audioData; // 更新已发送数据 } } } } }