Adding base classes for zmq networking and modificiations to event server
authorquark <quark@cern.ch>
Fri, 30 May 2014 14:21:01 +0000 (16:21 +0200)
committerMihai Niculescu <quark@cern.ch>
Fri, 30 May 2014 14:28:33 +0000 (16:28 +0200)
13 files changed:
MONITOR/AliNetMessage.cxx [new file with mode: 0644]
MONITOR/AliNetMessage.h [new file with mode: 0644]
MONITOR/AliRecoServer.cxx
MONITOR/AliRecoServer.h
MONITOR/AliRecoServerThread.cxx
MONITOR/AliRecoServerThread.h
MONITOR/AliSocket.cxx [new file with mode: 0644]
MONITOR/AliSocket.h [new file with mode: 0644]
MONITOR/AliThreadedSocket.cxx [new file with mode: 0644]
MONITOR/AliThreadedSocket.h [new file with mode: 0644]
MONITOR/CMakelibMONITORzmq.pkg
MONITOR/MONITORLinkDef.h
MONITOR/MONITORzmqLinkDef.h

diff --git a/MONITOR/AliNetMessage.cxx b/MONITOR/AliNetMessage.cxx
new file mode 100644 (file)
index 0000000..b298d39
--- /dev/null
@@ -0,0 +1,209 @@
+#include <TVirtualStreamerInfo.h>
+#include <Bytes.h>
+#include <TFile.h>
+#include <TClass.h>
+
+#include "AliNetMessage.h"
+
+Bool_t AliNetMessage::fgEvolution = kFALSE;
+
+ClassImp(AliNetMessage)
+
+//______________________________________________________________________________
+AliNetMessage::AliNetMessage(UInt_t what) 
+  :
+  TBufferFile(kWrite),
+  fWhat(what),
+  fClass(0),
+  fBufUncompressed(0), 
+  fInfos(NULL), 
+  fEvolution(kFALSE)
+{
+   // Create a AliNetMessage object for storing objects. The "what" integer
+   // describes the type of message. Predifined ROOT system message types
+   // can be found in MessageTypes.h. Make sure your own message types are
+   // unique from the ROOT defined message types (i.e. 0 - 10000 are
+   // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
+   // will wait for an acknowledgement from the remote side. This makes
+   // the sending process synchronous. In case you OR "what" with kMESS_ZIP,
+   // the message will be compressed in TSocket using the zip algorithm
+   // (only if message is > 256 bytes).
+
+   // space at the beginning of the message reserved for the message length
+   UInt_t   reserved = 0;
+   *this << reserved;
+
+   *this << what;
+
+   SetBit(kCannotHandleMemberWiseStreaming);
+}
+
+
+//______________________________________________________________________________
+AliNetMessage::AliNetMessage(void *buf, Int_t bufsize)
+  :
+  TBufferFile(kRead, bufsize, buf),
+  fWhat(0),
+  fClass(0),
+  fBufUncompressed(0), 
+  fInfos(NULL), 
+  fEvolution(kFALSE)
+{
+   // Create a AliNetMessage object for reading objects. The objects will be
+   // read from buf. Use the What() method to get the message type.
+
+   // skip space at the beginning of the message reserved for the message length
+   fBufCur += sizeof(UInt_t);
+
+   *this >> fWhat;
+
+   if (fWhat == kMESS_OBJECT) {
+      InitMap();
+      fClass = ReadClass();     // get first the class stored in message
+      SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
+      ResetMap();
+   } else {
+      fClass = 0;
+   }
+}
+
+//______________________________________________________________________________
+AliNetMessage::~AliNetMessage()
+{
+   // Clean up
+  Reset();
+}
+
+//______________________________________________________________________________
+void AliNetMessage::EnableSchemaEvolutionForAll(Bool_t enable)
+{
+   // Static function enabling or disabling the automatic schema evolution.
+   // By default schema evolution support is off.
+
+   fgEvolution = enable;
+}
+
+//______________________________________________________________________________
+Bool_t AliNetMessage::UsesSchemaEvolutionForAll()
+{
+   // Static function returning status of global schema evolution.
+
+   return fgEvolution;
+}
+
+//______________________________________________________________________________
+void AliNetMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t /* force */)
+{
+   // Force writing the TStreamerInfo to the message.
+
+   if (fgEvolution || fEvolution) {
+      if (!fInfos) fInfos = new TList();
+                               fInfos->Add(info);
+   }
+}
+
+//______________________________________________________________________________
+void AliNetMessage::Forward()
+{
+   // Change a buffer that was received into one that can be send, i.e.
+   // forward a just received message.
+
+   if (IsReading()) {
+      SetWriteMode();
+      SetBufferOffset(fBufSize);
+      SetBit(kCannotHandleMemberWiseStreaming);
+   }
+}
+
+//______________________________________________________________________________
+void AliNetMessage::TagStreamerInfo(TVirtualStreamerInfo *info)
+{
+   // Remember that the StreamerInfo is being used in writing.
+
+   if (fgEvolution || fEvolution) {
+      if (!fInfos) fInfos = new TList();
+      fInfos->Add(info);
+   }
+}
+
+//______________________________________________________________________________
+void AliNetMessage::IncrementLevel(TVirtualStreamerInfo *info)
+{
+   // Increment level.
+
+   TBufferFile::IncrementLevel(info);
+
+   if (!info) return;
+   if (fgEvolution || fEvolution) {
+      if (!fInfos) fInfos = new TList();
+
+      // add the streamer info, but only once
+      // this assumes that there is only one version
+      if (fInfos->FindObject(info->GetName())==NULL) {
+                               fInfos->Add(info);
+      }
+   }
+}
+
+//______________________________________________________________________________
+void AliNetMessage::Reset()
+{
+   // Reset the message buffer so we can use (i.e. fill) it again.
+
+   SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
+   ResetMap();
+
+   if (fBufUncompressed) {
+     delete [] fBufUncompressed;
+     fBufUncompressed=NULL;
+   }
+}
+
+//______________________________________________________________________________
+void AliNetMessage::SetLength() const
+{
+   // Set the message length at the beginning of the message buffer.
+
+   if (IsWriting()) {
+      char *buf = Buffer();
+      *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
+   }
+}
+
+//______________________________________________________________________________
+void AliNetMessage::SetWhat(UInt_t what)
+{
+   // Using this method one can change the message type a-posteriory.
+   // In case you OR "what" with kMESS_ACK, the message will wait for
+   // an acknowledgement from the remote side. This makes the sending
+   // process synchronous.
+
+   fWhat = what;
+
+   char *buf = Buffer();
+   buf += sizeof(UInt_t);   // skip reserved length space
+   tobuf(buf, what);
+}
+
+//______________________________________________________________________________
+void AliNetMessage::WriteObject(const TObject *obj)
+{
+   // Write object to message buffer.
+   // When support for schema evolution is enabled the list of TStreamerInfo
+   // used to stream this object is kept in fInfos. This information is used
+   // by TSocket::Send that sends this list through the socket. This list is in
+   // turn used by TSocket::Recv to store the TStreamerInfo objects in the
+   // relevant TClass in case the TClass does not know yet about a particular
+   // class version. This feature is implemented to support clients and servers
+   // with either different ROOT versions or different user classes versions.
+
+   if (fgEvolution || fEvolution) {
+      if (fInfos)
+         fInfos->Clear();
+      else
+         fInfos = new TList();
+   }
+
+   WriteObjectAny(obj, TObject::Class());
+}
+
diff --git a/MONITOR/AliNetMessage.h b/MONITOR/AliNetMessage.h
new file mode 100644 (file)
index 0000000..c23f570
--- /dev/null
@@ -0,0 +1,57 @@
+#ifndef AliNetMessage_H
+#define AliNetMessage_H
+
+// adapted from AliHLTMessage
+
+#include <TBufferFile.h>
+
+#ifndef ROOT_MessageTypes
+#include <MessageTypes.h>
+#endif
+#ifndef ROOT_TBits
+#include <TBits.h>
+#endif
+
+class AliNetMessage : public TBufferFile
+{
+public:
+       AliNetMessage(UInt_t what = kMESS_ANY);
+       AliNetMessage(void *buf, Int_t bufsize);
+       virtual ~AliNetMessage();
+       
+        void     ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t force);
+   void     Forward();
+   void                TagStreamerInfo(TVirtualStreamerInfo *info);
+   TClass  *GetClass() const { return fClass;}
+   void     IncrementLevel(TVirtualStreamerInfo* info);
+   void     Reset();
+   void     Reset(UInt_t what) { SetWhat(what); Reset(); }
+   UInt_t   What() const { return fWhat; }
+   void                        SetLength() const;
+   void     SetWhat(UInt_t what);
+
+   void     EnableSchemaEvolution(Bool_t enable = kTRUE) { fEvolution = enable; }
+   Bool_t   UsesSchemaEvolution() const { return fEvolution; }
+   void     WriteObject(const TObject *obj);
+
+   static void   EnableSchemaEvolutionForAll(Bool_t enable = kTRUE);
+   static Bool_t UsesSchemaEvolutionForAll();
+
+   const TList* GetStreamerInfos() const {return fInfos;}
+
+private:
+       AliNetMessage(const AliNetMessage &); 
+       void operator=(const AliNetMessage &);
+       
+   UInt_t   fWhat;        //!Message type
+   TClass  *fClass;       //!If message is kMESS_OBJECT pointer to object's class
+   char    *fBufUncompressed; //!Uncompressed buffer
+   TList   *fInfos;       //Array of TStreamerInfo used in WriteObject
+   Bool_t   fEvolution;   //True if support for schema evolution required
+
+   static Bool_t fgEvolution;  //True if global support for schema evolution required
+  
+       ClassDef(AliNetMessage, 1);
+};
+#endif
index 6909784..125c513 100644 (file)
@@ -17,6 +17,8 @@
 #include <AliReconstruction.h>
 #include <AliTPCRecoParam.h>
 
