Modified EveBase to compile with zmq<3.0. Corrected ThreadedSocket implementation.
authorquark <quark@cern.ch>
Thu, 19 Jun 2014 11:01:44 +0000 (13:01 +0200)
committerquark <quark@cern.ch>
Thu, 19 Jun 2014 11:01:44 +0000 (13:01 +0200)
EVE/EveBase/AliEveEventManager.cxx
MONITOR/AliRecoServerThread.cxx
MONITOR/AliRecoServerThread.h
MONITOR/AliThreadedSocket.cxx
MONITOR/AliThreadedSocket.h

index 5e8b000..2409073 100644 (file)
@@ -128,7 +128,7 @@ bool AliEveEventManager::ConnectToServer(const char* host, int port)
 
 #ifdef ZMQ
        // make a zeromq socket
-    fgSubContext = new zmq::context_t;
+         fgSubContext = new zmq::context_t(1);
     fgSubSock = new AliSocket(&*fgSubContext, ZMQ_SUB);
     fgSubSock->Subscribe("");
     fgSubSock->Connect(Form("%s:%d", host, port) );
index 97ec5f8..4dbbe3b 100644 (file)
@@ -31,7 +31,7 @@
 
 ClassImp(AliRecoServerThread)
 AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
-  : AliThreadedSocket(context, AliThreadedSocket::WRITE),
+  : AliThreadedSocket(context, AliThreadedSocket::kWRITE),
        fReco(0),
     fCond(0)
 {
@@ -50,41 +50,35 @@ Bool_t AliRecoServerThread::Start(const char* endpoint)
        return AliThreadedSocket::Start();
 }
 
