#ifdef ZMQ
// make a zeromq socket
- fgSubContext = new zmq::context_t;
+ fgSubContext = new zmq::context_t(1);
fgSubSock = new AliSocket(&*fgSubContext, ZMQ_SUB);
fgSubSock->Subscribe("");
fgSubSock->Connect(Form("%s:%d", host, port) );
ClassImp(AliRecoServerThread)
AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
- : AliThreadedSocket(context, AliThreadedSocket::WRITE),
+ : AliThreadedSocket(context, AliThreadedSocket::kWRITE),
fReco(0),
fCond(0)
{
return AliThreadedSocket::Start();
}
-void* AliRecoServerThread::RunThrdWrite(void* arg)
+void AliRecoServerThread::RunThrdWrite()
{
TThread::SetCancelAsynchronous();
TThread::SetCancelOn();
- AliRecoServerThread* recoTh = (AliRecoServerThread*)arg;
-
- const char* host = recoTh->GetHost();
- zmq::context_t* context = recoTh->GetContext();
- AliReconstruction* reco = recoTh->GetReconstruction();
-
// generate a publish socket
- AliSocket publisher(context, ZMQ_PUB);
- publisher.Bind(host);
+ AliSocket publisher(fContext, ZMQ_PUB);
+ publisher.Bind(fHost);
- if(reco==0) return 0;
+ if(fReco==0) return;
AliESDEvent* event;
- reco->Begin(NULL);
- if (reco->GetAbort() != TSelector::kContinue) return 0;
+ fReco->Begin(NULL);
+ if (fReco->GetAbort() != TSelector::kContinue) return;
- reco->SlaveBegin(NULL);
- if (reco->GetAbort() != TSelector::kContinue) return 0;
+ fReco->SlaveBegin(NULL);
+ if (fReco->GetAbort() != TSelector::kContinue) return;
//******* The loop over events
Int_t iEvent = 0;
- while ( reco->HasNextEventAfter(iEvent) ) {
+ while ( fReco->HasNextEventAfter(iEvent) ) {
// check if process has enough resources
- if (!reco->HasEnoughResources(iEvent)) break;
- Bool_t status = reco->ProcessEvent(iEvent);
+ if (!fReco->HasEnoughResources(iEvent)) break;
+ Bool_t status = fReco->ProcessEvent(iEvent);
if (status)
{
- event = reco->GetESDEvent();
+ event = fReco->GetESDEvent();
AliNetMessage tmess(kMESS_OBJECT);
tmess.Reset();
sleep(1);
}
else {
- reco->Abort("ProcessEvent",TSelector::kAbortFile);
+ fReco->Abort("ProcessEvent",TSelector::kAbortFile);
}
- reco->CleanProcessedEvent();
- if(recoTh->Condition()->TimedWaitRelative(500)==0){
+ fReco->CleanProcessedEvent();
+ if(fCond->TimedWaitRelative(500)==0){
break;
}
iEvent++;
}
- reco->SlaveTerminate();
- if (reco->GetAbort() != TSelector::kContinue) return 0;
- reco->Terminate();
- if (reco->GetAbort() != TSelector::kContinue) return 0;
+ fReco->SlaveTerminate();
+ if (fReco->GetAbort() != TSelector::kContinue) return;
+ fReco->Terminate();
+ if (fReco->GetAbort() != TSelector::kContinue) return;
}
TCondition* Condition() { return fCond; }
private:
- static void* RunThrdWrite(void* arg);
+ void RunThrdWrite();
AliReconstruction* fReco;
AliThreadedSocket::~AliThreadedSocket()
{
+ Wait();
Stop();
}
Bool_t AliThreadedSocket::Start()
{
if(!fThread){
- if(fOpenMode==READ)
- fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &RunThrdRead, (void*) this );
- else
- fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &RunThrdWrite,(void*) this );
-
+ fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &Dispatch, (void*) this );
+
if(fThread->Run()==0){
Emit("Started()");
return kTRUE;
Bool_t AliThreadedSocket::Stop()
{
+ if(fThread){
+ fThread->Delete();
+ fThread=0;
+ }
+
Emit("Stopped()");
return kTRUE;
}
}
+void AliThreadedSocket::Wait()
+{
+ if(fThread && fThread->GetState()==TThread::kRunningState)
+ {
+ fThread->Join();
+ }
+}
+
zmq::context_t* AliThreadedSocket::GetContext() const
{
return fContext;
Emit("Stopped()");
}
-void* AliThreadedSocket::RunThrdRead(void* arg)
+void AliThreadedSocket::RunThrdRead()
{
AliNetMessage* mess=0;
- AliThreadedSocket* thsock = (AliThreadedSocket*)arg;
- zmq::context_t* context = thsock->GetContext();
-
- AliSocket sock(context, ZMQ_SUB);
+ AliSocket sock(fContext, ZMQ_SUB);
do{
sock.Recv(mess);
}
while(mess==0);
- thsock->Stopped();
+ Stopped();
}
-void* AliThreadedSocket::RunThrdWrite(void* arg)
+void AliThreadedSocket::RunThrdWrite()
{
AliNetMessage* mess=0;
- AliThreadedSocket* thsock = (AliThreadedSocket*)arg;
- zmq::context_t* context = thsock->GetContext();
-
- AliSocket sock(context, ZMQ_PUB);
+ AliSocket sock(fContext, ZMQ_PUB);
do{
sock.Send(*mess);
}
while(1);
- thsock->Emit("Stopped()");
+ Stopped();
}
class AliThreadedSocket : public TQObject
{
public:
- enum EOpenMode{READ, WRITE};
+ enum EOpenMode{kREAD, kWRITE};
AliThreadedSocket(zmq::context_t *context, EOpenMode mode);
virtual ~AliThreadedSocket();
Bool_t Start();
Bool_t Stop();
Bool_t Kill();
+ void Wait();
zmq::context_t* GetContext() const;
TThread* GetThread() const;
+ EOpenMode GetMode() const { return fOpenMode; }
void Started(); // *SIGNAL*
void Stopped(); // *SIGNAL*
AliThreadedSocket(const AliThreadedSocket&); // Not implemented
AliThreadedSocket& operator=(const AliThreadedSocket&); // Not implemented
- // reimplement these in a derived class
- static void* RunThrdRead(void* arg);
- static void* RunThrdWrite(void* arg);
+ // Reimplement these in a derived Class
+ virtual void RunThrdRead(); // function to run in a thread when in Read mode
+ virtual void RunThrdWrite(); // function to run in a thread when in Write mode
- zmq::context_t* fContext;
TThread* fThread;
+ zmq::context_t* fContext;
EOpenMode fOpenMode;
+private:
+ static void* Dispatch(void* arg)
+ {
+ AliThreadedSocket* th = static_cast<AliThreadedSocket*>(arg);
+
+ if(th->GetMode()==kREAD)
+ th->RunThrdRead();
+ else
+ th->RunThrdWrite();
+
+ return NULL;
+ }
ClassDef(AliThreadedSocket, 0);
-
};
#endif