+#include <zmq.hpp>
+
 #include "AliEventServerUtil.h"
 #include "AliRecoServer.h"
 #include "AliRecoServerThread.h"
@@ -32,7 +34,7 @@ AliRecoServer::AliRecoServer()
   fSettings(0),
   fRecoTh(0)
 {
-       fContext = new zmq::context_t(1);
+       fContext = new zmq::context_t;
 }
 
 AliRecoServer::~AliRecoServer()
index 378a33f..2d73a55 100644 (file)
@@ -9,7 +9,6 @@
 #ifndef __AliRecoServer_H__
 #define __AliRecoServer_H__
 
-#include <zmq.hpp>
 #include <TObjString.h>
 #include <TQObject.h>
 #include <RQ_OBJECT.h>
@@ -19,6 +18,11 @@ class AliCDBManager;
 class AliReconstruction;
 class AliRecoServerThread;
 
+namespace zmq
+{
+       class context_t;
+}
+
 class AliRecoServer : public TQObject
 {
        RQ_OBJECT("AliRecoServer")
index e490dc0..97ec5f8 100644 (file)
@@ -9,8 +9,6 @@
 #include <RVersion.h>
 #include <stdlib.h>
 
-#include <zmq.hpp>
-
 #include <TCondition.h>
 #include <TBufferFile.h>
 #include <TMessage.h>
 #include <TStreamerInfo.h>
 #include <TThread.h>
 
-
 #include <AliESDEvent.h>
 #include <AliESDfriend.h>
 #include <AliRawReader.h>
 #include <AliRunLoader.h>
 #include <AliReconstruction.h>
 
+#include <AliNetMessage.h>
+#include <AliSocket.h>
+
+#include <zmq.hpp>
+
 #include "AliRecoServerThread.h"
 
-ClassImp(AliRecoServerThread);
+ClassImp(AliRecoServerThread)
 AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
-  : TQObject(),
-               fContext(0),
+  : AliThreadedSocket(context, AliThreadedSocket::WRITE),
        fReco(0),
-       fHost("tcp://*:5051"),
-    fThread(0),
     fCond(0)
 {
-       fContext = context;
        fReco = reco;
 }
 
@@ -45,108 +43,14 @@ AliRecoServerThread::~AliRecoServerThread()
        Stop();
 }
 