-void* AliRecoServerThread::RunThrdWrite(void* arg)
+void AliRecoServerThread::RunThrdWrite()
 {
        TThread::SetCancelAsynchronous();
        TThread::SetCancelOn();
        
-       AliRecoServerThread* recoTh = (AliRecoServerThread*)arg;
-       
-       const char* host = recoTh->GetHost();
-       zmq::context_t* context = recoTh->GetContext();
-       AliReconstruction* reco = recoTh->GetReconstruction();
-
  // generate a publish socket
-       AliSocket publisher(context, ZMQ_PUB);
-       publisher.Bind(host);
+       AliSocket publisher(fContext, ZMQ_PUB);
+       publisher.Bind(fHost);
        
-  if(reco==0) return 0;
+  if(fReco==0) return;
   
   AliESDEvent* event;
   
-       reco->Begin(NULL);
-  if (reco->GetAbort() != TSelector::kContinue) return 0;
+       fReco->Begin(NULL);
+  if (fReco->GetAbort() != TSelector::kContinue) return;
   
-  reco->SlaveBegin(NULL);
-       if (reco->GetAbort() != TSelector::kContinue) return 0;
+  fReco->SlaveBegin(NULL);
+       if (fReco->GetAbort() != TSelector::kContinue) return;
   
   //******* The loop over events
     Int_t iEvent = 0;
-    while ( reco->HasNextEventAfter(iEvent) ) {
+    while ( fReco->HasNextEventAfter(iEvent) ) {
       // check if process has enough resources 
-      if (!reco->HasEnoughResources(iEvent)) break;
-      Bool_t status = reco->ProcessEvent(iEvent);
+      if (!fReco->HasEnoughResources(iEvent)) break;
+      Bool_t status = fReco->ProcessEvent(iEvent);
       
       if (status)
       {
-                   event = reco->GetESDEvent();
+                   event = fReco->GetESDEvent();
                    
        AliNetMessage tmess(kMESS_OBJECT);
                        tmess.Reset();
@@ -95,18 +89,18 @@ void* AliRecoServerThread::RunThrdWrite(void* arg)
                        sleep(1);
      }
       else {
-        reco->Abort("ProcessEvent",TSelector::kAbortFile);
+        fReco->Abort("ProcessEvent",TSelector::kAbortFile);
       }
                
-      reco->CleanProcessedEvent();
-      if(recoTh->Condition()->TimedWaitRelative(500)==0){
+      fReco->CleanProcessedEvent();
+      if(fCond->TimedWaitRelative(500)==0){
                                break;
                        }                       
       iEvent++;
     }
-    reco->SlaveTerminate();
-    if (reco->GetAbort() != TSelector::kContinue) return 0;
-    reco->Terminate();
-    if (reco->GetAbort() != TSelector::kContinue) return 0;
+    fReco->SlaveTerminate();
+    if (fReco->GetAbort() != TSelector::kContinue) return;
+    fReco->Terminate();
+    if (fReco->GetAbort() != TSelector::kContinue) return;
   
 }
index 814860f..74bcc36 100644 (file)
@@ -36,7 +36,7 @@ public:
        TCondition*                                     Condition() { return fCond; }
        
 private:
-       static void* RunThrdWrite(void* arg);
+       void RunThrdWrite();
        
        AliReconstruction* fReco;
 
index 916bf09..56566b0 100644 (file)
@@ -18,17 +18,15 @@ AliThreadedSocket::AliThreadedSocket(zmq::context_t *context, EOpenMode mode)
 
 AliThreadedSocket::~AliThreadedSocket()
 {
+       Wait();
        Stop();
 }
 
 Bool_t AliThreadedSocket::Start()
 {
        if(!fThread){
-               if(fOpenMode==READ)
-               fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &RunThrdRead, (void*)  this );
-       else
-               fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &RunThrdWrite,(void*)  this );
-               
+               fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &Dispatch, (void*)  this );
+  
        if(fThread->Run()==0){ 
                Emit("Started()");
                return kTRUE; 
@@ -40,6 +38,11 @@ Bool_t AliThreadedSocket::Start()
 
 Bool_t AliThreadedSocket::Stop()
 {
+       if(fThread){
+               fThread->Delete();
+               fThread=0;
+       }
+
        Emit("Stopped()");
        return kTRUE;
 }
@@ -61,6 +64,14 @@ void AliThreadedSocket::Continue()
        
 }
 
+void AliThreadedSocket::Wait()
+{
+       if(fThread && fThread->GetState()==TThread::kRunningState)
+       {
+               fThread->Join();
+       }
+}
+
 zmq::context_t* AliThreadedSocket::GetContext() const
 {
        return fContext;
@@ -81,34 +92,28 @@ void AliThreadedSocket::Stopped()
        Emit("Stopped()");
 }
 
-void* AliThreadedSocket::RunThrdRead(void* arg)
+void AliThreadedSocket::RunThrdRead()
 {
        AliNetMessage* mess=0;
-       AliThreadedSocket* thsock = (AliThreadedSocket*)arg;
-       zmq::context_t* context = thsock->GetContext();
-       
-       AliSocket sock(context, ZMQ_SUB);
+       AliSocket sock(fContext, ZMQ_SUB);
                
        do{
                sock.Recv(mess);
        }
        while(mess==0);
        
-       thsock->Stopped();
+       Stopped();
 }
 
-void* AliThreadedSocket::RunThrdWrite(void* arg)
+void AliThreadedSocket::RunThrdWrite()
 {
        AliNetMessage* mess=0;
-       AliThreadedSocket* thsock = (AliThreadedSocket*)arg;
-       zmq::context_t* context = thsock->GetContext();
-       
-       AliSocket sock(context, ZMQ_PUB);
+       AliSocket sock(fContext, ZMQ_PUB);
        
        do{
                sock.Send(*mess);
        }
        while(1);
        
-       thsock->Emit("Stopped()");
+       Stopped();
 }
index 677eb58..c360f39 100644 (file)
@@ -21,7 +21,7 @@ namespace zmq {
 class AliThreadedSocket : public TQObject
 {
 public:
-       enum EOpenMode{READ, WRITE};
+       enum EOpenMode{kREAD, kWRITE};
 
        AliThreadedSocket(zmq::context_t *context, EOpenMode mode);
        virtual ~AliThreadedSocket();
@@ -29,9 +29,11 @@ public:
        Bool_t Start();
        Bool_t Stop();
        Bool_t Kill();
+       void Wait();
        
        zmq::context_t* GetContext() const;
        TThread* GetThread() const;
+       EOpenMode GetMode() const { return fOpenMode; }
                
        void Started(); // *SIGNAL*
        void Stopped(); // *SIGNAL*
@@ -42,16 +44,27 @@ protected:
   AliThreadedSocket(const AliThreadedSocket&);            // Not implemented
   AliThreadedSocket& operator=(const AliThreadedSocket&); // Not implemented
 
-       // reimplement these in a derived class
-       static void* RunThrdRead(void* arg);
-       static void* RunThrdWrite(void* arg);
+       // Reimplement these in a derived Class
+       virtual void RunThrdRead(); // function to run in a thread when in Read mode
+       virtual void RunThrdWrite(); // function to run in a thread when in Write mode
 
-       zmq::context_t* fContext;
        TThread* fThread;
+       zmq::context_t* fContext;
        EOpenMode fOpenMode;
 
+private:
+       static void* Dispatch(void* arg)
+       {
+               AliThreadedSocket* th = static_cast<AliThreadedSocket*>(arg);
+               
+               if(th->GetMode()==kREAD)
+                       th->RunThrdRead();
+               else
+                       th->RunThrdWrite();
+                       
+                       return NULL;
+       }
 
   ClassDef(AliThreadedSocket, 0);  
-
 };
 #endif