if(tmpMessage){delete tmpMessage;}
}
-void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
+bool AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket,int timeout)
{
+
+ if(timeout>=0)
+ {
+ pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}};
+ if(poll (&items[0], 1, timeout)==0){
+ cout<<"Event Manager -- couldn't send serverRequestStruct -- timeout"<<endl;
+ return 0;
+ }
+ }
+
char *buffer = (char*)(request);
message_t *requestMessage = new message_t((void*)buffer,
sizeof(struct serverRequestStruct)
catch(const zmq::error_t &e)
{
cout<<"MANAGER -- send serverRequestStruct -- "<<e.what()<<endl;
+ return 0;
}
+ return 1;
}
bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
{
- pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
-
+ if(timeout>=0){
+ pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}};
+ if(poll (&items[0], 1, timeout)==0){return 0;}
+ }
char *buffer = (char*)(request);
- message_t *requestMessage = new message_t((void*)buffer,
- sizeof(struct clientRequestStruct),freeBuff);
+ message_t *requestMessage = new message_t((void*)buffer,sizeof(struct clientRequestStruct),freeBuff);
try{
fSockets[socket]->send(*requestMessage);
return 0;
}
- if(timeout>=0)
- {
- if(poll (&items[0], 1, timeout)==0)
- {
- delete requestMessage;
- return 0;
- }
- }
+
delete requestMessage;
return 1;
}
TMessage tmess(kMESS_OBJECT);
tmess.Reset();
tmess.WriteObject(event);
- // TMessage::EnableSchemaEvolutionForAll(kTRUE);
+ //TMessage::EnableSchemaEvolutionForAll(kTRUE);
int bufsize = tmess.Length();
char* buf = (char*) malloc(bufsize * sizeof(char));
try{(poll (&items[0], 1, timeout)==0);}
catch(const zmq::error_t &e){
cout<<"EVENT MANAGER -- GetEvent():"<<e.what()<<endl;
- return NULL;
+ return NULL;
}
}
message_t* message = new message_t();