#include "AliStorageEventManager.h" #include #include #include #include #include #include #include "zmq.hpp" #include "AliESDEvent.h" #include "AliESDtrack.h" #include "AliTrackPointArray.h" #include "AliESDfriendTrack.h" #include "AliExternalTrackParam.h" #include "AliTrackerBase.h" #include "AliTracker.h" using namespace zmq; using namespace std; AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0; AliStorageEventManager::AliStorageEventManager() { //read config file TThread::Lock(); ifstream configFile (GetConfigFilePath()); if (configFile.is_open()) { string line; int from,to; while(configFile.good()) { getline(configFile,line); from = line.find("\"")+1; to = line.find_last_of("\""); if(line.find("STORAGE_SERVER=")==0) { fStorageServer=line.substr(from,to-from); } else if(line.find("EVENT_SERVER=")==0) { fEventServer=line.substr(from,to-from); } else if(line.find("STORAGE_SERVER_PORT=")==0) { fStorageServerPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("EVENT_SERVER_PORT=")==0) { fEventServerPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("STORAGE_CLIENT_PORT=")==0) { fStorageClientPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("XML_SERVER_PORT=")==0) { fXmlServerPort=atoi(line.substr(from,to-from).c_str()); } } if(configFile.eof()){configFile.clear();} configFile.close(); } else{cout<<"EVENT MANAGER -- Unable to open config file"<connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fStorageServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fStorageClientPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fEventServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<setsockopt(ZMQ_SUBSCRIBE,"",0); try { fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fXmlServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "< list,storageSockets socket) { //send size of the struct first int numberOfRecords = list.size(); message_t message(20); snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords); try{ fSockets[socket]->send(message); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send vector -- "<recv(tmpMessage);//empty message just to keep req-rep order } catch(const zmq::error_t &e) { cout<<"MANAGER -- send vector -- "< (&list[0]); // message_t *reply = new message_t((void*)buffer, // sizeof(serverListStruct)*numberOfRecords,0); // fSockets[socket]->send(*reply); // if(reply){delete reply;} message_t reply(sizeof(serverListStruct)*numberOfRecords); memcpy(reply.data(), reinterpret_cast (&list[0]), sizeof(serverListStruct)*numberOfRecords); try{ fSockets[socket]->send(reply); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send vector -- "<send(*requestMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send serverRequestStruct -- "<send(*requestMessage); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<=0) { if(poll (&items[0], 1, timeout)==0) { delete requestMessage; return 0; } } delete requestMessage; return 1; } void AliStorageEventManager::Send(long message,storageSockets socket) { stringstream streamBuffer; streamBuffer << message; string stringBuffer = streamBuffer.str(); char *buffer = (char*)stringBuffer.c_str(); message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff); try{ fSockets[socket]->send(*replyMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send long -- "<send(*replyMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send bool -- "<send(message); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send AliESDEvent -- "<"<" << endl; for(int i=0;iGetNumberOfTracks();i++) { AliESDtrack *track = event->GetTrack(i); bufferStream << "\tGetMass()<<"\">" <GetTrackPointArray(); if(array) { const float *x = array->GetX(); const float *y = array->GetY(); const float *z = array->GetZ(); int n = array->GetNPoints(); for(int j=0;j"<"<< x[j] <<""<"<< y[j] <<""<"<< z[j] <<""<"<"<"<send(message); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send send xml -- "< AliStorageEventManager::GetServerListVector(storageSockets socket) { //get size of the incomming message message_t sizeMessage; try{ fSockets[socket]->recv(&sizeMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get vector -- "<(sizeMessage.data())); iss >> numberOfRecords; if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<send(*(new message_t()));//receive empty message just to keep req-rep order } catch(const zmq::error_t &e) { cout<<"MANAGER -- get vector -- "<recv(response); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get vector -- "< receivedList(static_cast(response->data()), static_cast(response->data()) + numberOfRecords); if (response) {delete response;} return receivedList; } AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout) { pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ; if(timeout>=0) { if(poll (&items[0], 1, timeout)==0) { return NULL; } } message_t* message = new message_t(); try { fSockets[socket]->recv(message); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<size()+sizeof(UInt_t), message->data()); mess->InitMap(); mess->ReadClass();// get first the class stored in message mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT)); mess->ResetMap(); AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class())); if (data) { data->GetStdContent(); if(message){delete message;} return data; } else { if(message){delete message;} return NULL; } } struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket) { struct serverRequestStruct *request = new struct serverRequestStruct; message_t *requestMessage = new message_t(); try{ fSockets[socket]->recv(requestMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get serverRequestStruct -- "<(requestMessage->data()); return request; } struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket,int timeout) { pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ; if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}} struct clientRequestStruct *request = new struct clientRequestStruct; message_t *requestMessage = new message_t(); try{ fSockets[socket]->recv(requestMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get clientRequestStruct -- "<(requestMessage->data()); return request; } bool AliStorageEventManager::GetBool(storageSockets socket) { message_t *response = new message_t(); try{ fSockets[socket]->recv(response); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get bool -- "<data(); if(!strcmp("true",result)){return true;} else{return false;} } long AliStorageEventManager::GetLong(storageSockets socket) { message_t *responseMessage = new message_t(); try{ fSockets[socket]->recv(responseMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get long -- "<(responseMessage->data())); delete responseMessage; } return result; }