-Bool_t AliRecoServerThread::Start(const char* host)
-{
-       if(!fThread){
-       fHost = host;
-       fCond = new TCondition(0);
-       fThread = new TThread("AliRecoServerThread", (void(*) (void *) ) &RunThreaded, (void*)  this );
-       fThread->Run();
-  
-       return kTRUE;
-       }
-       
-       return kFALSE;  
-}
-
-Int_t AliRecoServerThread::Stop()
-{
-       fCond->Signal();
-  return 0;
-}
-
-Bool_t AliRecoServerThread::ForceStop()
+Bool_t AliRecoServerThread::Start(const char* endpoint)
 {
-       if(fThread){
-               fThread->Kill();
-               fThread->Delete();
-               fThread=0;
-               
-               return kTRUE;
-       }
+       fHost = endpoint;
        
-       return kFALSE;
+       return AliThreadedSocket::Start();
 }
 
-void AliRecoServerThread::Finished(Int_t status)
-{
-  Emit("Finished(Int_t)", status);
-}
-
-void AliRecoServerThread::SendStreamerInfos(TMessage* mess, zmq::socket_t *sock)
-{
-       //printf("Sending Streamer Infos....\n");
-
-       // Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
-   // in the object in the message is in the fInfos list of the message.
-   // We send only the TStreamerInfos not yet sent on this socket.
-       TList* infos = mess->GetStreamerInfos();
-   
-      TIter next(infos);
-      TStreamerInfo *info;
-      TList *minilist = 0;
-      while ((info = (TStreamerInfo*)next())) {
-         Int_t uid = info->GetNumber();
-         if (!minilist) minilist = new TList();
-         
-         minilist->Add(info);
-      }
-      
-      if (minilist) {
-         TMessage messinfo(kMESS_STREAMERINFO);
-         messinfo.WriteObject(minilist);
-         delete minilist;
-         if (messinfo.GetStreamerInfos())
-            messinfo.GetStreamerInfos()->Clear();
-          
-                                       int bufsize = messinfo.Length();
-               char* buf = (char*) malloc(bufsize * sizeof(char));
-          memcpy(buf, messinfo.Buffer(), bufsize);
-
-               // send!
-          zmq::message_t message((void*)buf, bufsize, 0, 0);
-                     
-         if (sock->send(message, ZMQ_SNDMORE))
-            Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
-      }
-
-   return;
-}
-
-void AliRecoServerThread::SendEvent(AliESDEvent* event, zmq::socket_t* socket)
-{
-  if(!event) return;
-
-  TMessage tmess(kMESS_OBJECT);
-  tmess.Reset();
-  tmess.WriteObject(event);
-
-  TMessage::EnableSchemaEvolutionForAll(kTRUE);
-  SendStreamerInfos(&tmess, socket);
-
-  int bufsize = tmess.Length();
-  char* buf = (char*) malloc(bufsize * sizeof(char));
-  memcpy(buf, tmess.Buffer(), bufsize);
-
-  // send!
-  zmq::message_t message((void*)buf, bufsize, 0, 0);
-  socket->send(message);
-
-}
-
-
-void* AliRecoServerThread::RunThreaded(void* arg)
+void* AliRecoServerThread::RunThrdWrite(void* arg)
 {
        TThread::SetCancelAsynchronous();
        TThread::SetCancelOn();
@@ -155,10 +59,11 @@ void* AliRecoServerThread::RunThreaded(void* arg)
        
        const char* host = recoTh->GetHost();
        zmq::context_t* context = recoTh->GetContext();
-       AliReconstruction* reco   = recoTh->GetReconstruction();
+       AliReconstruction* reco = recoTh->GetReconstruction();
 
-       zmq::socket_t publisher(*context, ZMQ_PUB);
-       publisher.bind(host);
+ // generate a publish socket
+       AliSocket publisher(context, ZMQ_PUB);
+       publisher.Bind(host);
        
   if(reco==0) return 0;
   
@@ -179,8 +84,13 @@ void* AliRecoServerThread::RunThreaded(void* arg)
       
       if (status)
       {
-                               event = reco->GetESDEvent();
-                               SendEvent(event, &publisher);
+                   event = reco->GetESDEvent();
+                   
+       AliNetMessage tmess(kMESS_OBJECT);
+                       tmess.Reset();
+                       tmess.WriteObject(event);
+                               
+                               publisher.Send(tmess);
 
                        sleep(1);
      }
index 66e6d88..814860f 100644 (file)
 #include <TMutex.h>
 #include <TCondition.h>
 
+#include "AliThreadedSocket.h"
+
 class TCondition;
-class TMessage;
 class TThread;
 
 class AliReconstruction;
 class AliESDEvent;
 
-namespace zmq{
-       class context_t;
-       class socket_t;
-}
-
-class AliRecoServerThread : public TQObject
+class AliRecoServerThread : public AliThreadedSocket
 {
 public:
   AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco);
   virtual ~AliRecoServerThread();
 
-       Bool_t Start(const char* host);
-       Int_t Stop();
-       Bool_t ForceStop(); // imediate kill it, use it with rarely and with caution
 
-       zmq::context_t*                 GetContext() { return fContext; }
+       Bool_t Start(const char* endpoint);
+
+       const char* GetHost() const { return fHost.Data(); }    
        AliReconstruction*      GetReconstruction() { return fReco; }
-       const char*                                     GetHost() { return fHost.Data(); }
-       TCondition*                                                     Condition() { return fCond; }
+       TCondition*                                     Condition() { return fCond; }
        
-       void Finished(Int_t status); // *SIGNAL*
-
 private:
-       static void* RunThreaded(void* arg);
-       static void SendStreamerInfos(TMessage* mess, zmq::socket_t *sock);
-       static void SendEvent(AliESDEvent* event, zmq::socket_t* socket);
+       static void* RunThrdWrite(void* arg);
        
-       // shared
-       zmq::context_t* fContext;
        AliReconstruction* fReco;
 
        // local        
-       TString                                                                         fHost;
-  TThread*                                                             fThread;
   TCondition*                                          fCond; // condition whether to stop reco/clean exit thread
+  TString fHost;
 
 private:
   AliRecoServerThread(const AliRecoServerThread&);            // Not implemented
diff --git a/MONITOR/AliSocket.cxx b/MONITOR/AliSocket.cxx
new file mode 100644 (file)
index 0000000..23af362
--- /dev/null
@@ -0,0 +1,79 @@
+#include <cstring>
+
+#include <zmq.hpp>
+
+#include "AliNetMessage.h"
+#include "AliSocket.h"
+
+void __freeBuffer (void *data, void *hint)
+{
+    free(data);
+}
+
+
+ClassImp(AliSocket);
+AliSocket::AliSocket(zmq::context_t *context,int type)
+       : TObject(),
+       fContext(context)
+{
+       fSocket = new zmq::socket_t(*context,type);
+}
+
+AliSocket::~AliSocket()
+{
+       fSocket->close();
+}
+
+void AliSocket::Bind(const char* endpoint)
+{
+       fEndPoint = endpoint;
+       
+       fSocket->bind(endpoint);
+}
+
+void AliSocket::Connect(const char* endpoint)
+{
+       fEndPoint = endpoint;
+       
+       fSocket->connect(endpoint);
+}
+
+void AliSocket::Subscribe(const char* filter)
+{
+       fSocket->setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter) );
+}
+
+bool AliSocket::Recv(AliNetMessage *&mess, int flags)
+{
+       zmq::message_t message;
+         
+       if(!fSocket->recv(&message, flags))
+               return false;   
+       
+       int bufSize = (int)message.size();
+               
+       // buffer will be adopted by AliNetMessage, no need to free it
+       char* buf = new char[bufSize];
+       memcpy(buf, (char*)message.data(), bufSize);
+       
+       mess = new AliNetMessage(buf, bufSize);
+
+       return true;
+}
+
+bool AliSocket::Send(AliNetMessage &mess, int flags)
+{
+       //NOTE: this is already done by AliNetMessage, should we do this too?
+       // send length of the message
+       int bufSize = mess.BufferSize();
+       
+       // we need to copy it elsewhere because zmq takes ownership of the buffer data
+       char* buf = new char[bufSize];
+       memcpy(buf, (char*)mess.Buffer(), bufSize);
+
+       zmq::message_t message(buf, bufSize, __freeBuffer, NULL);
+               
+       //fwrite(mess.Buffer(), sizeof(char), bufSize, stdout);
+       
+       return fSocket->send(message, flags);
+}
diff --git a/MONITOR/AliSocket.h b/MONITOR/AliSocket.h
new file mode 100644 (file)
index 0000000..7dafdfd
--- /dev/null
@@ -0,0 +1,39 @@
+#ifndef ALISOCKET_H
+#define ALISOCKET_H
+
+#include <TObject.h>
+#include <TString.h>
+
+class AliNetMessage;
+
+namespace zmq
+{
+       class context_t;
+       class socket_t;
+}
+
+class AliSocket : public TObject
+{
+public:
+       AliSocket(zmq::context_t* context, int type);
+       virtual ~AliSocket();
+       
+       void Bind(const char* endpoint);
+       void Connect(const char* endpoint);
+       void Subscribe(const char* filter);
+       
+  bool Recv(AliNetMessage *&mess, int flags = 0);
+  bool Send(AliNetMessage &message, int flags = 0);
+  
+private:
+       AliSocket(const AliSocket &); // Not implemented
+       void operator=(const AliSocket &); // Not implemented
+       
+       zmq::context_t *fContext; //! the zmq context
+       zmq::socket_t *fSocket; //! the socket
+       TString fEndPoint; //!
+       
+  ClassDef(AliSocket, 0);  
+};
+
+#endif
diff --git a/MONITOR/AliThreadedSocket.cxx b/MONITOR/AliThreadedSocket.cxx
new file mode 100644 (file)
index 0000000..916bf09
--- /dev/null
@@ -0,0 +1,114 @@
+#include <zmq.hpp>
+
+#include <TThread.h>
+
+#include "AliNetMessage.h"
+#include "AliSocket.h"
+#include "AliThreadedSocket.h"
+
+ClassImp(AliThreadedSocket)
+AliThreadedSocket::AliThreadedSocket(zmq::context_t *context, EOpenMode mode)
+       : TQObject(),
+       fThread(0),
+       fContext(context),
+       fOpenMode(mode)
+{
+
+}
+
+AliThreadedSocket::~AliThreadedSocket()
+{
+       Stop();
+}
+
+Bool_t AliThreadedSocket::Start()
+{
+       if(!fThread){
+               if(fOpenMode==READ)
+               fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &RunThrdRead, (void*)  this );
+       else
+               fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &RunThrdWrite,(void*)  this );
+               
+       if(fThread->Run()==0){ 
+               Emit("Started()");
+               return kTRUE; 
+       }
+       }
+       
+       return kFALSE;
+}
+
+Bool_t AliThreadedSocket::Stop()
+{
+       Emit("Stopped()");
+       return kTRUE;
+}
+
+Bool_t AliThreadedSocket::Kill()
+{
+       if(fThread){
+               if(fThread->Kill()!=0) return kFALSE;
+               fThread->Delete();
+               fThread=0;
+               
+               Emit("Stopped()");
+               return kTRUE;
+       }
+}
+
+void AliThreadedSocket::Continue()
+{
+       
+}
+
+zmq::context_t* AliThreadedSocket::GetContext() const
+{
+       return fContext;
+}
+
+TThread* AliThreadedSocket::GetThread() const
+{
+       return fThread;
+}
+
+void AliThreadedSocket::Started()
+{
+       Emit("Started()");
+}
+
+void AliThreadedSocket::Stopped()
+{
+       Emit("Stopped()");
+}
+
+void* AliThreadedSocket::RunThrdRead(void* arg)
+{
+       AliNetMessage* mess=0;
+       AliThreadedSocket* thsock = (AliThreadedSocket*)arg;
+       zmq::context_t* context = thsock->GetContext();
+       
+       AliSocket sock(context, ZMQ_SUB);
+               
+       do{
+               sock.Recv(mess);
+       }
+       while(mess==0);
+       
+       thsock->Stopped();
+}
+
+void* AliThreadedSocket::RunThrdWrite(void* arg)
+{
+       AliNetMessage* mess=0;
+       AliThreadedSocket* thsock = (AliThreadedSocket*)arg;
+       zmq::context_t* context = thsock->GetContext();
+       
+       AliSocket sock(context, ZMQ_PUB);
+       
+       do{
+               sock.Send(*mess);
+       }
+       while(1);
+       
+       thsock->Emit("Stopped()");
+}
diff --git a/MONITOR/AliThreadedSocket.h b/MONITOR/AliThreadedSocket.h
new file mode 100644 (file)
index 0000000..677eb58
--- /dev/null
@@ -0,0 +1,57 @@
+// Main authors: Mihai Niculescu 2014
+
+/**************************************************************************
+ * Copyright(c) 1998-2008, ALICE Experiment at CERN, all rights reserved. *
+ * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for          *
+ * full copyright notice.                                                 *
+ **************************************************************************/
+
+#ifndef AliThreadedSocket_H
+#define AliThreadedSocket_H
+
+#include <TQObject.h>
+
+class TThread;
+class AliNetMessage;
+
+namespace zmq {
+       class context_t;
+}
+
+class AliThreadedSocket : public TQObject
+{
+public:
+       enum EOpenMode{READ, WRITE};
+
+       AliThreadedSocket(zmq::context_t *context, EOpenMode mode);
+       virtual ~AliThreadedSocket();
+
+       Bool_t Start();
+       Bool_t Stop();
+       Bool_t Kill();
+       
+       zmq::context_t* GetContext() const;
+       TThread* GetThread() const;
+               
+       void Started(); // *SIGNAL*
+       void Stopped(); // *SIGNAL*
+
+       void Continue();
+       
+protected:
+  AliThreadedSocket(const AliThreadedSocket&);            // Not implemented
+  AliThreadedSocket& operator=(const AliThreadedSocket&); // Not implemented
+
+       // reimplement these in a derived class
+       static void* RunThrdRead(void* arg);
+       static void* RunThrdWrite(void* arg);
+
+       zmq::context_t* fContext;
+       TThread* fThread;
+       EOpenMode fOpenMode;
+
+
+  ClassDef(AliThreadedSocket, 0);  
+
+};
+#endif
index 8c9b516..15bf9a4 100644 (file)
 # SHLIBS - Shared Libraries and objects for linking (Executables only)           #
 #--------------------------------------------------------------------------------#
 
