AliStorageEventManager::AliStorageEventManager()
{
- //read config file
- TThread::Lock();
- ifstream configFile (GetConfigFilePath());
+ //read config file
+ TThread::Lock();
+ ifstream configFile (GetConfigFilePath());
- if (configFile.is_open())
- {
- string line;
- int from,to;
- while(configFile.good())
- {
- getline(configFile,line);
- from = line.find("\"")+1;
- to = line.find_last_of("\"");
- if(line.find("STORAGE_SERVER=")==0)
- {
- fStorageServer=line.substr(from,to-from);
- }
- else if(line.find("EVENT_SERVER=")==0)
- {
- fEventServer=line.substr(from,to-from);
- }
- else if(line.find("STORAGE_SERVER_PORT=")==0)
- {
- fStorageServerPort=atoi(line.substr(from,to-from).c_str());
- }
- else if(line.find("EVENT_SERVER_PORT=")==0)
- {
- fEventServerPort=atoi(line.substr(from,to-from).c_str());
- }
- else if(line.find("STORAGE_CLIENT_PORT=")==0)
- {
- fStorageClientPort=atoi(line.substr(from,to-from).c_str());
- }
- else if(line.find("XML_SERVER_PORT=")==0)
- {
- fXmlServerPort=atoi(line.substr(from,to-from).c_str());
- }
- }
- if(configFile.eof())
- {
- configFile.clear();
- }
- configFile.close();
- }
- else
- {
- cout<<"EVENT MANAGER -- Unable to open config file"<<endl;
- }
- TThread::UnLock();
-
- for(int i=0;i<NUMBER_OF_SOCKETS;i++)
- {
- fContexts[i] = new context_t();
- }
+ if (configFile.is_open())
+ {
+ string line;
+ int from,to;
+ while(configFile.good())
+ {
+ getline(configFile,line);
+ from = line.find("\"")+1;
+ to = line.find_last_of("\"");
+ if(line.find("STORAGE_SERVER=")==0)
+ {
+ fStorageServer=line.substr(from,to-from);
+ }
+ else if(line.find("EVENT_SERVER=")==0)
+ {
+ fEventServer=line.substr(from,to-from);
+ }
+ else if(line.find("STORAGE_SERVER_PORT=")==0)
+ {
+ fStorageServerPort=atoi(line.substr(from,to-from).c_str());
+ }
+ else if(line.find("EVENT_SERVER_PORT=")==0)
+ {
+ fEventServerPort=atoi(line.substr(from,to-from).c_str());
+ }
+ else if(line.find("STORAGE_CLIENT_PORT=")==0)
+ {
+ fStorageClientPort=atoi(line.substr(from,to-from).c_str());
+ }
+ else if(line.find("XML_SERVER_PORT=")==0)
+ {
+ fXmlServerPort=atoi(line.substr(from,to-from).c_str());
+ }
+ }
+ if(configFile.eof()){configFile.clear();}
+ configFile.close();
+ }
+ else{cout<<"EVENT MANAGER -- Unable to open config file"<<endl;}
+ TThread::UnLock();
+
+ for(int i=0;i<NUMBER_OF_SOCKETS;i++){fContexts[i] = new context_t();}
}
AliStorageEventManager::~AliStorageEventManager()
{
- if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
+ if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
}
AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
{
- TThread::Lock();
- if(fManagerInstance==0)
- {
- fManagerInstance = new AliStorageEventManager();
- }
- TThread::UnLock();
- return fManagerInstance;
+ TThread::Lock();
+ if(fManagerInstance==0)
+ {
+ fManagerInstance = new AliStorageEventManager();
+ }
+ TThread::UnLock();
+ return fManagerInstance;
}
void freeBuff (void *data, void *hint)
{
- // free(data);
+ // free(data);
}
bool AliStorageEventManager::CreateSocket(storageSockets socket)
void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
{
- //send size of the struct first
- int numberOfRecords = list.size();
- message_t message(20);
- snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
-
- fSockets[socket]->send(message);
- if(numberOfRecords==0)return;
+ //send size of the struct first
+ int numberOfRecords = list.size();
+ message_t message(20);
+ snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
+ try{
+ fSockets[socket]->send(message);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
+ }
+ //if(numberOfRecords==0)return;
message_t *tmpMessage = new message_t();
- fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
- // //prepare message with event's list
- // char *buffer = reinterpret_cast<char*> (&list[0]);
- // message_t *reply = new message_t((void*)buffer,
- // sizeof(serverListStruct)*numberOfRecords,0);
- // fSockets[socket]->send(*reply);
- // if(reply){delete reply;}
-
- message_t reply(sizeof(serverListStruct)*numberOfRecords);
- memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
-
- fSockets[socket]->send(reply);
+ try{
+ fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
+ }
+ // //prepare message with event's list
+ // char *buffer = reinterpret_cast<char*> (&list[0]);
+ // message_t *reply = new message_t((void*)buffer,
+ // sizeof(serverListStruct)*numberOfRecords,0);
+ // fSockets[socket]->send(*reply);
+ // if(reply){delete reply;}
+
+ message_t reply(sizeof(serverListStruct)*numberOfRecords);
+ memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
+
+ try{
+ fSockets[socket]->send(reply);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
+ }
if(tmpMessage){delete tmpMessage;}
}
void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
{
- char *buffer = (char*)(request);
- message_t *requestMessage = new message_t((void*)buffer,
- sizeof(struct serverRequestStruct)
- +sizeof(struct listRequestStruct)
- +sizeof(struct eventStruct),freeBuff);
- fSockets[socket]->send(*requestMessage);
+ char *buffer = (char*)(request);
+ message_t *requestMessage = new message_t((void*)buffer,
+ sizeof(struct serverRequestStruct)
+ +sizeof(struct listRequestStruct)
+ +sizeof(struct eventStruct),freeBuff);
+ try{
+ fSockets[socket]->send(*requestMessage);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- send serverRequestStruct -- "<<e.what()<<endl;
+ }
}
bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
{
- pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
-
-
- char *buffer = (char*)(request);
- message_t *requestMessage = new message_t((void*)buffer,
- sizeof(struct clientRequestStruct),freeBuff);
-
- try
- {
- fSockets[socket]->send(*requestMessage);
- }
- catch (const zmq::error_t& e)
- {
- cout<<"MANAGER -- "<<e.what()<<endl;
- cout<<e.num()<<endl;
- if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
-
- CreateSocket(socket);
- delete requestMessage;
- return 0;
-
- }
- if(timeout>=0)
- {
- if(poll (&items[0], 1, timeout)==0)
- {
- delete requestMessage;
- return 0;
- }
- }
- delete requestMessage;
- return 1;
+ pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
+
+
+ char *buffer = (char*)(request);
+ message_t *requestMessage = new message_t((void*)buffer,
+ sizeof(struct clientRequestStruct),freeBuff);
+
+ try{
+ fSockets[socket]->send(*requestMessage);
+ }
+ catch (const zmq::error_t& e)
+ {
+ cout<<"MANAGER -- "<<e.what()<<endl;
+ cout<<e.num()<<endl;
+ if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
+
+ CreateSocket(socket);
+ delete requestMessage;
+ return 0;
+
+ }
+ if(timeout>=0)
+ {
+ if(poll (&items[0], 1, timeout)==0)
+ {
+ delete requestMessage;
+ return 0;
+ }
+ }
+ delete requestMessage;
+ return 1;
}
void AliStorageEventManager::Send(long message,storageSockets socket)
{
- stringstream streamBuffer;
- streamBuffer << message;
- string stringBuffer = streamBuffer.str();
- char *buffer = (char*)stringBuffer.c_str();
- message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
+ stringstream streamBuffer;
+ streamBuffer << message;
+ string stringBuffer = streamBuffer.str();
+ char *buffer = (char*)stringBuffer.c_str();
+ message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
- fSockets[socket]->send(*replyMessage);
- delete replyMessage;
- streamBuffer.str(string());
- streamBuffer.clear();
+ try{
+ fSockets[socket]->send(*replyMessage);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- send long -- "<<e.what()<<endl;
+ }
+ delete replyMessage;
+ streamBuffer.str(string());
+ streamBuffer.clear();
}
void AliStorageEventManager::Send(bool message,storageSockets socket)
{
- char *buffer;
- if(message==true)
- {
- buffer = (char*)("true");
- }
- else
- {
- buffer = (char*)("false");
- }
- message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
- fSockets[socket]->send(*replyMessage);
- delete replyMessage;
+ char *buffer;
+ if(message==true)
+ {
+ buffer = (char*)("true");
+ }
+ else
+ {
+ buffer = (char*)("false");
+ }
+ message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
+ try{
+ fSockets[socket]->send(*replyMessage);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- send bool -- "<<e.what()<<endl;
+ }
+ delete replyMessage;
}
void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
{
- TMessage tmess(kMESS_OBJECT);
- tmess.Reset();
- tmess.WriteObject(event);
- TMessage::EnableSchemaEvolutionForAll(kTRUE);
-
- int bufsize = tmess.Length();
- char* buf = (char*) malloc(bufsize * sizeof(char));
- memcpy(buf, tmess.Buffer(), bufsize);
-
- message_t message((void*)buf, bufsize, freeBuff);
- fSockets[socket]->send(message);
+ TMessage tmess(kMESS_OBJECT);
+ tmess.Reset();
+ tmess.WriteObject(event);
+ TMessage::EnableSchemaEvolutionForAll(kTRUE);
+
+ int bufsize = tmess.Length();
+ char* buf = (char*) malloc(bufsize * sizeof(char));
+ memcpy(buf, tmess.Buffer(), bufsize);
+
+ message_t message((void*)buf, bufsize, freeBuff);
+ try{
+ fSockets[socket]->send(message);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- send AliESDEvent -- "<<e.what()<<endl;
+ }
}
void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
{
- cout<<"SENDING AS XML"<<endl;
- stringstream bufferStream;
- bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
- bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
-
- for(int i=0;i<event->GetNumberOfTracks();i++)
- {
- AliESDtrack *track = event->GetTrack(i);
- bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
- const AliTrackPointArray *array = track->GetTrackPointArray();
-
- if(array)
- {
- const float *x = array->GetX();
- const float *y = array->GetY();
- const float *z = array->GetZ();
- int n = array->GetNPoints();
-
- for(int j=0;j<n;j++)
- {
- bufferStream <<"\t\t<point>"<<endl;
- bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
- bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
- bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
- bufferStream <<"\t\t</point>"<<endl;
- }
- }
- else cout<<"no array"<<endl;
-
- bufferStream << "\t</track>"<<endl;
- }
-
- bufferStream << "</ESD>"<<endl;
-
- string bufferString = bufferStream.str();
- message_t message(bufferString.size());
- memcpy (message.data(), bufferString.data(), bufferString.size());
-
- fSockets[socket]->send(message);
- cout<<"xml sent"<<endl;
+ cout<<"SENDING AS XML"<<endl;
+ stringstream bufferStream;
+ bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
+ bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
+
+ for(int i=0;i<event->GetNumberOfTracks();i++)
+ {
+ AliESDtrack *track = event->GetTrack(i);
+ bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
+ const AliTrackPointArray *array = track->GetTrackPointArray();
+
+ if(array)
+ {
+ const float *x = array->GetX();
+ const float *y = array->GetY();
+ const float *z = array->GetZ();
+ int n = array->GetNPoints();
+
+ for(int j=0;j<n;j++)
+ {
+ bufferStream <<"\t\t<point>"<<endl;
+ bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
+ bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
+ bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
+ bufferStream <<"\t\t</point>"<<endl;
+ }
+ }
+ else cout<<"no array"<<endl;
+
+ bufferStream << "\t</track>"<<endl;
+ }
+
+ bufferStream << "</ESD>"<<endl;
+
+ string bufferString = bufferStream.str();
+ message_t message(bufferString.size());
+ memcpy (message.data(), bufferString.data(), bufferString.size());
+
+ try{
+ fSockets[socket]->send(message);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- send send xml -- "<<e.what()<<endl;
+ }
+ cout<<"xml sent"<<endl;
}
vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket)
{
- //get size of the incomming message
- message_t sizeMessage;
- fSockets[socket]->recv(&sizeMessage);
- int numberOfRecords;
- istringstream iss(static_cast<char*>(sizeMessage.data()));
- iss >> numberOfRecords;
-
- if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
-
- fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
-
-//get list of events
- message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
- fSockets[socket]->recv(response);
-
- vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
-
- if (response) {delete response;}
- return receivedList;
+ //get size of the incomming message
+ message_t sizeMessage;
+
+ try{
+ fSockets[socket]->recv(&sizeMessage);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
+ }
+ int numberOfRecords;
+ istringstream iss(static_cast<char*>(sizeMessage.data()));
+ iss >> numberOfRecords;
+
+ if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
+
+ try{
+ fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
+ }
+ //get list of events
+ message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
+ try{
+ fSockets[socket]->recv(response);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
+ }
+
+ vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
+
+ if (response) {delete response;}
+ return receivedList;
}
AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
{
- message_t* message = new message_t();
-
- try
- {
- fSockets[socket]->recv(message);
- }
- catch (const zmq::error_t& e)
- {
- cout<<"MANAGER -- "<<e.what()<<endl;
- return NULL;
- }
-
- TBufferFile *mess = new TBufferFile(TBuffer::kRead,
- message->size()+sizeof(UInt_t),
- message->data());
- mess->InitMap();
- mess->ReadClass();// get first the class stored in message
- mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
- mess->ResetMap();
-
- AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
-
- if (data)
- {
- data->GetStdContent();
- if(message){delete message;}
- return data;
- }
- else
- {
- if(message){delete message;}
- return NULL;
- }
+ pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
+
+ if(timeout>=0)
+ {
+ if(poll (&items[0], 1, timeout)==0)
+ {
+ return NULL;
+ }
+ }
+
+ message_t* message = new message_t();
+
+ try
+ {
+ fSockets[socket]->recv(message);
+ }
+ catch (const zmq::error_t& e)
+ {
+ cout<<"MANAGER -- "<<e.what()<<endl;
+ return NULL;
+ }
+
+ TBufferFile *mess = new TBufferFile(TBuffer::kRead,
+ message->size()+sizeof(UInt_t),
+ message->data());
+ mess->InitMap();
+ mess->ReadClass();// get first the class stored in message
+ mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
+ mess->ResetMap();
+
+ AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
+
+ if (data)
+ {
+ data->GetStdContent();
+ if(message){delete message;}
+ return data;
+ }
+ else
+ {
+ if(message){delete message;}
+ return NULL;
+ }
}
struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
{
- struct serverRequestStruct *request = new struct serverRequestStruct;
- message_t *requestMessage = new message_t();
- fSockets[socket]->recv(requestMessage);
- request = static_cast<struct serverRequestStruct*>(requestMessage->data());
- return request;
+ struct serverRequestStruct *request = new struct serverRequestStruct;
+ message_t *requestMessage = new message_t();
+ try{
+ fSockets[socket]->recv(requestMessage);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- get serverRequestStruct -- "<<e.what()<<endl;
+ }
+ request = static_cast<struct serverRequestStruct*>(requestMessage->data());
+ return request;
}
struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket)
{
- struct clientRequestStruct *request = new struct clientRequestStruct;
- message_t *requestMessage = new message_t();
- fSockets[socket]->recv(requestMessage);
- request = static_cast<struct clientRequestStruct*>(requestMessage->data());
- return request;
+ struct clientRequestStruct *request = new struct clientRequestStruct;
+ message_t *requestMessage = new message_t();
+ try{
+ fSockets[socket]->recv(requestMessage);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- get clientRequestStruct -- "<<e.what()<<endl;
+ }
+ request = static_cast<struct clientRequestStruct*>(requestMessage->data());
+ return request;
}
bool AliStorageEventManager::GetBool(storageSockets socket)
{
- message_t *response = new message_t();
- fSockets[socket]->recv(response);
- char *result = (char*)response->data();
-
- if(!strcmp("true",result)){return true;}
- else{return false;}
+ message_t *response = new message_t();
+ try{
+ fSockets[socket]->recv(response);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- get bool -- "<<e.what()<<endl;
+ }
+ char *result = (char*)response->data();
+
+ if(!strcmp("true",result)){return true;}
+ else{return false;}
}
long AliStorageEventManager::GetLong(storageSockets socket)
{
- message_t *responseMessage = new message_t();
- fSockets[socket]->recv(responseMessage);
+ message_t *responseMessage = new message_t();
+ try{
+ fSockets[socket]->recv(responseMessage);
+ }
+ catch(const zmq::error_t &e)
+ {
+ cout<<"MANAGER -- get long -- "<<e.what()<<endl;
+ }
long result = 0;
result = (long)atoi(static_cast<char*>(responseMessage->data()));
delete responseMessage;
}
- return result;
+ return result;
}