4cd5e368e69cab2e98b5ff43a97b3f238561d2b2
[u/mrichter/AliRoot.git] / HLT / BASE / HOMER / AliHLTHOMERReader.cxx
1 /************************************************************************
2 **
3 **
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck, 
8 ** timm@kip.uni-heidelberg.de
9 **
10 **
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
15 **
16 **
17 ** Newer versions of this file's package will be made available from 
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/ 
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
20 **
21 *************************************************************************/
22
23 /*
24 ***************************************************************************
25 **
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
27 **
28 ** $Id$ 
29 **
30 ***************************************************************************
31 */
32
33 /** @file   AliHLTHOMERReader.cxx
34     @author Timm Steinbeck
35     @date   Sep 14 2007
36     @brief  HLT Online Monitoring Environment including ROOT - Reader
37     @note   migrated from PubSub HLT-stable-20070905.141318 (rev 2375)    */
38
39 // see header file for class documentation
40 // or
41 // refer to README to build package
42 // or
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
44
45 #include "AliHLTHOMERReader.h"
46 //#include <stdio.h>
47 #ifdef __SUNPRO_CC
48 #include <string.h>
49 #else
50 #include <cstring>
51 #endif
52 #include <cerrno>
53 #include <netdb.h>
54 //#include <sys/types.h>
55 //#include <sys/socket.h>
56 //#include <netinet/in.h>
57 //#include <netinet/tcp.h>
58 #include <unistd.h>
59 #ifndef __CYGWIN__
60 #include <rpc/types.h>
61 #endif
62 #include <fcntl.h>
63 #include <sys/stat.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
66 #ifdef USE_ROOT
67 #include <Rtypes.h>
68 #endif
69
70
71 #define MOD_BIN "MOD BIN\n"
72 #define MOD_ASC "MOD ASC\n"
73 #define GET_ONE "GET ONE\n"
74 #define GET_ALL "GET ALL\n"
75
76 // MAXHOSTNAMELEN not defined on macosx
77 // 686-apple-darwin9-gcc-4.0.1
78 #ifndef MAXHOSTNAMELEN
79 #define MAXHOSTNAMELEN 64
80 #endif
81
82 #ifdef USE_ROOT
83 ClassImp(AliHLTMonitoringReader);
84 ClassImp(AliHLTHOMERReader);
85 #endif
86
87
88
89
90
91 #ifdef USE_ROOT
92 AliHLTHOMERReader::AliHLTHOMERReader()
93   :
94   fCurrentEventType(~(homer_uint64)0),
95   fCurrentEventID(~(homer_uint64)0),
96   fBlockCnt(0),
97   fMaxBlockCnt(0),
98   fBlocks(NULL),
99   fDataSourceCnt(0),
100   fTCPDataSourceCnt(0),
101   fShmDataSourceCnt(0),
102   fDataSourceMaxCnt(0),
103   fDataSources(NULL),
104   fConnectionStatus(0),
105   fErrorConnection(~(unsigned int)0),
106   fEventRequestAdvanceTime(0)
107     {
108 // Reader implementation of the HOMER interface.
109 // The HLT Monitoring Environment including ROOT is
110 // a native interface to ship out data from the HLT chain.
111 // See pdf document shiped with the package
112 // for class documentation and tutorial.
113     Init();
114     }
115 #endif
116
117
118 AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
119   :
120   AliHLTMonitoringReader(),
121   TObject(),
122   fCurrentEventType(~(homer_uint64)0),
123   fCurrentEventID(~(homer_uint64)0),
124   fBlockCnt(0),
125   fMaxBlockCnt(0),
126   fBlocks(NULL),
127   fDataSourceCnt(0),
128   fTCPDataSourceCnt(0),
129   fShmDataSourceCnt(0),
130   fDataSourceMaxCnt(0),
131   fDataSources(NULL),
132   fConnectionStatus(0),
133   fErrorConnection(~(unsigned int)0),
134   fEventRequestAdvanceTime(0)
135     {
136 // see header file for class documentation
137 // For reading from a TCP port
138     Init();
139     if ( !AllocDataSources(1) )
140         {
141         fErrorConnection = 0;
142         fConnectionStatus = ENOMEM;
143         return;
144         }
145     fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
146     if ( fConnectionStatus )
147         fErrorConnection = 0;
148     else
149         {
150         fDataSourceCnt++;
151         fTCPDataSourceCnt++;
152         fDataSources[0].fNdx = 0;
153         }
154     }
155
156 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, const unsigned short* ports )
157   :
158   AliHLTMonitoringReader(),
159   TObject(),
160   fCurrentEventType(~(homer_uint64)0),
161   fCurrentEventID(~(homer_uint64)0),
162   fBlockCnt(0),
163   fMaxBlockCnt(0),
164   fBlocks(NULL),
165   fDataSourceCnt(0),
166   fTCPDataSourceCnt(0),
167   fShmDataSourceCnt(0),
168   fDataSourceMaxCnt(0),
169   fDataSources(NULL),
170   fConnectionStatus(0),
171   fErrorConnection(~(unsigned int)0),
172   fEventRequestAdvanceTime(0)
173     {
174 // see header file for class documentation
175 // For reading from multiple TCP ports
176     Init();
177     if ( !AllocDataSources(tcpCnt) )
178         {
179         fErrorConnection = 0;
180         fConnectionStatus = ENOMEM;
181         return;
182         }
183     for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
184         {
185         fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
186         if ( fConnectionStatus )
187             {
188             fErrorConnection = n;
189             return;
190             }
191         fDataSources[n].fNdx = n;
192         }
193     }
194
195 AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
196   :
197   AliHLTMonitoringReader(),
198   TObject(),
199   fCurrentEventType(~(homer_uint64)0),
200   fCurrentEventID(~(homer_uint64)0),
201   fBlockCnt(0),
202   fMaxBlockCnt(0),
203   fBlocks(NULL),
204   fDataSourceCnt(0),
205   fTCPDataSourceCnt(0),
206   fShmDataSourceCnt(0),
207   fDataSourceMaxCnt(0),
208   fDataSources(NULL),
209   fConnectionStatus(0),
210   fErrorConnection(~(unsigned int)0),
211   fEventRequestAdvanceTime(0)
212     {
213 // see header file for class documentation
214 // For reading from a System V shared memory segment
215     Init();
216     if ( !AllocDataSources(1) )
217         {
218         fErrorConnection = 0;
219         fConnectionStatus = ENOMEM;
220         return;
221         }
222     fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
223     if ( fConnectionStatus )
224         fErrorConnection = 0;
225     else
226         {
227         fDataSourceCnt++;
228         fShmDataSourceCnt++;
229         fDataSources[0].fNdx = 0;
230         }
231     }
232
233 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, const key_t* shmKeys, const int* shmSizes )
234   :
235   AliHLTMonitoringReader(),
236   TObject(),
237   fCurrentEventType(~(homer_uint64)0),
238   fCurrentEventID(~(homer_uint64)0),
239   fBlockCnt(0),
240   fMaxBlockCnt(0),
241   fBlocks(NULL),
242   fDataSourceCnt(0),
243   fTCPDataSourceCnt(0),
244   fShmDataSourceCnt(0),
245   fDataSourceMaxCnt(0),
246   fDataSources(NULL),
247   fConnectionStatus(0),
248   fErrorConnection(~(unsigned int)0),
249   fEventRequestAdvanceTime(0)
250     {
251 // see header file for class documentation
252 // For reading from multiple System V shared memory segments
253     Init();
254     if ( !AllocDataSources(shmCnt) )
255         {
256         fErrorConnection = 0;
257         fConnectionStatus = ENOMEM;
258         return;
259         }
260     for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
261         {
262         fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
263         if ( fConnectionStatus )
264             {
265             fErrorConnection = n;
266             return;
267             }
268         fDataSources[n].fNdx = n;
269         }
270     }
271
272 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, const unsigned short* ports, 
273                           unsigned int shmCnt, const key_t* shmKeys, const int* shmSizes )
274   :
275   AliHLTMonitoringReader(),
276   TObject(),
277   fCurrentEventType(~(homer_uint64)0),
278   fCurrentEventID(~(homer_uint64)0),
279   fBlockCnt(0),
280   fMaxBlockCnt(0),
281   fBlocks(NULL),
282   fDataSourceCnt(0),
283   fTCPDataSourceCnt(0),
284   fShmDataSourceCnt(0),
285   fDataSourceMaxCnt(0),
286   fDataSources(NULL),
287   fConnectionStatus(0),
288   fErrorConnection(~(unsigned int)0),
289   fEventRequestAdvanceTime(0)
290     {
291 // see header file for class documentation
292 // For reading from multiple TCP ports and multiple System V shared memory segments
293     Init();
294     if ( !AllocDataSources(tcpCnt+shmCnt) )
295         {
296         fErrorConnection = 0;
297         fConnectionStatus = ENOMEM;
298         return;
299         }
300     for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
301         {
302         fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
303         if ( fConnectionStatus )
304             {
305             fErrorConnection = n;
306             return;
307             }
308         fDataSources[n].fNdx = n;
309         }
310     for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
311         {
312         fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
313         if ( fConnectionStatus )
314             {
315             fErrorConnection = tcpCnt+n;
316             return;
317             }
318         fDataSources[n].fNdx = n;
319         }
320     }
321
322 AliHLTHOMERReader::AliHLTHOMERReader( const void* pBuffer, int size )
323   :
324   AliHLTMonitoringReader(),
325   TObject(),
326   fCurrentEventType(~(homer_uint64)0),
327   fCurrentEventID(~(homer_uint64)0),
328   fBlockCnt(0),
329   fMaxBlockCnt(0),
330   fBlocks(NULL),
331   fDataSourceCnt(0),
332   fTCPDataSourceCnt(0),
333   fShmDataSourceCnt(0),
334   fDataSourceMaxCnt(0),
335   fDataSources(NULL),
336   fConnectionStatus(0),
337   fErrorConnection(~(unsigned int)0),
338   fEventRequestAdvanceTime(0)
339     {
340 // see header file for class documentation
341 // For reading from a System V shared memory segment
342     Init();
343     if ( !AllocDataSources(1) )
344         {
345         fErrorConnection = 0;
346         fConnectionStatus = ENOMEM;
347         return;
348         }
349     fConnectionStatus = AddDataSource(const_cast<void*>(pBuffer), size, fDataSources[0] );
350     if ( fConnectionStatus )
351         fErrorConnection = 0;
352     else
353         {
354         fDataSourceCnt++;
355         fShmDataSourceCnt++;
356         fDataSources[0].fNdx = 0;
357         }
358     }
359
360 AliHLTHOMERReader::~AliHLTHOMERReader()
361     {
362 // see header file for class documentation
363     ReleaseCurrentEvent();
364     FreeDataSources();
365
366     if (fDataSources)
367       delete [] fDataSources;
368     }
369
370 int  AliHLTHOMERReader::ReadNextEvent()
371     {
372 // see header file for class documentation
373 // Read in the next available event
374     return ReadNextEvent( false, 0 );
375     }
376
377 int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
378     {
379 // see header file for class documentation
380 // Read in the next available event
381     return ReadNextEvent( true, timeout );
382     }
383
384 unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
385     {
386 // see header file for class documentation
387 // Return the size (in bytes) of the current event's data
388 // block with the given block index (starting at 0).
389     if ( ndx >= fBlockCnt )
390         return 0;
391     return fBlocks[ndx].fLength;
392     }
393
394 const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
395     {
396 // see header file for class documentation
397 // Return a pointer to the start of the current event's data
398 // block with the given block index (starting at 0).
399     if ( ndx >= fBlockCnt )
400         return NULL;
401     return fBlocks[ndx].fData;
402     }
403
404 const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
405     {
406 // see header file for class documentation
407 // Return IP address or hostname of node which sent the 
408 // current event's data block with the given block index 
409 // (starting at 0).
410 // For HOMER this is the ID of the node on which the subscriber 
411 // that provided this data runs/ran.
412     if ( ndx >= fBlockCnt )
413         return NULL;
414 #ifdef DEBUG
415     if ( fBlocks[ndx].fSource >= fDataSourceCnt )
416         {
417         fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%u) >= fDataSourceCnt (%u)\n",
418                  __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
419         return NULL;
420         }
421 #endif
422     return fDataSources[ fBlocks[ndx].fSource ].fHostname;
423     //return fBlocks[ndx].fOriginatingNodeID;
424     }
425
426 homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
427     {
428 // see header file for class documentation
429 // Return byte order of the data stored in the 
430 // current event's data block with the given block index (starting at 0). 
431 //         0 is unknown alignment, 
432 //         1 ist little endian, 
433 //         2 is big endian. */
434     if ( ndx >= fBlockCnt )
435         return 0;
436     //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
437     return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
438     }
439
440 homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
441     {
442 // see header file for class documentation
443 // Return the alignment (in bytes) of the given datatype 
444 // in the data stored in the current event's data block
445 // with the given block index (starting at 0). 
446 // Possible values for the data type are
447 //         0: homer_uint64
448 //         1: homer_uint32
449 //         2: uin16
450 //         3: homer_uint8
451 //         4: double
452 //         5: float
453     if ( ndx >= fBlockCnt )
454         return 0;
455     if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
456         return 0;
457     //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
458     return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
459     }
460
461 homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
462     {
463 // see header file for class documentation
464     if ( ndx >= fBlockCnt )
465         return 0;
466     return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
467     }
468
469 /* HOMER specific */
470 /* Return the type of the data in the current event's data
471    block with the given block index (starting at 0). */
472 homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
473     {
474 // see header file for class documentation
475     if ( ndx >= fBlockCnt )
476         return ~(homer_uint64)0;
477     //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
478     return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
479     }
480
481 /* Return the origin of the data in the current event's data
482    block with the given block index (starting at 0). */
483 homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
484     {
485 // see header file for class documentation
486     if ( ndx >= fBlockCnt )
487         return ~(homer_uint32)0;
488     //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
489     return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
490     }
491
492 /* Return a specification of the data in the current event's data
493    block with the given block index (starting at 0). */
494 homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
495     {
496 // see header file for class documentation
497     if ( ndx >= fBlockCnt )
498         return ~(homer_uint32)0;
499     //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
500     return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
501     }
502
503 /* Find the next data block in the current event with the given
504    data type, origin, and specification. Returns the block's 
505    index. */
506 unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin, 
507                                          homer_uint32 spec, unsigned long startNdx ) const
508     {
509 // see header file for class documentation
510     for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
511         {
512         if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
513              ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
514              ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
515             return n;
516         }
517     return ~(unsigned long)0;
518     }
519
520 /* Find the next data block in the current event with the given
521    data type, origin, and specification. Returns the block's 
522    index. */
523 unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4], 
524                                          homer_uint32 spec, unsigned long startNdx ) const
525     {
526 // see header file for class documentation
527     for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
528         {
529         bool found1=true, found2=true;
530         for ( unsigned i = 0; i < 8; i++ )
531             {
532             if ( type[i] != (char)0xFF )
533                 {
534                 found1=false;
535                 break;
536                 }
537             }
538         if ( !found1 )
539             {
540             found1 = true;
541             for ( unsigned i = 0; i < 8; i++ )
542                 {
543                 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
544                 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
545                     {
546                     found1=false;
547                     break;
548                     }
549                 }
550             }
551         for ( unsigned i = 0; i < 4; i++ )
552             {
553             if ( origin[i] != (char)0xFF )
554                 {
555                 found2 = false;
556                 break;
557                 }
558             }
559         if ( !found2 )
560             {
561             found2 = true;
562             for ( unsigned i = 0; i < 4; i++ )
563                 {
564                 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
565                 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
566                     {
567                     found2=false;
568                     break;
569                     }
570                 }
571             }
572         //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
573         if ( found1 && found2 &&
574              ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
575             return n;
576         }
577     return ~(unsigned long)0;
578     }
579
580 /* Return the ID of the node that actually produced this data block.
581    This may be different from the node which sent the data to this
582    monitoring object as returned by GetBlockSendNodeID. */
583 const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
584     {
585 // see header file for class documentation
586     if ( ndx >= fBlockCnt )
587         return NULL;
588     return fBlocks[ndx].fOriginatingNodeID;
589     }
590
591
592 void AliHLTHOMERReader::Init()
593     {
594 // see header file for class documentation
595     fCurrentEventType = ~(homer_uint64)0;
596     fCurrentEventID = ~(homer_uint64)0;
597     fMaxBlockCnt = fBlockCnt = 0;
598     fBlocks = NULL;
599         
600     fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
601     fDataSources = NULL;
602
603         
604     fConnectionStatus = 0;
605     fErrorConnection = ~(unsigned int)0;
606
607     fEventRequestAdvanceTime = 0;
608     }
609         
610 bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
611     {
612 // see header file for class documentation
613     fDataSources = new DataSource[ sourceCnt ];
614     if ( !fDataSources )
615         return false;
616     memset(fDataSources, 0, sizeof(DataSource)*sourceCnt);
617     fDataSourceCnt = 0;
618     fDataSourceMaxCnt = sourceCnt;
619     return true;
620     }
621
622 int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
623     {
624 // see header file for class documentation
625     struct hostent* he;
626     he = gethostbyname( hostname );
627     if ( he == NULL )
628         {
629         //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
630         return EADDRNOTAVAIL;
631         }
632
633     struct sockaddr_in remoteAddr;
634     remoteAddr.sin_family = AF_INET;    // host byte order 
635     remoteAddr.sin_port = htons(port);  // short, network byte order 
636     remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
637     memset(&(remoteAddr.sin_zero), '\0', 8);  // zero the rest of the struct
638
639     // Create socket and connect to target program on remote node
640     source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
641     if ( source.fTCPConnection == -1 )
642         {
643         return errno;
644         }
645
646     int ret;
647
648     ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
649     if ( ret == -1 )
650         {
651         ret=errno;
652         close( source.fTCPConnection );
653         return ret;
654         } 
655
656     ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
657     if ( ret != (int)strlen(MOD_BIN) )
658         {
659         ret=errno;
660         close( source.fTCPConnection );
661         return ret;
662         }
663
664     unsigned hostnamelen=strlen( hostname );
665     char* tmpchar = new char[ hostnamelen+1 ];
666     if ( !tmpchar )
667         {
668         close( source.fTCPConnection );
669         return ENOMEM;
670         }
671     strncpy( tmpchar, hostname, hostnamelen );
672     tmpchar[hostnamelen]=0;
673     source.fHostname = tmpchar;
674
675     source.fType = kTCP;
676     source.fTCPPort = port;
677     source.fData = NULL;
678     source.fDataSize = 0;
679     source.fDataRead = 0;
680     return 0;
681     }
682
683 int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
684     {
685 // see header file for class documentation
686     int ret;
687     char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
688     if ( !tmpchar )
689         {
690         return ENOMEM;
691         }
692     gethostname( tmpchar, MAXHOSTNAMELEN );
693     tmpchar[MAXHOSTNAMELEN]=(char)0;
694     source.fHostname = tmpchar;
695
696     source.fShmID = shmget( shmKey, shmSize, 0660 );
697     if ( source.fShmID == -1 )
698         {
699         ret = errno;
700         delete [] source.fHostname;
701         return ret;
702         }
703     
704     source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
705
706     if ( !source.fShmPtr )
707         {
708         ret = errno;
709         shmctl( source.fShmID, IPC_RMID, NULL );
710         delete [] source.fHostname;
711         return ret;
712         }
713
714     source.fType = kShm;
715     source.fShmKey = shmKey;
716     source.fShmSize = shmSize;
717     source.fDataSize = 0;
718     source.fDataRead = 0;
719     return 0;
720     }
721
722 int AliHLTHOMERReader::AddDataSource( void* pBuffer, int size, DataSource& source )
723     {
724 // see header file for class documentation
725 // a buffer data source is like a shm source apart from the shm attach and detach
726 // procedure. Furthermore, the size indicator at the beginning of the buffer is not
727 // cleared right before sources are read but after the reading.
728     //int ret;
729     if ( !pBuffer || size<=0) return EINVAL;
730
731     char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
732     if ( !tmpchar )
733         {
734         return ENOMEM;
735         }
736     gethostname( tmpchar, MAXHOSTNAMELEN );
737     tmpchar[MAXHOSTNAMELEN]=(char)0;
738     source.fHostname = tmpchar;
739
740     source.fShmID = -1;
741     // the data buffer does not contain a size indicator in the first 4 bytes
742     // like the shm source buffer. Still we want to use the mechanism to invalidate/
743     // trigger by clearing the size indicator. Take the source.fShmSize variable.
744     source.fShmPtr = &source.fShmSize;
745     source.fType = kBuf;
746     source.fShmKey = 0;
747     source.fShmSize = size;
748     source.fData = pBuffer;
749     source.fDataSize = 0;
750     source.fDataRead = 0;
751     return 0;
752     }
753
754 void AliHLTHOMERReader::FreeDataSources()
755     {
756 // see header file for class documentation
757     for ( unsigned n=0; n < fDataSourceCnt; n++ )
758         {
759         if ( fDataSources[n].fType == kTCP )
760             FreeTCPDataSource( fDataSources[n] );
761         else if ( fDataSources[n].fType == kShm )
762             FreeShmDataSource( fDataSources[n] );
763         if ( fDataSources[n].fHostname )
764           delete [] fDataSources[n].fHostname;
765         }
766     fDataSourceCnt=0;
767     }
768
769 int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
770     {
771 // see header file for class documentation
772     if ( source.fShmPtr )
773       shmdt( (char*)source.fShmPtr );
774 //     if ( source.fShmID != -1 )
775 //      shmctl( source.fShmID, IPC_RMID, NULL );
776     return 0;
777     }
778
779 int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
780     {
781 // see header file for class documentation
782     if ( source.fTCPConnection )
783         close( source.fTCPConnection );
784     return 0;
785     }
786
787 int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
788     {
789 // see header file for class documentation
790     if ( fDataSourceCnt<=0 )
791         return ENXIO;
792     // Clean up currently active event.
793     ReleaseCurrentEvent();
794     int ret=0;
795     // Trigger all configured data sources
796     for ( unsigned n = 0; n<fDataSourceCnt; n++ ){
797       if ( fDataSources[n].fType == kTCP )
798         ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
799       else if ( fDataSources[n].fType == kShm )
800         ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
801       if ( ret )
802         {
803           fErrorConnection = n;
804           fConnectionStatus=ret;
805           return fConnectionStatus;
806         }
807     }
808     // Now read in data from the configured data source
809     ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
810     
811     if ( ret )
812       {
813         return ret;
814       }
815     ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
816     if ( ret )
817         {
818         return ret;
819         }
820 //     for ( unsigned n = 0; n<fDataSourceCnt; n++ )
821 //      {
822 //      if ( fDataSources[n].fType == kTCP )
823 //          ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
824 //      else
825 //          ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
826 //      if ( ret )
827 //          {
828 //          fErrorConnection = n;
829 //          fConnectionStatus=ret;
830 //          return fConnectionStatus;
831 //          }
832 //      }
833     //Check to see that all sources contributed data for the same event
834     homer_uint64 eventID;
835     homer_uint64 eventType;
836     if (!fDataSources[0].fData)
837       {
838         fErrorConnection = 0;
839         fConnectionStatus=56;//ENOBUF;
840         return fConnectionStatus;
841       }
842     eventID = GetSourceEventID( fDataSources[0] );
843     eventType = GetSourceEventType( fDataSources[0] );
844     for ( unsigned n = 1; n < fDataSourceCnt; n++ )
845         {
846         if ( !fDataSources[n].fData || GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
847             {
848             fErrorConnection = n;
849             fConnectionStatus=56;//EBADRQC;
850             return fConnectionStatus;
851             }
852         }
853     // Find all the different data blocks contained in the data from all
854     // the sources.
855     for ( unsigned n = 0; n < fDataSourceCnt; n++ )
856         {
857         ret = ParseSourceData( fDataSources[n] );
858         if ( ret )
859             {
860             fErrorConnection = n;
861             fConnectionStatus=57;//EBADSLT;
862             return ret;
863             }
864         }
865     fCurrentEventID = eventID;
866     fCurrentEventType = eventType;
867     return 0;
868     }
869
870 void AliHLTHOMERReader::ReleaseCurrentEvent()
871     {
872 // see header file for class documentation
873     // sources.fDataRead = 0;
874     // fMaxBlockCnt
875     fCurrentEventID = ~(homer_uint64)0;
876     fCurrentEventType = ~(homer_uint64)0;
877     for ( unsigned n = 0; n < fDataSourceCnt; n++ )
878         {
879         if ( fDataSources[n].fData )
880             {
881             if ( fDataSources[n].fType == kTCP )
882                 delete [] (homer_uint8*)fDataSources[n].fData;
883             // do not reset the data pointer for kBuf sources since this
884             // can not be set again.
885             if ( fDataSources[n].fType != kBuf )
886               fDataSources[n].fData = NULL;
887             }
888         fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
889         }
890     if ( fBlocks )
891         {
892         for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
893             {
894             if ( fBlocks[n].fOriginatingNodeID )
895                 delete [] fBlocks[n].fOriginatingNodeID;
896             }
897         delete [] fBlocks;
898         fBlocks=0;
899         fMaxBlockCnt = 0;
900         fBlockCnt=0;
901         }
902     }
903
904 int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
905     {
906 // see header file for class documentation
907     int ret=0;
908     struct timeval oldSndTO, newSndTO;
909     memset(&oldSndTO, 0, sizeof(oldSndTO));
910     memset(&newSndTO, 0, sizeof(newSndTO));
911     if ( useTimeout )
912         {
913         socklen_t optlen=sizeof(oldSndTO);
914         ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
915         if ( ret )
916             {
917             return errno;
918             }
919         if ( optlen!=sizeof(oldSndTO) )
920             {
921             return ENXIO;
922             }
923         newSndTO.tv_sec = timeoutUsec / 1000000;
924         newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
925         ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
926         if ( ret )
927             {
928             return errno;
929             }
930         }
931     // Send one event request
932     if ( !fEventRequestAdvanceTime )
933         {
934         ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
935         
936         if ( ret != (int)strlen(GET_ONE) )
937             {
938             ret=errno;
939             setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
940             return ret;
941             }
942         }
943     else
944         {
945         char tmpCmd[ 128 ];
946
947         int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
948         if ( len>128 || len<0 )
949             {
950             ret=EMSGSIZE;
951             setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
952             return ret;
953             }
954         
955         ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
956         
957         if ( ret != (int)strlen(tmpCmd) )
958             {
959             ret=errno;
960             setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
961             return ret;
962             }
963         
964         }
965     return 0;
966     }
967
968 int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long ) const
969     {
970 // see header file for class documentation
971 // clear the size indicator in the first 4 bytes of the buffer to request data
972 // from the HOMER writer.
973     if ( source.fShmPtr )
974         {
975         *(homer_uint32*)( source.fShmPtr ) = 0;
976         return 0;
977         }
978     else
979         return EFAULT;
980     }
981
982 int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
983     {
984 // see header file for class documentation
985     bool toRead = false;
986     do
987         {
988         fd_set conns;
989         FD_ZERO( &conns );
990         int highestConn=0;
991         toRead = false;
992         unsigned firstConnection=~(unsigned)0;
993         for ( unsigned long n = 0; n < sourceCnt; n++ )
994             {
995             if ( sources[n].fDataSize == 0 // size specifier not yet read
996                  || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
997                 {
998                 toRead = true;
999                 FD_SET( sources[n].fTCPConnection, &conns );
1000                 if ( sources[n].fTCPConnection > highestConn )
1001                     highestConn = sources[n].fTCPConnection;
1002                 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
1003                 if ( firstConnection == ~(unsigned)0 )
1004                     firstConnection = n;
1005                 }
1006             else
1007                 {
1008                 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
1009                 }
1010             }
1011         if ( toRead )
1012             {
1013             struct timeval tv, *ptv;
1014             if ( useTimeout )
1015                 {
1016                 tv.tv_sec = timeout / 1000000;
1017                 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1018                 ptv = &tv;
1019                 }
1020             else
1021                 ptv = NULL;
1022             // wait until something is ready to be read
1023             // either for timeout usecs or until eternity
1024             int ret;
1025             ret = select( highestConn+1, &conns, NULL, NULL, ptv ); 
1026             if ( ret <=0 )
1027                 {
1028                 fErrorConnection = firstConnection;
1029                 if ( errno )
1030                     fConnectionStatus = errno;
1031                 else
1032                     fConnectionStatus = ETIMEDOUT;
1033                 return fConnectionStatus;
1034                 }
1035             for ( unsigned n = 0; n < sourceCnt; n++ )
1036                 {
1037                 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
1038                     {
1039                     if ( sources[n].fDataSize == 0 )
1040                         {
1041                         ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
1042                         if ( ret != sizeof(homer_uint32) )
1043                             {
1044                             fErrorConnection = n;
1045                             if ( errno )
1046                                 fConnectionStatus = errno;
1047                             else
1048                                 fConnectionStatus = ENOMSG;
1049                             return fConnectionStatus;
1050                             }
1051                         sources[n].fDataSize = ntohl( sources[n].fDataSize );
1052                         sources[n].fDataRead = 0;
1053                         sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
1054                         if ( !sources[n].fData )
1055                             {
1056                             fErrorConnection = n;
1057                             fConnectionStatus = ENOMEM;
1058                             return fConnectionStatus;
1059                             }
1060                         }
1061                     else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
1062                         {
1063                         ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
1064                         if ( ret>0 )
1065                             sources[n].fDataRead += ret;
1066                         else if ( ret == 0 )
1067                             {
1068                             fErrorConnection = n;
1069                             fConnectionStatus = ECONNRESET;
1070                             return fConnectionStatus;
1071                             }
1072                         else
1073                             {
1074                             fErrorConnection = n;
1075                             fConnectionStatus = errno;
1076                             return fConnectionStatus;
1077                             }
1078                         }
1079                     else
1080                         {
1081                         fErrorConnection = n;
1082                         fConnectionStatus = ENXIO;
1083                         return fConnectionStatus;
1084                         }
1085                     }
1086                 }
1087             }
1088         }
1089     while ( toRead );
1090     return 0;
1091     }
1092
1093 /*
1094 int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
1095     {
1096 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
1097     // Send one event request
1098     ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
1099     if ( ret != strlen(GET_ONE) )
1100         {
1101         return errno;
1102         }
1103     // wait for and read back size specifier
1104     unsigned sizeNBO;
1105     // The value transmitted is binary, in network byte order
1106     ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1107     if ( ret != sizeof(sizeNBO) )
1108         {
1109         return errno;
1110         }
1111     // Convert back to host byte order
1112     source.fDataSize = ntohl( sizeNBO );
1113     source.fData = new homer_uint8[ source.fDataSize ];
1114     unsigned long dataRead=0, toRead;
1115     if ( !source.fData )
1116         {
1117         char buffer[1024];
1118         // Read in data into buffer in order not to block connection
1119         while ( dataRead < source.fDataSize )
1120             {
1121             if ( source.fDataSize-dataRead > 1024 )
1122                 toRead = 1024;
1123             else
1124                 toRead = source.fDataSize-dataRead;
1125             ret = read( source.fTCPConnection, buffer, toRead );
1126             if ( ret > 0 )
1127                 dataRead += ret;
1128             else
1129                 return errno;
1130             }
1131         return ENOMEM;
1132         }
1133     while ( dataRead < source.fDataSize )
1134         {
1135         toRead = source.fDataSize-dataRead;
1136         ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1137         if ( ret > 0 )
1138             dataRead += ret;
1139         else if ( ret == 0 && useTimeout )
1140             {
1141             struct timeval tv;
1142             tv.tv_sec = timeout / 1000000;
1143             tv.tv_usec = timeout - (tv.tv_sec*1000000);
1144             fd_set conns;
1145             FD_ZERO( &conns );
1146             FD_SET( source.fTCPConnection, &conns );
1147             ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1148             if ( ret <=0 )
1149                 return errno;
1150             }
1151         else if ( ret == 0 )
1152             {
1153             if ( errno == EOK )
1154                 return ECONNRESET;
1155             else
1156                 return errno;
1157             }
1158         else
1159             {
1160             return errno;
1161             }
1162         }
1163     return 0;
1164     }
1165 */
1166
1167 /*
1168 int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1169     {
1170     
1171     }
1172 */
1173
1174 int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1175     {
1176 // see header file for class documentation
1177     struct timeval tv1, tv2;
1178     bool found=false;
1179     bool all=true;
1180     if ( useTimeout )
1181         gettimeofday( &tv1, NULL );
1182     do
1183         {
1184         found = false;
1185         all = true;
1186         for ( unsigned n = 0; n < sourceCnt; n++ )
1187             {
1188             if ( !sources[n].fDataSize )
1189                 all = false;
1190             if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1191                 {
1192                 found = true;
1193                 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1194                 if (sources[n].fType==kBuf)
1195                   {
1196                     // the data buffer is already set to fData, just need to set fDataSize member
1197                     // to invalidate after the first reading. Subsequent calls to ReadNextEvent return 0
1198                     TriggerShmSource( sources[n], 0, 0 );
1199                   } else 
1200                   {
1201                     sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1202                   }
1203                 }
1204             }
1205         if ( found && useTimeout )
1206             gettimeofday( &tv1, NULL );
1207         if ( !all && useTimeout )
1208             {
1209             gettimeofday( &tv2, NULL );
1210             unsigned long long tdiff;
1211             tdiff = tv2.tv_sec-tv1.tv_sec;
1212             tdiff *= 1000000;
1213             tdiff += tv2.tv_usec-tv1.tv_usec;
1214             if ( tdiff > timeout )
1215                 return ETIMEDOUT;
1216             }
1217         if ( !all )
1218             usleep( 0 );
1219         }
1220     while ( !all );
1221     return 0;
1222     }
1223
1224 int AliHLTHOMERReader::ParseSourceData( const DataSource& source )
1225     {
1226 // see header file for class documentation
1227     if ( source.fData )
1228         {
1229         homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1230         if (sourceByteOrder!=kHOMERLittleEndianByteOrder && sourceByteOrder!=kHOMERBigEndianByteOrder) return EBADMSG;
1231         homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1232         // block count is not related to size of the data in the way the
1233         // following condition implies. But we can at least limit the block
1234         // count for the case the data is corrupted
1235         if (blockCnt>source.fDataSize) return EBADMSG;
1236         int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1237         if ( ret )
1238             return ret;
1239         homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1240         for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1241             {
1242             if (descrOffset+kLength_64b_Offset>=source.fDataSize) return EBADMSG;
1243             homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1244             unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1245             if (descrOffset+descrLen>=source.fDataSize) return EBADMSG;
1246             if (Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kID_64b_Offset ] ) != HOMER_BLOCK_DESCRIPTOR_TYPEID) return 126/*ENOKEY*/;
1247             fBlocks[fBlockCnt].fSource = source.fNdx;
1248             fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1249             fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1250             fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1251             struct in_addr tmpA;
1252             tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1253             char* addr = inet_ntoa( tmpA );
1254             unsigned straddrlen=strlen(addr);
1255             char* tmpchar = new char[ straddrlen+1 ];
1256             if ( !tmpchar )
1257                 return ENOMEM;
1258             strncpy( tmpchar, addr, straddrlen );
1259             tmpchar[straddrlen]=0;
1260             fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1261             descrOffset += descrLen;
1262             }
1263         return 0;
1264         }
1265     return EFAULT;
1266     }
1267         
1268 int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1269     {
1270 // see header file for class documentation
1271     DataBlock* newBlocks;
1272     newBlocks = new DataBlock[ newCnt ];
1273     if ( !newBlocks )
1274         return ENOMEM;
1275     unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1276     if ( fBlocks ) {
1277     memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1278     } else {
1279       fMaxBlockCnt=0;
1280     }
1281     if ( newCnt > fMaxBlockCnt )
1282         memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1283     if ( fBlocks )
1284         delete [] fBlocks;
1285     fBlocks = newBlocks;
1286     fMaxBlockCnt = newCnt;
1287     return 0;
1288     }
1289
1290 homer_uint64 AliHLTHOMERReader::GetSourceEventID( const DataSource& source )
1291     {
1292 // see header file for class documentation
1293     homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1294     return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1295     }
1296
1297 homer_uint64 AliHLTHOMERReader::GetSourceEventType( const DataSource& source )
1298     {
1299 // see header file for class documentation
1300     homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1301     return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1302     }
1303
1304 homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1305     {
1306 // see header file for class documentation
1307     if ( destFormat == sourceFormat )
1308         return source;
1309     else
1310         return ((source & 0xFFULL) << 56) | 
1311         ((source & 0xFF00ULL) << 40) | 
1312         ((source & 0xFF0000ULL) << 24) | 
1313         ((source & 0xFF000000ULL) << 8) | 
1314         ((source & 0xFF00000000ULL) >> 8) | 
1315         ((source & 0xFF0000000000ULL) >> 24) | 
1316         ((source & 0xFF000000000000ULL) >>  40) | 
1317         ((source & 0xFF00000000000000ULL) >> 56);
1318     }
1319
1320 homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1321     {
1322 // see header file for class documentation
1323     if ( destFormat == sourceFormat )
1324         return source;
1325     else
1326         return ((source & 0xFFUL) << 24) | 
1327         ((source & 0xFF00UL) << 8) | 
1328         ((source & 0xFF0000UL) >> 8) | 
1329         ((source & 0xFF000000UL) >> 24);
1330     }
1331
1332 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPort(const char* hostname, unsigned short port )
1333     {
1334       // see header file for function documentation
1335       return new AliHLTHOMERReader(hostname, port);
1336     }
1337
1338 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPorts(unsigned int tcpCnt, const char** hostnames, unsigned short* ports)
1339     {
1340       // see header file for function documentation
1341       return new AliHLTHOMERReader(tcpCnt, hostnames, ports);
1342     }
1343
1344 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromBuffer(const void* pBuffer, int size)
1345     {
1346       // see header file for function documentation
1347       return new AliHLTHOMERReader(pBuffer, size);
1348     }
1349
1350 void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1351     {
1352       // see header file for function documentation
1353       if (pInstance) delete pInstance;
1354     }
1355
1356 /*
1357 ***************************************************************************
1358 **
1359 ** $Author$ - Initial Version by Timm Morten Steinbeck
1360 **
1361 ** $Id$ 
1362 **
1363 ***************************************************************************
1364 */