-set ( SRCS   
-    AliEventServerWindow.cxx
+set ( SRCS
+    AliNetMessage.cxx
+    AliSocket.cxx
+    AliThreadedSocket.cxx
     AliRecoServerThread.cxx
     AliRecoServer.cxx
+    AliEventServerWindow.cxx
     )
 
 string ( REPLACE ".cxx" ".h" CINTHDRS "${SRCS}" )
@@ -38,7 +41,7 @@ string ( REPLACE ".cxx" ".h" HDRS "${SRCS}" )
 
 set ( DHDR  MONITORzmqLinkDef.h)
 
-set ( EINCLUDE  STEER/STEERBase )#"${ZEROMQ_INCLUDE_DIR}")
+set ( EINCLUDE STEER/STEERBase ) # ${ZEROMQ_INCLUDE_DIR})
 
 set(DIMDIR $ENV{DIMDIR})
 set(ODIR $ENV{ODIR})
@@ -55,7 +58,7 @@ if(DIMDIR)
   set ( CXXFLAGS  "-DALI_DIM ${CXXFLAGS}")
 endif(DIMDIR)
 
-set ( ELIBS ${ELIBS} ${ZEROMQ_LIBRARY} )
+set (ELIBS ${ELIBS} ${ZEROMQ_LIBRARIES} )
 
 # this must be set in a better way
 set ( ELIBSDIR ${ELIBSDIR}  /usr/lib64)
index ee26e77..cdeb0b2 100644 (file)
 #pragma link off all functions;
 
 #pragma link C++ class AliEventServerPreferencesWindow+;
-#pragma link C++ class  AliQAHistNavigator+;
-#pragma link C++ class  AliQAHistViewer+;
-#pragma link C++ class  AliQADirList+;
-#pragma link C++ class  AliQADirListItem+;
-#pragma link C++ class  AliOnlineRecoTrigger+;
-#pragma link C++ class  TerminateSignalHandler+;
+#pragma link C++ class AliQAHistNavigator+;
+#pragma link C++ class AliQAHistViewer+;
+#pragma link C++ class AliQADirList+;
+#pragma link C++ class AliQADirListItem+;
+#pragma link C++ class AliOnlineRecoTrigger+;
+#pragma link C++ class TerminateSignalHandler+;
 
 #pragma link C++ class AliDimIntNotifier+;
 #pragma link C++ class AliChildProcTerminator+;
index 9ff59c1..4dad292 100644 (file)
@@ -9,8 +9,11 @@
 #pragma link off all classes;
 #pragma link off all functions;
 
-#pragma link C++ class AliEventServerWindow+;
+#pragma link C++ class AliNetMessage+;
+#pragma link C++ class AliSocket+;
+#pragma link C++ class AliThreadedSocket+;
 #pragma link C++ class AliRecoServerThread+;
+#pragma link C++ class AliEventServerWindow+;
 #pragma link C++ class AliRecoServer+;
 
 #endif