#include "AssignSrcDataThread.h" #include "ThreadManager.h" #include "AudioData.h" #include "GlobalInfo.h" #include "RecordThread.h" #include "spdlog.h" #include #include #include #include #include "CreateDBThread.h" #include "CreateWAVThread.h" #include "CreateRecordFileThread.h" AssignSrcDataThread::AssignSrcDataThread(RecordThreadInfo_t& threadInfo) : BaseRecordThread(threadInfo) { } AssignSrcDataThread::~AssignSrcDataThread() { // if(m_pRwLock != nullptr) // { // delete m_pRwLock; // m_pRwLock = nullptr; // } for(auto& audioData : m_listAudioSrcData) { if(audioData != nullptr) { delete audioData; audioData = nullptr; } } m_listAudioSrcData.clear(); if(m_dispatchSrcData != nullptr) { delete m_dispatchSrcData; m_dispatchSrcData = nullptr; } } /* 停止线程 */ void AssignSrcDataThread::thread_stop() { m_isRunning = false; m_condDataUpdate.notify_all(); // 通知所有等待的线程 } /* 停止线程 */ void AssignSrcDataThread::thread_stop_block() { thread_stop(); while(m_isStoped.load() == false) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } /* 设置数据,这里不用 */ bool AssignSrcDataThread::setData(const AudioSrcData& srcData) { return true; } /** * @brief 设置数据,输入小于1秒的数据 * * @param srcData * @param dataSize * @param endTime * @return true * @return false */ bool AssignSrcDataThread::setSrcData(const char* srcData, int32_t dataSize, QDateTime& endTime) { AudioSrcData* audioData = new AudioSrcData(dataSize); if(audioData == nullptr) { return false; } audioData->appendData(srcData, dataSize); audioData->endTime = endTime; /* 获取读写锁 */ { // std::lock_guard lock(m_pRwLock); std::unique_lock lock(m_mutexDataUpdate); m_listAudioSrcData.push_back(audioData); m_listDataSize += dataSize; // 更新缓存中的数据大小 m_isDataUpdate.store(true); } m_condDataUpdate.notify_one(); // SPDLOG_LOGGER_WARN(m_logger, "{} 收到音频数据: 大小: {}, 时间: {}", m_logBase, dataSize, endTime.toString().toStdString()); return true; } void AssignSrcDataThread::task() { SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开始分派数据线程 ", m_logBase); /* 初始化数据 */ if(!initData()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase); return; } /* 获取需要分派数据的线程 */ if(!getDispatchThread()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 获取音频处理线程失败", m_logBase); return; } m_isRunning = true; while(m_isRunning) { /* 休眠一段时间 */ // std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::unique_lock lock(m_mutexDataUpdate); m_condDataUpdate.wait(lock, [this] { return (m_isDataUpdate.load() || !m_isRunning.load()); }); m_isDataUpdate.store(false); // std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now(); /*------------------------------------------------------------------------ * 分派实时数据 *------------------------------------------------------------------------*/ /* 获取最新的数据,给其添加开始时间戳 */ if(m_listAudioSrcData.size() == 0) { /* 在线程退出的时候会唤醒上面的条件变量,此时队列可能是空的 */ continue; } auto latestData = m_listAudioSrcData.back(); if(latestData == nullptr || latestData->isEmpty()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 分派数据线程获取到空数据", m_logBase); continue; } latestData->startTime = previTime(latestData->endTime, latestData->dataSize); sendRealTimeSrcData(*latestData); /*------------------------------------------------------------------------ * 将每一个数据都设置成1s大小,将时间设置为这个数据的开始时间 *------------------------------------------------------------------------*/ /* 判断数据是否满足1s大小 */ if(!isFullOneSecondData()) { continue; } /* 处理数据,将其拼接成1s的数据 */ processData(); // SPDLOG_LOGGER_TRACE(m_logger, "{} 开始分发数据,开始时间: {}, 结束时间: {}, 大小: {}", // m_logBase, m_dispatchSrcData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), // m_dispatchSrcData->endTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), m_dispatchSrcData->dataSize); /*------------------------------------------------------------------------ * 分派常规数据 *------------------------------------------------------------------------*/ /* 分派数据给各个线程 */ sendRegularData(*m_dispatchSrcData); /* 清空分派数据 */ m_dispatchSrcData->clear(); // std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now(); // auto duration = std::chrono::duration_cast(endTime - startTime); // SPDLOG_LOGGER_TRACE(m_logger, "{} 分派数据耗时: {}us", m_logBase, duration.count()); } /* 清理数据 */ clearData(); SPDLOG_LOGGER_WARN(m_logger, "➢ {} 结束分派数据线程", m_logBase); } /* 初始化一些数据 */ bool AssignSrcDataThread::initData() { /* 初始化数据 */ m_listAudioSrcData.clear(); // 清空数据列表 m_sampleRate = GInfo.sampleRate(); /* 采样率 */ m_numChannels = GInfo.numChannels(); /* 声道数 */ m_bitsPerSample = GInfo.bitsPerSample(); /* 每个采样点的位数 */ m_oneSecondSize = m_sampleRate * m_numChannels * (m_bitsPerSample / 8); /* 1秒数据大小 */ m_dispatchSrcData = new AudioSrcData(m_oneSecondSize); m_lastSendTime = QDateTime::currentDateTime(); // 初始化最后发送时间 /* 初始化读写锁 */ // m_pRwLock = new QReadWriteLock(); return true; } /* 清理数据 */ void AssignSrcDataThread::clearData() { std::unique_lock lock(m_mutexDataUpdate); if(m_dispatchSrcData != nullptr) { delete m_dispatchSrcData; m_dispatchSrcData = nullptr; } for(auto& it : m_listAudioSrcData) { if(it != nullptr) { delete it; it = nullptr; } } m_listAudioSrcData.clear(); } /** * @brief 获取需要分派数据的线程 * 1、获取的时候按照线程实时性、重要性等因素进行排序获取 * 2、获取该线程指针前,可以先获取是否需要这个线程(目前还没做) * * @return true * @return false */ bool AssignSrcDataThread::getDispatchThread() { /* 根据生成数据文件的类型顺序获取,循环获取,直到所有的线程都获取到 */ /* 先获取生成wav小文件的线程 */ auto pWavThread = ThreadMan.findRecordThread(EThreadType::Type_CreateWAV, m_threadInfo.cardRoadInfo.pcmInfo.strPCMName); if(pWavThread != nullptr) { m_pThreadCreateWAV = dynamic_cast(pWavThread); } /* 获取生成音量数据的线程 */ auto pDBAndPhaseThread = ThreadMan.findRecordThread(EThreadType::Type_CreateDB, m_threadInfo.cardRoadInfo.pcmInfo.strPCMName); if(pDBAndPhaseThread != nullptr) { m_pThreadCreateDB = dynamic_cast(pDBAndPhaseThread); } /* 获取生成长文件的线程 */ auto pLongFileThread = ThreadMan.findRecordThread(EThreadType::Type_CreateLongWAV, m_threadInfo.cardRoadInfo.pcmInfo.strPCMName); if(pLongFileThread != nullptr ) { m_pThreadCreateLongFile = dynamic_cast(pLongFileThread); } /* 获取发送RTP数据的线程 */ m_rtpSenderThread = ThreadMan.findRecordThread(EThreadType::Type_RtpSend, m_threadInfo.cardRoadInfo.pcmInfo.strPCMName); if(m_rtpSenderThread == nullptr) { // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取RTP发送线程失败", m_logBase); // return false; } return true; } /* 发送常规数据,对实时性要求不高的数据,数据时长1秒 */ void AssignSrcDataThread::sendRegularData(const AudioSrcData& srcData) { /* 发送计算音量的数据 */ if(m_pThreadCreateWAV != nullptr) { m_pThreadCreateWAV->setData(srcData); } /* 发送生成wav小文件的数据 */ if(m_pThreadCreateDB != nullptr) { m_pThreadCreateDB->setData(srcData); } /* 发送生成长文件的数据 */ if(m_pThreadCreateLongFile != nullptr) { m_pThreadCreateLongFile->setData(srcData); } } /* 判断是否满足1秒的数据 */ bool AssignSrcDataThread::isFullOneSecondData() const { // QReadLocker locker(m_pRwLock); /* 判断缓存中的数据是否满足1秒大小 */ if(m_listDataSize + m_remainingData.size() >= m_oneSecondSize) { return true; } return false; } /* 新的处理算法,假设传入来的源数据是不定长度的,可能大于1秒,可能小于1秒 * 将其拆分成1秒的数据,有一秒的数据后就退出循环,分派给其他线程 */ bool AssignSrcDataThread::processData() { /* 处理数据 */ m_dispatchSrcData->clear(); while(true) { if(m_remainingData.size() > 0) { /* 拷贝剩余数据 */ int32_t copySize = m_dispatchSrcData->appendData(m_remainingData.data(), m_remainingData.size()); if(copySize < m_remainingData.size()) { /* 剩余数据没拷贝完,可能已经满足一次了 */ m_remainingData = m_remainingData.mid(copySize, m_remainingData.size() - copySize); m_dispatchSrcData->endTime = previTime(m_lastDataEndTime, m_remainingData.size()); m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize); }else { /* 剩余数据拷贝完了,清空剩余数据 */ m_remainingData.clear(); m_dispatchSrcData->endTime = m_lastDataEndTime; m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize); } if(m_dispatchSrcData->isFull()) { /* 如果已经满足1秒大小,则退出循环 */ break; } }else { /* 没有剩余数据了,拷贝全新的数据 */ auto pData = m_listAudioSrcData.front(); m_listAudioSrcData.pop_front(); // SPDLOG_LOGGER_TRACE(m_logger, "{} 一条数据 大小: {}, 时间: {}", // m_logBase, pData->dataSize, pData->endTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString()); if(pData == nullptr) { continue; } int copySize = m_dispatchSrcData->appendData(pData->pData, pData->dataSize); if(m_dispatchSrcData->isFull()) { /* 判断有无剩余数据 */ if(copySize < pData->dataSize) { m_remainingData = QByteArray(pData->pData + copySize, pData->dataSize - copySize); m_lastDataEndTime = pData->endTime; } else { m_remainingData.clear(); m_lastDataEndTime = QDateTime(); } /* 更新结束时间 */ m_dispatchSrcData->endTime = pData->endTime; /* 删除数据 */ delete pData; pData = nullptr; m_listDataSize -= copySize; break; } delete pData; pData = nullptr; m_listDataSize -= copySize; } } /* 更新数据的开始时间 */ m_dispatchSrcData->startTime = previTime(m_dispatchSrcData->endTime, m_dispatchSrcData->dataSize); // SPDLOG_LOGGER_TRACE(m_logger, "{} 一秒的数据拼接完成, 开始时间: {}, 结束时间: {}, 大小: {}", // m_logBase, m_dispatchSrcData->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), // m_dispatchSrcData->endTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), m_dispatchSrcData->dataSize); return true; } /* 发送实时音频数据 */ void AssignSrcDataThread::sendRealTimeSrcData(const AudioSrcData& srcData) { /* 发送实时数据到RTP中 */ if(m_rtpSenderThread != nullptr) { m_rtpSenderThread->setData(srcData); } /* 发送实时数据到创建音量包的线程 */ if(m_pThreadCreateDB != nullptr) { m_pThreadCreateDB->setRealTimeData(srcData); } } /* 发送原始数据到Rtp中,实时发送,有新的就发送 */ void AssignSrcDataThread::sendSrcDataToRtp(const AudioSrcData& srcData) { // QReadLocker locker(m_pRwLock); if(m_rtpSenderThread == nullptr) { return; // 如果没有RTP发送线程,则直接返回 } m_rtpSenderThread->setData(srcData); // if(m_listAudioSrcData.empty()) // { // return; // 如果没有数据,则直接返回 // } // SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送RTP数据,队列大小: {}, 开始时间: {}, 当前时间:{}", // m_logBase, m_listAudioSrcData.count(), m_listAudioSrcData.back()->startTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString(), // m_lastSendTime.toString("yyyy-MM-dd hh:mm:ss.zzz").toStdString()); // auto it = m_listAudioSrcData.end(); // if(it != m_listAudioSrcData.begin()) // { // --it; // 移动到最后一个数据 // } // /* 已发送过数据,获取这个数据在队列中的位置,然后发送剩余的数据 */ // for(; it != m_listAudioSrcData.begin(); --it) // { // if((*it)->startTime == m_lastSendTime) // { // break; // 找到已发送数据的位置 // } // } // if(it == m_listAudioSrcData.begin()) // { // SPDLOG_LOGGER_TRACE(m_logger, "{} RTP线程需要拷贝全部数据", m_logBase); // } // /* 开始发送数据 */ // for(; it != m_listAudioSrcData.end(); ++it) // { // if(*it != nullptr) // { // m_rtpSenderThread->setData(**it); // m_lastSendTime = (*it)->startTime; // 更新已发送数据 // } // } }