classes renamed to follow coding conventions
[u/mrichter/AliRoot.git] / HLT / BASE / HOMER / AliHLTHOMERReader.cxx
1 /************************************************************************
2 **
3 **
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck, 
8 ** timm@kip.uni-heidelberg.de
9 **
10 **
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
15 **
16 **
17 ** Newer versions of this file's package will be made available from 
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/ 
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
20 **
21 *************************************************************************/
22
23 /*
24 ***************************************************************************
25 **
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
27 **
28 ** $Id$ 
29 **
30 ***************************************************************************
31 */
32
33 /** @file   AliHLTHOMERReader.cxx
34     @author Timm Steinbeck
35     @date   Sep 14 2007
36     @brief  HLT Online Monitoring Environment including ROOT - Reader
37     @note   migrated from PubSub HLT-stable-20070905.141318 (rev 2375)    */
38
39 // see header file for class documentation
40 // or
41 // refer to README to build package
42 // or
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
44
45 #include "AliHLTHOMERReader.h"
46 #include <stdio.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <netdb.h>
50 extern int h_errno;
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <netinet/in.h>
54 #include <netinet/tcp.h>
55 #include <unistd.h>
56 #include <rpc/types.h>
57 #include <fcntl.h>
58 #include <sys/stat.h>
59 #include <netinet/in.h>
60 #include <arpa/inet.h>
61 #ifdef USE_ROOT
62 #include <Rtypes.h>
63 #endif
64
65
66 #define MOD_BIN "MOD BIN\n"
67 #define MOD_ASC "MOD ASC\n"
68 #define GET_ONE "GET ONE\n"
69 #define GET_ALL "GET ALL\n"
70
71 #ifdef USE_ROOT
72 ClassImp(AliHLTMonitoringReader);
73 ClassImp(AliHLTHOMERReader);
74 #endif
75
76
77
78
79
80 #ifdef USE_ROOT
81 AliHLTHOMERReader::AliHLTHOMERReader()
82   :
83   fCurrentEventType(~(homer_uint64)0),
84   fCurrentEventID(~(homer_uint64)0),
85   fBlockCnt(0),
86   fMaxBlockCnt(0),
87   fBlocks(NULL),
88   fDataSourceCnt(0),
89   fTCPDataSourceCnt(0),
90   fShmDataSourceCnt(0),
91   fDataSourceMaxCnt(0),
92   fDataSources(NULL),
93   fConnectionStatus(0),
94   fErrorConnection(~(unsigned int)0),
95   fEventRequestAdvanceTime(0)
96     {
97 // Reader implementation of the HOMER interface.
98 // The HLT Monitoring Environment including ROOT is
99 // a native interface to ship out data from the HLT chain.
100 // See pdf document shiped with the package
101 // for class documentation and tutorial.
102     Init();
103     }
104 #endif
105
106
107 AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
108   :
109   AliHLTMonitoringReader(),
110   fCurrentEventType(~(homer_uint64)0),
111   fCurrentEventID(~(homer_uint64)0),
112   fBlockCnt(0),
113   fMaxBlockCnt(0),
114   fBlocks(NULL),
115   fDataSourceCnt(0),
116   fTCPDataSourceCnt(0),
117   fShmDataSourceCnt(0),
118   fDataSourceMaxCnt(0),
119   fDataSources(NULL),
120   fConnectionStatus(0),
121   fErrorConnection(~(unsigned int)0),
122   fEventRequestAdvanceTime(0)
123     {
124 // see header file for class documentation
125 // For reading from a TCP port
126     Init();
127     if ( !AllocDataSources(1) )
128         {
129         fErrorConnection = 0;
130         fConnectionStatus = ENOMEM;
131         return;
132         }
133     fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
134     if ( fConnectionStatus )
135         fErrorConnection = 0;
136     else
137         {
138         fDataSourceCnt++;
139         fTCPDataSourceCnt++;
140         fDataSources[0].fNdx = 0;
141         }
142     }
143
144 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
145   :
146   AliHLTMonitoringReader(),
147   fCurrentEventType(~(homer_uint64)0),
148   fCurrentEventID(~(homer_uint64)0),
149   fBlockCnt(0),
150   fMaxBlockCnt(0),
151   fBlocks(NULL),
152   fDataSourceCnt(0),
153   fTCPDataSourceCnt(0),
154   fShmDataSourceCnt(0),
155   fDataSourceMaxCnt(0),
156   fDataSources(NULL),
157   fConnectionStatus(0),
158   fErrorConnection(~(unsigned int)0),
159   fEventRequestAdvanceTime(0)
160     {
161 // see header file for class documentation
162 // For reading from multiple TCP ports
163     Init();
164     if ( !AllocDataSources(tcpCnt) )
165         {
166         fErrorConnection = 0;
167         fConnectionStatus = ENOMEM;
168         return;
169         }
170     for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
171         {
172         fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
173         if ( fConnectionStatus )
174             {
175             fErrorConnection = n;
176             return;
177             }
178         fDataSources[n].fNdx = n;
179         }
180     }
181
182 AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
183   :
184   AliHLTMonitoringReader(),
185   fCurrentEventType(~(homer_uint64)0),
186   fCurrentEventID(~(homer_uint64)0),
187   fBlockCnt(0),
188   fMaxBlockCnt(0),
189   fBlocks(NULL),
190   fDataSourceCnt(0),
191   fTCPDataSourceCnt(0),
192   fShmDataSourceCnt(0),
193   fDataSourceMaxCnt(0),
194   fDataSources(NULL),
195   fConnectionStatus(0),
196   fErrorConnection(~(unsigned int)0),
197   fEventRequestAdvanceTime(0)
198     {
199 // see header file for class documentation
200 // For reading from a System V shared memory segment
201     Init();
202     if ( !AllocDataSources(1) )
203         {
204         fErrorConnection = 0;
205         fConnectionStatus = ENOMEM;
206         return;
207         }
208     fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
209     if ( fConnectionStatus )
210         fErrorConnection = 0;
211     else
212         {
213         fDataSourceCnt++;
214         fShmDataSourceCnt++;
215         fDataSources[0].fNdx = 0;
216         }
217     }
218
219 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
220   :
221   AliHLTMonitoringReader(),
222   fCurrentEventType(~(homer_uint64)0),
223   fCurrentEventID(~(homer_uint64)0),
224   fBlockCnt(0),
225   fMaxBlockCnt(0),
226   fBlocks(NULL),
227   fDataSourceCnt(0),
228   fTCPDataSourceCnt(0),
229   fShmDataSourceCnt(0),
230   fDataSourceMaxCnt(0),
231   fDataSources(NULL),
232   fConnectionStatus(0),
233   fErrorConnection(~(unsigned int)0),
234   fEventRequestAdvanceTime(0)
235     {
236 // see header file for class documentation
237 // For reading from multiple System V shared memory segments
238     Init();
239     if ( !AllocDataSources(shmCnt) )
240         {
241         fErrorConnection = 0;
242         fConnectionStatus = ENOMEM;
243         return;
244         }
245     for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
246         {
247         fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
248         if ( fConnectionStatus )
249             {
250             fErrorConnection = n;
251             return;
252             }
253         fDataSources[n].fNdx = n;
254         }
255     }
256
257 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports, 
258                           unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
259   :
260   AliHLTMonitoringReader(),
261   fCurrentEventType(~(homer_uint64)0),
262   fCurrentEventID(~(homer_uint64)0),
263   fBlockCnt(0),
264   fMaxBlockCnt(0),
265   fBlocks(NULL),
266   fDataSourceCnt(0),
267   fTCPDataSourceCnt(0),
268   fShmDataSourceCnt(0),
269   fDataSourceMaxCnt(0),
270   fDataSources(NULL),
271   fConnectionStatus(0),
272   fErrorConnection(~(unsigned int)0),
273   fEventRequestAdvanceTime(0)
274     {
275 // see header file for class documentation
276 // For reading from multiple TCP ports and multiple System V shared memory segments
277     Init();
278     if ( !AllocDataSources(tcpCnt+shmCnt) )
279         {
280         fErrorConnection = 0;
281         fConnectionStatus = ENOMEM;
282         return;
283         }
284     for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
285         {
286         fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
287         if ( fConnectionStatus )
288             {
289             fErrorConnection = n;
290             return;
291             }
292         fDataSources[n].fNdx = n;
293         }
294     for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
295         {
296         fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
297         if ( fConnectionStatus )
298             {
299             fErrorConnection = tcpCnt+n;
300             return;
301             }
302         fDataSources[n].fNdx = n;
303         }
304     }
305 AliHLTHOMERReader::~AliHLTHOMERReader()
306     {
307 // see header file for class documentation
308     ReleaseCurrentEvent();
309     FreeDataSources();
310     }
311
312 int  AliHLTHOMERReader::ReadNextEvent()
313     {
314 // see header file for class documentation
315 // Read in the next available event
316     return ReadNextEvent( false, 0 );
317     }
318
319 int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
320     {
321 // see header file for class documentation
322 // Read in the next available event
323     return ReadNextEvent( true, timeout );
324     }
325
326 unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
327     {
328 // see header file for class documentation
329 // Return the size (in bytes) of the current event's data
330 // block with the given block index (starting at 0).
331     if ( ndx >= fBlockCnt )
332         return 0;
333     return fBlocks[ndx].fLength;
334     }
335
336 const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
337     {
338 // see header file for class documentation
339 // Return a pointer to the start of the current event's data
340 // block with the given block index (starting at 0).
341     if ( ndx >= fBlockCnt )
342         return NULL;
343     return fBlocks[ndx].fData;
344     }
345
346 const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
347     {
348 // see header file for class documentation
349 // Return IP address or hostname of node which sent the 
350 // current event's data block with the given block index 
351 // (starting at 0).
352 // For HOMER this is the ID of the node on which the subscriber 
353 // that provided this data runs/ran.
354     if ( ndx >= fBlockCnt )
355         return NULL;
356 #ifdef DEBUG
357     if ( fBlocks[ndx].fSource >= fDataSourceCnt )
358         {
359         fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
360                  __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
361         return NULL;
362         }
363 #endif
364     return fDataSources[ fBlocks[ndx].fSource ].fHostname;
365     //return fBlocks[ndx].fOriginatingNodeID;
366     }
367
368 homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
369     {
370 // see header file for class documentation
371 // Return byte order of the data stored in the 
372 // current event's data block with the given block index (starting at 0). 
373 //         0 is unknown alignment, 
374 //         1 ist little endian, 
375 //         2 is big endian. */
376     if ( ndx >= fBlockCnt )
377         return 0;
378     //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
379     return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
380     }
381
382 homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
383     {
384 // see header file for class documentation
385 // Return the alignment (in bytes) of the given datatype 
386 // in the data stored in the current event's data block
387 // with the given block index (starting at 0). 
388 // Possible values for the data type are
389 //         0: homer_uint64
390 //         1: homer_uint32
391 //         2: uin16
392 //         3: homer_uint8
393 //         4: double
394 //         5: float
395     if ( ndx >= fBlockCnt )
396         return 0;
397     if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
398         return 0;
399     //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
400     return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
401     }
402
403 homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
404     {
405 // see header file for class documentation
406     if ( ndx >= fBlockCnt )
407         return 0;
408     return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
409     }
410
411 /* HOMER specific */
412 /* Return the type of the data in the current event's data
413    block with the given block index (starting at 0). */
414 homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
415     {
416 // see header file for class documentation
417     if ( ndx >= fBlockCnt )
418         return ~(homer_uint64)0;
419     //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
420     return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
421     }
422
423 /* Return the origin of the data in the current event's data
424    block with the given block index (starting at 0). */
425 homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
426     {
427 // see header file for class documentation
428     if ( ndx >= fBlockCnt )
429         return ~(homer_uint32)0;
430     //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
431     return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
432     }
433
434 /* Return a specification of the data in the current event's data
435    block with the given block index (starting at 0). */
436 homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
437     {
438 // see header file for class documentation
439     if ( ndx >= fBlockCnt )
440         return ~(homer_uint32)0;
441     //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
442     return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
443     }
444
445 /* Find the next data block in the current event with the given
446    data type, origin, and specification. Returns the block's 
447    index. */
448 unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin, 
449                                          homer_uint32 spec, unsigned long startNdx ) const
450     {
451 // see header file for class documentation
452     for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
453         {
454         if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
455              ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
456              ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
457             return n;
458         }
459     return ~(unsigned long)0;
460     }
461
462 /* Find the next data block in the current event with the given
463    data type, origin, and specification. Returns the block's 
464    index. */
465 unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4], 
466                                          homer_uint32 spec, unsigned long startNdx ) const
467     {
468 // see header file for class documentation
469     for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
470         {
471         bool found1=true, found2=true;
472         for ( unsigned i = 0; i < 8; i++ )
473             {
474             if ( type[i] != (char)0xFF )
475                 {
476                 found1=false;
477                 break;
478                 }
479             }
480         if ( !found1 )
481             {
482             found1 = true;
483             for ( unsigned i = 0; i < 8; i++ )
484                 {
485                 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
486                 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
487                     {
488                     found1=false;
489                     break;
490                     }
491                 }
492             }
493         for ( unsigned i = 0; i < 4; i++ )
494             {
495             if ( origin[i] != (char)0xFF )
496                 {
497                 found2 = false;
498                 break;
499                 }
500             }
501         if ( !found2 )
502             {
503             found2 = true;
504             for ( unsigned i = 0; i < 4; i++ )
505                 {
506                 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
507                 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
508                     {
509                     found2=false;
510                     break;
511                     }
512                 }
513             }
514         //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
515         if ( found1 && found2 &&
516              ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
517             return n;
518         }
519     return ~(unsigned long)0;
520     }
521
522 /* Return the ID of the node that actually produced this data block.
523    This may be different from the node which sent the data to this
524    monitoring object as returned by GetBlockSendNodeID. */
525 const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
526     {
527 // see header file for class documentation
528     if ( ndx >= fBlockCnt )
529         return NULL;
530     return fBlocks[ndx].fOriginatingNodeID;
531     }
532
533
534 void AliHLTHOMERReader::Init()
535     {
536 // see header file for class documentation
537     fCurrentEventType = ~(homer_uint64)0;
538     fCurrentEventID = ~(homer_uint64)0;
539     fMaxBlockCnt = fBlockCnt = 0;
540     fBlocks = NULL;
541         
542     fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
543     fDataSources = NULL;
544
545         
546     fConnectionStatus = 0;
547     fErrorConnection = ~(unsigned int)0;
548
549     fEventRequestAdvanceTime = 0;
550     }
551         
552 bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
553     {
554 // see header file for class documentation
555     fDataSources = new DataSource[ sourceCnt ];
556     if ( !fDataSources )
557         return false;
558     fDataSourceCnt = 0;
559     fDataSourceMaxCnt = sourceCnt;
560     return true;
561     }
562
563 int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
564     {
565 // see header file for class documentation
566     struct hostent* he;
567     he = gethostbyname( hostname );
568     if ( he == NULL )
569         {
570         //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
571         return EADDRNOTAVAIL;
572         }
573
574     struct sockaddr_in remoteAddr;
575     remoteAddr.sin_family = AF_INET;    // host byte order 
576     remoteAddr.sin_port = htons(port);  // short, network byte order 
577     remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
578     memset(&(remoteAddr.sin_zero), '\0', 8);  // zero the rest of the struct
579
580     // Create socket and connect to target program on remote node
581     source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
582     if ( source.fTCPConnection == -1 )
583         {
584         return errno;
585         }
586
587     int ret;
588
589     ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
590     if ( ret == -1 )
591         {
592         ret=errno;
593         close( source.fTCPConnection );
594         return ret;
595         } 
596
597     ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
598     if ( ret != (int)strlen(MOD_BIN) )
599         {
600         ret=errno;
601         close( source.fTCPConnection );
602         return ret;
603         }
604
605     char* tmpchar = new char[ strlen( hostname )+1 ];
606     if ( !tmpchar )
607         {
608         close( source.fTCPConnection );
609         return ENOMEM;
610         }
611     strcpy( tmpchar, hostname );
612     source.fHostname = tmpchar;
613
614     source.fType = kTCP;
615     source.fTCPPort = port;
616     source.fData = NULL;
617     source.fDataSize = 0;
618     source.fDataRead = 0;
619     return 0;
620     }
621
622 int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
623     {
624 // see header file for class documentation
625     int ret;
626     char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
627     if ( !tmpchar )
628         {
629         return ENOMEM;
630         }
631     gethostname( tmpchar, MAXHOSTNAMELEN );
632     tmpchar[MAXHOSTNAMELEN]=(char)0;
633     source.fHostname = tmpchar;
634
635     source.fShmID = shmget( shmKey, shmSize, 0660 );
636     if ( source.fShmID == -1 )
637         {
638         ret = errno;
639         delete [] source.fHostname;
640         return ret;
641         }
642     
643     source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
644
645     if ( !source.fShmPtr )
646         {
647         ret = errno;
648         shmctl( source.fShmID, IPC_RMID, NULL );
649         delete [] source.fHostname;
650         return ret;
651         }
652
653     source.fType = kShm;
654     source.fShmKey = shmKey;
655     source.fShmSize = shmSize;
656     source.fDataSize = 0;
657     source.fDataRead = 0;
658     return 0;
659     }
660
661 void AliHLTHOMERReader::FreeDataSources()
662     {
663 // see header file for class documentation
664     for ( unsigned n=0; n < fDataSourceCnt; n++ )
665         {
666         if ( fDataSources[n].fType == kTCP )
667             FreeTCPDataSource( fDataSources[n] );
668         else
669             FreeShmDataSource( fDataSources[n] );
670         }
671     }
672
673 int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
674     {
675 // see header file for class documentation
676     if ( source.fShmPtr )
677         shmdt( source.fShmPtr );
678 //     if ( source.fShmID != -1 )
679 //      shmctl( source.fShmID, IPC_RMID, NULL );
680     if ( source.fHostname )
681         delete [] source.fHostname;
682     return 0;
683     }
684
685 int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
686     {
687 // see header file for class documentation
688     if ( source.fTCPConnection )
689         close( source.fTCPConnection );
690     if ( source.fHostname )
691         delete [] source.fHostname;
692     return 0;
693     }
694
695 int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
696     {
697 // see header file for class documentation
698     if ( fDataSourceCnt<=0 )
699         return ENXIO;
700     // Clean up currently active event.
701     ReleaseCurrentEvent();
702     int ret;
703     // Trigger all configured data sources
704     for ( unsigned n = 0; n<fDataSourceCnt; n++ )
705         {
706         if ( fDataSources[n].fType == kTCP )
707             ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
708         else
709             ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
710         if ( ret )
711             {
712             fErrorConnection = n;
713             fConnectionStatus=ret;
714             return fConnectionStatus;
715             }
716         }
717     // Now read in data from the configured data source
718     ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
719     if ( ret )
720         {
721         return ret;
722         }
723     ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
724     if ( ret )
725         {
726         return ret;
727         }
728 //     for ( unsigned n = 0; n<fDataSourceCnt; n++ )
729 //      {
730 //      if ( fDataSources[n].fType == kTCP )
731 //          ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
732 //      else
733 //          ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
734 //      if ( ret )
735 //          {
736 //          fErrorConnection = n;
737 //          fConnectionStatus=ret;
738 //          return fConnectionStatus;
739 //          }
740 //      }
741     //Check to see that all sources contributed data for the same event
742     homer_uint64 eventID;
743     homer_uint64 eventType;
744     eventID = GetSourceEventID( fDataSources[0] );
745     eventType = GetSourceEventType( fDataSources[0] );
746     for ( unsigned n = 1; n < fDataSourceCnt; n++ )
747         {
748         if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
749             {
750             fErrorConnection = n;
751             fConnectionStatus=56;//EBADRQC;
752             return fConnectionStatus;
753             }
754         }
755     // Find all the different data blocks contained in the data from all
756     // the sources.
757     for ( unsigned n = 0; n < fDataSourceCnt; n++ )
758         {
759         ret = ParseSourceData( fDataSources[n] );
760         if ( ret )
761             {
762             fErrorConnection = n;
763             fConnectionStatus=57;//EBADSLT;
764             return fConnectionStatus;
765             }
766         }
767     fCurrentEventID = eventID;
768     fCurrentEventType = eventType;
769     return 0;
770     }
771
772 void AliHLTHOMERReader::ReleaseCurrentEvent()
773     {
774 // see header file for class documentation
775     // sources.fDataRead = 0;
776     // fMaxBlockCnt
777     fCurrentEventID = ~(homer_uint64)0;
778     fCurrentEventType = ~(homer_uint64)0;
779     for ( unsigned n = 0; n < fDataSourceCnt; n++ )
780         {
781         if ( fDataSources[n].fData )
782             {
783             if ( fDataSources[n].fType == kTCP )
784                 delete [] (homer_uint8*)fDataSources[n].fData;
785             fDataSources[n].fData = NULL;
786             }
787         fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
788         }
789     if ( fBlocks )
790         {
791         for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
792             {
793             if ( fBlocks[n].fOriginatingNodeID )
794                 delete [] fBlocks[n].fOriginatingNodeID;
795             }
796         delete [] fBlocks;
797         fBlocks=0;
798         fMaxBlockCnt = 0;
799         fBlockCnt=0;
800         }
801     }
802
803 int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
804     {
805 // see header file for class documentation
806     int ret;
807     struct timeval oldSndTO, newSndTO;
808     if ( useTimeout )
809         {
810         socklen_t optlen=sizeof(oldSndTO);
811         ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
812         if ( ret )
813             {
814             return errno;
815             }
816         if ( optlen!=sizeof(oldSndTO) )
817             {
818             return ENXIO;
819             }
820         newSndTO.tv_sec = timeoutUsec / 1000000;
821         newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
822         ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
823         if ( ret )
824             {
825             return errno;
826             }
827         }
828     // Send one event request
829     if ( !fEventRequestAdvanceTime )
830         {
831         ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
832         
833         if ( ret != (int)strlen(GET_ONE) )
834             {
835             ret=errno;
836             setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
837             return ret;
838             }
839         }
840     else
841         {
842         char tmpCmd[ 128 ];
843
844         int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
845         if ( len>128 || len<0 )
846             {
847             ret=EMSGSIZE;
848             setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
849             return ret;
850             }
851         
852         ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
853         
854         if ( ret != (int)strlen(tmpCmd) )
855             {
856             ret=errno;
857             setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
858             return ret;
859             }
860         
861         }
862     return 0;
863     }
864
865 int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long )
866     {
867 // see header file for class documentation
868 // clear the size indicator in the first 4 bytes of the buffer to request data
869 // from the HOMER writer.
870     if ( source.fShmPtr )
871         {
872         *(homer_uint32*)( source.fShmPtr ) = 0;
873         return 0;
874         }
875     else
876         return EFAULT;
877     }
878
879 int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
880     {
881 // see header file for class documentation
882     bool toRead = false;
883     do
884         {
885         fd_set conns;
886         FD_ZERO( &conns );
887         int highestConn=0;
888         toRead = false;
889         unsigned firstConnection=~(unsigned)0;
890         for ( unsigned long n = 0; n < sourceCnt; n++ )
891             {
892             if ( sources[n].fDataSize == 0 // size specifier not yet read
893                  || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
894                 {
895                 toRead = true;
896                 FD_SET( sources[n].fTCPConnection, &conns );
897                 if ( sources[n].fTCPConnection > highestConn )
898                     highestConn = sources[n].fTCPConnection;
899                 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
900                 if ( firstConnection == ~(unsigned)0 )
901                     firstConnection = n;
902                 }
903             else
904                 {
905                 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
906                 }
907             }
908         if ( toRead )
909             {
910             struct timeval tv, *ptv;
911             if ( useTimeout )
912                 {
913                 tv.tv_sec = timeout / 1000000;
914                 tv.tv_usec = timeout - (tv.tv_sec*1000000);
915                 ptv = &tv;
916                 }
917             else
918                 ptv = NULL;
919             // wait until something is ready to be read
920             // either for timeout usecs or until eternity
921             int ret;
922             ret = select( highestConn+1, &conns, NULL, NULL, ptv ); 
923             if ( ret <=0 )
924                 {
925                 fErrorConnection = firstConnection;
926                 if ( errno )
927                     fConnectionStatus = errno;
928                 else
929                     fConnectionStatus = ETIMEDOUT;
930                 return fConnectionStatus;
931                 }
932             for ( unsigned n = 0; n < sourceCnt; n++ )
933                 {
934                 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
935                     {
936                     if ( sources[n].fDataSize == 0 )
937                         {
938                         ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
939                         if ( ret != sizeof(homer_uint32) )
940                             {
941                             fErrorConnection = n;
942                             if ( errno )
943                                 fConnectionStatus = errno;
944                             else
945                                 fConnectionStatus = ENOMSG;
946                             return fConnectionStatus;
947                             }
948                         sources[n].fDataSize = ntohl( sources[n].fDataSize );
949                         sources[n].fDataRead = 0;
950                         sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
951                         if ( !sources[n].fData )
952                             {
953                             fErrorConnection = n;
954                             fConnectionStatus = ENOMEM;
955                             return fConnectionStatus;
956                             }
957                         }
958                     else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
959                         {
960                         ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
961                         if ( ret>0 )
962                             sources[n].fDataRead += ret;
963                         else if ( ret == 0 )
964                             {
965                             fErrorConnection = n;
966                             fConnectionStatus = ECONNRESET;
967                             return fConnectionStatus;
968                             }
969                         else
970                             {
971                             fErrorConnection = n;
972                             fConnectionStatus = errno;
973                             return fConnectionStatus;
974                             }
975                         }
976                     else
977                         {
978                         fErrorConnection = n;
979                         fConnectionStatus = ENXIO;
980                         return fConnectionStatus;
981                         }
982                     }
983                 }
984             }
985         }
986     while ( toRead );
987     return 0;
988     }
989
990 /*
991 int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
992     {
993 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
994     // Send one event request
995     ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
996     if ( ret != strlen(GET_ONE) )
997         {
998         return errno;
999         }
1000     // wait for and read back size specifier
1001     unsigned sizeNBO;
1002     // The value transmitted is binary, in network byte order
1003     ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1004     if ( ret != sizeof(sizeNBO) )
1005         {
1006         return errno;
1007         }
1008     // Convert back to host byte order
1009     source.fDataSize = ntohl( sizeNBO );
1010     source.fData = new homer_uint8[ source.fDataSize ];
1011     unsigned long dataRead=0, toRead;
1012     if ( !source.fData )
1013         {
1014         char buffer[1024];
1015         // Read in data into buffer in order not to block connection
1016         while ( dataRead < source.fDataSize )
1017             {
1018             if ( source.fDataSize-dataRead > 1024 )
1019                 toRead = 1024;
1020             else
1021                 toRead = source.fDataSize-dataRead;
1022             ret = read( source.fTCPConnection, buffer, toRead );
1023             if ( ret > 0 )
1024                 dataRead += ret;
1025             else
1026                 return errno;
1027             }
1028         return ENOMEM;
1029         }
1030     while ( dataRead < source.fDataSize )
1031         {
1032         toRead = source.fDataSize-dataRead;
1033         ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1034         if ( ret > 0 )
1035             dataRead += ret;
1036         else if ( ret == 0 && useTimeout )
1037             {
1038             struct timeval tv;
1039             tv.tv_sec = timeout / 1000000;
1040             tv.tv_usec = timeout - (tv.tv_sec*1000000);
1041             fd_set conns;
1042             FD_ZERO( &conns );
1043             FD_SET( source.fTCPConnection, &conns );
1044             ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1045             if ( ret <=0 )
1046                 return errno;
1047             }
1048         else if ( ret == 0 )
1049             {
1050             if ( errno == EOK )
1051                 return ECONNRESET;
1052             else
1053                 return errno;
1054             }
1055         else
1056             {
1057             return errno;
1058             }
1059         }
1060     return 0;
1061     }
1062 */
1063
1064 /*
1065 int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1066     {
1067     
1068     }
1069 */
1070
1071 int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1072     {
1073 // see header file for class documentation
1074     struct timeval tv1, tv2;
1075     bool found=false;
1076     bool all=true;
1077     if ( useTimeout )
1078         gettimeofday( &tv1, NULL );
1079     do
1080         {
1081         found = false;
1082         all = true;
1083         for ( unsigned n = 0; n < sourceCnt; n++ )
1084             {
1085             if ( !sources[n].fDataSize )
1086                 all = false;
1087             if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1088                 {
1089                 found = true;
1090                 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1091                 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1092                 }
1093             }
1094         if ( found && useTimeout )
1095             gettimeofday( &tv1, NULL );
1096         if ( !all && useTimeout )
1097             {
1098             gettimeofday( &tv2, NULL );
1099             unsigned long long tdiff;
1100             tdiff = tv2.tv_sec-tv1.tv_sec;
1101             tdiff *= 1000000;
1102             tdiff += tv2.tv_usec-tv1.tv_usec;
1103             if ( tdiff > timeout )
1104                 return ETIMEDOUT;
1105             }
1106         if ( !all )
1107             usleep( 0 );
1108         }
1109     while ( !all );
1110     return 0;
1111     }
1112
1113 int AliHLTHOMERReader::ParseSourceData( DataSource& source )
1114     {
1115 // see header file for class documentation
1116     if ( source.fData )
1117         {
1118         homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1119         homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1120         int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1121         if ( ret )
1122             return ret;
1123         homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1124         for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1125             {
1126             homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1127             unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1128             fBlocks[fBlockCnt].fSource = source.fNdx;
1129             fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1130             fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1131             fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1132             struct in_addr tmpA;
1133             tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1134             char* addr = inet_ntoa( tmpA );
1135             char* tmpchar = new char[ strlen(addr)+1 ];
1136             if ( !tmpchar )
1137                 return ENOMEM;
1138             strcpy( tmpchar, addr );
1139             fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1140             descrOffset += descrLen;
1141             }
1142         return 0;
1143         }
1144     return EFAULT;
1145     }
1146         
1147 int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1148     {
1149 // see header file for class documentation
1150     DataBlock* newBlocks;
1151     newBlocks = new DataBlock[ newCnt ];
1152     if ( !newBlocks )
1153         return ENOMEM;
1154     unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1155     memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1156     if ( newCnt > fMaxBlockCnt )
1157         memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1158     if ( fBlocks )
1159         delete [] fBlocks;
1160     fBlocks = newBlocks;
1161     fMaxBlockCnt = newCnt;
1162     return 0;
1163     }
1164
1165 homer_uint64 AliHLTHOMERReader::GetSourceEventID( DataSource& source )
1166     {
1167 // see header file for class documentation
1168     homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1169     return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1170     }
1171
1172 homer_uint64 AliHLTHOMERReader::GetSourceEventType( DataSource& source )
1173     {
1174 // see header file for class documentation
1175     homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1176     return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1177     }
1178
1179 homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1180     {
1181 // see header file for class documentation
1182     if ( destFormat == sourceFormat )
1183         return source;
1184     else
1185         return ((source & 0xFFULL) << 56) | 
1186         ((source & 0xFF00ULL) << 40) | 
1187         ((source & 0xFF0000ULL) << 24) | 
1188         ((source & 0xFF000000ULL) << 8) | 
1189         ((source & 0xFF00000000ULL) >> 8) | 
1190         ((source & 0xFF0000000000ULL) >> 24) | 
1191         ((source & 0xFF000000000000ULL) >>  40) | 
1192         ((source & 0xFF00000000000000ULL) >> 56);
1193     }
1194
1195 homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1196     {
1197 // see header file for class documentation
1198     if ( destFormat == sourceFormat )
1199         return source;
1200     else
1201         return ((source & 0xFFUL) << 24) | 
1202         ((source & 0xFF00UL) << 8) | 
1203         ((source & 0xFF0000UL) >> 8) | 
1204         ((source & 0xFF000000UL) >> 24);
1205     }
1206
1207 AliHLTHOMERReader* AliHLTHOMERReaderCreate(const void* pBuffer, int size)
1208     {
1209 // see header file for function documentation
1210       //return new AliHLTHOMERReader(pBuffer, size);
1211       return NULL;
1212     }
1213
1214 void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1215     {
1216 // see header file for function documentation
1217       if (pInstance) delete pInstance;
1218     }
1219
1220 /*
1221 ***************************************************************************
1222 **
1223 ** $Author$ - Initial Version by Timm Morten Steinbeck
1224 **
1225 ** $Id$ 
1226 **
1227 ***************************************************************************
1228 */