#include "AliStorageEventManager.h" #include #include #include #include #include #include #include "zmq.hpp" #include "AliESDEvent.h" #include "AliESDtrack.h" #include "AliTrackPointArray.h" #include "AliESDfriendTrack.h" #include "AliExternalTrackParam.h" #include "AliTrackerBase.h" #include "AliTracker.h" using namespace zmq; using namespace std; AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0; AliStorageEventManager::AliStorageEventManager() { //read config file TThread::Lock(); ifstream configFile (GetConfigFilePath()); if (configFile.is_open()) { string line; int from,to; while(configFile.good()) { getline(configFile,line); from = line.find("\"")+1; to = line.find_last_of("\""); if(line.find("STORAGE_SERVER=")==0) { fStorageServer=line.substr(from,to-from); } else if(line.find("EVENT_SERVER=")==0) { fEventServer=line.substr(from,to-from); } else if(line.find("STORAGE_SERVER_PORT=")==0) { fStorageServerPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("EVENT_SERVER_PORT=")==0) { fEventServerPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("STORAGE_CLIENT_PORT=")==0) { fStorageClientPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("XML_SERVER_PORT=")==0) { fXmlServerPort=atoi(line.substr(from,to-from).c_str()); } } if(configFile.eof()) { configFile.clear(); } configFile.close(); } else { cout<<"EVENT MANAGER -- Unable to open config file"<connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fStorageServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fStorageClientPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fEventServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<setsockopt(ZMQ_SUBSCRIBE,"",0); try { fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fXmlServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "< list,storageSockets socket) { //send size of the struct first int numberOfRecords = list.size(); message_t message(20); snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords); fSockets[socket]->send(message); if(numberOfRecords==0)return; fSockets[socket]->recv((new message_t));//empty message just to keep req-rep order //prepare message with event's list char *buffer = reinterpret_cast (&list[0]); message_t *reply = new message_t((void*)buffer, sizeof(serverListStruct)*numberOfRecords,0); fSockets[socket]->send(*reply); if(reply){delete reply;} } void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket) { char *buffer = (char*)(request); message_t *requestMessage = new message_t((void*)buffer, sizeof(struct serverRequestStruct) +sizeof(struct listRequestStruct) +sizeof(struct eventStruct),0); fSockets[socket]->send(*requestMessage); } bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout) { pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ; char *buffer = (char*)(request); message_t *requestMessage = new message_t((void*)buffer, sizeof(struct clientRequestStruct),0); try { fSockets[socket]->send(*requestMessage); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<=0) { if(poll (&items[0], 1, timeout)==0) { return 0; } } return 1; } void AliStorageEventManager::Send(long message,storageSockets socket) { stringstream streamBuffer; streamBuffer << message; string stringBuffer = streamBuffer.str(); char *buffer = (char*)stringBuffer.c_str(); message_t *replyMessage = new message_t((void*)buffer,sizeof(long),0); fSockets[socket]->send(*replyMessage); delete replyMessage; streamBuffer.str(string()); streamBuffer.clear(); } void AliStorageEventManager::Send(bool message,storageSockets socket) { char *buffer; if(message==true) { buffer = (char*)("true"); } else { buffer = (char*)("false"); } message_t *replyMessage = new message_t((void*)buffer,sizeof(char*),0); fSockets[socket]->send(*replyMessage); delete replyMessage; } void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket) { TMessage tmess(kMESS_OBJECT); tmess.Reset(); tmess.WriteObject(event); TMessage::EnableSchemaEvolutionForAll(kTRUE); int bufsize = tmess.Length(); char* buf = (char*) malloc(bufsize * sizeof(char)); memcpy(buf, tmess.Buffer(), bufsize); zmq::message_t message((void*)buf, bufsize, 0, 0); fSockets[socket]->send(message); } void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket) { cout<<"SENDING AS XML"<"<" << endl; for(int i=0;iGetNumberOfTracks();i++) { AliESDtrack *track = event->GetTrack(i); //double hgfhgf[size]; //track->GetESDpid(pid); bufferStream << "\tGetMass()<<"\">" <GetTrackPointArray(); if(array) { const float *x = array->GetX(); const float *y = array->GetY(); const float *z = array->GetZ(); int n = array->GetNPoints(); for(int j=0;j"<"<< x[j] <<""<"<< y[j] <<""<"<< z[j] <<""<"<"<"<send(message); cout<<"xml sent"< AliStorageEventManager::GetServerListVector(storageSockets socket) { //get size of the incomming message message_t sizeMessage; fSockets[socket]->recv(&sizeMessage); int numberOfRecords; istringstream iss(static_cast(sizeMessage.data())); iss >> numberOfRecords; if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<send(*(new message_t()));//receive empty message just to keep req-rep order //get list of events message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords); fSockets[socket]->recv(response); vector receivedList(static_cast(response->data()), static_cast(response->data()) + numberOfRecords); return receivedList; } AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout,TTree **tmpTree) { message_t* message = new message_t(); try { fSockets[socket]->recv(message); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<size()+sizeof(UInt_t), message->data()); mess->InitMap(); mess->ReadClass();// get first the class stored in message mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT)); mess->ResetMap(); AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class())); if (data) { TTree* tree= new TTree("esdTree", "esdTree"); data->WriteToTree(tree); tree->Fill(); AliESDEvent* event= new AliESDEvent(); event->ReadFromTree(tree); tree->GetEntry(0); if(data){delete data;} //if(tree){delete tree;} if(message){delete message;} if(tmpTree){*tmpTree = tree;} return event; } else { if(message){delete message;} return NULL; } } struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket) { struct serverRequestStruct *request = new struct serverRequestStruct; message_t *requestMessage = new message_t(); fSockets[socket]->recv(requestMessage); request = static_cast(requestMessage->data()); return request; } struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket) { struct clientRequestStruct *request = new struct clientRequestStruct; message_t *requestMessage = new message_t(); fSockets[socket]->recv(requestMessage); request = static_cast(requestMessage->data()); return request; } bool AliStorageEventManager::GetBool(storageSockets socket) { message_t *response = new message_t(); fSockets[socket]->recv(response); char *result = (char*)response->data(); if(!strcmp("true",result)){return true;} else{return false;} } long AliStorageEventManager::GetLong(storageSockets socket) { message_t *responseMessage = new message_t; fSockets[socket]->recv(responseMessage); return (long)atoi(static_cast(responseMessage->data())); }