1 /**************************************************************************
2 * Copyright(c) 1998-1999, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
17 // Author: Andrei Gheata, 31/05/2006
19 //==============================================================================
20 // AliAnalysysDataContainer - Container of data of arbitrary type deriving
21 // from TObject used for analysis. A container must be connected to the
22 // output data slot of a single analysis task (producer) , but also as
23 // input slot for possibly several other tasks (consumers). The connected
24 // slots must enforce the same data type as the container (or a derived type).
25 // A container becomes the owner of the contained data once this was produced.
27 // Containers should be defined by the analysis module using:
29 // AliAnalysisModule::AddContainer(const char *name, TClass *type);
31 // A container should be connected to a producer:
33 // AliAnalysisModule::ConnectOutput(AliAnalysisTask *task,
34 // AliAnalysisDataContainer *cont)
35 // and to its consumers:
37 // AliAnalysisModule::ConnectInput(AliAnalysisTask *task, Int_t islot,
38 // AliAnalysisDataContainer *cont)
40 // The container will create an implicit connection between the producer task
41 // and all consumers, which will become sub-tasks of the producer.
43 //==============================================================================
45 #include <Riostream.h>
46 #include <TMethodCall.h>
55 #include "AliAnalysisManager.h"
56 #include "AliAnalysisDataContainer.h"
57 #include "AliAnalysisDataSlot.h"
58 #include "AliAnalysisTask.h"
63 using std::setiosflags;
64 using std::setprecision;
65 ClassImp(AliAnalysisDataContainer)
67 //______________________________________________________________________________
68 AliAnalysisDataContainer::AliAnalysisDataContainer() : TNamed(),
82 //______________________________________________________________________________
83 AliAnalysisDataContainer::AliAnalysisDataContainer(const char *name, TClass *type)
95 // Default constructor.
96 SetTitle(fType->GetName());
99 //______________________________________________________________________________
100 AliAnalysisDataContainer::AliAnalysisDataContainer(const AliAnalysisDataContainer &cont)
102 fDataReady(cont.fDataReady),
104 fFileName(cont.fFileName),
105 fFolderName(cont.fFolderName),
109 fProducer(cont.fProducer),
114 if (cont.fConsumers) {
115 fConsumers = new TObjArray(2);
116 Int_t ncons = cont.fConsumers->GetEntriesFast();
117 for (Int_t i=0; i<ncons; i++) fConsumers->Add(cont.fConsumers->At(i));
121 //______________________________________________________________________________
122 AliAnalysisDataContainer::~AliAnalysisDataContainer()
124 // Destructor. Deletes data ! (What happens if data is a container ???)
125 if (fData && fOwnedData) delete fData;
126 if (fConsumers) delete fConsumers;
129 //______________________________________________________________________________
130 AliAnalysisDataContainer &AliAnalysisDataContainer::operator=(const AliAnalysisDataContainer &cont)
134 TNamed::operator=(cont);
135 fDataReady = cont.fDataReady;
137 fFileName = cont.fFileName;
138 fFolderName = cont.fFolderName;
142 fProducer = cont.fProducer;
143 if (cont.fConsumers) {
144 fConsumers = new TObjArray(2);
145 Int_t ncons = cont.fConsumers->GetEntriesFast();
146 for (Int_t i=0; i<ncons; i++) fConsumers->Add(cont.fConsumers->At(i));
152 //______________________________________________________________________________
153 void AliAnalysisDataContainer::AddConsumer(AliAnalysisTask *consumer, Int_t islot)
155 // Add a consumer for contained data;
156 AliAnalysisDataSlot *slot = consumer->GetInputSlot(islot);
157 if (!slot || !slot->GetType()) {
158 cout<<"Consumer task "<< consumer->GetName()<<" does not have an input/type #"<<islot<<endl;
159 //AliError(Form("Consumer task %s does not have an input #%i", consumer->GetName(),islot));
162 if (!slot->GetType()->InheritsFrom(GetType())) {
163 cout<<"Data type "<<slot->GetTitle()<<" for input slot "<<islot<<" of task "<<consumer->GetName()<<" does not match container type "<<GetTitle()<<endl;
164 //AliError(Form("Data type %s for input slot %i of task %s does not match container type %s", slot->GetType()->GetName(),islot,consumer->GetName(),fType->GetName()));
168 if (!fConsumers) fConsumers = new TObjArray(2);
169 fConsumers->Add(consumer);
170 // Add the consumer task to the list of task of the producer
171 if (fProducer && !fProducer->GetListOfTasks()->FindObject(consumer))
172 fProducer->Add(consumer);
175 //______________________________________________________________________________
176 Bool_t AliAnalysisDataContainer::ClientsExecuted() const
178 // Check if all client tasks have executed.
179 TIter next(fConsumers);
180 AliAnalysisTask *task;
181 while ((task=(AliAnalysisTask*)next())) {
182 if (!task->HasExecuted()) return kFALSE;
187 //______________________________________________________________________________
188 void AliAnalysisDataContainer::DeleteData()
190 // Delete data if not needed anymore.
191 if (!fDataReady || !ClientsExecuted()) {
192 cout<<"Data not ready or not all clients of container "<<GetName()<<" executed. Data not deleted."<<endl;
193 //AliWarning(Form("Data not ready or not all clients of container %s executed. Data not deleted.", GetName()));
197 cout<<"Data not owned by container "<<GetName()<<". Not deleted."<<endl;
198 //AliWarning(Form("Data not owned by container %s. Not deleted.", GetName()));
206 //______________________________________________________________________________
207 TClass *AliAnalysisDataContainer::GetType() const
209 // Get class type for this slot.
210 AliAnalysisDataContainer *cont = (AliAnalysisDataContainer*)this;
211 if (!fType) cont->SetType(gROOT->GetClass(fTitle.Data()));
212 if (!fType) printf("AliAnalysisDataContainer: Unknown class: %s\n", GetTitle());
216 //______________________________________________________________________________
217 void AliAnalysisDataContainer::GetEntry(Long64_t ientry)
219 // If data is ready and derives from TTree or from TBranch, this will get the
220 // requested entry in memory if not already loaded.
221 if (!fDataReady || !GetType()) return;
222 Bool_t istree = fType->InheritsFrom(TTree::Class());
224 TTree *tree = (TTree*)fData;
225 if (tree->GetReadEntry() != ientry) tree->GetEntry(ientry);
228 Bool_t isbranch = fType->InheritsFrom(TBranch::Class());
230 TBranch *branch = (TBranch*)fData;
231 if (branch->GetReadEntry() != ientry) branch->GetEntry(ientry);
236 //______________________________________________________________________________
237 Long64_t AliAnalysisDataContainer::Merge(TCollection *list)
239 // Merge a list of containers with this one. Containers in the list must have
240 // data of the same type.
241 if (!list || !fData) return 0;
242 printf("Merging %d containers %s\n", list->GetSize()+1, GetName());
245 callEnv.InitWithPrototype(fData->IsA(), "Merge", "TCollection*");
246 if (!callEnv.IsValid() && !list->IsEmpty()) {
247 cout << "No merge interface for data stored by " << GetName() << ". Merging not possible !" << endl;
251 if (list->IsEmpty()) return 1;
254 AliAnalysisDataContainer *cont;
255 // Make a list where to temporary store the data to be merged.
256 TList *collectionData = new TList();
257 Int_t count = 0; // object counter
258 while ((cont=(AliAnalysisDataContainer*)next())) {
259 TObject *data = cont->GetData();
261 if (strcmp(cont->GetName(), GetName())) {
262 cout << "Not merging containers with different names !" << endl;
265 printf(" ... merging object %s\n", data->GetName());
266 collectionData->Add(data);
269 callEnv.SetParam((Long_t) collectionData);
270 callEnv.Execute(fData);
271 delete collectionData;
276 //______________________________________________________________________________
277 void AliAnalysisDataContainer::PrintContainer(Option_t *option, Int_t indent) const
279 // Print info about this container.
281 for (Int_t i=0; i<indent; i++) ind += " ";
284 TString ctype = "Container";
285 if (IsExchange()) ctype = "Exchange container";
286 Bool_t dep = (opt.Contains("dep"))?kTRUE:kFALSE;
288 if (IsPostEventLoop()) printf("%s%s: %s DATA TYPE: %s POST_LOOP task\n", ind.Data(), ctype.Data(), GetName(), GetTitle());
289 else printf("%s%s: %s DATA TYPE: %s\n", ind.Data(), ctype.Data(), GetName(), GetTitle());
291 // printf("%s = Data producer: task %s\n",ind.Data(),fProducer->GetName());
293 printf("%s= Not connected to a data producer\n",ind.Data());
294 if (fConsumers && fConsumers->GetEntriesFast())
295 printf("%s = Client tasks indented below:\n", ind.Data());
298 if (!fFolderName.IsNull())
299 printf("%s = Filename: %s folder: %s\n", ind.Data(),fFileName.Data(), fFolderName.Data());
301 if (!fFileName.IsNull()) printf("%s = Filename: %s\n", ind.Data(),fFileName.Data());
303 ((AliAnalysisDataContainer*)this)->SetTouched(kTRUE);
304 TIter next(fConsumers);
305 AliAnalysisTask *task;
306 while ((task=(AliAnalysisTask*)next())) task->PrintTask(option, indent+3);
309 //______________________________________________________________________________
310 Bool_t AliAnalysisDataContainer::SetData(TObject *data, Option_t *)
312 // Set the data as READY only if it was published by the producer.
313 // If there is no producer declared, this is a top level container.
314 AliAnalysisTask *task;
315 Bool_t init = kFALSE;
318 if (data != fData) init = kTRUE;
322 nc = fConsumers->GetEntriesFast();
323 for (i=0; i<nc; i++) {
324 task = (AliAnalysisTask*)fConsumers->At(i);
325 task->CheckNotify(init);
330 // Check if it is the producer who published the data
331 if (fProducer->GetPublishedData()==data) {
335 nc = fConsumers->GetEntriesFast();
336 for (i=0; i<nc; i++) {
337 task = (AliAnalysisTask*)fConsumers->At(i);
343 // Ignore data posting from other than the producer
344 // cout<<"Data for container "<<GetName()<<" can be published only by producer task "<<fProducer->GetName()<<endl;
345 //AliWarning(Form("Data for container %s can be published only by producer task %s", GetName(), fProducer->GetName()));
350 //______________________________________________________________________________
351 void AliAnalysisDataContainer::SetFileName(const char *filename)
353 // The filename field can be actually composed by the actual file name followed
354 // by :dirname (optional):
355 // filename = file_name[:dirname]
356 // No slashes (/) allowed
357 fFileName = filename;
359 Int_t index = fFileName.Index(":");
360 // Fill the folder name
362 fFolderName = fFileName(index+1, fFileName.Length()-index);
363 fFileName.Remove(index);
365 if (!fFileName.Length())
366 Fatal("SetFileName", "Empty file name");
367 if (fFileName.Index("/")>=0)
368 Fatal("SetFileName", "No slashes (/) allowed in the file name");
371 //______________________________________________________________________________
372 void AliAnalysisDataContainer::SetProducer(AliAnalysisTask *prod, Int_t islot)
374 // Set the producer of data. The slot number is required for data type checking.
376 cout<<"Data container "<<GetName()<<" already has a producer: "<<fProducer->GetName()<<endl;
377 //AliWarning(Form("Data container %s already has a producer: %s",GetName(),fProducer->GetName()));
380 cout<<GetName()<<" container contains data - cannot change producer!"<<endl;
381 //AliError(Form("%s container contains data - cannot change producer!", GetName()));
384 AliAnalysisDataSlot *slot = prod->GetOutputSlot(islot);
386 cout<<"Producer task "<<prod->GetName()<<" does not have an output #"<<islot<<endl;
387 //AliError(Form("Producer task %s does not have an output #%i", prod->GetName(),islot));
390 if (!slot->GetType()->InheritsFrom(GetType())) {
391 cout<<"Data type "<<slot->GetTitle()<<"for output slot "<<islot<<" of task "<<prod->GetName()<<" does not match container type "<<GetTitle()<<endl;
392 //AliError(Form("Data type %s for output slot %i of task %s does not match container type %s", slot->GetType()->GetName(),islot,prod->GetName(),fType->GetName()));
397 // Add all consumers as daughter tasks
398 TIter next(fConsumers);
399 AliAnalysisTask *cons;
400 while ((cons=(AliAnalysisTask*)next())) {
401 if (!prod->GetListOfTasks()->FindObject(cons)) prod->Add(cons);
405 //______________________________________________________________________________
406 AliAnalysisDataWrapper *AliAnalysisDataContainer::ExportData() const
408 // Wraps data for sending it through the net.
409 AliAnalysisDataWrapper *pack = 0;
411 Error("ExportData", "Container %s - No data to be wrapped !", GetName());
414 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
415 if (mgr && mgr->GetDebugLevel() > 1) printf(" ExportData: Wrapping data %s for container %s\n", fData->GetName(),GetName());
416 pack = new AliAnalysisDataWrapper(fData);
417 pack->SetName(fName.Data());
421 //______________________________________________________________________________
422 void AliAnalysisDataContainer::ImportData(AliAnalysisDataWrapper *pack)
424 // Unwraps data from a data wrapper.
426 fData = pack->Data();
428 Error("ImportData", "No data was wrapped for container %s", GetName());
431 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
432 if (mgr && mgr->GetDebugLevel() > 1) printf(" ImportData: Unwrapping data %s for container %s\n", fData->GetName(),GetName());
434 // Imported wrappers do not own data anymore (AG 13-11-07)
435 pack->SetDeleteData(kFALSE);
439 ClassImp (AliAnalysisDataWrapper)
441 //______________________________________________________________________________
442 AliAnalysisDataWrapper::AliAnalysisDataWrapper(TObject *data)
447 if (data) SetName(data->GetName());
450 //______________________________________________________________________________
451 AliAnalysisDataWrapper::~AliAnalysisDataWrapper()
454 if (fData && TObject::TestBit(kDeleteData)) delete fData;
457 //______________________________________________________________________________
458 AliAnalysisDataWrapper &AliAnalysisDataWrapper::operator=(const AliAnalysisDataWrapper &other)
461 if (&other != this) {
462 TNamed::operator=(other);
468 //______________________________________________________________________________
469 Long64_t AliAnalysisDataWrapper::Merge(TCollection *list)
471 // Merge a list of containers with this one. Containers in the list must have
472 // data of the same type.
473 if (TH1::AddDirectoryStatus()) TH1::AddDirectory(kFALSE);
474 if (!fData) return 0;
475 if (!list || list->IsEmpty()) return 1;
481 callEnv.InitWithPrototype(fData->IsA(), "Merge", "TCollection*");
482 if (!callEnv.IsValid()) {
483 cout << "No merge interface for data stored by " << GetName() << ". Merging not possible !" << endl;
488 AliAnalysisDataWrapper *cont;
489 // Make a list where to temporary store the data to be merged.
490 TList *collectionData = new TList();
491 Int_t count = 0; // object counter
492 // printf("Wrapper %s 0x%lx (data=%s) merged with:\n", GetName(), (ULong_t)this, fData->ClassName());
493 while ((cont=(AliAnalysisDataWrapper*)next1())) {
494 cont->SetDeleteData();
495 TObject *data = cont->Data();
497 // printf(" - %s 0x%lx (data=%s)\n", cont->GetName(), (ULong_t)cont, data->ClassName());
498 collectionData->Add(data);
501 callEnv.SetParam((Long_t) collectionData);
502 callEnv.Execute(fData);
503 delete collectionData;
508 ClassImp(AliAnalysisFileDescriptor)
510 //______________________________________________________________________________
511 AliAnalysisFileDescriptor::AliAnalysisFileDescriptor()
512 :TObject(), fLfn(), fGUID(), fUrl(), fPfn(), fSE(),
513 fIsArchive(kFALSE), fImage(0), fNreplicas(0),
514 fStartBytes(0), fReadBytes(0), fSize(0), fOpenedAt(0),
515 fOpenTime(0.), fProcessingTime(0.), fThroughput(0.), fTimer()
520 //______________________________________________________________________________
521 AliAnalysisFileDescriptor::AliAnalysisFileDescriptor(const TFile *file)
522 :TObject(), fLfn(), fGUID(), fUrl(), fPfn(), fSE(),
523 fIsArchive(kFALSE), fImage(0), fNreplicas(0),
524 fStartBytes(0), fReadBytes(0), fSize(0), fOpenedAt(0),
525 fOpenTime(0.), fProcessingTime(0.), fThroughput(0.), fTimer()
527 // Normal constructor
528 if (file->InheritsFrom("TAlienFile")) {
529 fLfn =(const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetLfn();", file));
530 fGUID =(const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetGUID();", file));
531 fUrl =(const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetUrl();", file));
532 fPfn =(const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetPfn();", file));
533 fSE = (const char*)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetSE();", file));
534 fImage = (Int_t)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetImage();", file));
535 fNreplicas = (Int_t)gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetNreplicas();", file));
536 fOpenedAt = gROOT->ProcessLine(Form("((TAlienFile*)%p)->GetOpenTime();", file));
537 gROOT->ProcessLine(Form("((AliAnalysisFileDescriptor*)%p)->SetOpenTime(((TAlienFile*)%p)->GetElapsed());", this, file));
539 fLfn = file->GetName();
540 fPfn = file->GetName();
541 fUrl = file->GetName();
543 if (!fPfn.BeginsWith("/")) fPfn.Prepend(Form("%s/",gSystem->WorkingDirectory()));
546 fStartBytes = TFile::GetFileBytesRead();
547 fIsArchive = file->IsArchive();
548 fSize = file->GetSize();
551 //______________________________________________________________________________
552 AliAnalysisFileDescriptor::AliAnalysisFileDescriptor(const AliAnalysisFileDescriptor &other)
553 :TObject(other), fLfn(other.fLfn), fGUID(other.fGUID),
554 fUrl(other.fUrl), fPfn(other.fPfn), fSE(other.fSE),
555 fIsArchive(other.fIsArchive), fImage(other.fImage),
556 fNreplicas(other.fNreplicas), fStartBytes(other.fStartBytes), fReadBytes(other.fReadBytes),
557 fSize(other.fSize), fOpenedAt(other.fOpenedAt), fOpenTime(other.fOpenTime),
558 fProcessingTime(other.fProcessingTime), fThroughput(other.fThroughput), fTimer()
563 //______________________________________________________________________________
564 AliAnalysisFileDescriptor &AliAnalysisFileDescriptor::operator=(const AliAnalysisFileDescriptor &other)
567 if (&other == this) return *this;
568 TObject::operator=(other);
574 fIsArchive = other.fIsArchive;
575 fImage = other.fImage;
576 fNreplicas = other.fNreplicas;
577 fStartBytes = other.fStartBytes;;
578 fReadBytes = other.fReadBytes;
580 fOpenedAt = other.fOpenedAt;
581 fOpenTime = other.fOpenTime;
582 fProcessingTime = other.fProcessingTime;
583 fThroughput = other.fThroughput;
587 //______________________________________________________________________________
588 AliAnalysisFileDescriptor::~AliAnalysisFileDescriptor()
593 //______________________________________________________________________________
594 void AliAnalysisFileDescriptor::Done()
596 // Must be called at the end of processing, providing file->GetBytesRead() as argument.
598 const Double_t megabyte = 1048576.;
599 // Long64_t stampnow = time(0);
600 fReadBytes = TFile::GetFileBytesRead()-fStartBytes;
601 // fProcessingTime = stampnow-fOpenedAt;
602 fProcessingTime = fTimer.RealTime();
603 Double_t readsize = fReadBytes/megabyte;
604 fThroughput = readsize/fProcessingTime;
607 //______________________________________________________________________________
608 void AliAnalysisFileDescriptor::Print(Option_t*) const
610 // Print info about the file descriptor
611 const Double_t megabyte = 1048576.;
612 printf("===== Logical file name: %s =====\n", fLfn.Data());
613 printf(" Pfn: %s\n", fPfn.Data());
614 printf(" url: %s\n", fUrl.Data());
615 printf(" access time: %lld from SE: %s image %d/%d\n", fOpenedAt, fSE.Data(), fImage, fNreplicas);
616 printf(" open time: %g [sec]\n", fOpenTime);
617 printf(" file size: %g [MB], read size: %g [MB]\n", fSize/megabyte, fReadBytes/megabyte);
618 printf(" processing time [sec]: %g\n", fProcessingTime);
619 printf(" average throughput: %g [MB/sec]\n", fThroughput);
622 //______________________________________________________________________________
623 void AliAnalysisFileDescriptor::SavePrimitive(std::ostream &out, Option_t *)
625 // Stream info to file
626 const Double_t megabyte = 1048576.;
627 out << "#################################################################" << endl;
628 out << "pfn " << fPfn.Data() << endl;
629 out << "url " << fUrl.Data() << endl;
630 out << "se " << fSE.Data() << endl;
631 out << "image " << fImage << endl;
632 out << "nreplicas " << fNreplicas << endl;
633 out << "openstamp " << fOpenedAt << endl;
634 std::ios_base::fmtflags original_flags = out.flags();
635 out << setiosflags(std::ios::fixed) << std::setprecision(3);
636 out << "opentime " << fOpenTime << endl;
637 out << "runtime " << fProcessingTime << endl;
638 out << "filesize " << fSize/megabyte << endl;
639 out << "readsize " << fReadBytes/megabyte << endl;
640 out << "throughput " << fThroughput << endl;
641 out.flags(original_flags);