X-Git-Url: http://git.uio.no/git/?a=blobdiff_plain;f=ANALYSIS%2FAliAnalysisDataContainer.cxx;h=4e3636774956dc9eacf37ae7b173dd634ebb77e8;hb=2757a40b12d7c5b662077abf055bacef343ccdbd;hp=7d06ac3841b72f1d14e1ab160a481b8889f693a4;hpb=11026a8010b381cdfc500e6bc3f6666a2a9e7342;p=u%2Fmrichter%2FAliRoot.git diff --git a/ANALYSIS/AliAnalysisDataContainer.cxx b/ANALYSIS/AliAnalysisDataContainer.cxx index 7d06ac3841b..4e363677495 100644 --- a/ANALYSIS/AliAnalysisDataContainer.cxx +++ b/ANALYSIS/AliAnalysisDataContainer.cxx @@ -42,44 +42,58 @@ // //============================================================================== -#include "Riostream.h" +#include +#include -#include "TClass.h" -#include "TTree.h" -#include "TFile.h" -//#include "AliLog.h" +#include +#include +#include +#include +#include +#include +#include "AliAnalysisManager.h" #include "AliAnalysisDataContainer.h" #include "AliAnalysisDataSlot.h" #include "AliAnalysisTask.h" +using std::endl; +using std::cout; +using std::ios; +using std::setiosflags; +using std::setprecision; ClassImp(AliAnalysisDataContainer) //______________________________________________________________________________ AliAnalysisDataContainer::AliAnalysisDataContainer() : TNamed(), fDataReady(kFALSE), fOwnedData(kFALSE), + fFileName(), + fFolderName(), fFile(NULL), fData(NULL), fType(NULL), fProducer(NULL), fConsumers(NULL) { -// Default ctor. +// Dummy ctor. } //______________________________________________________________________________ AliAnalysisDataContainer::AliAnalysisDataContainer(const char *name, TClass *type) :TNamed(name,""), fDataReady(kFALSE), - fOwnedData(kTRUE), + fOwnedData(kFALSE), + fFileName(), + fFolderName(), fFile(NULL), fData(NULL), fType(type), fProducer(NULL), fConsumers(NULL) { -// Normal constructor. +// Default constructor. + SetTitle(fType->GetName()); } //______________________________________________________________________________ @@ -87,13 +101,16 @@ AliAnalysisDataContainer::AliAnalysisDataContainer(const AliAnalysisDataContaine :TNamed(cont), fDataReady(cont.fDataReady), fOwnedData(kFALSE), - fFile(cont.fFile), + fFileName(cont.fFileName), + fFolderName(cont.fFolderName), + fFile(NULL), fData(cont.fData), - fType(cont.fType), + fType(NULL), fProducer(cont.fProducer), fConsumers(NULL) { // Copy ctor. + GetType(); if (cont.fConsumers) { fConsumers = new TObjArray(2); Int_t ncons = cont.fConsumers->GetEntriesFast(); @@ -107,10 +124,6 @@ AliAnalysisDataContainer::~AliAnalysisDataContainer() // Destructor. Deletes data ! (What happens if data is a container ???) if (fData && fOwnedData) delete fData; if (fConsumers) delete fConsumers; - if (fFile) { - fFile->Close(); - delete fFile; - } } //______________________________________________________________________________ @@ -120,10 +133,12 @@ AliAnalysisDataContainer &AliAnalysisDataContainer::operator=(const AliAnalysisD if (&cont != this) { TNamed::operator=(cont); fDataReady = cont.fDataReady; - fOwnedData = kFALSE; // !!! Data owned by cont. - fFile = cont.fFile; + fOwnedData = kFALSE; + fFileName = cont.fFileName; + fFolderName = cont.fFolderName; + fFile = NULL; fData = cont.fData; - fType = cont.fType; + GetType(); fProducer = cont.fProducer; if (cont.fConsumers) { fConsumers = new TObjArray(2); @@ -134,6 +149,158 @@ AliAnalysisDataContainer &AliAnalysisDataContainer::operator=(const AliAnalysisD return *this; } +//______________________________________________________________________________ +void AliAnalysisDataContainer::AddConsumer(AliAnalysisTask *consumer, Int_t islot) +{ +// Add a consumer for contained data; + AliAnalysisDataSlot *slot = consumer->GetInputSlot(islot); + if (!slot || !slot->GetType()) { + cout<<"Consumer task "<< consumer->GetName()<<" does not have an input/type #"<GetName(),islot)); + return; + } + if (!slot->GetType()->InheritsFrom(GetType())) { + cout<<"Data type "<GetTitle()<<" for input slot "<GetName()<<" does not match container type "<GetType()->GetName(),islot,consumer->GetName(),fType->GetName())); + return; + } + + if (!fConsumers) fConsumers = new TObjArray(2); + fConsumers->Add(consumer); + // Add the consumer task to the list of task of the producer + if (fProducer && !fProducer->GetListOfTasks()->FindObject(consumer)) + fProducer->Add(consumer); +} + +//______________________________________________________________________________ +Bool_t AliAnalysisDataContainer::ClientsExecuted() const +{ +// Check if all client tasks have executed. + TIter next(fConsumers); + AliAnalysisTask *task; + while ((task=(AliAnalysisTask*)next())) { + if (!task->HasExecuted()) return kFALSE; + } + return kTRUE; +} + +//______________________________________________________________________________ +void AliAnalysisDataContainer::DeleteData() +{ +// Delete data if not needed anymore. + if (!fDataReady || !ClientsExecuted()) { + cout<<"Data not ready or not all clients of container "<SetType(gROOT->GetClass(fTitle.Data())); + if (!fType) printf("AliAnalysisDataContainer: Unknown class: %s\n", GetTitle()); + return fType; +} + +//______________________________________________________________________________ +void AliAnalysisDataContainer::GetEntry(Long64_t ientry) +{ +// If data is ready and derives from TTree or from TBranch, this will get the +// requested entry in memory if not already loaded. + if (!fDataReady || !GetType()) return; + Bool_t istree = fType->InheritsFrom(TTree::Class()); + if (istree) { + TTree *tree = (TTree*)fData; + if (tree->GetReadEntry() != ientry) tree->GetEntry(ientry); + return; + } + Bool_t isbranch = fType->InheritsFrom(TBranch::Class()); + if (isbranch) { + TBranch *branch = (TBranch*)fData; + if (branch->GetReadEntry() != ientry) branch->GetEntry(ientry); + return; + } +} + +//______________________________________________________________________________ +Long64_t AliAnalysisDataContainer::Merge(TCollection *list) +{ +// Merge a list of containers with this one. Containers in the list must have +// data of the same type. + if (!list || !fData) return 0; + printf("Merging %d containers %s\n", list->GetSize()+1, GetName()); + TMethodCall callEnv; + if (fData->IsA()) + callEnv.InitWithPrototype(fData->IsA(), "Merge", "TCollection*"); + if (!callEnv.IsValid() && !list->IsEmpty()) { + cout << "No merge interface for data stored by " << GetName() << ". Merging not possible !" << endl; + return 1; + } + + if (list->IsEmpty()) return 1; + + TIter next(list); + AliAnalysisDataContainer *cont; + // Make a list where to temporary store the data to be merged. + TList *collectionData = new TList(); + Int_t count = 0; // object counter + while ((cont=(AliAnalysisDataContainer*)next())) { + TObject *data = cont->GetData(); + if (!data) continue; + if (strcmp(cont->GetName(), GetName())) { + cout << "Not merging containers with different names !" << endl; + continue; + } + printf(" ... merging object %s\n", data->GetName()); + collectionData->Add(data); + count++; + } + callEnv.SetParam((Long_t) collectionData); + callEnv.Execute(fData); + delete collectionData; + + return count+1; +} + +//______________________________________________________________________________ +void AliAnalysisDataContainer::PrintContainer(Option_t *option, Int_t indent) const +{ +// Print info about this container. + TString ind; + for (Int_t i=0; iGetName()); + else + printf("%s= No data producer",ind.Data()); + printf("%s = Consumer tasks: ", ind.Data()); + if (!fConsumers || !fConsumers->GetEntriesFast()) printf("-none-\n"); + else printf("\n"); + } + if (fFolderName.Length()) + printf("Filename: %s folder: %s\n", fFileName.Data(), fFolderName.Data()); + else + printf("Filename: %s\n", fFileName.Data()); + TIter next(fConsumers); + AliAnalysisTask *task; + while ((task=(AliAnalysisTask*)next())) task->PrintTask(option, indent+3); +} + //______________________________________________________________________________ Bool_t AliAnalysisDataContainer::SetData(TObject *data, Option_t *) { @@ -154,7 +321,7 @@ Bool_t AliAnalysisDataContainer::SetData(TObject *data, Option_t *) } } return kTRUE; - } + } // Check if it is the producer who published the data if (fProducer->GetPublishedData()==data) { fData = data; @@ -168,62 +335,34 @@ Bool_t AliAnalysisDataContainer::SetData(TObject *data, Option_t *) } return kTRUE; } else { - cout<<"Data for container "<GetName()<GetName())); - return kFALSE; + // Ignore data posting from other than the producer +// cout<<"Data for container "<GetName()<GetName())); + return kFALSE; } } //______________________________________________________________________________ -void AliAnalysisDataContainer::OpenFile(const char *name, Option_t *option) +void AliAnalysisDataContainer::SetFileName(const char *filename) { -// Data will be written to this file at the end of processing. -// Option represent the way the file is accessed: NEW, APPEND, ... - if (fFile) { - fFile->Close(); - delete fFile; - } - fFile = new TFile(name, option); - if (fFile->IsZombie()) { - cout<<"Cannot open file "<cd(); - fData->Write(); -// fFile->Write(); - if (cursav) cursav->cd(); - } +// The filename field can be actually composed by the actual file name followed +// by :dirname (optional): +// filename = file_name[:dirname] +// No slashes (/) allowed + fFileName = filename; + fFolderName = ""; + Int_t index = fFileName.Index(":"); + // Fill the folder name + if (index >= 0) { + fFolderName = fFileName(index+1, fFileName.Length()-index); + fFileName.Remove(index); + } + if (!fFileName.Length()) + Fatal("SetFileName", "Empty file name"); + if (fFileName.Index("/")>=0) + Fatal("SetFileName", "No slashes (/) allowed in the file name"); } -//______________________________________________________________________________ -void AliAnalysisDataContainer::GetEntry(Long64_t ientry) -{ -// If data is ready and derives from TTree or from TBranch, this will get the -// requested entry in memory if not already loaded. - if (!fDataReady) return; - Bool_t istree = fType->InheritsFrom(TTree::Class()); - if (istree) { - TTree *tree = (TTree*)fData; - if (tree->GetReadEntry() != ientry) tree->GetEntry(ientry); - return; - } - Bool_t isbranch = fType->InheritsFrom(TBranch::Class()); - if (isbranch) { - TBranch *branch = (TBranch*)fData; - if (branch->GetReadEntry() != ientry) branch->GetEntry(ientry); - return; - } -} - //______________________________________________________________________________ void AliAnalysisDataContainer::SetProducer(AliAnalysisTask *prod, Int_t islot) { @@ -243,8 +382,8 @@ void AliAnalysisDataContainer::SetProducer(AliAnalysisTask *prod, Int_t islot) //AliError(Form("Producer task %s does not have an output #%i", prod->GetName(),islot)); return; } - if (!slot->GetType()->InheritsFrom(fType)) { - cout<<"Data type "<GetType()->GetName()<<"for output slot "<GetName()<<" does not match container type "<GetName()<GetType()->InheritsFrom(GetType())) { + cout<<"Data type "<GetTitle()<<"for output slot "<GetName()<<" does not match container type "<GetType()->GetName(),islot,prod->GetName(),fType->GetName())); return; } @@ -259,79 +398,241 @@ void AliAnalysisDataContainer::SetProducer(AliAnalysisTask *prod, Int_t islot) } //______________________________________________________________________________ -void AliAnalysisDataContainer::AddConsumer(AliAnalysisTask *consumer, Int_t islot) +AliAnalysisDataWrapper *AliAnalysisDataContainer::ExportData() const { -// Add a consumer for contained data; - AliAnalysisDataSlot *slot = consumer->GetInputSlot(islot); - if (!slot) { - cout<<"Consumer task "<< consumer->GetName()<<" does not have an input #"<GetName(),islot)); - return; - } - if (!slot->GetType()->InheritsFrom(fType)) { - cout<<"Data type "<GetType()->GetName()<<" for input slot "<GetName()<<" does not match container type "<GetName()<GetType()->GetName(),islot,consumer->GetName(),fType->GetName())); - return; - } +// Wraps data for sending it through the net. + AliAnalysisDataWrapper *pack = 0; + if (!fData) { + Error("ExportData", "Container %s - No data to be wrapped !", GetName()); + return pack; + } + AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager(); + if (mgr && mgr->GetDebugLevel() > 1) printf(" ExportData: Wrapping data %s for container %s\n", fData->GetName(),GetName()); + pack = new AliAnalysisDataWrapper(fData); + pack->SetName(fName.Data()); + return pack; +} - if (!fConsumers) fConsumers = new TObjArray(2); - fConsumers->Add(consumer); - // Add the consumer task to the list of task of the producer - if (fProducer && !fProducer->GetListOfTasks()->FindObject(consumer)) - fProducer->Add(consumer); +//______________________________________________________________________________ +void AliAnalysisDataContainer::ImportData(AliAnalysisDataWrapper *pack) +{ +// Unwraps data from a data wrapper. + if (pack) { + fData = pack->Data(); + if (!fData) { + Error("ImportData", "No data was wrapped for container %s", GetName()); + return; + } + AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager(); + if (mgr && mgr->GetDebugLevel() > 1) printf(" ImportData: Unwrapping data %s for container %s\n", fData->GetName(),GetName()); + fDataReady = kTRUE; + // Imported wrappers do not own data anymore (AG 13-11-07) + pack->SetDeleteData(kFALSE); + } } + +ClassImp (AliAnalysisDataWrapper) //______________________________________________________________________________ -Bool_t AliAnalysisDataContainer::ClientsExecuted() const +AliAnalysisDataWrapper::AliAnalysisDataWrapper(TObject *data) + :TNamed(), + fData(data) { -// Check if all client tasks have executed. - TIter next(fConsumers); - AliAnalysisTask *task; - while ((task=(AliAnalysisTask*)next())) { - if (!task->HasExecuted()) return kFALSE; - } - return kTRUE; +// Ctor. + if (data) SetName(data->GetName()); +} + +//______________________________________________________________________________ +AliAnalysisDataWrapper::~AliAnalysisDataWrapper() +{ +// Dtor. + if (fData && TObject::TestBit(kDeleteData)) delete fData; } //______________________________________________________________________________ -void AliAnalysisDataContainer::DeleteData() +AliAnalysisDataWrapper &AliAnalysisDataWrapper::operator=(const AliAnalysisDataWrapper &other) { -// Delete data if not needed anymore. - if (!fDataReady || !ClientsExecuted()) { - cout<<"Data not ready or not all clients of container "<IsEmpty()) return 1; + + SetDeleteData(); + + TMethodCall callEnv; + if (fData->IsA()) + callEnv.InitWithPrototype(fData->IsA(), "Merge", "TCollection*"); + if (!callEnv.IsValid()) { + cout << "No merge interface for data stored by " << GetName() << ". Merging not possible !" << endl; + return 1; } - if (!fOwnedData) { - cout<<"Data not owned by container "<ClassName()); + while ((cont=(AliAnalysisDataWrapper*)next1())) { + cont->SetDeleteData(); + TObject *data = cont->Data(); + if (!data) continue; + // printf(" - %s 0x%lx (data=%s)\n", cont->GetName(), (ULong_t)cont, data->ClassName()); + collectionData->Add(data); + count++; } - delete fData; - fData = 0; - fDataReady = kFALSE; -} - + callEnv.SetParam((Long_t) collectionData); + callEnv.Execute(fData); + delete collectionData; + + return count+1; +} + +ClassImp(AliAnalysisFileDescriptor) + //______________________________________________________________________________ -void AliAnalysisDataContainer::PrintContainer(Option_t *option, Int_t indent) const +AliAnalysisFileDescriptor::AliAnalysisFileDescriptor() + :TObject(), fLfn(), fGUID(), fUrl(), fPfn(), fSE(), + fIsArchive(kFALSE), fImage(0), fNreplicas(0), + fStartBytes(0), fReadBytes(0), fSize(0), fOpenedAt(0), + fOpenTime(0.), fProcessingTime(0.), fThroughput(0.), fTimer() { -// Print info about this container. - TString ind; - for (Int_t i=0; iGetName())); - if (fProducer) - printf("%s\n", Form("%s = Data producer: task %s",ind.Data(),fProducer->GetName())); - else - printf("%s\n", Form("%s= No data producer")); - printf("%s", Form("%s = Consumer tasks: ")); - if (!fConsumers || !fConsumers->GetEntriesFast()) printf("-none-\n"); - else printf("\n"); - } - TIter next(fConsumers); - AliAnalysisTask *task; - while ((task=(AliAnalysisTask*)next())) task->PrintTask(option, indent+3); +// I/O constructor +} + +//______________________________________________________________________________ +AliAnalysisFileDescriptor::AliAnalysisFileDescriptor(const TFile *file) + :TObject(), fLfn(), fGUID(), fUrl(), fPfn(), fSE(), + fIsArchive(kFALSE), fImage(0), fNreplicas(0), + fStartBytes(0), fReadBytes(0), fSize(0), fOpenedAt(0), + fOpenTime(0.), fProcessingTime(0.), fThroughput(0.), fTimer() +{ +// Normal constructor + if (file->InheritsFrom("TAlienFile")) { + fLfn =(const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetLfn();", file)); + fGUID =(const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetGUID();", file)); + fUrl =(const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetUrl();", file)); + fPfn =(const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetPfn();", file)); + fSE = (const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetSE();", file)); + fImage = (Int_t)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetImage();", file)); + fNreplicas = (Int_t)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetNreplicas();", file)); + fOpenedAt = gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetOpenTime();", file)); + gROOT->ProcessLine(Form("((AliAnalysisFileDescriptor*)%p)->SetOpenTime(((TAlienFile*)%p)->GetElapsed());", this, file)); + } else { + fLfn = file->GetName(); + fPfn = file->GetName(); + fUrl = file->GetName(); + fSE = "local"; + if (!fPfn.BeginsWith("/")) fPfn.Prepend(Form("%s/",gSystem->WorkingDirectory())); + fOpenedAt = time(0); + } + fStartBytes = TFile::GetFileBytesRead(); + fIsArchive = file->IsArchive(); + fSize = file->GetSize(); +} + +//______________________________________________________________________________ +AliAnalysisFileDescriptor::AliAnalysisFileDescriptor(const AliAnalysisFileDescriptor &other) + :TObject(other), fLfn(other.fLfn), fGUID(other.fGUID), + fUrl(other.fUrl), fPfn(other.fPfn), fSE(other.fSE), + fIsArchive(other.fIsArchive), fImage(other.fImage), + fNreplicas(other.fNreplicas), fStartBytes(other.fStartBytes), fReadBytes(other.fReadBytes), + fSize(other.fSize), fOpenedAt(other.fOpenedAt), fOpenTime(other.fOpenTime), + fProcessingTime(other.fProcessingTime), fThroughput(other.fThroughput), fTimer() +{ +// CC +} + +//______________________________________________________________________________ +AliAnalysisFileDescriptor &AliAnalysisFileDescriptor::operator=(const AliAnalysisFileDescriptor &other) +{ +// Assignment. + if (&other == this) return *this; + TObject::operator=(other); + fLfn = other.fLfn; + fGUID = other.fGUID; + fUrl = other.fUrl; + fPfn = other.fPfn; + fSE = other.fSE; + fIsArchive = other.fIsArchive; + fImage = other.fImage; + fNreplicas = other.fNreplicas; + fStartBytes = other.fStartBytes;; + fReadBytes = other.fReadBytes; + fSize = other.fSize; + fOpenedAt = other.fOpenedAt; + fOpenTime = other.fOpenTime; + fProcessingTime = other.fProcessingTime; + fThroughput = other.fThroughput; + return *this; +} + +//______________________________________________________________________________ +AliAnalysisFileDescriptor::~AliAnalysisFileDescriptor() +{ +// Destructor +} + +//______________________________________________________________________________ +void AliAnalysisFileDescriptor::Done() +{ +// Must be called at the end of processing, providing file->GetBytesRead() as argument. + fTimer.Stop(); + const Double_t megabyte = 1048576.; +// Long64_t stampnow = time(0); + fReadBytes = TFile::GetFileBytesRead()-fStartBytes; +// fProcessingTime = stampnow-fOpenedAt; + fProcessingTime = fTimer.RealTime(); + Double_t readsize = fReadBytes/megabyte; + fThroughput = readsize/fProcessingTime; } + +//______________________________________________________________________________ +void AliAnalysisFileDescriptor::Print(Option_t*) const +{ +// Print info about the file descriptor + const Double_t megabyte = 1048576.; + printf("===== Logical file name: %s =====\n", fLfn.Data()); + printf(" Pfn: %s\n", fPfn.Data()); + printf(" url: %s\n", fUrl.Data()); + printf(" access time: %lld from SE: %s image %d/%d\n", fOpenedAt, fSE.Data(), fImage, fNreplicas); + printf(" open time: %g [sec]\n", fOpenTime); + printf(" file size: %g [MB], read size: %g [MB]\n", fSize/megabyte, fReadBytes/megabyte); + printf(" processing time [sec]: %g\n", fProcessingTime); + printf(" average throughput: %g [MB/sec]\n", fThroughput); +} + +//______________________________________________________________________________ +void AliAnalysisFileDescriptor::SavePrimitive(std::ostream &out, Option_t *) +{ +// Stream info to file + const Double_t megabyte = 1048576.; + out << "#################################################################" << endl; + out << "pfn " << fPfn.Data() << endl; + out << "url " << fUrl.Data() << endl; + out << "se " << fSE.Data() << endl; + out << "image " << fImage << endl; + out << "nreplicas " << fNreplicas << endl; + out << "openstamp " << fOpenedAt << endl; + std::ios_base::fmtflags original_flags = out.flags(); + out << setiosflags(std::ios::fixed) << std::setprecision(3); + out << "opentime " << fOpenTime << endl; + out << "runtime " << fProcessingTime << endl; + out << "filesize " << fSize/megabyte << endl; + out << "readsize " << fReadBytes/megabyte << endl; + out << "throughput " << fThroughput << endl; + out.flags(original_flags); +} +