diff --git a/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h new file mode 100644 index 0000000000000..c01e92b34495b --- /dev/null +++ b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h @@ -0,0 +1,123 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMV.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Common mode values data format definition + +/// The data is sent by the CRU as 256+16 bit words. The CMV data layout is as follows: +/// - 256-bit Header: [version:8][packetID:8][errorCode:8][magicWord:8][heartbeatOrbit:32][heartbeatBC:16][padding:176] +/// - 16-bit CMV value: [CMV:16] + +#ifndef ALICEO2_DATAFORMATSTPC_CMV_H +#define ALICEO2_DATAFORMATSTPC_CMV_H + +#include + +namespace o2::tpc::cmv +{ + +static constexpr uint32_t NTimeBinsPerPacket = 3564; ///< number of time bins (covering 8 heartbeats) +static constexpr uint32_t NPacketsPerTFPerCRU = 4; ///< 4 packets per timeframe +static constexpr uint32_t NTimeBinsPerTF = NTimeBinsPerPacket * NPacketsPerTFPerCRU; ///< maximum number of timebins per timeframe (14256) +static constexpr uint32_t SignificantBits = 2; ///< number of bits used for floating point precision +static constexpr float FloatConversion = 1.f / float(1 << SignificantBits); ///< conversion factor from integer representation to float + +/// Header definition of the CMVs +struct Header { + static constexpr uint8_t MagicWord = 0xDC; + union { + uint32_t word0 = 0; ///< bits 0 - 31 + struct { + uint8_t version : 8; ///< version + uint8_t packetID : 8; ///< packet id + uint8_t errorCode : 8; ///< errors + uint8_t magicWord : 8; ///< magic word + }; + }; + union { + uint32_t word1 = 0; ///< bits 32 - 63 + struct { + uint32_t heartbeatOrbit : 32; ///< first heart beat timing of the package + }; + }; + union { + uint16_t word2 = 0; ///< bits 64 - 79 + struct { + uint16_t heartbeatBC : 16; ///< first BC id of the package + }; + }; + union { + uint16_t word3 = 0; ///< bits 80 - 95 + struct { + uint16_t unused1 : 16; ///< reserved + }; + }; + union { + uint32_t word4 = 0; ///< bits 96 - 127 + struct { + uint32_t unused2 : 32; ///< reserved + }; + }; + union { + uint64_t word5 = 0; ///< bits 128 - 191 + struct { + uint64_t unused3 : 64; ///< reserved + }; + }; + union { + uint64_t word6 = 0; ///< bits 192 - 255 + struct { + uint64_t unused4 : 64; ///< reserved + }; + }; +}; + +/// CMV single data container +struct Data { + uint16_t CMV{0}; ///< 16bit ADC value + + // Raw integer accessors + uint16_t getCMV() const { return CMV; } + void setCMV(uint16_t value) { CMV = value; } + + // Float helpers using SignificantBits for fixed-point conversion + float getCMVFloat() const { return static_cast(CMV) * FloatConversion; } + void setCMVFloat(float value) + { + // round to nearest representable fixed-point value + setCMV(uint32_t((value + 0.5f * FloatConversion) / FloatConversion)); + } +}; + +/// CMV full data container: one packet carries NTimeBinsPerPacket +struct Container { + Header header; ///< CMV data header + Data data[NTimeBinsPerPacket]; ///< data values + + // Header and data accessors + const Header& getHeader() const { return header; } + Header& getHeader() { return header; } + + const Data* getData() const { return data; } + Data* getData() { return data; } + + // Per-time-bin CMV accessors + uint16_t getCMV(uint32_t timeBin) const { return data[timeBin].getCMV(); } + void setCMV(uint32_t timeBin, uint16_t value) { data[timeBin].setCMV(value); } + + float getCMVFloat(uint32_t timeBin) const { return data[timeBin].getCMVFloat(); } + void setCMVFloat(uint32_t timeBin, float value) { data[timeBin].setCMVFloat(value); } +}; + +} // namespace o2::tpc::cmv + +#endif \ No newline at end of file diff --git a/Detectors/TPC/base/include/TPCBase/RDHUtils.h b/Detectors/TPC/base/include/TPCBase/RDHUtils.h index adfd94cf6b703..71b5d16b85702 100644 --- a/Detectors/TPC/base/include/TPCBase/RDHUtils.h +++ b/Detectors/TPC/base/include/TPCBase/RDHUtils.h @@ -13,7 +13,7 @@ #define AliceO2_TPC_RDHUtils_H #include "DetectorsRaw/RDHUtils.h" -//#include "Headers/RAWDataHeader.h" +// #include "Headers/RAWDataHeader.h" namespace o2 { @@ -28,6 +28,7 @@ static constexpr FEEIDType UserLogicLinkID = 15; ///< virtual link ID for ZS dat static constexpr FEEIDType IDCLinkID = 20; ///< Identifier for integrated digital currents static constexpr FEEIDType ILBZSLinkID = 21; ///< Identifier for improved link-based ZS static constexpr FEEIDType DLBZSLinkID = 22; ///< Identifier for dense link-based ZS +static constexpr FEEIDType CMVLinkID = 23; ///< Identifier for common mode values static constexpr FEEIDType SACLinkID = 25; ///< Identifier for sampled analog currents /// compose feeid from cru, endpoint and link diff --git a/Detectors/TPC/calibration/include/TPCCalibration/CMVContainer.h b/Detectors/TPC/calibration/include/TPCCalibration/CMVContainer.h new file mode 100644 index 0000000000000..80d73341faed3 --- /dev/null +++ b/Detectors/TPC/calibration/include/TPCCalibration/CMVContainer.h @@ -0,0 +1,174 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMVContainer.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Struct for storing CMVs to the CCDB + +#ifndef ALICEO2_TPC_CMVCONTAINER_H_ +#define ALICEO2_TPC_CMVCONTAINER_H_ + +#include +#include +#include +#include + +#include + +#include "TTree.h" +#include "TFile.h" + +#include "DataFormatsTPC/CMV.h" + +namespace o2::tpc +{ + +/// CMVContainer: accumulator for one aggregation interval +struct CMVContainer { + + uint32_t nTFs{0}; ///< number of TFs accumulated + uint32_t nCRUs{0}; ///< number of contributing CRUs + long firstTF{0}; ///< first TF counter in this aggregation interval + + std::vector cmvValues{}; ///< CMV float values + std::vector cru{}; ///< CRU indices + std::vector timebin{}; ///< absolute timebins within the TF + std::vector tf{}; ///< TF counters + + /// Pre-allocate storage for the expected number of entries: expectedTFs × expectedCRUs × NTimeBinsPerTF + void reserve(uint32_t expectedTFs, uint32_t expectedCRUs) + { + const std::size_t n = static_cast(expectedTFs) * expectedCRUs * o2::tpc::cmv::NTimeBinsPerTF; + cmvValues.reserve(n); + cru.reserve(n); + timebin.reserve(n); + tf.reserve(n); + } + + /// Append one (cmv, cru, timebin, tf) tuple + void addEntry(float cmvVal, uint32_t cruID, uint32_t tb, uint32_t tfCounter) + { + cmvValues.push_back(cmvVal); + cru.push_back(cruID); + timebin.push_back(tb); + tf.push_back(tfCounter); + } + + /// Append one full CRU packet (NTimeBinsPerPacket consecutive timebins) + /// \param packet pointer to NTimeBinsPerPacket floats + /// \param cruID CRU index + /// \param tbOffset absolute timebin of the first sample in this packet + /// \param tfCounter TF counter + void addPacket(const float* packet, uint32_t cruID, uint32_t tbOffset, uint32_t tfCounter) + { + for (uint32_t tb = 0; tb < o2::tpc::cmv::NTimeBinsPerPacket; ++tb) { + addEntry(packet[tb], cruID, tbOffset + tb, tfCounter); + } + } + + std::size_t size() const { return cmvValues.size(); } + bool empty() const { return cmvValues.empty(); } + + /// Clear all data and reset counters + void clear() + { + cmvValues.clear(); + cru.clear(); + timebin.clear(); + tf.clear(); + nTFs = 0; + nCRUs = 0; + firstTF = 0; + } + + std::string summary() const + { + return fmt::format("CMVContainer: {} entries, {} TFs, {} CRUs, firstTF={}", + size(), nTFs, nCRUs, firstTF); + } + + /// Build an in-memory TTree with one branch per field and one entry per tuple + std::unique_ptr toTTree() const + { + const std::size_t n = size(); + if (n == 0) { + throw std::runtime_error("CMVContainer::toTTree() called on empty container"); + } + + auto tree = std::make_unique("CMVTree", "TPC common mode values"); + tree->SetAutoSave(0); + + // Point branches directly at the vector data — single Fill() call writes all rows + float* pCmv = const_cast(cmvValues.data()); + uint32_t* pCru = const_cast(cru.data()); + uint32_t* pTimebin = const_cast(timebin.data()); + uint32_t* pTf = const_cast(tf.data()); + + tree->Branch("cmv", pCmv, fmt::format("cmv[{}]/F", n).c_str()); + tree->Branch("cru", pCru, fmt::format("cru[{}]/i", n).c_str()); + tree->Branch("timebin", pTimebin, fmt::format("timebin[{}]/i", n).c_str()); + tree->Branch("tf", pTf, fmt::format("tf[{}]/i", n).c_str()); + + tree->Fill(); + return tree; + } + + /// Write the container as a TTree inside a TFile on disk + /// \param filename path to the output ROOT file + void writeToFile(const std::string& filename) const + { + TFile f(filename.c_str(), "RECREATE"); + if (f.IsZombie()) { + throw std::runtime_error(fmt::format("CMVContainer::writeToFile: cannot open '{}'", filename)); + } + auto tree = toTTree(); + tree->Write(); + f.Close(); + } + + /// Restore a CMVContainer from a TTree previously written by toTTree() + static CMVContainer fromTTree(TTree* tree) + { + if (!tree) { + throw std::runtime_error("CMVContainer::fromTTree: null TTree pointer"); + } + + CMVContainer c; + const Long64_t nEntries = tree->GetEntries(); + if (nEntries <= 0) { + return c; + } + + // Read the array branches back into vectors in one GetEntry() call + std::vector bCmv(nEntries); + std::vector bCru(nEntries), bTimebin(nEntries), bTf(nEntries); + + tree->SetBranchAddress("cmv", bCmv.data()); + tree->SetBranchAddress("cru", bCru.data()); + tree->SetBranchAddress("timebin", bTimebin.data()); + tree->SetBranchAddress("tf", bTf.data()); + + tree->GetEntry(0); + + c.cmvValues = std::move(bCmv); + c.cru = std::move(bCru); + c.timebin = std::move(bTimebin); + c.tf = std::move(bTf); + + return c; + } + + ClassDefNV(CMVContainer, 1) +}; + +} // namespace o2::tpc + +#endif // ALICEO2_TPC_CMVCONTAINER_H_ \ No newline at end of file diff --git a/Detectors/TPC/workflow/CMakeLists.txt b/Detectors/TPC/workflow/CMakeLists.txt index 6930f332bfbf1..d52a5d5b3f732 100644 --- a/Detectors/TPC/workflow/CMakeLists.txt +++ b/Detectors/TPC/workflow/CMakeLists.txt @@ -25,6 +25,7 @@ o2_add_library(TPCWorkflow src/KryptonRawFilterSpec.cxx src/OccupancyFilterSpec.cxx src/SACProcessorSpec.cxx + src/CMVToVectorSpec.cxx src/IDCToVectorSpec.cxx src/CalibdEdxSpec.cxx src/CalibratordEdxSpec.cxx @@ -288,4 +289,24 @@ o2_add_executable(pressure-temperature SOURCES src/tpc-pressure-temperature.cxx PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) -add_subdirectory(readers) +o2_add_executable(cmv-to-vector + COMPONENT_NAME tpc + SOURCES src/tpc-cmv-to-vector.cxx + PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +o2_add_executable(cmv-flp + COMPONENT_NAME tpc + SOURCES src/tpc-flp-cmv.cxx + PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +o2_add_executable(cmv-distribute + COMPONENT_NAME tpc + SOURCES src/tpc-distribute-cmv.cxx + PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +o2_add_executable(cmv-factorize + COMPONENT_NAME tpc + SOURCES src/tpc-factorize-cmv.cxx + PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +add_subdirectory(readers) \ No newline at end of file diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h new file mode 100644 index 0000000000000..fb27dedabd9c4 --- /dev/null +++ b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h @@ -0,0 +1,31 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMVToVectorSpec.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Processor to convert CMVs to a vector in a CRU + +#ifndef TPC_CMVToVectorSpec_H_ +#define TPC_CMVToVectorSpec_H_ + +#include "Framework/DataProcessorSpec.h" +#include + +namespace o2::tpc +{ + +/// create a processor spec +/// convert CMV raw values to a vector in a CRU +o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector const& crus); + +} // end namespace o2::tpc + +#endif // TPC_CMVToVectorSpec_H_ diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h new file mode 100644 index 0000000000000..57c3f551ae220 --- /dev/null +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h @@ -0,0 +1,412 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file TPCDistributeCMVSpec.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief TPC aggregation of grouped CMVs + +#ifndef O2_TPCDISTRIBUTECMVSPEC_H +#define O2_TPCDISTRIBUTECMVSPEC_H + +#include +#include "Framework/Task.h" +#include "Framework/ControlService.h" +#include "Framework/Logger.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/InputRecordWalker.h" +#include "Headers/DataHeader.h" +#include "Framework/ConfigParamRegistry.h" +#include "TPCWorkflow/TPCFLPCMVSpec.h" +#include "TPCBase/CRU.h" +#include "MemoryResources/MemoryResources.h" +#include "TPCWorkflow/ProcessingHelpers.h" +#include "DetectorsBase/GRPGeomHelper.h" +#include "CommonDataFormat/Pair.h" + +using namespace o2::framework; +using o2::header::gDataOriginTPC; +using namespace o2::tpc; + +namespace o2::tpc +{ + +inline std::vector getCMVSides(const std::vector& crus) +{ + bool hasA = false, hasC = false; + for (const auto cru : crus) { + const Side side = CRU(cru).side(); + if (side == Side::A) { + hasA = true; + } else { + hasC = true; + } + } + std::vector sides; + if (hasA) { + sides.emplace_back(Side::A); + } + if (hasC) { + sides.emplace_back(Side::C); + } + return sides; +} + +class TPCDistributeCMVSpec : public o2::framework::Task +{ + public: + TPCDistributeCMVSpec(const std::vector& crus, const unsigned int timeframes, const int nTFsBuffer, const unsigned int outlanes, const int firstTF, std::shared_ptr req) + : mCRUs{crus}, mTimeFrames{timeframes}, mNTFsBuffer{nTFsBuffer}, mOutLanes{outlanes}, mProcessedCRU{{std::vector(timeframes), std::vector(timeframes)}}, mTFStart{{firstTF, firstTF + timeframes}}, mTFEnd{{firstTF + timeframes - 1, mTFStart[1] + timeframes - 1}}, mCCDBRequest(req), mSendCCDBOutputOrbitReset(outlanes), mSendCCDBOutputGRPECS(outlanes) + { + // pre calculate data description for output + mDataDescrOut.reserve(mOutLanes); + for (unsigned int i = 0; i < mOutLanes; ++i) { + mDataDescrOut.emplace_back(getDataDescriptionCMV(i)); + } + + // sort vector for binary_search + std::sort(mCRUs.begin(), mCRUs.end()); + + for (auto& processedCRUbuffer : mProcessedCRUs) { + processedCRUbuffer.resize(mTimeFrames); + for (auto& crusMap : processedCRUbuffer) { + crusMap.reserve(mCRUs.size()); + for (const auto cruID : mCRUs) { + crusMap.emplace(cruID, false); + } + } + } + + const auto sides = getCMVSides(mCRUs); + for (auto side : sides) { + const std::string name = (side == Side::A) ? "cmvsgroupa" : "cmvsgroupc"; + mFilter.emplace_back(InputSpec{name.data(), ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(side)}, Lifetime::Sporadic}); + } + }; + + void init(o2::framework::InitContext& ic) final + { + o2::base::GRPGeomHelper::instance().setRequest(mCCDBRequest); + mNFactorTFs = ic.options().get("nFactorTFs"); + mNTFsDataDrop = ic.options().get("drop-data-after-nTFs"); + mCheckEveryNData = ic.options().get("check-data-every-n"); + if (mCheckEveryNData == 0) { + mCheckEveryNData = mTimeFrames / 2; + if (mCheckEveryNData == 0) { + mCheckEveryNData = 1; + } + mNTFsDataDrop = mCheckEveryNData; + } + } + + void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final + { + o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj); + if (matcher == ConcreteDataMatcher("CTP", "ORBITRESET", 0)) { + LOGP(info, "Updating ORBITRESET"); + std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(), true); + } else if (matcher == ConcreteDataMatcher("GLO", "GRPECS", 0)) { + // check if received object is valid + if (o2::base::GRPGeomHelper::instance().getGRPECS()->getRun() != 0) { + LOGP(info, "Updating GRPECS"); + std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(), true); + } else { + LOGP(info, "Detected default GRPECS object"); + } + } + } + + void run(o2::framework::ProcessingContext& pc) final + { + // send orbit reset and orbits per TF only once + if (mCCDBRequest->askTime) { + const bool grpecsValid = pc.inputs().isValid("grpecs"); + const bool orbitResetValid = pc.inputs().isValid("orbitReset"); + if (grpecsValid) { + pc.inputs().get("grpecs"); + } + if (orbitResetValid) { + pc.inputs().get*>("orbitReset"); + } + if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) { + return; + } + } + + const auto tf = processing_helpers::getCurrentTF(pc); + + // automatically detect firstTF in case firstTF was not specified + if (mTFStart.front() <= -1) { + const auto firstTF = tf; + const long offsetTF = std::abs(mTFStart.front() + 1); + const auto nTotTFs = getNRealTFs(); + mTFStart = {firstTF + offsetTF, firstTF + offsetTF + nTotTFs}; + mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs}; + LOGP(info, "Setting {} as first TF", mTFStart[0]); + LOGP(info, "Using offset of {} TFs for setting the first TF", offsetTF); + } + + // check which buffer to use for current incoming data + const bool currentBuffer = (tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer; + if (mTFStart[currentBuffer] > tf) { + LOGP(info, "All CRUs for current TF {} already received. Skipping this TF", tf); + return; + } + + const unsigned int currentOutLane = getOutLane(tf); + const unsigned int relTF = (tf - mTFStart[currentBuffer]) / mNTFsBuffer; + LOGP(info, "Current TF: {}, relative TF: {}, current buffer: {}, current output lane: {}, mTFStart: {}", tf, relTF, currentBuffer, currentOutLane, mTFStart[currentBuffer]); + + if (relTF >= mProcessedCRU[currentBuffer].size()) { + LOGP(warning, "Skipping tf {}: relative tf {} is larger than size of buffer: {}", tf, relTF, mProcessedCRU[currentBuffer].size()); + + // check number of processed CRUs for previous TFs. If CRUs are missing for them, they are probably lost/not received + mProcessedTotalData = mCheckEveryNData; + checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf); + return; + } + + if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) { + return; + } + + // send start info only once + if (mSendOutputStartInfo[currentBuffer]) { + mSendOutputStartInfo[currentBuffer] = false; + pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[currentBuffer]); + } + + if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) { + mSendCCDBOutputOrbitReset[currentOutLane] = false; + mSendCCDBOutputGRPECS[currentOutLane] = false; + pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{currentOutLane}}, dataformats::Pair{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()}); + } + + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); + const unsigned int cru = tpcCRUHeader->subSpecification >> 7; + + // check if cru is specified in input cru list + if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) { + LOGP(info, "Received data from CRU: {} which was not specified as input. Skipping", cru); + continue; + } + + if (mProcessedCRUs[currentBuffer][relTF][cru]) { + continue; + } else { + // count total number of processed CRUs for given TF + ++mProcessedCRU[currentBuffer][relTF]; + + // to keep track of processed CRUs + mProcessedCRUs[currentBuffer][relTF][cru] = true; + } + + // sending CMVs + sendOutput(pc, currentOutLane, cru, pc.inputs().get>(ref)); + } + + LOGP(info, "Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf); + + // check for missing data if specified + if (mNTFsDataDrop > 0) { + checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf); + } + + if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) { + ++mProcessedTFs[currentBuffer]; + } + + if (mProcessedTFs[currentBuffer] == mTimeFrames) { + finishInterval(pc, currentOutLane, currentBuffer, tf); + } + } + + void endOfStream(o2::framework::EndOfStreamContext& ec) final { ec.services().get().readyToQuit(QuitRequest::Me); } + + /// return data description for aggregated CMVs for given lane + static header::DataDescription getDataDescriptionCMV(const unsigned int lane) + { + const std::string name = fmt::format("CMVAGG{}", lane).data(); + header::DataDescription description; + description.runtimeInit(name.substr(0, 16).c_str()); + return description; + } + + static constexpr header::DataDescription getDataDescriptionCMVFirstTF() { return header::DataDescription{"CMVFIRSTTF"}; } + static constexpr header::DataDescription getDataDescriptionCMVOrbitReset() { return header::DataDescription{"CMVORBITRESET"}; } + + private: + std::vector mCRUs{}; ///< CRUs to process in this instance + const unsigned int mTimeFrames{}; ///< number of TFs per aggregation interval + const int mNTFsBuffer{1}; ///< number of TFs for which the CMVs will be buffered + const unsigned int mOutLanes{}; ///< number of output lanes + std::array mProcessedTFs{{0, 0}}; ///< number of processed time frames to keep track of when the writing to CCDB will be done + std::array, 2> mProcessedCRU{}; ///< counter of received data from CRUs per TF to merge incoming data from FLPs. Buffer used in case one FLP delivers the TF after the last TF for the current aggregation interval faster then the other FLPs the last TF. + std::array>, 2> mProcessedCRUs{}; ///< to keep track of the already processed CRUs ([buffer][relTF][CRU]) + std::array mTFStart{}; ///< storing of first TF for buffer interval + std::array mTFEnd{}; ///< storing of last TF for buffer interval + std::array mSendOutputStartInfo{true, true}; ///< flag for sending the info for the start of the aggregation interval + std::shared_ptr mCCDBRequest; ///< info for CCDB request + std::vector mSendCCDBOutputOrbitReset{}; ///< flag for received orbit reset time from CCDB + std::vector mSendCCDBOutputGRPECS{}; ///< flag for received orbit GRPECS from CCDB + unsigned int mCurrentOutLane{0}; ///< index for keeping track of the current output lane + bool mBuffer{false}; ///< buffer index + int mNFactorTFs{0}; ///< Number of TFs to skip for sending oldest TF + int mNTFsDataDrop{0}; ///< delay for the check if TFs are missing in TF units + std::array mStartNTFsDataDrop{0}; ///< first relative TF to check + long mProcessedTotalData{0}; ///< used to check for dropeed TF data + int mCheckEveryNData{1}; ///< factor after which to check for missing data (in case data missing -> send dummy data) + std::vector mFilter{}; ///< filter for looping over input data + std::vector mDataDescrOut{}; + + void sendOutput(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const unsigned int cru, o2::pmr::vector cmvs) + { + pc.outputs().adoptContainer(Output{gDataOriginTPC, mDataDescrOut[currentOutLane], header::DataHeader::SubSpecificationType{cru}}, std::move(cmvs)); + } + + /// returns the output lane to which the data will be send + unsigned int getOutLane(const uint32_t tf) const { return (tf > mTFEnd[mBuffer]) ? (mCurrentOutLane + 1) % mOutLanes : mCurrentOutLane; } + + /// returns real number of TFs taking buffer size into account + unsigned int getNRealTFs() const { return mNTFsBuffer * mTimeFrames; } + + void clearBuffer(const bool currentBuffer) + { + // resetting received CRUs + for (auto& crusMap : mProcessedCRUs[currentBuffer]) { + for (auto& it : crusMap) { + it.second = false; + } + } + + mProcessedTFs[currentBuffer] = 0; // reset processed TFs for next aggregation interval + std::fill(mProcessedCRU[currentBuffer].begin(), mProcessedCRU[currentBuffer].end(), 0); + + // set integration range for next integration interval + mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1; + mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1; + + // switch buffer + mBuffer = !mBuffer; + + // set output lane + mCurrentOutLane = ++mCurrentOutLane % mOutLanes; + } + + void checkIntervalsForMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const long relTF, const unsigned int currentOutLane, const uint32_t tf) + { + if (!(mProcessedTotalData++ % mCheckEveryNData)) { + LOGP(info, "Checking for dropped packages..."); + + // if last buffer has smaller time range check the whole last buffer + if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) { + LOGP(warning, "Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size()); + const unsigned int lastLane = (currentOutLane == 0) ? (mOutLanes - 1) : (currentOutLane - 1); + checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size(), lastLane); + LOGP(info, "All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer", tf); + finishInterval(pc, lastLane, !currentBuffer, tf); + } + + const int tfEndCheck = std::clamp(static_cast(relTF) - mNTFsDataDrop, 0, static_cast(mProcessedCRU[currentBuffer].size())); + LOGP(info, "Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck); + checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck, currentOutLane); + mStartNTFsDataDrop[currentBuffer] = tfEndCheck; + } + } + + void checkMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const int startTF, const int endTF, const unsigned int outLane) + { + for (int iTF = startTF; iTF < endTF; ++iTF) { + if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) { + LOGP(warning, "CRUs for lane {} rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", outLane, iTF, mTFStart[currentBuffer] + iTF, mProcessedCRU[currentBuffer][iTF], mCRUs.size()); + ++mProcessedTFs[currentBuffer]; + mProcessedCRU[currentBuffer][iTF] = mCRUs.size(); + + // find missing CRUs + for (auto& it : mProcessedCRUs[currentBuffer][iTF]) { + if (!it.second) { + it.second = true; + sendOutput(pc, outLane, it.first, pmr::vector()); + } + } + } + } + } + + void finishInterval(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const bool buffer, const uint32_t tf) + { + if (mNFactorTFs > 0) { + mNFactorTFs = 0; + // ToDo: Find better fix + for (unsigned int ilane = 0; ilane < mOutLanes; ++ilane) { + auto& deviceProxy = pc.services().get(); + auto& state = deviceProxy.getOutputChannelState({static_cast(ilane)}); + size_t oldest = std::numeric_limits::max() - 1; // just set to really large value + state.oldestForChannel = {oldest}; + } + } + + LOGP(info, "All TFs {} for current buffer received. Clearing buffer", tf); + clearBuffer(buffer); + mStartNTFsDataDrop[buffer] = 0; + mSendOutputStartInfo[buffer] = true; + } +}; + +DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector& crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp = false, const int nTFsBuffer = 1) +{ + std::vector inputSpecs; + const auto sides = getCMVSides(crus); + for (auto side : sides) { + const std::string name = (side == Side::A) ? "cmvsgroupa" : "cmvsgroupc"; + inputSpecs.emplace_back(InputSpec{name.data(), ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(side)}, Lifetime::Sporadic}); + } + + std::vector outputSpecs; + outputSpecs.reserve(outlanes); + for (unsigned int lane = 0; lane < outlanes; ++lane) { + outputSpecs.emplace_back(ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, Lifetime::Sporadic); + outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic); + } + + bool fetchCCDB = false; + if (sendPrecisetimeStamp && (ilane == 0)) { + fetchCCDB = true; + for (unsigned int lane = 0; lane < outlanes; ++lane) { + outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic); + } + } + + auto ccdbRequest = std::make_shared(fetchCCDB, // orbitResetTime + fetchCCDB, // GRPECS=true + false, // GRPLHCIF + false, // GRPMagField + false, // askMatLUT + o2::base::GRPGeomRequest::None, // geometry + inputSpecs); + + const std::string type = "cmv"; + const auto id = fmt::format("tpc-distribute-{}-{:02}", type, ilane); + DataProcessorSpec spec{ + id.data(), + inputSpecs, + outputSpecs, + AlgorithmSpec{adaptFromTask(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)}, + Options{{"drop-data-after-nTFs", VariantType::Int, 0, {"Number of TFs after which to drop the data."}}, + {"check-data-every-n", VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}}, + {"nFactorTFs", VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF."}}}}; // end DataProcessorSpec + spec.rank = ilane; + return spec; +} + +} // namespace o2::tpc + +#endif diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h new file mode 100644 index 0000000000000..2a7561d610d55 --- /dev/null +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h @@ -0,0 +1,137 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file TPCFLPIDCSpec.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief TPC device for processing CMVs on FLPs + +#ifndef O2_TPCFLPIDCSPEC_H +#define O2_TPCFLPIDCSPEC_H + +#include +#include +#include "Framework/Task.h" +#include "Framework/ControlService.h" +#include "Framework/Logger.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/InputRecordWalker.h" +#include "Framework/ConfigParamRegistry.h" +#include "Headers/DataHeader.h" +#include "TPCWorkflow/ProcessingHelpers.h" +#include "TPCBase/CRU.h" +#include "TFile.h" + +using namespace o2::framework; +using o2::header::gDataOriginTPC; +using namespace o2::tpc; + +namespace o2::tpc +{ + +class TPCFLPCMVDevice : public o2::framework::Task +{ + public: + TPCFLPCMVDevice(const int lane, const std::vector& crus, const int nTFsBuffer) + : mLane{lane}, mCRUs{crus}, mNTFsBuffer{nTFsBuffer} {} + + void init(o2::framework::InitContext& ic) final + { + mDumpCMVs = ic.options().get("dump-cmvs-flp"); + } + + void run(o2::framework::ProcessingContext& pc) final + { + LOGP(info, "Processing CMVs for TF {} for CRUs {} to {}", processing_helpers::getCurrentTF(pc), mCRUs.front(), mCRUs.back()); + + ++mCountTFsForBuffer; + + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); + const int cru = tpcCRUHeader->subSpecification >> 7; + auto vecCMVs = pc.inputs().get>(ref); + mCMVs[cru].insert(mCMVs[cru].end(), vecCMVs.begin(), vecCMVs.end()); + } + + if (mCountTFsForBuffer >= mNTFsBuffer) { + mCountTFsForBuffer = 0; + for (const auto cru : mCRUs) { + LOGP(info, "Sending CMVs of size {} for TF {}", mCMVs[cru].size(), processing_helpers::getCurrentTF(pc)); + sendOutput(pc.outputs(), cru); + } + } + + if (mDumpCMVs) { + TFile fOut(fmt::format("CMVs_{}_tf_{}.root", mLane, processing_helpers::getCurrentTF(pc)).data(), "RECREATE"); + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); + const int cru = tpcCRUHeader->subSpecification >> 7; + auto vec = pc.inputs().get>(ref); + fOut.WriteObject(&vec, fmt::format("CRU_{}", cru).data()); + } + } + } + + void endOfStream(o2::framework::EndOfStreamContext& ec) final + { + if (mCountTFsForBuffer > 0) { + LOGP(info, "Flushing remaining {} buffered TFs at end of stream", mCountTFsForBuffer); + for (const auto cru : mCRUs) { + sendOutput(ec.outputs(), cru); + } + } + ec.services().get().readyToQuit(QuitRequest::Me); + } + + static constexpr header::DataDescription getDataDescriptionCMVGroup(const Side side) { return (side == Side::A) ? getDataDescriptionCMVGroupA() : getDataDescriptionCMVGroupC(); } + static constexpr header::DataDescription getDataDescriptionCMVGroupA() { return header::DataDescription{"CMVGROUPA"}; } + static constexpr header::DataDescription getDataDescriptionCMVGroupC() { return header::DataDescription{"CMVGROUPC"}; } + + private: + const int mLane{}; ///< lane number of processor + const std::vector mCRUs{}; ///< CRUs to process in this instance + int mNTFsBuffer{1}; ///< number of TFs to buffer before sending + bool mDumpCMVs{}; ///< dump CMVs to file for debugging + int mCountTFsForBuffer{0}; ///< counts TFs to track when to send output + std::unordered_map> mCMVs{}; ///< buffered CMV vectors per CRU + const std::vector mFilter = {{"cmvs", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVVECTOR"}, Lifetime::Timeframe}}; + + void sendOutput(DataAllocator& output, const uint32_t cru) + { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + output.adoptContainer(Output{gDataOriginTPC, getDataDescriptionCMVGroup(CRU(cru).side()), subSpec}, std::move(mCMVs[cru])); + } +}; + +DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector& crus, const int nTFsBuffer = 1) +{ + std::vector outputSpecs; + std::vector inputSpecs; + outputSpecs.reserve(crus.size()); + inputSpecs.reserve(crus.size()); + + for (const auto& cru : crus) { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + inputSpecs.emplace_back(InputSpec{"cmvs", gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe}); + const Side side = CRU(cru).side(); + outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(side), subSpec}, Lifetime::Sporadic); + } + + const auto id = fmt::format("tpc-flp-cmv-{:02}", ilane); + return DataProcessorSpec{ + id.data(), + inputSpecs, + outputSpecs, + AlgorithmSpec{adaptFromTask(ilane, crus, nTFsBuffer)}, + Options{{"dump-cmvs-flp", VariantType::Bool, false, {"Dump CMVs to file"}}}}; +} + +} // namespace o2::tpc +#endif \ No newline at end of file diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCFactorizeCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCFactorizeCMVSpec.h new file mode 100644 index 0000000000000..70f5d30c91f6f --- /dev/null +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCFactorizeCMVSpec.h @@ -0,0 +1,346 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file TPCFactorizeCMVSpec.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief TPC device that collects per CRU CMV vectors over an aggregation interval and writes them as a TTree (via CMVContainer) to the CCDB + +#ifndef O2_TPCFACTORIZECMVSPEC_H +#define O2_TPCFACTORIZECMVSPEC_H + +#include +#include +#include + +#include "TMemFile.h" + +#include "Framework/Task.h" +#include "Framework/ControlService.h" +#include "Framework/Logger.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/InputRecordWalker.h" +#include "Framework/DataRefUtils.h" +#include "Framework/DataTakingContext.h" +#include "Headers/DataHeader.h" +#include "CommonDataFormat/Pair.h" +#include "CCDB/CcdbApi.h" +#include "CCDB/CcdbObjectInfo.h" +#include "DetectorsCalibration/Utils.h" +#include "TPCWorkflow/TPCDistributeCMVSpec.h" +#include "TPCWorkflow/ProcessingHelpers.h" +#include "TPCCalibration/CMVContainer.h" +#include "DataFormatsTPC/CMV.h" +#include "MemoryResources/MemoryResources.h" + +using namespace o2::framework; +using o2::header::gDataOriginTPC; + +namespace o2::tpc +{ + +class TPCFactorizeCMVDevice : public o2::framework::Task +{ + public: + TPCFactorizeCMVDevice(const int lane, + const std::vector& crus, + const unsigned int timeframes, + const bool sendCCDB, + const bool usePreciseTimestamp, + const int nTFsBuffer) + : mLaneId{lane}, + mCRUs{crus}, + mTimeFrames{timeframes}, + mSendCCDB{sendCCDB}, + mUsePreciseTimestamp{usePreciseTimestamp}, + mNTFsBuffer{nTFsBuffer} + { + mContainer.reserve(mTimeFrames, static_cast(mCRUs.size())); + } + + void init(o2::framework::InitContext& ic) final + { + mNOrbitsCMV = ic.options().get("orbits-CMVs"); + mDumpCMVs = ic.options().get("dump-cmvs"); + mOffsetCCDB = ic.options().get("add-offset-for-CCDB-timestamp"); + } + + void run(o2::framework::ProcessingContext& pc) final + { + // Capture orbit-reset info once for precise CCDB timestamp calculation + if (mUsePreciseTimestamp && pc.inputs().isValid("orbitreset")) { + mTFInfo = pc.inputs().get>("orbitreset"); + if (pc.inputs().countValidInputs() == 1) { + return; // only the orbit-reset message arrived this round + } + } + + // Record the absolute first TF of this aggregation interval + const auto currTF = processing_helpers::getCurrentTF(pc); + if (mTFFirst == -1 && pc.inputs().isValid("firstTF")) { + mTFFirst = pc.inputs().get("firstTF"); + mContainer.firstTF = mTFFirst; + } + if (mTFFirst == -1) { + mTFFirst = currTF; + mContainer.firstTF = mTFFirst; + LOGP(warning, "firstTF not Found! Found valid inputs {}. Setting {} as first TF", pc.inputs().countValidInputs(), mTFFirst); + } + + // Set data taking context only once + if (mSetDataTakingCont) { + mDataTakingContext = pc.services().get(); + mSetDataTakingCont = false; + } + + // Set the run number only once + if (!mRun) { + mRun = processing_helpers::getRunNumber(pc); + } + + const long relTF = (currTF - mTFFirst) / mNTFsBuffer; + + // Set CCDB start timestamp once at the start of each aggregation interval + if (mTimestampStart == 0) { + setTimestampCCDB(relTF, pc); + } + + if (relTF < 0 || relTF >= static_cast(mTimeFrames)) { + LOGP(warning, "relTF={} out of range [0, {}), skipping TF {}", relTF, mTimeFrames, currTF); + return; + } + + // Consume all incoming CMV vectors for this TF + for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + auto const* hdr = DataRefUtils::getHeader(ref); + const uint32_t cru = hdr->subSpecification; + + auto cmvVec = pc.inputs().get>(ref); + if (cmvVec.empty()) { + LOGP(warning, "Received empty CMV vector for CRU {}, skipping", cru); + ++mProcessedCRUs; // count it to avoid stalling the completion check + continue; + } + + // Each TF carries 4 packets × NTimeBinsPerPacket floats = NTimeBinsPerTF floats + if (cmvVec.size() != cmv::NTimeBinsPerTF) { + LOGP(warning, "CRU {}: got {} CMV values, expected {} (4 packets × {})", + cru, cmvVec.size(), cmv::NTimeBinsPerTF, cmv::NTimeBinsPerPacket); + } + + // Store one entry per timebin into the container + const uint32_t tfCounter = static_cast(currTF); + for (uint32_t tb = 0; tb < static_cast(cmvVec.size()); ++tb) { + mContainer.addEntry(cmvVec[tb], cru, tb, tfCounter); + } + ++mProcessedCRUs; + } + + // Once all CRUs × all TFs have been received, write out + if (mProcessedCRUs == mCRUs.size() * mTimeFrames) { + mContainer.nTFs = static_cast(mTimeFrames); + mContainer.nCRUs = static_cast(mCRUs.size()); + LOGP(info, "ProcessedTFs: {} currTF: {} relTF: {} OrbitResetTime: {} orbits per TF: {}", + mProcessedCRUs / mCRUs.size(), currTF, relTF, mTFInfo.first, mTFInfo.second); + sendOutput(pc.outputs()); + } + } + + void endOfStream(o2::framework::EndOfStreamContext& ec) final + { + LOGP(info, "End of stream – flushing CMV container ({} entries, lane {})", mContainer.size(), mLaneId); + mContainer.nTFs = static_cast(mTimeFrames); + mContainer.nCRUs = static_cast(mCRUs.size()); + sendOutput(ec.outputs()); + ec.services().get().readyToQuit(QuitRequest::Me); + } + + static constexpr header::DataDescription getDataDescriptionCCDBCMV() { return header::DataDescription{"TPC_CMV"}; } + + private: + const int mLaneId{0}; + const std::vector mCRUs{}; + const unsigned int mTimeFrames{}; + const bool mSendCCDB{false}; + const bool mUsePreciseTimestamp{false}; + const int mNTFsBuffer{1}; + int mNOrbitsCMV{12}; ///< orbits per CMV integration window (for CCDB timestamp range) + bool mDumpCMVs{false}; ///< write a local ROOT debug file + bool mOffsetCCDB{false}; + long mTFFirst{-1}; + long mTimestampStart{0}; + unsigned int mProcessedCRUs{0}; ///< total CRU entries received in this interval + uint64_t mRun{0}; + dataformats::Pair mTFInfo{}; + CMVContainer mContainer{}; + o2::framework::DataTakingContext mDataTakingContext{}; + bool mSetDataTakingCont{true}; + const std::vector mFilter{ + {"cmvagg", + ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(mLaneId)}, + Lifetime::Sporadic}}; + + /// Determine the CCDB start timestamp from orbit-reset time or framework creation time (depending on mUsePreciseTimestamp). + void setTimestampCCDB(const long relTF, o2::framework::ProcessingContext& pc) + { + if (mUsePreciseTimestamp && !mTFInfo.second) { + return; + } + const auto& tinfo = pc.services().get(); + const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * mTFInfo.second; + mTimestampStart = mUsePreciseTimestamp + ? (mTFInfo.first + (tinfo.firstTForbit - nOrbitsOffset) * o2::constants::lhc::LHCOrbitMUS * 0.001) + : tinfo.creation; + LOGP(info, "setting time stamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}", + mTFInfo.first, tinfo.tfCounter, tinfo.firstTForbit, mTFInfo.second, relTF, nOrbitsOffset); + } + + /// Serialise mContainer into a TMemFile and push to CCDB, then reset state. + void sendOutput(DataAllocator& output) + { + using timer = std::chrono::high_resolution_clock; + + if (mContainer.empty()) { + LOGP(warning, "CMV container is empty at sendOutput (lane {}), skipping", mLaneId); + reset(); + return; + } + + LOGP(info, "{}", mContainer.summary()); + + // Compute CCDB validity window + const long offsetCCDB = mOffsetCCDB ? o2::ccdb::CcdbObjectInfo::HOUR : 0; + const long timeStampEnd = offsetCCDB + mTimestampStart + + mNOrbitsCMV * mTimeFrames * mNTFsBuffer * o2::constants::lhc::LHCOrbitMUS * 0.001; + LOGP(info, "Setting time stamp range from {} to {} for writing to CCDB with an offset of {}", + mTimestampStart, timeStampEnd, offsetCCDB); + + // Write local ROOT file for debugging + if (mDumpCMVs) { + const std::string fname = fmt::format("CMV_lane{:02}_TS{}.root", mLaneId, mTimestampStart); + try { + mContainer.writeToFile(fname); + LOGP(info, "CMV debug file written to {}", fname); + } catch (const std::exception& e) { + LOGP(error, "Failed to write CMV debug file: {}", e.what()); + } + } + + if (mSendCCDB && timeStampEnd > mTimestampStart) { + auto start = timer::now(); + + // Build a TMemFile containing the TTree, hand it to CcdbApi for serialisation + TMemFile memFile("CMV.root", "RECREATE"); + try { + auto tree = mContainer.toTTree(); + memFile.cd(); + tree->Write(); + } catch (const std::exception& e) { + LOGP(error, "CMVContainer::toTTree() failed: {}", e.what()); + reset(); + return; + } + memFile.Write(); + + o2::ccdb::CcdbObjectInfo ccdbInfo( + "TPC/Calib/CMV", + "TTree", + "CMV.root", + /*metadata=*/{}, + mTimestampStart, + timeStampEnd); + + auto image = o2::ccdb::CcdbApi::createObjectImage(&memFile, &ccdbInfo); + LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {}", + ccdbInfo.getPath(), ccdbInfo.getFileName(), image->size(), + ccdbInfo.getStartValidityTimestamp(), ccdbInfo.getEndValidityTimestamp()); + + output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image); + output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfo); + + auto stop = timer::now(); + std::chrono::duration elapsed = stop - start; + LOGP(info, "CMV CCDB serialisation time: {:.3f} s", elapsed.count()); + } + + reset(); + } + + /// Reset all per-interval state + void reset() + { + mContainer.clear(); + mContainer.reserve(mTimeFrames, static_cast(mCRUs.size())); + mTimestampStart = 0; + mTFFirst = -1; + mProcessedCRUs = 0; + mSetDataTakingCont = true; + LOGP(info, "Everything cleared. Waiting for new data to arrive."); + } +}; + +inline DataProcessorSpec getTPCFactorizeCMVSpec( + const int lane, + const std::vector& crus, + const unsigned int timeframes, + const bool sendCCDB, + const bool usePreciseTimestamp, + const int nTFsBuffer = 1) +{ + std::vector outputSpecs; + if (sendCCDB) { + outputSpecs.emplace_back( + ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, + TPCFactorizeCMVDevice::getDataDescriptionCCDBCMV()}, + Lifetime::Sporadic); + outputSpecs.emplace_back( + ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, + TPCFactorizeCMVDevice::getDataDescriptionCCDBCMV()}, + Lifetime::Sporadic); + } + + std::vector inputSpecs; + inputSpecs.emplace_back(InputSpec{ + "cmvagg", + ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, + Lifetime::Sporadic}); + inputSpecs.emplace_back(InputSpec{ + "firstTF", + gDataOriginTPC, + TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), + header::DataHeader::SubSpecificationType{static_cast(lane)}, + Lifetime::Sporadic}); + if (usePreciseTimestamp) { + inputSpecs.emplace_back(InputSpec{ + "orbitreset", + gDataOriginTPC, + TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), + header::DataHeader::SubSpecificationType{static_cast(lane)}, + Lifetime::Sporadic}); + } + + const std::string type = "cmv"; + DataProcessorSpec spec{ + fmt::format("tpc-factorize-{}-{:02}", type, lane).data(), + inputSpecs, + outputSpecs, + AlgorithmSpec{adaptFromTask(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer)}, + Options{ + {"orbits-CMVs", VariantType::Int, 12, {"Number of orbits over which the CMVs are integrated"}}, + {"dump-cmvs", VariantType::Bool, false, {"Dump CMVs to a local ROOT file for debugging"}}, + {"add-offset-for-CCDB-timestamp", VariantType::Bool, false, {"Add an offset of 1 hour for the validity range of the CCDB objects"}}}}; + + spec.rank = lane; + return spec; +} + +} // namespace o2::tpc + +#endif // O2_TPCFACTORIZECMVSPEC_H \ No newline at end of file diff --git a/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx new file mode 100644 index 0000000000000..49d05683151eb --- /dev/null +++ b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx @@ -0,0 +1,445 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMVToVectorSpec.cxx +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Processor to convert CMVs to a vector in a CRU + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "TFile.h" +#include "DetectorsRaw/RDHUtils.h" +#include "Framework/Task.h" +#include "Framework/ControlService.h" +#include "Framework/ConfigParamRegistry.h" +#include "Framework/Logger.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/InputRecordWalker.h" +#include "Framework/DataRefUtils.h" +#include "DPLUtils/RawParser.h" +#include "Headers/DataHeader.h" +#include "Headers/DataHeaderHelpers.h" +#include "CommonUtils/TreeStreamRedirector.h" +#include "CommonUtils/NameConf.h" +#include "DataFormatsTPC/Constants.h" +#include "CommonConstants/LHCConstants.h" +#include "CCDB/BasicCCDBManager.h" + +#include "DataFormatsTPC/Defs.h" +#include "DataFormatsTPC/CMV.h" +#include "DataFormatsTPC/RawDataTypes.h" +#include "TPCBase/Utils.h" +#include "TPCBase/RDHUtils.h" +#include "TPCBase/Mapper.h" +#include "TPCWorkflow/ProcessingHelpers.h" + +using namespace o2::framework; +using o2::constants::lhc::LHCMaxBunches; +using o2::header::gDataOriginTPC; +using o2::tpc::constants::LHCBCPERTIMEBIN; +using RDHUtils = o2::raw::RDHUtils; +using RawDataType = o2::tpc::raw_data_types::Type; + +namespace o2::tpc +{ + +class CMVToVectorDevice : public o2::framework::Task +{ + public: + using FEEIDType = rdh_utils::FEEIDType; + CMVToVectorDevice(const std::vector& crus) : mCRUs(crus) {} + + void init(o2::framework::InitContext& ic) final + { + // set up ADC value filling + mWriteDebug = ic.options().get("write-debug"); + mWriteDebugOnError = ic.options().get("write-debug-on-error"); + mWriteRawDataOnError = ic.options().get("write-raw-data-on-error"); + mRawDataType = ic.options().get("raw-data-type"); + o2::framework::RawParser<>::setCheckIncompleteHBF(ic.options().get("check-incomplete-hbf")); + + mDebugStreamFileName = ic.options().get("debug-file-name").data(); + mRawOutputFileName = ic.options().get("raw-file-name").data(); + + initCMV(); + } + + void run(o2::framework::ProcessingContext& pc) final + { + const auto runNumber = processing_helpers::getRunNumber(pc); + std::vector filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}}; + const auto& mapper = Mapper::instance(); + + // open files if necessary + if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) { + const auto debugFileName = fmt::format(fmt::runtime(mDebugStreamFileName), fmt::arg("run", runNumber)); + LOGP(info, "Creating debug stream {}", debugFileName); + mDebugStream = std::make_unique(debugFileName.data(), "recreate"); + } + + if (mWriteRawDataOnError && !mRawOutputFile.is_open()) { + std::string_view rawType = (mRawDataType < 2) ? "tf" : "raw"; + if (mRawDataType == 5) { + rawType = "cmv.raw"; + } + const auto rawFileName = fmt::format(fmt::runtime(mRawOutputFileName), fmt::arg("run", runNumber), fmt::arg("raw_type", rawType)); + LOGP(info, "Creating raw debug file {}", rawFileName); + mRawOutputFile.open(rawFileName, std::ios::binary); + } + + uint32_t heartbeatOrbit = 0; + uint16_t heartbeatBC = 0; + uint32_t tfCounter = 0; + bool first = true; + bool hasErrors = false; + + for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) { + const auto* dh = DataRefUtils::getHeader(ref); + tfCounter = dh->tfCounter; + const auto subSpecification = dh->subSpecification; + auto payloadSize = DataRefUtils::getPayloadSize(ref); + LOGP(info, "Processing TF {}, subSpecification {}, payloadSize {}", tfCounter, subSpecification, payloadSize); + + // ---| data loop |--- + const gsl::span raw = pc.inputs().get>(ref); + try { + o2::framework::RawParser parser(raw.data(), raw.size()); + size_t lastErrorCount = 0; + + for (auto it = parser.begin(), end = parser.end(); it != end; ++it) { + const auto size = it.size(); + + if (parser.getNErrors() > lastErrorCount) { + lastErrorCount = parser.getNErrors(); + hasErrors = true; + } + + // skip empty packages (HBF open) + if (size == 0) { + continue; + } + + auto rdhPtr = reinterpret_cast(it.raw()); + const auto rdhVersion = RDHUtils::getVersion(rdhPtr); + if (!rdhPtr || rdhVersion < 6) { + throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data()); + } + + // ---| extract hardware information to do the processing |--- + const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr); + const auto link = rdh_utils::getLink(feeId); + const uint32_t cruID = rdh_utils::getCRU(feeId); + const auto detField = RDHUtils::getDetectorField(*rdhPtr); + + LOGP(info, "Detected CMV packet: CRU {}, link {}, feeId {}", cruID, link, feeId); + + if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) { + LOGP(debug, "Skipping packet: detField {}, (expected RawDataType {}), link {}, (expected CMVLinkID {})", detField, (decltype(detField))RawDataType::CMV, link, rdh_utils::CMVLinkID); + continue; + } + + LOGP(debug, "Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6}, cruID {:3}, link {:2}", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, link); + + if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) { + LOGP(error, "CMV CRU {:3} not configured in CRUs, skipping", cruID); + continue; + } + + auto& cmvVec = mCMVvectors[cruID]; + auto& infoVec = mCMVInfos[cruID]; + + if (size != sizeof(cmv::Container)) { + LOGP(error, "CMV packet size mismatch: got {} bytes, expected {} bytes (sizeof cmv::Container). Skipping package.", size, sizeof(cmv::Container)); + return; + } + auto data = it.data(); + auto& cmvs = *((cmv::Container*)(data)); + const uint32_t orbit = cmvs.header.heartbeatOrbit; + const uint16_t bc = cmvs.header.heartbeatBC; + + // record packet meta and append its CMV vector (3564 TB) + infoVec.emplace_back(orbit, bc); + cmvVec.reserve(cmvVec.size() + cmv::NTimeBinsPerPacket); + for (uint32_t tb = 0; tb < cmv::NTimeBinsPerPacket; ++tb) { + cmvVec.push_back(cmvs.getCMVFloat(tb)); + // LOGP(debug, "Appended CMV {} for timebin {}, CRU {}, orbit {}, bc {}", cmvs.getCMVFloat(tb), tb, cruID, orbit, bc); + } + } + } catch (const std::exception& e) { + // error message throtteling + using namespace std::literals::chrono_literals; + static std::unordered_map nErrorPerSubspec; + static std::chrono::time_point lastReport = std::chrono::steady_clock::now(); + const auto now = std::chrono::steady_clock::now(); + static size_t reportedErrors = 0; + const size_t MAXERRORS = 10; + const auto sleepTime = 10min; + ++nErrorPerSubspec[subSpecification]; + + if ((now - lastReport) < sleepTime) { + if (reportedErrors < MAXERRORS) { + ++reportedErrors; + std::string sleepInfo; + if (reportedErrors == MAXERRORS) { + sleepInfo = fmt::format(", maximum error count ({}) reached, not reporting for the next {}", MAXERRORS, sleepTime); + } + LOGP(alarm, "EXCEPTION in processRawData: {} -> skipping part:{}/{} of spec:{}/{}/{}, size:{}, error count for subspec: {}{}", e.what(), dh->splitPayloadIndex, dh->splitPayloadParts, + dh->dataOrigin, dh->dataDescription, subSpecification, payloadSize, nErrorPerSubspec.at(subSpecification), sleepInfo); + lastReport = now; + } + } else { + lastReport = now; + reportedErrors = 0; + } + continue; + } + } + + hasErrors |= snapshotCMVs(pc.outputs(), tfCounter); + + if (mWriteDebug || (mWriteDebugOnError && hasErrors)) { + writeDebugOutput(tfCounter); + } + + if (mWriteRawDataOnError && hasErrors) { + writeRawData(pc.inputs()); + } + + // clear output + initCMV(); + } + + void closeFiles() + { + LOGP(info, "closeFiles"); + + if (mDebugStream) { + // set some default aliases + auto& stream = (*mDebugStream) << "cmvs"; + auto& tree = stream.getTree(); + tree.SetAlias("sector", "int(cru/10)"); + mDebugStream->Close(); + mDebugStream.reset(nullptr); + mRawOutputFile.close(); + } + } + + void stop() final + { + LOGP(info, "stop"); + closeFiles(); + } + + void endOfStream(o2::framework::EndOfStreamContext& ec) final + { + LOGP(info, "endOfStream"); + // ec.services().get().readyToQuit(QuitRequest::Me); + closeFiles(); + } + + private: + /// CMV information for each cru + struct CMVInfo { + CMVInfo() = default; + CMVInfo(const CMVInfo&) = default; + CMVInfo(uint32_t orbit, uint16_t bc) : heartbeatOrbit(orbit), heartbeatBC(bc) {} + + uint32_t heartbeatOrbit{0}; + uint16_t heartbeatBC{0}; + + bool operator==(const uint32_t orbit) const { return (heartbeatOrbit == orbit); } + bool operator==(const CMVInfo& inf) const { return (inf.heartbeatOrbit == heartbeatOrbit) && (inf.heartbeatBC == heartbeatBC); } + bool matches(uint32_t orbit, int16_t bc) const { return ((heartbeatOrbit == orbit) && (heartbeatBC == bc)); } + }; + + int mRawDataType{0}; ///< type of raw data to dump in case of errors + bool mWriteDebug{false}; ///< write a debug output + bool mWriteDebugOnError{false}; ///< write a debug output in case of errors + bool mWriteRawDataOnError{false}; ///< write raw data in case of errors + std::vector mCRUs; ///< CRUs expected for this device + std::unordered_map> mCMVvectors; ///< decoded CMVs per cru over all CMV packets in the TF + std::unordered_map> mCMVInfos; ///< CMV packet information within the TF + std::string mDebugStreamFileName; ///< name of the debug stream output file + std::unique_ptr mDebugStream; ///< debug output streamer + std::ofstream mRawOutputFile; ///< raw output file + std::string mRawOutputFileName; ///< name of the raw output file + + //____________________________________________________________________________ + bool snapshotCMVs(DataAllocator& output, uint32_t tfCounter) + { + bool hasErrors = false; + + // send data per CRU with its own orbit/BC vector + for (auto& [cru, cmvVec] : mCMVvectors) { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + const auto& infVec = mCMVInfos[cru]; + + if (infVec.size() != 4) { + LOGP(warning, "CRU {:3}: expected 4 packets per TF, got {}", cru, infVec.size()); + hasErrors = true; + } + if (cmvVec.size() != cmv::NTimeBinsPerPacket * infVec.size()) { + LOGP(warning, "CRU {:3}: vector size {} does not match expected {}", cru, cmvVec.size(), cmv::NTimeBinsPerPacket * infVec.size()); + hasErrors = true; + } + + std::vector orbitBCInfo; + orbitBCInfo.reserve(infVec.size()); + for (const auto& inf : infVec) { + orbitBCInfo.emplace_back((uint64_t(inf.heartbeatOrbit) << 32) + uint64_t(inf.heartbeatBC)); + } + + LOGP(debug, "Sending CMVs for CRU {} of size {} ({} packets)", cru, cmvVec.size(), infVec.size()); + output.snapshot(Output{gDataOriginTPC, "CMVVECTOR", subSpec}, cmvVec); + output.snapshot(Output{gDataOriginTPC, "CMVORBITS", subSpec}, orbitBCInfo); + } + + return hasErrors; + } + + //____________________________________________________________________________ + void initCMV() + { + for (const auto cruID : mCRUs) { + auto& cmvVec = mCMVvectors[cruID]; + cmvVec.clear(); + + auto& infosCRU = mCMVInfos[cruID]; + infosCRU.clear(); + } + } + + //____________________________________________________________________________ + void writeDebugOutput(uint32_t tfCounter) + { + const auto& mapper = Mapper::instance(); + + mDebugStream->GetFile()->cd(); + auto& stream = (*mDebugStream) << "cmvs"; + uint32_t seen = 0; + static uint32_t firstOrbit = std::numeric_limits::max(); + + for (auto cru : mCRUs) { + if (mCMVInfos.find(cru) == mCMVInfos.end()) { + continue; + } + + auto& infos = mCMVInfos[cru]; + auto& cmvVec = mCMVvectors[cru]; + + stream << "cru=" << cru + << "tfCounter=" << tfCounter + << "nCMVs=" << cmvVec.size() + << "cmvs=" << cmvVec + << "\n"; + } + } + + void writeRawData(InputRecord& inputs) + { + if (!mRawOutputFile.is_open()) { + return; + } + + using DataHeader = o2::header::DataHeader; + + std::vector filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}}; + for (auto const& ref : InputRecordWalker(inputs, filter)) { + auto dh = DataRefUtils::getHeader(ref); + // LOGP(info, "write header: {}/{}/{}, payload size: {} / {}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dh->payloadSize, ref.payloadSize); + if (((mRawDataType == 1) || (mRawDataType == 3)) && (dh->payloadSize == 2 * sizeof(o2::header::RAWDataHeader))) { + continue; + } + + if (mRawDataType < 2) { + mRawOutputFile.write(ref.header, sizeof(DataHeader)); + } + if (mRawDataType < 5) { + mRawOutputFile.write(ref.payload, ref.payloadSize); + } + + if (mRawDataType == 5) { + const gsl::span raw = inputs.get>(ref); + try { + o2::framework::RawParser parser(raw.data(), raw.size()); + for (auto it = parser.begin(), end = parser.end(); it != end; ++it) { + const auto size = it.size(); + // skip empty packages (HBF open) + if (size == 0) { + continue; + } + + auto rdhPtr = reinterpret_cast(it.raw()); + const auto rdhVersion = RDHUtils::getVersion(rdhPtr); + if (!rdhPtr || rdhVersion < 6) { + throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data()); + } + + // ---| extract hardware information to do the processing |--- + const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr); + const auto link = rdh_utils::getLink(feeId); + const auto detField = RDHUtils::getDetectorField(*rdhPtr); + + // only select CMVs + if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) { + continue; + } + + // write out raw data + mRawOutputFile.write((const char*)it.raw(), RDHUtils::getMemorySize(rdhPtr)); + } + } catch (...) { + } + } + } + } +}; + +o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector const& crus) +{ + using device = o2::tpc::CMVToVectorDevice; + + std::vector outputs; + for (const uint32_t cru : crus) { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + outputs.emplace_back(gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe); + outputs.emplace_back(gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe); + } + + return DataProcessorSpec{ + fmt::format("tpc-cmv-to-vector"), + select(inputSpec.data()), + outputs, + AlgorithmSpec{adaptFromTask(crus)}, + Options{ + {"write-debug", VariantType::Bool, false, {"write a debug output tree"}}, + {"write-debug-on-error", VariantType::Bool, false, {"write a debug output tree in case errors occurred"}}, + {"debug-file-name", VariantType::String, "/tmp/cmv_vector_debug.{run}.root", {"name of the debug output file"}}, + {"write-raw-data-on-error", VariantType::Bool, false, {"dump raw data in case errors occurred"}}, + {"raw-file-name", VariantType::String, "/tmp/cmv_debug.{run}.{raw_type}", {"name of the raw output file"}}, + {"raw-data-type", VariantType::Int, 0, {"Which raw data to dump: 0-full TPC with DH, 1-full TPC with DH skip empty, 2-full TPC no DH, 3-full TPC no DH skip empty, 4-IDC raw only 5-CMV raw only"}}, + {"check-incomplete-hbf", VariantType::Bool, false, {"false: don't chck; true: check and report"}}, + } // end Options + }; // end DataProcessorSpec +} +} // namespace o2::tpc \ No newline at end of file diff --git a/Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx b/Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx new file mode 100644 index 0000000000000..80ab0ce2f96b8 --- /dev/null +++ b/Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx @@ -0,0 +1,72 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include + +#include "Algorithm/RangeTokenizer.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ControlService.h" +#include "Framework/ConfigParamSpec.h" +#include "Framework/CompletionPolicy.h" +#include "Framework/CompletionPolicyHelpers.h" +#include "CommonUtils/ConfigurableParam.h" +#include "TPCBase/CRU.h" +#include "TPCWorkflow/CMVToVectorSpec.h" + +using namespace o2::framework; +using namespace o2::tpc; + +// customize the completion policy +void customize(std::vector& policies) +{ + using o2::framework::CompletionPolicy; + policies.push_back(CompletionPolicyHelpers::defineByName("tpc-cmv-to-vector", CompletionPolicy::CompletionOp::Consume)); +} + +// we need to add workflow options before including Framework/runDataProcessing +void customize(std::vector& workflowOptions) +{ + std::string crusDefault = "0-" + std::to_string(CRU::MaxCRU - 1); + + std::vector options{ + {"input-spec", VariantType::String, "A:TPC/RAWDATA", {"selection string input specs"}}, + {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}}, + {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}}, + {"crus", VariantType::String, crusDefault.c_str(), {"List of TPC crus, comma separated ranges, e.g. 0-3,7,9-15"}}, + }; + + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& config) +{ + + using namespace o2::tpc; + + // set up configuration + o2::conf::ConfigurableParam::updateFromFile(config.options().get("configFile")); + o2::conf::ConfigurableParam::updateFromString(config.options().get("configKeyValues")); + o2::conf::ConfigurableParam::writeINI("o2tpccmv_configuration.ini"); + + const std::string inputSpec = config.options().get("input-spec"); + + const auto crus = o2::RangeTokenizer::tokenize(config.options().get("crus")); + + WorkflowSpec workflow; + + workflow.emplace_back(getCMVToVectorSpec(inputSpec, crus)); + + return workflow; +} diff --git a/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx b/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx new file mode 100644 index 0000000000000..05b9567f5cf16 --- /dev/null +++ b/Detectors/TPC/workflow/src/tpc-distribute-cmv.cxx @@ -0,0 +1,84 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include "Algorithm/RangeTokenizer.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ConfigParamSpec.h" +#include "CommonUtils/ConfigurableParam.h" +#include "TPCWorkflow/TPCDistributeCMVSpec.h" +#include "Framework/CompletionPolicyHelpers.h" + +using namespace o2::framework; + +// customize the completion policy +void customize(std::vector& policies) +{ + using o2::framework::CompletionPolicy; + policies.push_back(CompletionPolicyHelpers::defineByName("tpc-distribute-*.*", CompletionPolicy::CompletionOp::Consume)); +} + +// we need to add workflow options before including Framework/runDataProcessing +void customize(std::vector& workflowOptions) +{ + const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1); + + std::vector options{ + {"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma separated ranges, e.g. 0-3,7,9-15"}}, + {"timeframes", VariantType::Int, 2000, {"Number of TFs which will be aggregated per aggregation interval."}}, + {"firstTF", VariantType::Int, -1, {"First time frame index. (if set to -1 the first TF will be automatically detected. Values < -1 are setting an offset for skipping the first TFs)"}}, + {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}, + {"lanes", VariantType::Int, 1, {"Number of lanes of this device (CRUs are split per lane)"}}, + {"send-precise-timestamp", VariantType::Bool, false, {"Send precise timestamp which can be used for writing to CCDB"}}, + {"n-TFs-buffer", VariantType::Int, 1, {"Buffer which was defined in the TPCFLPCMVSpec."}}, + {"output-lanes", VariantType::Int, 2, {"Number of parallel pipelines which will be used in the factorization device."}}}; + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& config) +{ + using namespace o2::tpc; + + // set up configuration + o2::conf::ConfigurableParam::updateFromString(config.options().get("configKeyValues")); + o2::conf::ConfigurableParam::writeINI("o2tpcdistributecmv_configuration.ini"); + + const auto tpcCRUs = o2::RangeTokenizer::tokenize(config.options().get("crus")); + const auto nCRUs = tpcCRUs.size(); + auto timeframes = static_cast(config.options().get("timeframes")); + const auto outlanes = static_cast(config.options().get("output-lanes")); + const auto nLanes = static_cast(config.options().get("lanes")); + const auto firstTF = static_cast(config.options().get("firstTF")); + const bool sendPrecisetimeStamp = config.options().get("send-precise-timestamp"); + int nTFsBuffer = config.options().get("n-TFs-buffer"); + if (nTFsBuffer <= 0) { + nTFsBuffer = 1; + } + assert(timeframes >= nTFsBuffer); + timeframes /= nTFsBuffer; + LOGP(info, "Using {} timeframes as each TF contains {} CMVs", timeframes, nTFsBuffer); + const auto crusPerLane = nCRUs / nLanes + ((nCRUs % nLanes) != 0); + WorkflowSpec workflow; + for (int ilane = 0; ilane < nLanes; ++ilane) { + const auto first = tpcCRUs.begin() + ilane * crusPerLane; + if (first >= tpcCRUs.end()) { + break; + } + const auto last = std::min(tpcCRUs.end(), first + crusPerLane); + const std::vector rangeCRUs(first, last); + workflow.emplace_back(getTPCDistributeCMVSpec(ilane, rangeCRUs, timeframes, outlanes, firstTF, sendPrecisetimeStamp, nTFsBuffer)); + } + + return workflow; +} diff --git a/Detectors/TPC/workflow/src/tpc-factorize-cmv.cxx b/Detectors/TPC/workflow/src/tpc-factorize-cmv.cxx new file mode 100644 index 0000000000000..48c9c9f449278 --- /dev/null +++ b/Detectors/TPC/workflow/src/tpc-factorize-cmv.cxx @@ -0,0 +1,86 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include "Algorithm/RangeTokenizer.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ConfigParamSpec.h" +#include "CommonUtils/ConfigurableParam.h" +#include "TPCWorkflow/TPCFactorizeCMVSpec.h" +#include "Framework/CompletionPolicyHelpers.h" + +using namespace o2::framework; + +void customize(std::vector& policies) +{ + using o2::framework::CompletionPolicy; + policies.push_back(CompletionPolicyHelpers::defineByName( + "tpc-factorize-cmv-*.*", CompletionPolicy::CompletionOp::Consume)); +} + +void customize(std::vector& workflowOptions) +{ + const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1); + + std::vector options{ + {"configFile", VariantType::String, "", {"Configuration file for configurable parameters"}}, + {"timeframes", VariantType::Int, 2000, {"Number of TFs aggregated per calibration interval"}}, + {"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma-separated ranges, e.g. 0-3,7,9-15"}}, + {"input-lanes", VariantType::Int, 2, {"Number of parallel pipelines set in the TPCDistributeCMVSpec device"}}, + {"use-precise-timestamp", VariantType::Bool, false, {"Use precise timestamp from distributor when writing to CCDB"}}, + {"enable-CCDB-output", VariantType::Bool, false, {"Send output to the CCDB populator"}}, + {"n-TFs-buffer", VariantType::Int, 1, {"Buffer size that was set in TPCFLPCMVSpec"}}, + {"configKeyValues", VariantType::String, "", {"Semicolon-separated key=value strings"}}}; + + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& config) +{ + using namespace o2::tpc; + + o2::conf::ConfigurableParam::updateFromFile(config.options().get("configFile")); + o2::conf::ConfigurableParam::updateFromString(config.options().get("configKeyValues")); + o2::conf::ConfigurableParam::writeINI("o2tpcfactorizecmv_configuration.ini"); + + const auto tpcCRUs = o2::RangeTokenizer::tokenize(config.options().get("crus")); + + auto timeframes = static_cast(config.options().get("timeframes")); + const auto nLanes = static_cast(config.options().get("input-lanes")); + const bool usePreciseTimestamp = config.options().get("use-precise-timestamp"); + const bool sendCCDB = config.options().get("enable-CCDB-output"); + + int nTFsBuffer = config.options().get("n-TFs-buffer"); + if (nTFsBuffer <= 0) { + nTFsBuffer = 1; + } + + assert(timeframes >= static_cast(nTFsBuffer)); + timeframes /= static_cast(nTFsBuffer); + + const std::vector rangeCRUs(tpcCRUs.begin(), tpcCRUs.end()); + + WorkflowSpec workflow; + workflow.reserve(nLanes); + for (int ilane = 0; ilane < static_cast(nLanes); ++ilane) { + workflow.emplace_back(getTPCFactorizeCMVSpec( + ilane, + rangeCRUs, + timeframes, + sendCCDB, + usePreciseTimestamp, + nTFsBuffer)); + } + return workflow; +} \ No newline at end of file diff --git a/Detectors/TPC/workflow/src/tpc-flp-cmv.cxx b/Detectors/TPC/workflow/src/tpc-flp-cmv.cxx new file mode 100644 index 0000000000000..01b15504b8558 --- /dev/null +++ b/Detectors/TPC/workflow/src/tpc-flp-cmv.cxx @@ -0,0 +1,72 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include +#include "CommonUtils/ConfigurableParam.h" +#include "Algorithm/RangeTokenizer.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ConfigParamSpec.h" +#include "TPCWorkflow/TPCFLPCMVSpec.h" +#include "TPCBase/CRU.h" + +using namespace o2::framework; + +void customize(std::vector& workflowOptions) +{ + const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1); + const int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2); + + std::vector options{ + {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}}, + {"lanes", VariantType::Int, defaultlanes, {"Number of parallel processing lanes (crus are split per device)."}}, + {"time-lanes", VariantType::Int, 1, {"Number of parallel processing lanes (timeframes are split per device)."}}, + {"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma separated ranges, e.g. 0-3,7,9-15"}}, + {"n-TFs-buffer", VariantType::Int, 1, {"Buffer n-TFs before sending output."}}, + {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}}; + + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& config) +{ + using namespace o2::tpc; + o2::conf::ConfigurableParam::updateFromString(config.options().get("configKeyValues")); + const auto tpcCRUs = o2::RangeTokenizer::tokenize(config.options().get("crus")); + const auto nCRUs = tpcCRUs.size(); + const auto nLanes = std::min(static_cast(config.options().get("lanes")), nCRUs); + const auto time_lanes = static_cast(config.options().get("time-lanes")); + const auto crusPerLane = nCRUs / nLanes + ((nCRUs % nLanes) != 0); + const int nTFsBuffer = config.options().get("n-TFs-buffer"); + + o2::conf::ConfigurableParam::updateFromFile(config.options().get("configFile")); + o2::conf::ConfigurableParam::writeINI("o2tpcflp_configuration.ini"); + + WorkflowSpec workflow; + if (nLanes <= 0) { + return workflow; + } + + for (int ilane = 0; ilane < nLanes; ++ilane) { + const auto first = tpcCRUs.begin() + ilane * crusPerLane; + if (first >= tpcCRUs.end()) { + break; + } + const auto last = std::min(tpcCRUs.end(), first + crusPerLane); + const std::vector rangeCRUs(first, last); + workflow.emplace_back(timePipeline(getTPCFLPCMVSpec(ilane, rangeCRUs, nTFsBuffer), time_lanes)); + } + + return workflow; +} \ No newline at end of file