#include "AliStorageEventManager.h" #include #include #include #include #include #include #include "zmq.hpp" #include "AliESDEvent.h" #include "AliESDtrack.h" #include "AliTrackPointArray.h" #include "AliESDfriendTrack.h" #include "AliExternalTrackParam.h" #include "AliTrackerBase.h" #include "AliTracker.h" using namespace zmq; using namespace std; AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0; AliStorageEventManager::AliStorageEventManager() { //read config file TThread::Lock(); ifstream configFile (GetConfigFilePath()); if (configFile.is_open()) { string line; int from,to; while(configFile.good()) { getline(configFile,line); from = line.find("\"")+1; to = line.find_last_of("\""); if(line.find("STORAGE_SERVER=")==0) { fStorageServer=line.substr(from,to-from); } else if(line.find("EVENT_SERVER=")==0) { fEventServer=line.substr(from,to-from); } else if(line.find("STORAGE_SERVER_PORT=")==0) { fStorageServerPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("EVENT_SERVER_PORT=")==0) { fEventServerPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("STORAGE_CLIENT_PORT=")==0) { fStorageClientPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("XML_SERVER_PORT=")==0) { fXmlServerPort=atoi(line.substr(from,to-from).c_str()); } else if(line.find("ITS_POINTS_SERVER_PORT=")==0) { fItsPointsServerPort=atoi(line.substr(from,to-from).c_str()); cout<<"ITS port is:"<connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fStorageServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fStorageClientPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fEventServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<setsockopt(ZMQ_SUBSCRIBE,"",0); try { fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fXmlServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<bind(Form("tcp://*:%d",fItsPointsServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<setsockopt(ZMQ_SUBSCRIBE,"",0); try { fSockets[ITS_POINTS_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fItsPointsServerPort)); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "< list,storageSockets socket) { //send size of the struct first int numberOfRecords = list.size(); message_t message(20); snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords); try{ fSockets[socket]->send(message); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send vector -- "<recv(tmpMessage);//empty message just to keep req-rep order } catch(const zmq::error_t &e) { cout<<"MANAGER -- send vector -- "< (&list[0]); // message_t *reply = new message_t((void*)buffer, // sizeof(serverListStruct)*numberOfRecords,0); // fSockets[socket]->send(*reply); // if(reply){delete reply;} message_t reply(sizeof(serverListStruct)*numberOfRecords); memcpy(reply.data(), reinterpret_cast (&list[0]), sizeof(serverListStruct)*numberOfRecords); try{ fSockets[socket]->send(reply); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send vector -- "<send(*requestMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send serverRequestStruct -- "<send(*requestMessage); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<=0) { if(poll (&items[0], 1, timeout)==0) { delete requestMessage; return 0; } } delete requestMessage; return 1; } void AliStorageEventManager::Send(long message,storageSockets socket) { stringstream streamBuffer; streamBuffer << message; string stringBuffer = streamBuffer.str(); char *buffer = (char*)stringBuffer.c_str(); message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff); try{ fSockets[socket]->send(*replyMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send long -- "<send(*replyMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send bool -- "<send(message); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send AliESDEvent -- "<ls(); cout<<"1"<GetListOfKeys()->Last()->GetTitle()); string *name = new string(file->GetListOfKeys()->Last()->GetTitle()); cout<<"treeTitle:"<data()<Get(name->data()))->Get("TreeR"); // TDirectoryFile *df = (TDirectoryFile*)file->Get(dfTitle->data()); //if(df) //{ // cout<<"directory file extracted"<Get("TreeR"); cout<<"2"<Branch("event",&name); cout<<"branch created:"<Print(); tree->Fill(); cout<<"ttree filled with name"<Print(); } else { tree = NULL; cout<<"file is empty"<send(message); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send TFile -- "<files[i],socket);} } void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket) { cout<<"SENDING AS XML"<"<" << endl; for(int i=0;iGetNumberOfTracks();i++) { AliESDtrack *track = event->GetTrack(i); AliKalmanTrack *ITStrack = track->GetITStrack(); const AliTrackPointArray *array = track->GetTrackPointArray(); bufferStream << "\tGetMass()<<"\""; // bufferStream << "\tGetESDpid(); bufferStream << "\t pid=\""<PID()<<"\""; bufferStream << "\t energy=\""<E()<<"\""; bufferStream << "\t volumeID=\""<GetVolumeID()<<"\">" <GetX(); const float *y = array->GetY(); const float *z = array->GetZ(); int n = array->GetNPoints(); for(int j=0;jGetVolumeID()[j]<<"\">"<"<< x[j] <<""<"<< y[j] <<""<"<< z[j] <<""<"<"<"<send(message); } catch(const zmq::error_t &e) { cout<<"MANAGER -- send send xml -- "< AliStorageEventManager::GetServerListVector(storageSockets socket, int timeout) { pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ; if(timeout>=0){if(poll (&items[0], 1, timeout)==0){vector emptyVector;return emptyVector;}} //get size of the incomming message message_t sizeMessage; try{ fSockets[socket]->recv(&sizeMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get vector -- "<(sizeMessage.data())); iss >> numberOfRecords; if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<send(*(new message_t()));//receive empty message just to keep req-rep order } catch(const zmq::error_t &e) { cout<<"MANAGER -- get vector -- "<recv(response); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get vector -- "< receivedList(static_cast(response->data()), static_cast(response->data()) + numberOfRecords); if (response) {delete response;} return receivedList; } AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout) { pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ; if(timeout>=0){ try{(poll (&items[0], 1, timeout)==0);} catch(const zmq::error_t &e){ cout<<"EVENT MANAGER -- GetEvent():"<recv(message); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<size()+sizeof(UInt_t), message->data()); mess->InitMap(); mess->ReadClass();// get first the class stored in message mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT)); mess->ResetMap(); AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class())); if (data) { data->GetStdContent(); if(message){delete message;} return data; } else { if(message){delete message;} return NULL; } } TFile* AliStorageEventManager::GetFile(storageSockets socket,int timeout) { cout<<"get file"<=0){ try{(poll (&items[0], 1, timeout)==0);} catch(const zmq::error_t &e){ cout<<"EVENT MANAGER -- GetFile():"<recv(message); } catch (const zmq::error_t& e) { cout<<"MANAGER -- "<size()+sizeof(UInt_t), message->data()); //TMessage *mess = new TMessage(); //mess->SetReadMode(); mess->InitMap(); mess->ReadClass(); mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT)); mess->ResetMap(); //mess->ReadBuf(message->data(),message->size()+sizeof(UInt_t)); cout<<"reading file from buffer"<ReadObjectAny(TTree::Class())); //TFile* data = (TFile*)mess->ReadObject(TFile::Class()); if(tree) { cout<<"received a tree:"<Print(); std::string *dfTitle = new std::string(); tree->SetBranchAddress("event",&dfTitle); tree->GetEntry(0); cout<<"setting df's name to:"<data()<data(),dfTitle->data()); df->Add(tree); cout<<"added tree to directory file"<Write(); cout<<"created file:"<ls(); if(message){delete message;} if(df){delete df;} if(tree){delete tree;} return file; } else { cout<<"no tree found"<files[i] = GetFile(socket,timeout); } return files; } struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket) { struct serverRequestStruct *request = new struct serverRequestStruct; message_t *requestMessage = new message_t(); try{ fSockets[socket]->recv(requestMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get serverRequestStruct -- "<messageType = -1; return request; } request = static_cast(requestMessage->data()); return request; } struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket,int timeout) { pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ; if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}} struct clientRequestStruct *request = new struct clientRequestStruct; message_t *requestMessage = new message_t(); try{ fSockets[socket]->recv(requestMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get clientRequestStruct -- "<(requestMessage->data()); return request; } bool AliStorageEventManager::GetBool(storageSockets socket) { message_t *response = new message_t(); try{ fSockets[socket]->recv(response); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get bool -- "<data(); if(!strcmp("true",result)){return true;} else{return false;} } long AliStorageEventManager::GetLong(storageSockets socket) { message_t *responseMessage = new message_t(); try{ fSockets[socket]->recv(responseMessage); } catch(const zmq::error_t &e) { cout<<"MANAGER -- get long -- "<(responseMessage->data())); delete responseMessage; } return result; }