15676c1e152d6a0cbce1af4e2cbce7e90ddcc646
[u/mrichter/AliRoot.git] / HLT / BASE / HOMER / AliHLTHOMERReader.cxx
1 /************************************************************************
2 **
3 **
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck, 
8 ** timm@kip.uni-heidelberg.de
9 **
10 **
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
15 **
16 **
17 ** Newer versions of this file's package will be made available from 
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/ 
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
20 **
21 *************************************************************************/
22
23 /*
24 ***************************************************************************
25 **
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
27 **
28 ** $Id$ 
29 **
30 ***************************************************************************
31 */
32
33 /** @file   AliHLTHOMERReader.cxx
34     @author Timm Steinbeck
35     @date   Sep 14 2007
36     @brief  HLT Online Monitoring Environment including ROOT - Reader
37     @note   migrated from PubSub HLT-stable-20070905.141318 (rev 2375)    */
38
39 // see header file for class documentation
40 // or
41 // refer to README to build package
42 // or
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
44
45 #include "AliHLTHOMERReader.h"
46 #include <stdio.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <netdb.h>
50 extern int h_errno;
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <netinet/in.h>
54 #include <netinet/tcp.h>
55 #include <unistd.h>
56 #include <rpc/types.h>
57 #include <fcntl.h>
58 #include <sys/stat.h>
59 #include <netinet/in.h>
60 #include <arpa/inet.h>
61 #ifdef USE_ROOT
62 #include <Rtypes.h>
63 #endif
64
65
66 #define MOD_BIN "MOD BIN\n"
67 #define MOD_ASC "MOD ASC\n"
68 #define GET_ONE "GET ONE\n"
69 #define GET_ALL "GET ALL\n"
70
71 #ifdef USE_ROOT
72 ClassImp(MonitoringReader);
73 ClassImp(HOMERReader);
74 #endif
75
76
77
78
79
80 #ifdef USE_ROOT
81 HOMERReader::HOMERReader()
82   :
83   fCurrentEventType(~(homer_uint64)0),
84   fCurrentEventID(~(homer_uint64)0),
85   fBlockCnt(0),
86   fMaxBlockCnt(0),
87   fBlocks(NULL),
88   fDataSourceCnt(0),
89   fTCPDataSourceCnt(0),
90   fShmDataSourceCnt(0),
91   fDataSourceMaxCnt(0),
92   fDataSources(NULL),
93   fConnectionStatus(0),
94   fErrorConnection(~(unsigned int)0),
95   fEventRequestAdvanceTime(0)
96     {
97 // Reader implementation of the HOMER interface.
98 // The HLT Monitoring Environment including ROOT is
99 // a native interface to ship out data from the HLT chain.
100 // See pdf document shiped with the package
101 // for class documentation and tutorial.
102     Init();
103     }
104 #endif
105
106
107 HOMERReader::HOMERReader( const char* hostname, unsigned short port )
108   :
109   MonitoringReader(),
110   fCurrentEventType(~(homer_uint64)0),
111   fCurrentEventID(~(homer_uint64)0),
112   fBlockCnt(0),
113   fMaxBlockCnt(0),
114   fBlocks(NULL),
115   fDataSourceCnt(0),
116   fTCPDataSourceCnt(0),
117   fShmDataSourceCnt(0),
118   fDataSourceMaxCnt(0),
119   fDataSources(NULL),
120   fConnectionStatus(0),
121   fErrorConnection(~(unsigned int)0),
122   fEventRequestAdvanceTime(0)
123     {
124 // see header file for class documentation
125 // For reading from a TCP port
126     Init();
127     if ( !AllocDataSources(1) )
128         {
129         fErrorConnection = 0;
130         fConnectionStatus = ENOMEM;
131         return;
132         }
133     fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
134     if ( fConnectionStatus )
135         fErrorConnection = 0;
136     else
137         {
138         fDataSourceCnt++;
139         fTCPDataSourceCnt++;
140         fDataSources[0].fNdx = 0;
141         }
142     }
143
144 HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
145   :
146   MonitoringReader(),
147   fCurrentEventType(~(homer_uint64)0),
148   fCurrentEventID(~(homer_uint64)0),
149   fBlockCnt(0),
150   fMaxBlockCnt(0),
151   fBlocks(NULL),
152   fDataSourceCnt(0),
153   fTCPDataSourceCnt(0),
154   fShmDataSourceCnt(0),
155   fDataSourceMaxCnt(0),
156   fDataSources(NULL),
157   fConnectionStatus(0),
158   fErrorConnection(~(unsigned int)0),
159   fEventRequestAdvanceTime(0)
160     {
161 // see header file for class documentation
162 // For reading from multiple TCP ports
163     Init();
164     if ( !AllocDataSources(tcpCnt) )
165         {
166         fErrorConnection = 0;
167         fConnectionStatus = ENOMEM;
168         return;
169         }
170     for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
171         {
172         fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
173         if ( fConnectionStatus )
174             {
175             fErrorConnection = n;
176             return;
177             }
178         fDataSources[n].fNdx = n;
179         }
180     }
181
182 HOMERReader::HOMERReader( key_t shmKey, int shmSize )
183   :
184   MonitoringReader(),
185   fCurrentEventType(~(homer_uint64)0),
186   fCurrentEventID(~(homer_uint64)0),
187   fBlockCnt(0),
188   fMaxBlockCnt(0),
189   fBlocks(NULL),
190   fDataSourceCnt(0),
191   fTCPDataSourceCnt(0),
192   fShmDataSourceCnt(0),
193   fDataSourceMaxCnt(0),
194   fDataSources(NULL),
195   fConnectionStatus(0),
196   fErrorConnection(~(unsigned int)0),
197   fEventRequestAdvanceTime(0)
198     {
199 // see header file for class documentation
200 // For reading from a System V shared memory segment
201     Init();
202     if ( !AllocDataSources(1) )
203         {
204         fErrorConnection = 0;
205         fConnectionStatus = ENOMEM;
206         return;
207         }
208     fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
209     if ( fConnectionStatus )
210         fErrorConnection = 0;
211     else
212         {
213         fDataSourceCnt++;
214         fShmDataSourceCnt++;
215         fDataSources[0].fNdx = 0;
216         }
217     }
218
219 HOMERReader::HOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
220   :
221   MonitoringReader(),
222   fCurrentEventType(~(homer_uint64)0),
223   fCurrentEventID(~(homer_uint64)0),
224   fBlockCnt(0),
225   fMaxBlockCnt(0),
226   fBlocks(NULL),
227   fDataSourceCnt(0),
228   fTCPDataSourceCnt(0),
229   fShmDataSourceCnt(0),
230   fDataSourceMaxCnt(0),
231   fDataSources(NULL),
232   fConnectionStatus(0),
233   fErrorConnection(~(unsigned int)0),
234   fEventRequestAdvanceTime(0)
235     {
236 // see header file for class documentation
237 // For reading from multiple System V shared memory segments
238     Init();
239     if ( !AllocDataSources(shmCnt) )
240         {
241         fErrorConnection = 0;
242         fConnectionStatus = ENOMEM;
243         return;
244         }
245     for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
246         {
247         fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
248         if ( fConnectionStatus )
249             {
250             fErrorConnection = n;
251             return;
252             }
253         fDataSources[n].fNdx = n;
254         }
255     }
256
257 HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports, 
258                           unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
259   :
260   MonitoringReader(),
261   fCurrentEventType(~(homer_uint64)0),
262   fCurrentEventID(~(homer_uint64)0),
263   fBlockCnt(0),
264   fMaxBlockCnt(0),
265   fBlocks(NULL),
266   fDataSourceCnt(0),
267   fTCPDataSourceCnt(0),
268   fShmDataSourceCnt(0),
269   fDataSourceMaxCnt(0),
270   fDataSources(NULL),
271   fConnectionStatus(0),
272   fErrorConnection(~(unsigned int)0),
273   fEventRequestAdvanceTime(0)
274     {
275 // see header file for class documentation
276 // For reading from multiple TCP ports and multiple System V shared memory segments
277     Init();
278     if ( !AllocDataSources(tcpCnt+shmCnt) )
279         {
280         fErrorConnection = 0;
281         fConnectionStatus = ENOMEM;
282         return;
283         }
284     for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
285         {
286         fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
287         if ( fConnectionStatus )
288             {
289             fErrorConnection = n;
290             return;
291             }
292         fDataSources[n].fNdx = n;
293         }
294     for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
295         {
296         fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
297         if ( fConnectionStatus )
298             {
299             fErrorConnection = tcpCnt+n;
300             return;
301             }
302         fDataSources[n].fNdx = n;
303         }
304     }
305 HOMERReader::~HOMERReader()
306     {
307 // see header file for class documentation
308     ReleaseCurrentEvent();
309     FreeDataSources();
310     }
311
312 int  HOMERReader::ReadNextEvent()
313     {
314 // see header file for class documentation
315 // Read in the next available event
316     return ReadNextEvent( false, 0 );
317     }
318
319 int HOMERReader::ReadNextEvent( unsigned long timeout )
320     {
321 // see header file for class documentation
322 // Read in the next available event
323     return ReadNextEvent( true, timeout );
324     }
325
326 unsigned long HOMERReader::GetBlockDataLength( unsigned long ndx ) const
327     {
328 // see header file for class documentation
329 // Return the size (in bytes) of the current event's data
330 // block with the given block index (starting at 0).
331     if ( ndx >= fBlockCnt )
332         return 0;
333     return fBlocks[ndx].fLength;
334     }
335
336 const void* HOMERReader::GetBlockData( unsigned long ndx ) const
337     {
338 // see header file for class documentation
339 // Return a pointer to the start of the current event's data
340 // block with the given block index (starting at 0).
341     if ( ndx >= fBlockCnt )
342         return NULL;
343     return fBlocks[ndx].fData;
344     }
345
346 const char* HOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
347     {
348 // see header file for class documentation
349 // Return IP address or hostname of node which sent the 
350 // current event's data block with the given block index 
351 // (starting at 0).
352 // For HOMER this is the ID of the node on which the subscriber 
353 // that provided this data runs/ran.
354     if ( ndx >= fBlockCnt )
355         return NULL;
356 #ifdef DEBUG
357     if ( fBlocks[ndx].fSource >= fDataSourceCnt )
358         {
359         fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
360                  __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
361         return NULL;
362         }
363 #endif
364     return fDataSources[ fBlocks[ndx].fSource ].fHostname;
365     //return fBlocks[ndx].fOriginatingNodeID;
366     }
367
368 homer_uint8 HOMERReader::GetBlockByteOrder( unsigned long ndx ) const
369     {
370 // see header file for class documentation
371 // Return byte order of the data stored in the 
372 // current event's data block with the given block index (starting at 0). 
373 //         0 is unknown alignment, 
374 //         1 ist little endian, 
375 //         2 is big endian. */
376     if ( ndx >= fBlockCnt )
377         return 0;
378     //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
379     return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
380     }
381
382 homer_uint8 HOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
383     {
384 // see header file for class documentation
385 // Return the alignment (in bytes) of the given datatype 
386 // in the data stored in the current event's data block
387 // with the given block index (starting at 0). 
388 // Possible values for the data type are
389 //         0: homer_uint64
390 //         1: homer_uint32
391 //         2: uin16
392 //         3: homer_uint8
393 //         4: double
394 //         5: float
395     if ( ndx >= fBlockCnt )
396         return 0;
397     if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
398         return 0;
399     //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
400     return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
401     }
402
403 homer_uint64 HOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
404     {
405 // see header file for class documentation
406     if ( ndx >= fBlockCnt )
407         return 0;
408     return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
409     }
410
411 /* HOMER specific */
412 /* Return the type of the data in the current event's data
413    block with the given block index (starting at 0). */
414 homer_uint64 HOMERReader::GetBlockDataType( unsigned long ndx ) const
415     {
416 // see header file for class documentation
417     if ( ndx >= fBlockCnt )
418         return ~(homer_uint64)0;
419     //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
420     return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
421     }
422
423 /* Return the origin of the data in the current event's data
424    block with the given block index (starting at 0). */
425 homer_uint32 HOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
426     {
427 // see header file for class documentation
428     if ( ndx >= fBlockCnt )
429         return ~(homer_uint32)0;
430     //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
431     return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
432     }
433
434 /* Return a specification of the data in the current event's data
435    block with the given block index (starting at 0). */
436 homer_uint32 HOMERReader::GetBlockDataSpec( unsigned long ndx ) const
437     {
438 // see header file for class documentation
439     if ( ndx >= fBlockCnt )
440         return ~(homer_uint32)0;
441     //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
442     return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
443     }
444
445 /* Find the next data block in the current event with the given
446    data type, origin, and specification. Returns the block's 
447    index. */
448 unsigned long HOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin, 
449                                          homer_uint32 spec, unsigned long startNdx ) const
450     {
451 // see header file for class documentation
452     for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
453         {
454         if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
455              ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
456              ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
457             return n;
458         }
459     return ~(unsigned long)0;
460     }
461
462 /* Find the next data block in the current event with the given
463    data type, origin, and specification. Returns the block's 
464    index. */
465 unsigned long HOMERReader::FindBlockNdx( char type[8], char origin[4], 
466                                          homer_uint32 spec, unsigned long startNdx ) const
467     {
468 // see header file for class documentation
469     for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
470         {
471         bool found1=true, found2=true;
472         for ( unsigned i = 0; i < 8; i++ )
473             {
474             if ( type[i] != (char)0xFF )
475                 {
476                 found1=false;
477                 break;
478                 }
479             }
480         if ( !found1 )
481             {
482             found1 = true;
483             for ( unsigned i = 0; i < 8; i++ )
484                 {
485                 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
486                 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
487                     {
488                     found1=false;
489                     break;
490                     }
491                 }
492             }
493         for ( unsigned i = 0; i < 4; i++ )
494             {
495             if ( origin[i] != (char)0xFF )
496                 {
497                 found2 = false;
498                 break;
499                 }
500             }
501         if ( !found2 )
502             {
503             found2 = true;
504             for ( unsigned i = 0; i < 4; i++ )
505                 {
506                 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
507                 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
508                     {
509                     found2=false;
510                     break;
511                     }
512                 }
513             }
514         //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
515         if ( found1 && found2 &&
516              ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
517             return n;
518         }
519     return ~(unsigned long)0;
520     }
521
522 /* Return the ID of the node that actually produced this data block.
523    This may be different from the node which sent the data to this
524    monitoring object as returned by GetBlockSendNodeID. */
525 const char* HOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
526     {
527 // see header file for class documentation
528     if ( ndx >= fBlockCnt )
529         return NULL;
530     return fBlocks[ndx].fOriginatingNodeID;
531     }
532
533
534 void HOMERReader::Init()
535     {
536 // see header file for class documentation
537     fCurrentEventType = ~(homer_uint64)0;
538     fCurrentEventID = ~(homer_uint64)0;
539     fMaxBlockCnt = fBlockCnt = 0;
540     fBlocks = NULL;
541         
542     fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
543     fDataSources = NULL;
544
545         
546     fConnectionStatus = 0;
547     fErrorConnection = ~(unsigned int)0;
548
549     fEventRequestAdvanceTime = 0;
550     }
551         
552 bool HOMERReader::AllocDataSources( unsigned int sourceCnt )
553     {
554 // see header file for class documentation
555     fDataSources = new DataSource[ sourceCnt ];
556     if ( !fDataSources )
557         return false;
558     fDataSourceCnt = 0;
559     fDataSourceMaxCnt = sourceCnt;
560     return true;
561     }
562
563 int HOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
564     {
565 // see header file for class documentation
566     struct hostent* he;
567     he = gethostbyname( hostname );
568     if ( he == NULL )
569         {
570         //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
571         return EADDRNOTAVAIL;
572         }
573
574     struct sockaddr_in remoteAddr;
575     remoteAddr.sin_family = AF_INET;    // host byte order 
576     remoteAddr.sin_port = htons(port);  // short, network byte order 
577     remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
578     memset(&(remoteAddr.sin_zero), '\0', 8);  // zero the rest of the struct
579
580     // Create socket and connect to target program on remote node
581     source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
582     if ( source.fTCPConnection == -1 )
583         {
584         return errno;
585         }
586
587     int ret;
588
589     ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
590     if ( ret == -1 )
591         {
592         ret=errno;
593         close( source.fTCPConnection );
594         return ret;
595         } 
596
597     ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
598     if ( ret != (int)strlen(MOD_BIN) )
599         {
600         ret=errno;
601         close( source.fTCPConnection );
602         return ret;
603         }
604
605     char* tmpchar = new char[ strlen( hostname )+1 ];
606     if ( !tmpchar )
607         {
608         close( source.fTCPConnection );
609         return ENOMEM;
610         }
611     strcpy( tmpchar, hostname );
612     source.fHostname = tmpchar;
613
614     source.fType = kTCP;
615     source.fTCPPort = port;
616     source.fData = NULL;
617     source.fDataSize = 0;
618     source.fDataRead = 0;
619     return 0;
620     }
621
622 int HOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
623     {
624 // see header file for class documentation
625     int ret;
626     char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
627     if ( !tmpchar )
628         {
629         return ENOMEM;
630         }
631     gethostname( tmpchar, MAXHOSTNAMELEN );
632     tmpchar[MAXHOSTNAMELEN]=(char)0;
633     source.fHostname = tmpchar;
634
635     source.fShmID = shmget( shmKey, shmSize, 0660 );
636     if ( source.fShmID == -1 )
637         {
638         ret = errno;
639         delete [] source.fHostname;
640         return ret;
641         }
642     
643     source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
644
645     if ( !source.fShmPtr )
646         {
647         ret = errno;
648         shmctl( source.fShmID, IPC_RMID, NULL );
649         delete [] source.fHostname;
650         return ret;
651         }
652
653     source.fType = kShm;
654     source.fShmKey = shmKey;
655     source.fShmSize = shmSize;
656     source.fDataSize = 0;
657     source.fDataRead = 0;
658     return 0;
659     }
660
661 void HOMERReader::FreeDataSources()
662     {
663 // see header file for class documentation
664     for ( unsigned n=0; n < fDataSourceCnt; n++ )
665         {
666         if ( fDataSources[n].fType == kTCP )
667             FreeTCPDataSource( fDataSources[n] );
668         else
669             FreeShmDataSource( fDataSources[n] );
670         }
671     }
672
673 int HOMERReader::FreeShmDataSource( DataSource& source )
674     {
675 // see header file for class documentation
676     if ( source.fShmPtr )
677         shmdt( source.fShmPtr );
678 //     if ( source.fShmID != -1 )
679 //      shmctl( source.fShmID, IPC_RMID, NULL );
680     if ( source.fHostname )
681         delete [] source.fHostname;
682     return 0;
683     }
684
685 int HOMERReader::FreeTCPDataSource( DataSource& source )
686     {
687 // see header file for class documentation
688     if ( source.fTCPConnection )
689         close( source.fTCPConnection );
690     if ( source.fHostname )
691         delete [] source.fHostname;
692     return 0;
693     }
694
695 int HOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
696     {
697 // see header file for class documentation
698     if ( fDataSourceCnt<=0 )
699         return ENXIO;
700     // Clean up currently active event.
701     ReleaseCurrentEvent();
702     int ret;
703     // Trigger all configured data sources
704     for ( unsigned n = 0; n<fDataSourceCnt; n++ )
705         {
706         if ( fDataSources[n].fType == kTCP )
707             ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
708         else
709             ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
710         if ( ret )
711             {
712             fErrorConnection = n;
713             fConnectionStatus=ret;
714             return fConnectionStatus;
715             }
716         }
717     // Now read in data from the configured data source
718     ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
719     if ( ret )
720         {
721         return ret;
722         }
723     ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
724     if ( ret )
725         {
726         return ret;
727         }
728 //     for ( unsigned n = 0; n<fDataSourceCnt; n++ )
729 //      {
730 //      if ( fDataSources[n].fType == kTCP )
731 //          ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
732 //      else
733 //          ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
734 //      if ( ret )
735 //          {
736 //          fErrorConnection = n;
737 //          fConnectionStatus=ret;
738 //          return fConnectionStatus;
739 //          }
740 //      }
741     //Check to see that all sources contributed data for the same event
742     homer_uint64 eventID;
743     homer_uint64 eventType;
744     eventID = GetSourceEventID( fDataSources[0] );
745     eventType = GetSourceEventType( fDataSources[0] );
746     for ( unsigned n = 1; n < fDataSourceCnt; n++ )
747         {
748         if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
749             {
750             fErrorConnection = n;
751             fConnectionStatus=56;//EBADRQC;
752             return fConnectionStatus;
753             }
754         }
755     // Find all the different data blocks contained in the data from all
756     // the sources.
757     for ( unsigned n = 0; n < fDataSourceCnt; n++ )
758         {
759         ret = ParseSourceData( fDataSources[n] );
760         if ( ret )
761             {
762             fErrorConnection = n;
763             fConnectionStatus=57;//EBADSLT;
764             return fConnectionStatus;
765             }
766         }
767     fCurrentEventID = eventID;
768     fCurrentEventType = eventType;
769     return 0;
770     }
771
772 void HOMERReader::ReleaseCurrentEvent()
773     {
774 // see header file for class documentation
775     // sources.fDataRead = 0;
776     // fMaxBlockCnt
777     fCurrentEventID = ~(homer_uint64)0;
778     fCurrentEventType = ~(homer_uint64)0;
779     for ( unsigned n = 0; n < fDataSourceCnt; n++ )
780         {
781         if ( fDataSources[n].fData )
782             {
783             if ( fDataSources[n].fType == kTCP )
784                 delete [] (homer_uint8*)fDataSources[n].fData;
785             fDataSources[n].fData = NULL;
786             }
787         fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
788         }
789     if ( fBlocks )
790         {
791         for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
792             {
793             if ( fBlocks[n].fOriginatingNodeID )
794                 delete [] fBlocks[n].fOriginatingNodeID;
795             }
796         delete [] fBlocks;
797         fBlocks=0;
798         fMaxBlockCnt = 0;
799         fBlockCnt=0;
800         }
801     }
802
803 int HOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
804     {
805 // see header file for class documentation
806     int ret;
807     struct timeval oldSndTO, newSndTO;
808     if ( useTimeout )
809         {
810         socklen_t optlen=sizeof(oldSndTO);
811         ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
812         if ( ret )
813             {
814             return errno;
815             }
816         if ( optlen!=sizeof(oldSndTO) )
817             {
818             return ENXIO;
819             }
820         newSndTO.tv_sec = timeoutUsec / 1000000;
821         newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
822         ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
823         if ( ret )
824             {
825             return errno;
826             }
827         }
828     // Send one event request
829     if ( !fEventRequestAdvanceTime )
830         {
831         ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
832         
833         if ( ret != (int)strlen(GET_ONE) )
834             {
835             ret=errno;
836             setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
837             return ret;
838             }
839         }
840     else
841         {
842         char tmpCmd[ 128 ];
843
844         int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
845         if ( len>128 || len<0 )
846             {
847             ret=EMSGSIZE;
848             setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
849             return ret;
850             }
851         
852         ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
853         
854         if ( ret != (int)strlen(tmpCmd) )
855             {
856             ret=errno;
857             setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
858             return ret;
859             }
860         
861         }
862     return 0;
863     }
864
865 int HOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long )
866     {
867 // see header file for class documentation
868     if ( source.fShmPtr )
869         {
870         *(homer_uint32*)( source.fShmPtr ) = 0;
871         return 0;
872         }
873     else
874         return EFAULT;
875     }
876
877 int HOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
878     {
879 // see header file for class documentation
880     bool toRead = false;
881     do
882         {
883         fd_set conns;
884         FD_ZERO( &conns );
885         int highestConn=0;
886         toRead = false;
887         unsigned firstConnection=~(unsigned)0;
888         for ( unsigned long n = 0; n < sourceCnt; n++ )
889             {
890             if ( sources[n].fDataSize == 0 // size specifier not yet read
891                  || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
892                 {
893                 toRead = true;
894                 FD_SET( sources[n].fTCPConnection, &conns );
895                 if ( sources[n].fTCPConnection > highestConn )
896                     highestConn = sources[n].fTCPConnection;
897                 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
898                 if ( firstConnection == ~(unsigned)0 )
899                     firstConnection = n;
900                 }
901             else
902                 {
903                 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
904                 }
905             }
906         if ( toRead )
907             {
908             struct timeval tv, *ptv;
909             if ( useTimeout )
910                 {
911                 tv.tv_sec = timeout / 1000000;
912                 tv.tv_usec = timeout - (tv.tv_sec*1000000);
913                 ptv = &tv;
914                 }
915             else
916                 ptv = NULL;
917             // wait until something is ready to be read
918             // either for timeout usecs or until eternity
919             int ret;
920             ret = select( highestConn+1, &conns, NULL, NULL, ptv ); 
921             if ( ret <=0 )
922                 {
923                 fErrorConnection = firstConnection;
924                 if ( errno )
925                     fConnectionStatus = errno;
926                 else
927                     fConnectionStatus = ETIMEDOUT;
928                 return fConnectionStatus;
929                 }
930             for ( unsigned n = 0; n < sourceCnt; n++ )
931                 {
932                 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
933                     {
934                     if ( sources[n].fDataSize == 0 )
935                         {
936                         ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
937                         if ( ret != sizeof(homer_uint32) )
938                             {
939                             fErrorConnection = n;
940                             if ( errno )
941                                 fConnectionStatus = errno;
942                             else
943                                 fConnectionStatus = ENOMSG;
944                             return fConnectionStatus;
945                             }
946                         sources[n].fDataSize = ntohl( sources[n].fDataSize );
947                         sources[n].fDataRead = 0;
948                         sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
949                         if ( !sources[n].fData )
950                             {
951                             fErrorConnection = n;
952                             fConnectionStatus = ENOMEM;
953                             return fConnectionStatus;
954                             }
955                         }
956                     else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
957                         {
958                         ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
959                         if ( ret>0 )
960                             sources[n].fDataRead += ret;
961                         else if ( ret == 0 )
962                             {
963                             fErrorConnection = n;
964                             fConnectionStatus = ECONNRESET;
965                             return fConnectionStatus;
966                             }
967                         else
968                             {
969                             fErrorConnection = n;
970                             fConnectionStatus = errno;
971                             return fConnectionStatus;
972                             }
973                         }
974                     else
975                         {
976                         fErrorConnection = n;
977                         fConnectionStatus = ENXIO;
978                         return fConnectionStatus;
979                         }
980                     }
981                 }
982             }
983         }
984     while ( toRead );
985     return 0;
986     }
987
988 /*
989 int HOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
990     {
991 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
992     // Send one event request
993     ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
994     if ( ret != strlen(GET_ONE) )
995         {
996         return errno;
997         }
998     // wait for and read back size specifier
999     unsigned sizeNBO;
1000     // The value transmitted is binary, in network byte order
1001     ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1002     if ( ret != sizeof(sizeNBO) )
1003         {
1004         return errno;
1005         }
1006     // Convert back to host byte order
1007     source.fDataSize = ntohl( sizeNBO );
1008     source.fData = new homer_uint8[ source.fDataSize ];
1009     unsigned long dataRead=0, toRead;
1010     if ( !source.fData )
1011         {
1012         char buffer[1024];
1013         // Read in data into buffer in order not to block connection
1014         while ( dataRead < source.fDataSize )
1015             {
1016             if ( source.fDataSize-dataRead > 1024 )
1017                 toRead = 1024;
1018             else
1019                 toRead = source.fDataSize-dataRead;
1020             ret = read( source.fTCPConnection, buffer, toRead );
1021             if ( ret > 0 )
1022                 dataRead += ret;
1023             else
1024                 return errno;
1025             }
1026         return ENOMEM;
1027         }
1028     while ( dataRead < source.fDataSize )
1029         {
1030         toRead = source.fDataSize-dataRead;
1031         ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1032         if ( ret > 0 )
1033             dataRead += ret;
1034         else if ( ret == 0 && useTimeout )
1035             {
1036             struct timeval tv;
1037             tv.tv_sec = timeout / 1000000;
1038             tv.tv_usec = timeout - (tv.tv_sec*1000000);
1039             fd_set conns;
1040             FD_ZERO( &conns );
1041             FD_SET( source.fTCPConnection, &conns );
1042             ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1043             if ( ret <=0 )
1044                 return errno;
1045             }
1046         else if ( ret == 0 )
1047             {
1048             if ( errno == EOK )
1049                 return ECONNRESET;
1050             else
1051                 return errno;
1052             }
1053         else
1054             {
1055             return errno;
1056             }
1057         }
1058     return 0;
1059     }
1060 */
1061
1062 /*
1063 int HOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1064     {
1065     
1066     }
1067 */
1068
1069 int HOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1070     {
1071 // see header file for class documentation
1072     struct timeval tv1, tv2;
1073     bool found=false;
1074     bool all=true;
1075     if ( useTimeout )
1076         gettimeofday( &tv1, NULL );
1077     do
1078         {
1079         found = false;
1080         all = true;
1081         for ( unsigned n = 0; n < sourceCnt; n++ )
1082             {
1083             if ( !sources[n].fDataSize )
1084                 all = false;
1085             if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1086                 {
1087                 found = true;
1088                 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1089                 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1090                 }
1091             }
1092         if ( found && useTimeout )
1093             gettimeofday( &tv1, NULL );
1094         if ( !all && useTimeout )
1095             {
1096             gettimeofday( &tv2, NULL );
1097             unsigned long long tdiff;
1098             tdiff = tv2.tv_sec-tv1.tv_sec;
1099             tdiff *= 1000000;
1100             tdiff += tv2.tv_usec-tv1.tv_usec;
1101             if ( tdiff > timeout )
1102                 return ETIMEDOUT;
1103             }
1104         if ( !all )
1105             usleep( 0 );
1106         }
1107     while ( !all );
1108     return 0;
1109     }
1110
1111 int HOMERReader::ParseSourceData( DataSource& source )
1112     {
1113 // see header file for class documentation
1114     if ( source.fData )
1115         {
1116         homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1117         homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1118         int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1119         if ( ret )
1120             return ret;
1121         homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1122         for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1123             {
1124             homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1125             unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1126             fBlocks[fBlockCnt].fSource = source.fNdx;
1127             fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1128             fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1129             fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1130             struct in_addr tmpA;
1131             tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1132             char* addr = inet_ntoa( tmpA );
1133             char* tmpchar = new char[ strlen(addr)+1 ];
1134             if ( !tmpchar )
1135                 return ENOMEM;
1136             strcpy( tmpchar, addr );
1137             fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1138             descrOffset += descrLen;
1139             }
1140         return 0;
1141         }
1142     return EFAULT;
1143     }
1144         
1145 int HOMERReader::ReAllocBlocks( unsigned long newCnt )
1146     {
1147 // see header file for class documentation
1148     DataBlock* newBlocks;
1149     newBlocks = new DataBlock[ newCnt ];
1150     if ( !newBlocks )
1151         return ENOMEM;
1152     unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1153     memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1154     if ( newCnt > fMaxBlockCnt )
1155         memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1156     if ( fBlocks )
1157         delete [] fBlocks;
1158     fBlocks = newBlocks;
1159     fMaxBlockCnt = newCnt;
1160     return 0;
1161     }
1162
1163 homer_uint64 HOMERReader::GetSourceEventID( DataSource& source )
1164     {
1165 // see header file for class documentation
1166     homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1167     return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1168     }
1169
1170 homer_uint64 HOMERReader::GetSourceEventType( DataSource& source )
1171     {
1172 // see header file for class documentation
1173     homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1174     return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1175     }
1176
1177 homer_uint64 HOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1178     {
1179 // see header file for class documentation
1180     if ( destFormat == sourceFormat )
1181         return source;
1182     else
1183         return ((source & 0xFFULL) << 56) | 
1184         ((source & 0xFF00ULL) << 40) | 
1185         ((source & 0xFF0000ULL) << 24) | 
1186         ((source & 0xFF000000ULL) << 8) | 
1187         ((source & 0xFF00000000ULL) >> 8) | 
1188         ((source & 0xFF0000000000ULL) >> 24) | 
1189         ((source & 0xFF000000000000ULL) >>  40) | 
1190         ((source & 0xFF00000000000000ULL) >> 56);
1191     }
1192
1193 homer_uint32 HOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1194     {
1195 // see header file for class documentation
1196     if ( destFormat == sourceFormat )
1197         return source;
1198     else
1199         return ((source & 0xFFUL) << 24) | 
1200         ((source & 0xFF00UL) << 8) | 
1201         ((source & 0xFF0000UL) >> 8) | 
1202         ((source & 0xFF000000UL) >> 24);
1203     }
1204 /*
1205 ***************************************************************************
1206 **
1207 ** $Author$ - Initial Version by Timm Morten Steinbeck
1208 **
1209 ** $Id$ 
1210 **
1211 ***************************************************************************
1212 */