//Storage Manager:
fStoragePopup = new TGPopupMenu(gClient->GetRoot());
#ifdef ZMQ
- fStoragePopup->AddEntry("&List events",kStorageListEvents);
+// fStoragePopup->AddEntry("&List events",kStorageListEvents);
fStoragePopup->AddEntry("&Mark event",kStorageMarkEvent);
+
+ gEve->GetBrowser()->StartEmbedding(0);
+ fListEventsTab = AliStorageAdministratorPanelListEvents::GetInstance();
+ gEve->GetBrowser()->StopEmbedding("List");
+
+ fListEventsTab->Connect("SelectedEvent()","AliEveConfigManager",this,"SetEventInEventManager()");
+
#endif
fStoragePopup->Connect("Activated(Int_t)","AliEveConfigManager",
}
*/
+ /*
case kStorageListEvents:
{
#ifdef ZMQ
break;
}
+ */
case kStorageMarkEvent:
{
#ifdef ZMQ
void AliEveConfigManager::SetEventInEventManager()
{
AliEveEventManager *manager = AliEveEventManager::GetMaster();
- AliESDEvent *event = fListEventsWindow->GetSelectedEvent();
+ AliESDEvent *event = fListEventsTab->GetSelectedEvent();
if(event)
{
AliEveConfigManager();
AliStorageAdministratorPanelListEvents *fListEventsWindow;
+ AliStorageAdministratorPanelListEvents *fListEventsTab;
AliEveConfigManager(const AliEveConfigManager&); // Not implemented
fCurrentTree[0]=0;
fCurrentTree[1]=0;
AliESDEvent *tmpEvent = NULL;
- TTree *tmpTree = NULL;
while(1)
{
- //if(tmpEvent){delete tmpEvent;tmpEvent=0;}
- tmpEvent = eventManager->GetEvent(EVENTS_SERVER_SUB,-1,&tmpTree);
-
+ tmpEvent = eventManager->GetEvent(EVENTS_SERVER_SUB);
if(tmpEvent)
{
if(tmpEvent->GetRunNumber()>=0)
fMutex.Lock();
if(fEventInUse == 0){fWritingToEventIndex = 1;}
else if(fEventInUse == 1){fWritingToEventIndex = 0;}
- cout<<"Writing to:"<<fWritingToEventIndex<<endl;
+ cout<<"Received new event"<<endl;
if(fCurrentEvent[fWritingToEventIndex])
{
- cout<<"DELETING:"<<fCurrentEvent[fWritingToEventIndex]<<endl;
delete fCurrentEvent[fWritingToEventIndex];
fCurrentEvent[fWritingToEventIndex]=0;
- delete fCurrentTree[fWritingToEventIndex];
}
fCurrentEvent[fWritingToEventIndex] = tmpEvent;
- fCurrentTree[fWritingToEventIndex] = tmpTree;
fIsNewEventAvaliable = true;
fMutex.UnLock();
}
void AliEveEventManager::GotoEvent(Int_t event)
{
+ cout<<"Go to event:"<<event<<endl;
// Load data for specified event.
// If event is out of range an exception is thrown and old state
// is preserved.
if (fExternalCtrl)
{
// throw (kEH + "Event-loop is under external control.");
-
#ifdef ZMQ
-
- if (fESD) {
- int runNumber=fESD->GetRunNumber();
- int eventNumber=fESD->GetEventNumberInFile();
- struct serverRequestStruct *requestMessage = new struct serverRequestStruct;
- struct eventStruct eventToLoad;
- eventToLoad.runNumber = runNumber;
- // // careful! check if exists!
- eventToLoad.eventNumber = eventNumber;
-
- if (event == -1) {
- requestMessage->messageType = REQUEST_GET_LAST_EVENT;
- }
- else if (event == 0) {
- requestMessage->messageType = REQUEST_GET_FIRST_EVENT;
- }
- else if (event == 1) {
- requestMessage->messageType = REQUEST_GET_PREV_EVENT;
- }
- else if (event == 2) {
- requestMessage->messageType = REQUEST_GET_NEXT_EVENT;
- }
-
- requestMessage->event = eventToLoad;
-
- AliStorageEventManager *eventManager =
- AliStorageEventManager::GetEventManagerInstance();
- AliESDEvent *resultEvent = NULL;
-
- eventManager->CreateSocket(SERVER_COMMUNICATION_REQ);
- fMutex.Lock();
- eventManager->Send(requestMessage,SERVER_COMMUNICATION_REQ);
- resultEvent = eventManager->GetEvent(SERVER_COMMUNICATION_REQ);
-
- if(resultEvent)
- {
- cout<<"Event Manager -- first/last or prev/next event loaded "<<resultEvent->GetRunNumber() <<endl;
- DestroyElements();
- InitOCDB(resultEvent->GetRunNumber());
- SetEvent(0,0,resultEvent,0);
- fMutex.UnLock();
-
- }
- else{cout<<"No first/last event is avaliable."<<endl;}
- }
- else {
-
- }
-
-
+ if (fESD)
+ {
+ // create new server request:
+ struct serverRequestStruct *requestMessage = new struct serverRequestStruct;
+
+ // set request type:
+ if (event == -1) {requestMessage->messageType = REQUEST_GET_LAST_EVENT;}
+ else if (event == 0) {requestMessage->messageType = REQUEST_GET_FIRST_EVENT;}
+ else if (event == 1) {requestMessage->messageType = REQUEST_GET_PREV_EVENT;}
+ else if (event == 2) {requestMessage->messageType = REQUEST_GET_NEXT_EVENT;}
+
+ // set event struct:
+ struct eventStruct eventToLoad;
+ eventToLoad.runNumber = fESD->GetRunNumber();
+ eventToLoad.eventNumber = fESD->GetEventNumberInFile();
+ requestMessage->event = eventToLoad;
+
+ // create event manager:
+ AliStorageEventManager *eventManager =
+ AliStorageEventManager::GetEventManagerInstance();
+ AliESDEvent *resultEvent = NULL;
+
+ eventManager->CreateSocket(SERVER_COMMUNICATION_REQ);
+ fMutex.Lock();
+
+ // send request and receive event:
+ eventManager->Send(requestMessage,SERVER_COMMUNICATION_REQ);
+ resultEvent = eventManager->GetEvent(SERVER_COMMUNICATION_REQ);
+
+ if(resultEvent)
+ {
+ DestroyElements();
+ InitOCDB(resultEvent->GetRunNumber());
+ SetEvent(0,0,resultEvent,0);
+ }
+ else
+ {
+ if(event==-1){cout<<"\n\nWARNING -- No last event is avaliable.\n\n"<<endl;}
+ if(event==0){cout<<"\n\nWARNING -- No first event is avaliable.\n\n"<<endl;}
+ if(event==1){cout<<"\n\nWARNING -- No previous event is avaliable.\n\n"<<endl;}
+ if(event==2){cout<<"\n\nWARNING -- No next event is avaliable.\n\n"<<endl;}
+ }
+
+ fMutex.UnLock();
+ }
+ else
+ {
+ cout<<"\n\nWARNING -- No event has been already loaded. Loading the most recent event...\n\n"<<endl;
+
+ struct serverRequestStruct *requestMessage = new struct serverRequestStruct;
+ requestMessage->messageType = REQUEST_GET_LAST_EVENT;
+
+ AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
+ eventManager->CreateSocket(SERVER_COMMUNICATION_REQ);
+ AliESDEvent *resultEvent = NULL;
+
+ fMutex.Lock();
+ eventManager->Send(requestMessage,SERVER_COMMUNICATION_REQ);
+ resultEvent = eventManager->GetEvent(SERVER_COMMUNICATION_REQ);
+
+ if(resultEvent)
+ {
+ fESD=resultEvent;
+ DestroyElements();
+ InitOCDB(resultEvent->GetRunNumber());
+ SetEvent(0,0,resultEvent,0);
+ }
+ else{cout<<"\n\nWARNING -- The most recent event is not avaliable.\n\n"<<endl;}
+ fMutex.UnLock();
+ }
#endif
}
void AliEveEventManager::PrepareForNewEvent(AliESDEvent *event)
{
DestroyElements();
-
InitOCDB(event->GetRunNumber());
-
printf("======================= setting event to %d\n", fEventId);
SetEvent(0,0,event,0);
}
static const TEveException kEH("AliEveEventManager::NextEvent ");
- if (fAutoLoadTimerRunning)
- {
- throw (kEH + "Event auto-load timer is running.");
- }
-
+ if (fAutoLoadTimerRunning){throw (kEH + "Event auto-load timer is running.");}
+
if (fExternalCtrl)
{
#ifdef ZMQ
-
- cout<<fIsNewEventAvaliable<<"\t"<<"\t"<<fWritingToEventIndex<<endl;
-
- if(fIsNewEventAvaliable)
- {
- cout<<"new event"<<endl;
- fMutex.Lock();
- if(fWritingToEventIndex == 0) fEventInUse = 0;
- else if(fWritingToEventIndex == 1) fEventInUse = 1;
- cout<<"Using:"<<fEventInUse<<endl;
-
- if(fCurrentEvent[fEventInUse])
+ if(fIsNewEventAvaliable)
{
- if(fCurrentEvent[fEventInUse]->GetRunNumber() >= 0)
- {
- printf("======================= setting event to %d\n", fEventId);
-
- DestroyElements();
- InitOCDB(fCurrentEvent[fEventInUse]->GetRunNumber());
- SetEvent(0,0,fCurrentEvent[fEventInUse],0);
- }
+ fMutex.Lock();
+ if(fWritingToEventIndex == 0) fEventInUse = 0;
+ else if(fWritingToEventIndex == 1) fEventInUse = 1;
+
+ if(fCurrentEvent[fEventInUse])
+ {
+ if(fCurrentEvent[fEventInUse]->GetRunNumber() >= 0)
+ {
+ printf("======================= setting event to %d\n", fEventId);
+
+ DestroyElements();
+ InitOCDB(fCurrentEvent[fEventInUse]->GetRunNumber());
+ SetEvent(0,0,fCurrentEvent[fEventInUse],0);
+ }
+ }
+ fIsNewEventAvaliable = false;
+ fMutex.UnLock();
}
- fIsNewEventAvaliable = false;
- fMutex.UnLock();
- }
- else{cout<<"No new event is avaliable."<<endl;}
-
-#endif
+ else{cout<<"No new event is avaliable."<<endl;}
+#endif
}
else if ((fESDTree!=0) || (fHLTESDTree!=0))
{
{
fStorageStatus->SetText("Storage: DOWN");
fMarkEvent->SetEnabled(false);
+ fNextEvent->SetEnabled(false);
+ fLastEvent->SetEnabled(false);
+ fPrevEvent->SetEnabled(false);
+ fFirstEvent->SetEnabled(false);
}
else if(state == 1)
{
fStorageStatus->SetText("Storage: OK");
fMarkEvent->SetEnabled(true);
+ fNextEvent->SetEnabled(true);
+ fLastEvent->SetEnabled(true);
+ fPrevEvent->SetEnabled(true);
+ fFirstEvent->SetEnabled(true);
}
}
cfLogbook->SetLayoutManager(new TGMatrixLayout(cfLogbook, 0, 2));
TGLabel* lbLogbookHost = new TGLabel(cfLogbook, "Host:");
- fEntryLogbookHost = new TGTextEntry(cfLogbook, "pcaldbl501");
+ fEntryLogbookHost = new TGTextEntry(cfLogbook, "localhost");
fEntryLogbookHost->Resize(150,0);
TGLabel* lbLogbookPort = new TGLabel(cfLogbook, "Port:");
fEntryLogbookPort = new TGTextEntry(cfLogbook, "3306");
// return full path to the server configuration file
inline const char* GetPathToServerConf()
{
- return Form("%s/MONITOR/%s",
+ return Form("%s/MONITOR/alieventserver/%s",
gSystem->Getenv("ALICE_ROOT"),
ALIEVENTSERVER_CONF);
}
vector<serverListStruct> receivedList = fEventManager->GetServerListVector(fServerSocket);
- cout<<"PANEL:"<<receivedList[0].runNumber<<endl;
- cout<<"VECTOR SIZE:"<<receivedList.size()<<endl;
-
-//do something with list of maching events
- cout<<"Received list of perm events"<<endl;
-
for(unsigned int i=0;i<receivedList.size();i++)
{
fEventsList->InsertEntry(Form("%d %d %s %d %d ",
//get run and event number from selected row
int selectedEventNumber = fEventsList->GetSelected()-1;
-
- cout<<"SELECTED:"<<selectedEventNumber<<endl;
if(selectedEventNumber<0)return;
//remove oldest event
cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
- fDatabase->RemoveEvent(fDatabase->GetOldestEvent());
+ fDatabase->RemoveEventsWithPath(oldestEventPath);
+// fDatabase->RemoveEvent(oldestEvent);
}
}
}
return data;
}
-struct eventStruct AliStorageDatabase::GetOldestEvent()
-{
- struct eventStruct oldestEvent = {0,0};
-
- TSQLResult *result = fServer->Query(Form("SELECT * FROM %s ORDER BY run_number,event_number;",fTable.c_str()));
-
- TSQLRow *row;
- if((row = result->Next()))
- {
- oldestEvent.runNumber = atoi(row->GetField(0));
- oldestEvent.eventNumber = atoi(row->GetField(1));
-
- delete row;
- }
- else
- {
- cout<<"DATABASE -- NO OLDEST EVENT FOUND. Storage may be corrupted."<<endl;
- }
- return oldestEvent;
-}
-
void AliStorageDatabase::RemoveEvent(struct eventStruct event)
{
TSQLResult* res;
res = fServer->Query(Form("DELETE FROM %s WHERE run_number = %d AND event_number = %d",fTable.c_str(),event.runNumber,event.eventNumber));
delete res;
+}
+void AliStorageDatabase::RemoveEventsWithPath(string path)
+{
+ TSQLResult *res = fServer->Query(Form("DELETE FROM %s WHERE file_path = \"%s\";",fTable.c_str(),path.c_str()));
+ delete res;
}
string AliStorageDatabase::GetFilePath(struct eventStruct event)
AliESDEvent* AliStorageDatabase::GetNextEvent(struct eventStruct event)
{
+ cout<<"Database:"<<event.runNumber<<"\t"<<event.eventNumber<<endl;
+
TSQLResult *result = fServer->Query(Form("SELECT * FROM %s ORDER BY run_number,event_number;",fTable.c_str()));
TSQLRow *row;
return NULL;
}
+struct eventStruct AliStorageDatabase::GetOldestEvent()
+{
+ TSQLResult *result = fServer->Query(Form("SELECT * FROM %s ORDER BY run_number,event_number;",fTable.c_str()));
+
+ TSQLRow *row;
+ struct eventStruct oldestEvent = {0,0};
+
+ if((row = result->Next()))
+ {
+ oldestEvent.runNumber = atoi(row->GetField(0));
+ oldestEvent.eventNumber = atoi(row->GetField(1));
+ delete row;
+ }
+ else
+ {
+ cout<<"DATABASE -- NO OLDEST EVENT FOUND. Storage may be corrupted."<<endl;
+ }
+ return oldestEvent;
+}
AliESDEvent* AliStorageDatabase::GetLastEvent()
{
AliESDEvent* AliStorageDatabase::GetFirstEvent()
{
+ cout<<"Database - first"<<endl;
TSQLResult *result = fServer->Query(Form("SELECT * FROM %s ORDER BY run_number,event_number DESC;",fTable.c_str()));
TSQLRow *row;
- struct eventStruct lastEvent = {0,0};
+ struct eventStruct firstEvent = {0,0};
while((row = result->Next()))
{
- lastEvent.runNumber = atoi(row->GetField(0));
- lastEvent.eventNumber = atoi(row->GetField(1));
+ firstEvent.runNumber = atoi(row->GetField(0));
+ firstEvent.eventNumber = atoi(row->GetField(1));
delete row;
}
- cout<<"Last event is:"<<lastEvent.eventNumber<<endl;
- return GetEvent(lastEvent);
+ cout<<"First event is:"<<firstEvent.eventNumber<<endl;
+ return GetEvent(firstEvent);
}
bool MarkEvent(struct eventStruct event);
void RemoveEvent(struct eventStruct event);
+ void RemoveEventsWithPath(std::string path);
std::string GetFilePath(struct eventStruct event);
struct eventStruct GetOldestEvent();
std::vector<serverListStruct> GetList(struct listRequestStruct listStruct);
}
-void __freeBuff (void *data, void *hint)
+void freeBuff (void *data, void *hint)
{
- free(data);
+ // free(data);
}
bool AliStorageEventManager::CreateSocket(storageSockets socket)
{
- switch(socket)
- {
- case SERVER_COMMUNICATION_REQ:
- {
- fSockets[SERVER_COMMUNICATION_REQ] =
- new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
- try
- {
- fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
-
- }
- catch (const zmq::error_t& e)
- {
- cout<<"MANAGER -- "<<e.what()<<endl;
- return 0;
- }
- }
- break;
- case SERVER_COMMUNICATION_REP:
- {
- fSockets[SERVER_COMMUNICATION_REP] =
- new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
- try
- {
- fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
- }
- catch (const zmq::error_t& e)
- {
- cout<<"MANAGER -- "<<e.what()<<endl;
- return 0;
- }
- }
- break;
- case CLIENT_COMMUNICATION_REQ:
- {
- fSockets[CLIENT_COMMUNICATION_REQ] =
- new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
- try
- {
- fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
- }
- catch (const zmq::error_t& e)
- {
- cout<<"MANAGER -- "<<e.what()<<endl;
- return 0;
- }
- }
- break;
- case CLIENT_COMMUNICATION_REP:
- {
- fSockets[CLIENT_COMMUNICATION_REP] =
- new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
- try
- {
- fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
- }
- catch (const zmq::error_t& e)
- {
- cout<<"MANAGER -- "<<e.what()<<endl;
- return 0;
- }
- }
- break;
- case EVENTS_SERVER_PUB:
- {
- fSockets[EVENTS_SERVER_PUB] =
- new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
- try
- {
- fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
- }
- catch (const zmq::error_t& e)
- {
- cout<<"MANAGER -- "<<e.what()<<endl;
- return 0;
- }
- }
- break;
- case EVENTS_SERVER_SUB:
- {
- fSockets[EVENTS_SERVER_SUB] =
- new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
- fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
- try
- {
- fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
- }
- catch (const zmq::error_t& e)
- {
- cout<<"MANAGER -- "<<e.what()<<endl;
- return 0;
-
- }
- }
- break;
- case XML_PUB:
- {
- fSockets[XML_PUB] =
- new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
- try
- {
- fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
- }
- catch (const zmq::error_t& e)
- {
- cout<<"MANAGER -- "<<e.what()<<endl;
- return 0;
- }
- }
- break;
- default:break;
- }
- return 1;
+ cout<<"Creating socket:"<<socket<<endl;
+
+ switch(socket)
+ {
+ case SERVER_COMMUNICATION_REQ:
+ {
+ fSockets[SERVER_COMMUNICATION_REQ] =
+ new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
+ try
+ {
+ fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
+ }
+ catch (const zmq::error_t& e)
+ {
+ cout<<"MANAGER -- "<<e.what()<<endl;
+ return 0;
+ }
+ }
+ break;
+ case SERVER_COMMUNICATION_REP:
+ {
+ fSockets[SERVER_COMMUNICATION_REP] =
+ new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
+ try
+ {
+ fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
+ }
+ catch (const zmq::error_t& e)
+ {
+ cout<<"MANAGER -- "<<e.what()<<endl;
+ return 0;
+ }
+ }
+ break;
+ case CLIENT_COMMUNICATION_REQ:
+ {
+ fSockets[CLIENT_COMMUNICATION_REQ] =
+ new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
+ try
+ {
+ fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
+ }
+ catch (const zmq::error_t& e)
+ {
+ cout<<"MANAGER -- "<<e.what()<<endl;
+ return 0;
+ }
+ }
+ break;
+ case CLIENT_COMMUNICATION_REP:
+ {
+ fSockets[CLIENT_COMMUNICATION_REP] =
+ new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
+ try
+ {
+ fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
+ }
+ catch (const zmq::error_t& e)
+ {
+ cout<<"MANAGER -- "<<e.what()<<endl;
+ return 0;
+ }
+ }
+ break;
+ case EVENTS_SERVER_PUB:
+ {
+ fSockets[EVENTS_SERVER_PUB] =
+ new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
+ try
+ {
+ fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
+ }
+ catch (const zmq::error_t& e)
+ {
+ cout<<"MANAGER -- "<<e.what()<<endl;
+ return 0;
+ }
+ }
+ break;
+ case EVENTS_SERVER_SUB:
+ {
+ fSockets[EVENTS_SERVER_SUB] =
+ new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
+ fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
+ try
+ {
+ fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
+ }
+ catch (const zmq::error_t& e)
+ {
+ cout<<"MANAGER -- "<<e.what()<<endl;
+ return 0;
+
+ }
+ }
+ break;
+ case XML_PUB:
+ {
+ fSockets[XML_PUB] =
+ new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
+ try
+ {
+ fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
+ }
+ catch (const zmq::error_t& e)
+ {
+ cout<<"MANAGER -- "<<e.what()<<endl;
+ return 0;
+ }
+ }
+ break;
+ default:break;
+ }
+ return 1;
}
void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
fSockets[socket]->send(message);
if(numberOfRecords==0)return;
- fSockets[socket]->recv((new message_t));//empty message just to keep req-rep order
-
+ message_t *tmpMessage = new message_t();
+ fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
+
// //prepare message with event's list
// char *buffer = reinterpret_cast<char*> (&list[0]);
// message_t *reply = new message_t((void*)buffer,
// fSockets[socket]->send(*reply);
// if(reply){delete reply;}
- zmq::message_t reply(sizeof(serverListStruct)*numberOfRecords);
+ message_t reply(sizeof(serverListStruct)*numberOfRecords);
memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
fSockets[socket]->send(reply);
-
+ if(tmpMessage){delete tmpMessage;}
}
void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
message_t *requestMessage = new message_t((void*)buffer,
sizeof(struct serverRequestStruct)
+sizeof(struct listRequestStruct)
- +sizeof(struct eventStruct),0);
+ +sizeof(struct eventStruct),freeBuff);
fSockets[socket]->send(*requestMessage);
}
char *buffer = (char*)(request);
message_t *requestMessage = new message_t((void*)buffer,
- sizeof(struct clientRequestStruct),0);
+ sizeof(struct clientRequestStruct),freeBuff);
try
{
streamBuffer << message;
string stringBuffer = streamBuffer.str();
char *buffer = (char*)stringBuffer.c_str();
- message_t *replyMessage = new message_t((void*)buffer,sizeof(long),0);
+ message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
+
fSockets[socket]->send(*replyMessage);
delete replyMessage;
streamBuffer.str(string());
{
buffer = (char*)("false");
}
- message_t *replyMessage = new message_t((void*)buffer,sizeof(char*),0);
+ message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
fSockets[socket]->send(*replyMessage);
delete replyMessage;
}
char* buf = (char*) malloc(bufsize * sizeof(char));
memcpy(buf, tmess.Buffer(), bufsize);
- zmq::message_t message((void*)buf, bufsize, 0, 0);
+ message_t message((void*)buf, bufsize, freeBuff);
fSockets[socket]->send(message);
}
for(int i=0;i<event->GetNumberOfTracks();i++)
{
AliESDtrack *track = event->GetTrack(i);
- //double hgfhgf[size];
- //track->GetESDpid(pid);
bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
const AliTrackPointArray *array = track->GetTrackPointArray();
return receivedList;
}
-AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout,TTree **tmpTree)
+AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
{
message_t* message = new message_t();
mess->ResetMap();
AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
-
+
if (data)
{
TTree* tree= new TTree("esdTree", "esdTree");
event->ReadFromTree(tree);
tree->GetEntry(0);
if(data){delete data;}
- //if(tree){delete tree;}
+ if(tree){delete tree;}
if(message){delete message;}
- if(tmpTree){*tmpTree = tree;}
- return event;
+ return event;
}
else
{
long AliStorageEventManager::GetLong(storageSockets socket)
{
- message_t *responseMessage = new message_t;
+ message_t *responseMessage = new message_t();
fSockets[socket]->recv(responseMessage);
- return (long)atoi(static_cast<char*>(responseMessage->data()));
+
+ long result = 0;
+
+ if(responseMessage)
+ {
+ result = (long)atoi(static_cast<char*>(responseMessage->data()));
+ delete responseMessage;
+ }
+ return result;
}
void SendAsXml(AliESDEvent *event,storageSockets socket);
std::vector<serverListStruct> GetServerListVector(storageSockets socket);
- AliESDEvent* GetEvent(storageSockets socket,int timeout=-1,TTree **tmpTree=0);
+ AliESDEvent* GetEvent(storageSockets socket,int timeout=-1);
struct serverRequestStruct* GetServerStruct(storageSockets socket);
struct clientRequestStruct* GetClientStruct(storageSockets socket);
long GetLong(storageSockets socket);
}
case REQUEST_GET_NEXT_EVENT:
{
+ cout<<"NEXT EVENT request received"<<endl;
AliESDEvent *event = fDatabase->GetNextEvent(request->event);
eventManager->Send(event,socket);
delete event;
currentTree[0]=0;
currentTree[1]=0;
AliESDEvent *tmpEvent;
- TTree *tmpTree = NULL;
while(1)
{
//if(tmpEvent){delete tmpEvent;tmpEvent=0;}
- tmpEvent = eventManager->GetEvent(EVENTS_SERVER_SUB,-1,&tmpTree);
+ tmpEvent = eventManager->GetEvent(EVENTS_SERVER_SUB);
if(tmpEvent)
{
myMutex.Lock();
if(eventInUse == 0){writingToEventIndex = 1;}
else if(eventInUse == 1){writingToEventIndex = 0;}
- cout<<"Writing to:"<<writingToEventIndex<<endl;
+
if(currentEvent[writingToEventIndex])
{
- cout<<"DELETING:"<<currentEvent[writingToEventIndex]<<endl;
delete currentEvent[writingToEventIndex];
currentEvent[writingToEventIndex]=0;
- delete currentTree[writingToEventIndex];
}
currentEvent[writingToEventIndex] = tmpEvent;
- currentTree[writingToEventIndex] = tmpTree;
isNewEventAvaliable = true;
myMutex.UnLock();
}
-#ifdef ZMQ
#include "AliStorageEventManager.h"
-#endif
#include <iostream>
#include <TFile.h>
+#include <TThread.h>
+
+
+using namespace std;
+
+AliESDEvent *fCurrentEvent[2];
+TTree *fCurrentTree[2];
+TMutex fMutex;
+int fEventInUse = 1;
+int fWritingToEventIndex = 0;
+bool fIsNewEventAvaliable = false;
int main()
{
-#ifdef ZMQ
- AliStorageEventManager *manager = AliStorageEventManager::GetEventManagerInstance();
- manager->CreateSocket(EVENTS_SERVER_SUB);
- AliESDEvent *event = manager->GetEvent(EVENTS_SERVER_SUB);
-
- std::cout<<"Received event:"<<event->GetEventNumberInFile()<<std::endl;
-#endif
+ AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
+ eventManager->CreateSocket(EVENTS_SERVER_SUB);
+
+ fCurrentEvent[0]=0;
+ fCurrentEvent[1]=0;
+ fCurrentTree[0]=0;
+ fCurrentTree[1]=0;
+ AliESDEvent *tmpEvent = NULL;
+
+ while(1)
+ {
+ tmpEvent = eventManager->GetEvent(EVENTS_SERVER_SUB);
+
+ if(tmpEvent)
+ {
+ if(tmpEvent->GetRunNumber()>=0)
+ {
+ fMutex.Lock();
+ if(fEventInUse == 0){fWritingToEventIndex = 1;}
+ else if(fEventInUse == 1){fWritingToEventIndex = 0;}
+ cout<<"Received new event"<<endl;
+ if(fCurrentEvent[fWritingToEventIndex])
+ {
+ delete fCurrentEvent[fWritingToEventIndex];
+ fCurrentEvent[fWritingToEventIndex]=0;
+ }
+ fCurrentEvent[fWritingToEventIndex] = tmpEvent;
+ fIsNewEventAvaliable = true;
+ fMutex.UnLock();
+ }
+ }
+ }
return 0;
}
PASS="storage123"
TABLE="events"
STORAGE_PATH="/Users/Jerus/storedFiles"
-MAX_SIZE="10000000"
+MAX_SIZE="30000000"
MAX_OCCUPATION="80"
REMOVE_PERCENT="60"
EVENTS_IN_FILE="5"