#include <TEnv.h>
#include <TString.h>
#include <TMap.h>
+#include <TROOT.h>
#ifdef ZMQ
#include "AliStorageEventManager.h"
fPEventSelector(0),
fSubManagers (0),
fAutoLoadTimerRunning(kFALSE),
+ fMutex(new TMutex()),
fgSubSock(EVENTS_SERVER_SUB),
fEventInUse(1),
fWritingToEventIndex(0),
fIsNewEventAvaliable(false),
- fStorageDown(false)
-
+ fStorageDown(false),
+ fFinished(false)
{
// Constructor with event-id.
AliEveEventManager::~AliEveEventManager()
{
// Destructor.
+
+ fFinished = true;
+
+ if(fEventListenerThread)
+ {
+ fEventListenerThread->Kill();
+ delete fEventListenerThread;
+ }
+ if(fStorageManagerWatcherThread)
+ {
+ fStorageManagerWatcherThread->Kill();
+ delete fStorageManagerWatcherThread;
+ }
+
+
fAutoLoadTimer->Stop();
fAutoLoadTimer->Disconnect("Timeout");
+ fAutoLoadTimer->Disconnect("AutoLoadNextEvent");
AliEveEventManager *manager = AliEveEventManager::GetCurrent();
manager->Disconnect("StorageManagerOk");
manager->Disconnect("StorageManagerDown");
if(fSubManagers){delete fSubManagers;}
-
+ if(fMutex){delete fMutex;}
+
if (fIsOpen)
{
Close();
fCurrentTree[1]=0;
AliESDEvent *tmpEvent = NULL;
- while(1)
+ while(!fFinished)
{
tmpEvent = eventManager->GetEvent(EVENTS_SERVER_SUB);
if(tmpEvent)
- {
+ {
if(tmpEvent->GetRunNumber()>=0)
- {
- fMutex.Lock();
+ {
+ fMutex->Lock();
if(fEventInUse == 0){fWritingToEventIndex = 1;}
else if(fEventInUse == 1){fWritingToEventIndex = 0;}
cout<<"Received new event"<<endl;
if(fCurrentEvent[fWritingToEventIndex])
- {
+ {
delete fCurrentEvent[fWritingToEventIndex];
fCurrentEvent[fWritingToEventIndex]=0;
- }
+ }
fCurrentEvent[fWritingToEventIndex] = tmpEvent;
fIsNewEventAvaliable = true;
- NewEventLoaded();
- fMutex.UnLock();
- }
-
- }
- }
+ NewEventLoaded();
+ fMutex->UnLock();
+ }
+ }
+ }
#endif
}
struct clientRequestStruct *request = new struct clientRequestStruct;
request->messageType = REQUEST_CONNECTION;
- while (1)
+ while (!fFinished)
{
if(eventManager->Send(request,CLIENT_COMMUNICATION_REQ,5000))
{
{
StorageManagerDown();
cout<<"WARNING -- Storage Manager is DOWN!!"<<endl;
- fStorageDown = kTRUE;
-
+ fStorageDown = kTRUE;
}
sleep(1);
}
AliESDEvent *resultEvent = NULL;
eventManager->CreateSocket(SERVER_COMMUNICATION_REQ);
- fMutex.Lock();
+ fMutex->Lock();
// send request and receive event:
eventManager->Send(requestMessage,SERVER_COMMUNICATION_REQ);
if(event==2){cout<<"\n\nWARNING -- No next event is avaliable.\n\n"<<endl;}
}
- fMutex.UnLock();
+ fMutex->UnLock();
}
else
{
eventManager->CreateSocket(SERVER_COMMUNICATION_REQ);
AliESDEvent *resultEvent = NULL;
- fMutex.Lock();
+ fMutex->Lock();
eventManager->Send(requestMessage,SERVER_COMMUNICATION_REQ);
resultEvent = eventManager->GetEvent(SERVER_COMMUNICATION_REQ);
SetEvent(0,0,resultEvent,0);
}
else{cout<<"\n\nWARNING -- The most recent event is not avaliable.\n\n"<<endl;}
- fMutex.UnLock();
+ fMutex->UnLock();
}
#endif
#ifdef ZMQ
if(fIsNewEventAvaliable)
{
- fMutex.Lock();
+ fMutex->Lock();
if(fWritingToEventIndex == 0) fEventInUse = 0;
else if(fWritingToEventIndex == 1) fEventInUse = 1;
DestroyElements();
InitOCDB(fCurrentEvent[fEventInUse]->GetRunNumber());
SetEvent(0,0,fCurrentEvent[fEventInUse],0);
-
+
}
}
fIsNewEventAvaliable = false;
- fMutex.UnLock();
+ fMutex->UnLock();
}
else
- {
- cout<<"No new event is avaliable."<<endl;
- NoEventLoaded();
- }
+ {
+ cout<<"No new event is avaliable."<<endl;
+ NoEventLoaded();
+ }
#endif
}
else if ((fESDTree!=0) || (fHLTESDTree!=0))