#include "AssignSrcDataThread.h" #include "ThreadManager.h" #include "AudioData.h" #include "GlobalInfo.h" #include "RecordThread.h" #include "spdlog.h" #include #include #include #include #include "CreateDBThread.h" #include "CreateWAVThread.h" #include "CreateLongFileThread.h" AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo) : BaseRecordThread(threadInfo) { } AssignSrcDataThread::~AssignSrcDataThread() { // if(m_pRwLock != nullptr) // { // delete m_pRwLock; // m_pRwLock = nullptr; // } for(auto& audioData : m_listAudioSrcData) { if(audioData != nullptr) { delete audioData; audioData = nullptr; } } m_listAudioSrcData.clear(); if(m_dispatchSrcData != nullptr) { delete m_dispatchSrcData; m_dispatchSrcData = nullptr; } } /* 停止线程 */ void AssignSrcDataThread::stopThread() { m_isRunning = false; m_condDataUpdate.notify_all(); // 通知所有等待的线程 } /* 设置数据,这里不用 */ bool AssignSrcDataThread::setData(const AudioSrcData& srcData) { return true; } /** * @brief 设置数据,输入小于1秒的数据 * * @param srcData * @param dataSize * @param endTime * @return true * @return false */ bool AssignSrcDataThread::setSrcData(const char* srcData, int32_t dataSize, QDateTime& endTime) { AudioSrcData* audioData = new AudioSrcData(dataSize); if(audioData == nullptr) { return false; } audioData->appendData(srcData, dataSize); audioData->endTime = endTime; /* 获取读写锁 */ { // std::lock_guard lock(m_pRwLock); std::unique_lock lock(m_mutexDataUpdate); m_listAudioSrcData.push_back(audioData); m_listDataSize += dataSize; // 更新缓存中的数据大小 m_isDataUpdate.store(true); } m_condDataUpdate.notify_one(); // SPDLOG_LOGGER_WARN(m_logger, "{} 收到音频数据: 大小: {}, 时间: {}", m_logBase, dataSize, endTime.toString().toStdString()); return true; } void AssignSrcDataThread::task() { SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始分派数据线程 ", m_logBase); /* 初始化数据 */ if(!initData()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase); return; } /* 获取需要分派数据的线程 */ if(!getDispatchThread()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 获取音频处理线程失败", m_logBase); return; } /* 将自身设置到录音线程中 */ m_pThreadRecord->setAssignSrcDataThread(this); m_isRunning = true; while(m_isRunning) { /* 休眠一段时间 */ // std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::unique_lock lock(m_mutexDataUpdate); m_condDataUpdate.wait(lock, [this] { return (m_isDataUpdate.load() || !m_isRunning.load()); }); m_isDataUpdate.store(false); // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now(); /*------------------------------------------------------------------------ * 分派实时数据 *------------------------------------------------------------------------*/ /* 获取最新的数据,给其添加开始时间戳 */ auto latestData = m_listAudioSrcData.back(); if(latestData == nullptr || latestData->isEmpty()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 分派数据线程获取到空数据", m_logBase); continue; } latestData->startTime = previTime(latestData->endTime, latestData->dataSize); sendRealTimeSrcData(*latestData); /*------------------------------------------------------------------------ * 将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间 *------------------------------------------------------------------------*/ /* 判断数据是否满足1s大小 */ if(!isFullOneSecondData()) { continue; // 如果不满足1s大小,则继续等待 } /* 处理数据,将其拼接成1s的数据 */ processData(); /* 处理数据时间戳,计算开始时间 */ m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize); /*------------------------------------------------------------------------ * 分派常规数据 *------------------------------------------------------------------------*/ /* 分派数据给各个线程 */ sendRegularData(*m_dispatchSrcData); /* 清空分派数据 */ m_dispatchSrcData->clear(); // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now(); // auto duration = std::chrono::duration_cast(endTime - startTime); // SPDLOG_LOGGER_TRACE(m_logger, "{} 分派数据耗时: {}us", m_logBase, duration.count()); } /* 清理数据 */ clearData(); SPDLOG_LOGGER_WARN(m_logger, "➢ {} 结束分派数据线程", m_logBase); } /* 初始化一些数据 */ bool AssignSrcDataThread::initData() { /* 初始化数据 */ m_listAudioSrcData.clear(); // 清空数据列表 m_sampleRate = GInfo.sampleRate(); /* 采样率 */ m_numChannels = GInfo.numChannels(); /* 声道数 */ m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */ m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8); /* 1秒数据大小 */ m_dispatchSrcData = new AudioSrcData(m_oneSecondSize); m_lastSendTime = QDateTime::currentDateTime(); // 初始化最后发送时间 /* 初始化读写锁 */ // m_pRwLock = new QReadWriteLock(); return true; } /* 清理数据 */ void AssignSrcDataThread::clearData() { if(m_dispatchSrcData != nullptr) { delete m_dispatchSrcData; m_dispatchSrcData = nullptr; } if(m_pCurrentSrcData != nullptr) { delete m_pCurrentSrcData; m_pCurrentSrcData = nullptr; } } /** * @brief 获取需要分派数据的线程 * 1、获取的时候按照线程实时性、重要性等因素进行排序获取 * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做) * * @return true * @return false */ bool AssignSrcDataThread::getDispatchThread() { /* 根据生成数据文件的类型顺序获取,循环获取,直到所有的线程都获取到 */ /* 先获取生成wav小文件的线程 */ auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); if(pWavThread != nullptr) { // m_listDispatchThreads.push_back(dynamic_cast(pWavThread)); m_pThreadCreateWAV = dynamic_cast(pWavThread); } /* 获取生成音量和反相数据的线程 */ auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CreateDB, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); if(pDBAndPhaseThread != nullptr) { // m_listDispatchThreads.push_back(dynamic_cast(pDBAndPhaseThread)); m_pThreadCreateDB = dynamic_cast(pDBAndPhaseThread); } /* 获取生成长文件的线程 */ auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); if(pLongFileThread != nullptr ) { // m_listDispatchThreads.push_back(dynamic_cast(pLongFileThread)); m_pThreadCreateLongFile = dynamic_cast(pLongFileThread); } /* 获取发送RTP数据的线程 */ m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); if(m_rtpSenderThread == nullptr) { // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase); // return false; } /* 最后获取录音线程信息,如果获取不到则一直获取 */ while(true) { auto pThreadBase = ThreadMan.findRecordThread(EThreadType::Type_RecordSrc, m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); if(pThreadBase != nullptr) { m_pThreadRecord = dynamic_cast(pThreadBase); break; // 找到录音线程了 } /* 如果没有找到录音线程,则等待一段时间再继续查找 */ std::this_thread::sleep_for(std::chrono::milliseconds(5)); } return true; } /* 发送常规数据,对实时性要求不高的数据,数据时长1秒 */ void AssignSrcDataThread::sendRegularData(const AudioSrcData& srcData) { /* 发送计算音量的数据 */ if(m_pThreadCreateWAV != nullptr) { m_pThreadCreateWAV->setData(srcData); } /* 发送生成wav小文件的数据 */ if(m_pThreadCreateDB != nullptr) { m_pThreadCreateDB->setData(srcData); } /* 发送生成长文件的数据 */ if(m_pThreadCreateLongFile != nullptr) { m_pThreadCreateLongFile->setData(srcData); } } /* 判断是否满足1秒的数据 */ bool AssignSrcDataThread::isFullOneSecondData() const { // QReadLocker locker(m_pRwLock); /* 判断缓存中的数据是否满足1秒大小 */ if(m_listDataSize + m_remainingDataSize >= m_oneSecondSize) { return true; } return false; } /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒 * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */ bool AssignSrcDataThread::processData() { m_dispatchSrcData->clear(); while(true) { if(m_remainingDataSize > 0) { /* 如果剩余数据还有数据,则继续处理 */ int32_t nowIndex = m_pCurrentSrcData->dataSize - m_remainingDataSize; // 当前剩余数据的起始位置 auto copySize = m_dispatchSrcData->appendData(m_pCurrentSrcData->pData + nowIndex, m_remainingDataSize); /* 更新结束时间 */ if(copySize == m_remainingDataSize) { /* 如果拷贝的大小等于剩余数据大小,说明当前的数据缓冲区数据已经用完了,则直接使用当前数据的结束时间 * 并删除这个缓冲区 */ m_dispatchSrcData->endTime = m_pCurrentSrcData->endTime; delete m_pCurrentSrcData; m_pCurrentSrcData = nullptr; }else { /* 如果拷贝的大小小于剩余数据大小,则计算结束时间 */ m_dispatchSrcData->endTime = previTime(m_pCurrentSrcData->endTime, m_remainingDataSize - copySize); } /* 更新剩余数据大小 */ m_remainingDataSize -= copySize; } else { /* 取出最新的一个数据,这里只取出数据,不进行数据拷贝,数据处理进入下一个循环后再处理 */ m_pCurrentSrcData = m_listAudioSrcData.front(); m_listAudioSrcData.pop_front(); if(m_pCurrentSrcData == nullptr) { break; } /* 更新队列中剩余的数据大小 */ m_listDataSize -= m_pCurrentSrcData->dataSize; /* 更新剩余数据的大小 */ m_remainingDataSize = m_pCurrentSrcData->dataSize; } if(m_dispatchSrcData->isFull()) { break; } } /* 更新数据的开始时间 */ m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize); return true; } /* 发送实时音频数据 */ void AssignSrcDataThread::sendRealTimeSrcData(const AudioSrcData& srcData) { /* 发送实时数据到RTP中 */ if(m_rtpSenderThread != nullptr) { m_rtpSenderThread->setData(srcData); } /* 发送实时数据到创建音量包的线程 */ if(m_pThreadCreateDB != nullptr) { m_pThreadCreateDB->setRealTimeData(srcData); } } /* 发送原始数据到Rtp中,实时发送,有新的就发送 */ void AssignSrcDataThread::sendSrcDataToRtp(const AudioSrcData& srcData) { // QReadLocker locker(m_pRwLock); if(m_rtpSenderThread == nullptr) { return; // 如果没有RTP发送线程,则直接返回 } m_rtpSenderThread->setData(srcData); // if(m_listAudioSrcData.empty()) // { // return; // 如果没有数据,则直接返回 // } // SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送RTP数据,队列大小: {}, 开始时间: {}, 当前时间:{}", // m_logBase, m_listAudioSrcData.count(), m_listAudioSrcData.back()->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), // m_lastSendTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString()); // auto it = m_listAudioSrcData.end(); // if(it != m_listAudioSrcData.begin()) // { // --it; // 移动到最后一个数据 // } // /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */ // for(; it != m_listAudioSrcData.begin(); --it) // { // if((*it)->startTime == m_lastSendTime) // { // break; // 找到已发送数据的位置 // } // } // if(it == m_listAudioSrcData.begin()) // { // SPDLOG_LOGGER_TRACE(m_logger, "{} RTP线程需要拷贝全部数据", m_logBase); // } // /* 开始发送数据 */ // for(; it != m_listAudioSrcData.end(); ++it) // { // if(*it != nullptr) // { // m_rtpSenderThread->setData(**it); // m_lastSendTime = (*it)->startTime; // 更新已发送数据 // } // } }