1 /************************************************************************
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck,
8 ** timm@kip.uni-heidelberg.de
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
17 ** Newer versions of this file's package will be made available from
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
21 *************************************************************************/
24 ***************************************************************************
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
30 ***************************************************************************
33 /** @file AliHLTHOMERReader.cxx
34 @author Timm Steinbeck
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
39 // see header file for class documentation
41 // refer to README to build package
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
45 #include "AliHLTHOMERReader.h"
55 //#include <sys/types.h>
56 //#include <sys/socket.h>
57 //#include <netinet/in.h>
58 //#include <netinet/tcp.h>
61 #include <rpc/types.h>
65 #include <netinet/in.h>
66 #include <arpa/inet.h>
72 #define MOD_BIN "MOD BIN\n"
73 #define MOD_ASC "MOD ASC\n"
74 #define GET_ONE "GET ONE\n"
75 #define GET_ALL "GET ALL\n"
77 // MAXHOSTNAMELEN not defined on macosx
78 // 686-apple-darwin9-gcc-4.0.1
79 #ifndef MAXHOSTNAMELEN
80 #define MAXHOSTNAMELEN 64
84 ClassImp(AliHLTMonitoringReader);
85 ClassImp(AliHLTHOMERReader);
93 AliHLTHOMERReader::AliHLTHOMERReader()
95 fCurrentEventType(~(homer_uint64)0),
96 fCurrentEventID(~(homer_uint64)0),
101 fTCPDataSourceCnt(0),
102 fShmDataSourceCnt(0),
103 fDataSourceMaxCnt(0),
105 fConnectionStatus(0),
106 fErrorConnection(~(unsigned int)0),
107 fEventRequestAdvanceTime(0)
109 // Reader implementation of the HOMER interface.
110 // The HLT Monitoring Environment including ROOT is
111 // a native interface to ship out data from the HLT chain.
112 // See pdf document shiped with the package
113 // for class documentation and tutorial.
119 AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
121 AliHLTMonitoringReader(),
122 fCurrentEventType(~(homer_uint64)0),
123 fCurrentEventID(~(homer_uint64)0),
128 fTCPDataSourceCnt(0),
129 fShmDataSourceCnt(0),
130 fDataSourceMaxCnt(0),
132 fConnectionStatus(0),
133 fErrorConnection(~(unsigned int)0),
134 fEventRequestAdvanceTime(0)
136 // see header file for class documentation
137 // For reading from a TCP port
139 if ( !AllocDataSources(1) )
141 fErrorConnection = 0;
142 fConnectionStatus = ENOMEM;
145 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
146 if ( fConnectionStatus )
147 fErrorConnection = 0;
152 fDataSources[0].fNdx = 0;
156 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
158 AliHLTMonitoringReader(),
159 fCurrentEventType(~(homer_uint64)0),
160 fCurrentEventID(~(homer_uint64)0),
165 fTCPDataSourceCnt(0),
166 fShmDataSourceCnt(0),
167 fDataSourceMaxCnt(0),
169 fConnectionStatus(0),
170 fErrorConnection(~(unsigned int)0),
171 fEventRequestAdvanceTime(0)
173 // see header file for class documentation
174 // For reading from multiple TCP ports
176 if ( !AllocDataSources(tcpCnt) )
178 fErrorConnection = 0;
179 fConnectionStatus = ENOMEM;
182 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
184 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
185 if ( fConnectionStatus )
187 fErrorConnection = n;
190 fDataSources[n].fNdx = n;
194 AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
196 AliHLTMonitoringReader(),
197 fCurrentEventType(~(homer_uint64)0),
198 fCurrentEventID(~(homer_uint64)0),
203 fTCPDataSourceCnt(0),
204 fShmDataSourceCnt(0),
205 fDataSourceMaxCnt(0),
207 fConnectionStatus(0),
208 fErrorConnection(~(unsigned int)0),
209 fEventRequestAdvanceTime(0)
211 // see header file for class documentation
212 // For reading from a System V shared memory segment
214 if ( !AllocDataSources(1) )
216 fErrorConnection = 0;
217 fConnectionStatus = ENOMEM;
220 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
221 if ( fConnectionStatus )
222 fErrorConnection = 0;
227 fDataSources[0].fNdx = 0;
231 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
233 AliHLTMonitoringReader(),
234 fCurrentEventType(~(homer_uint64)0),
235 fCurrentEventID(~(homer_uint64)0),
240 fTCPDataSourceCnt(0),
241 fShmDataSourceCnt(0),
242 fDataSourceMaxCnt(0),
244 fConnectionStatus(0),
245 fErrorConnection(~(unsigned int)0),
246 fEventRequestAdvanceTime(0)
248 // see header file for class documentation
249 // For reading from multiple System V shared memory segments
251 if ( !AllocDataSources(shmCnt) )
253 fErrorConnection = 0;
254 fConnectionStatus = ENOMEM;
257 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
259 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
260 if ( fConnectionStatus )
262 fErrorConnection = n;
265 fDataSources[n].fNdx = n;
269 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
270 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
272 AliHLTMonitoringReader(),
273 fCurrentEventType(~(homer_uint64)0),
274 fCurrentEventID(~(homer_uint64)0),
279 fTCPDataSourceCnt(0),
280 fShmDataSourceCnt(0),
281 fDataSourceMaxCnt(0),
283 fConnectionStatus(0),
284 fErrorConnection(~(unsigned int)0),
285 fEventRequestAdvanceTime(0)
287 // see header file for class documentation
288 // For reading from multiple TCP ports and multiple System V shared memory segments
290 if ( !AllocDataSources(tcpCnt+shmCnt) )
292 fErrorConnection = 0;
293 fConnectionStatus = ENOMEM;
296 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
298 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
299 if ( fConnectionStatus )
301 fErrorConnection = n;
304 fDataSources[n].fNdx = n;
306 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
308 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
309 if ( fConnectionStatus )
311 fErrorConnection = tcpCnt+n;
314 fDataSources[n].fNdx = n;
318 AliHLTHOMERReader::AliHLTHOMERReader( const void* pBuffer, int size )
320 AliHLTMonitoringReader(),
321 fCurrentEventType(~(homer_uint64)0),
322 fCurrentEventID(~(homer_uint64)0),
327 fTCPDataSourceCnt(0),
328 fShmDataSourceCnt(0),
329 fDataSourceMaxCnt(0),
331 fConnectionStatus(0),
332 fErrorConnection(~(unsigned int)0),
333 fEventRequestAdvanceTime(0)
335 // see header file for class documentation
336 // For reading from a System V shared memory segment
338 if ( !AllocDataSources(1) )
340 fErrorConnection = 0;
341 fConnectionStatus = ENOMEM;
344 fConnectionStatus = AddDataSource(const_cast<void*>(pBuffer), size, fDataSources[0] );
345 if ( fConnectionStatus )
346 fErrorConnection = 0;
351 fDataSources[0].fNdx = 0;
355 AliHLTHOMERReader::~AliHLTHOMERReader()
357 // see header file for class documentation
358 ReleaseCurrentEvent();
362 int AliHLTHOMERReader::ReadNextEvent()
364 // see header file for class documentation
365 // Read in the next available event
366 return ReadNextEvent( false, 0 );
369 int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
371 // see header file for class documentation
372 // Read in the next available event
373 return ReadNextEvent( true, timeout );
376 unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
378 // see header file for class documentation
379 // Return the size (in bytes) of the current event's data
380 // block with the given block index (starting at 0).
381 if ( ndx >= fBlockCnt )
383 return fBlocks[ndx].fLength;
386 const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
388 // see header file for class documentation
389 // Return a pointer to the start of the current event's data
390 // block with the given block index (starting at 0).
391 if ( ndx >= fBlockCnt )
393 return fBlocks[ndx].fData;
396 const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
398 // see header file for class documentation
399 // Return IP address or hostname of node which sent the
400 // current event's data block with the given block index
402 // For HOMER this is the ID of the node on which the subscriber
403 // that provided this data runs/ran.
404 if ( ndx >= fBlockCnt )
407 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
409 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
410 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
414 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
415 //return fBlocks[ndx].fOriginatingNodeID;
418 homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
420 // see header file for class documentation
421 // Return byte order of the data stored in the
422 // current event's data block with the given block index (starting at 0).
423 // 0 is unknown alignment,
424 // 1 ist little endian,
425 // 2 is big endian. */
426 if ( ndx >= fBlockCnt )
428 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
429 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
432 homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
434 // see header file for class documentation
435 // Return the alignment (in bytes) of the given datatype
436 // in the data stored in the current event's data block
437 // with the given block index (starting at 0).
438 // Possible values for the data type are
445 if ( ndx >= fBlockCnt )
447 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
449 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
450 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
453 homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
455 // see header file for class documentation
456 if ( ndx >= fBlockCnt )
458 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
462 /* Return the type of the data in the current event's data
463 block with the given block index (starting at 0). */
464 homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
466 // see header file for class documentation
467 if ( ndx >= fBlockCnt )
468 return ~(homer_uint64)0;
469 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
470 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
473 /* Return the origin of the data in the current event's data
474 block with the given block index (starting at 0). */
475 homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
477 // see header file for class documentation
478 if ( ndx >= fBlockCnt )
479 return ~(homer_uint32)0;
480 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
481 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
484 /* Return a specification of the data in the current event's data
485 block with the given block index (starting at 0). */
486 homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
488 // see header file for class documentation
489 if ( ndx >= fBlockCnt )
490 return ~(homer_uint32)0;
491 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
492 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
495 /* Find the next data block in the current event with the given
496 data type, origin, and specification. Returns the block's
498 unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
499 homer_uint32 spec, unsigned long startNdx ) const
501 // see header file for class documentation
502 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
504 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
505 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
506 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
509 return ~(unsigned long)0;
512 /* Find the next data block in the current event with the given
513 data type, origin, and specification. Returns the block's
515 unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4],
516 homer_uint32 spec, unsigned long startNdx ) const
518 // see header file for class documentation
519 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
521 bool found1=true, found2=true;
522 for ( unsigned i = 0; i < 8; i++ )
524 if ( type[i] != (char)0xFF )
533 for ( unsigned i = 0; i < 8; i++ )
535 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
536 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
543 for ( unsigned i = 0; i < 4; i++ )
545 if ( origin[i] != (char)0xFF )
554 for ( unsigned i = 0; i < 4; i++ )
556 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
557 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
564 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
565 if ( found1 && found2 &&
566 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
569 return ~(unsigned long)0;
572 /* Return the ID of the node that actually produced this data block.
573 This may be different from the node which sent the data to this
574 monitoring object as returned by GetBlockSendNodeID. */
575 const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
577 // see header file for class documentation
578 if ( ndx >= fBlockCnt )
580 return fBlocks[ndx].fOriginatingNodeID;
584 void AliHLTHOMERReader::Init()
586 // see header file for class documentation
587 fCurrentEventType = ~(homer_uint64)0;
588 fCurrentEventID = ~(homer_uint64)0;
589 fMaxBlockCnt = fBlockCnt = 0;
592 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
596 fConnectionStatus = 0;
597 fErrorConnection = ~(unsigned int)0;
599 fEventRequestAdvanceTime = 0;
602 bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
604 // see header file for class documentation
605 fDataSources = new DataSource[ sourceCnt ];
608 memset(fDataSources, 0, sizeof(DataSource)*sourceCnt);
610 fDataSourceMaxCnt = sourceCnt;
614 int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
616 // see header file for class documentation
618 he = gethostbyname( hostname );
621 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
622 return EADDRNOTAVAIL;
625 struct sockaddr_in remoteAddr;
626 remoteAddr.sin_family = AF_INET; // host byte order
627 remoteAddr.sin_port = htons(port); // short, network byte order
628 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
629 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
631 // Create socket and connect to target program on remote node
632 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
633 if ( source.fTCPConnection == -1 )
640 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
644 close( source.fTCPConnection );
648 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
649 if ( ret != (int)strlen(MOD_BIN) )
652 close( source.fTCPConnection );
656 char* tmpchar = new char[ strlen( hostname )+1 ];
659 close( source.fTCPConnection );
662 strcpy( tmpchar, hostname );
663 source.fHostname = tmpchar;
666 source.fTCPPort = port;
668 source.fDataSize = 0;
669 source.fDataRead = 0;
673 int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
675 // see header file for class documentation
677 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
682 gethostname( tmpchar, MAXHOSTNAMELEN );
683 tmpchar[MAXHOSTNAMELEN]=(char)0;
684 source.fHostname = tmpchar;
686 source.fShmID = shmget( shmKey, shmSize, 0660 );
687 if ( source.fShmID == -1 )
690 delete [] source.fHostname;
694 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
696 if ( !source.fShmPtr )
699 shmctl( source.fShmID, IPC_RMID, NULL );
700 delete [] source.fHostname;
705 source.fShmKey = shmKey;
706 source.fShmSize = shmSize;
707 source.fDataSize = 0;
708 source.fDataRead = 0;
712 int AliHLTHOMERReader::AddDataSource( void* pBuffer, int size, DataSource& source )
714 // see header file for class documentation
715 // a buffer data source is like a shm source apart from the shm attach and detach
716 // procedure. Furthermore, the size indicator at the beginning of the buffer is not
717 // cleared right before sources are read but after the reading.
719 if ( !pBuffer || size<=0) return EINVAL;
721 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
726 gethostname( tmpchar, MAXHOSTNAMELEN );
727 tmpchar[MAXHOSTNAMELEN]=(char)0;
728 source.fHostname = tmpchar;
731 // the data buffer does not contain a size indicator in the first 4 bytes
732 // like the shm source buffer. Still we want to use the mechanism to invalidate/
733 // trigger by clearing the size indicator. Take the source.fShmSize variable.
734 source.fShmPtr = &source.fShmSize;
737 source.fShmSize = size;
738 source.fData = pBuffer;
739 source.fDataSize = 0;
740 source.fDataRead = 0;
744 void AliHLTHOMERReader::FreeDataSources()
746 // see header file for class documentation
747 for ( unsigned n=0; n < fDataSourceCnt; n++ )
749 if ( fDataSources[n].fType == kTCP )
750 FreeTCPDataSource( fDataSources[n] );
751 else if ( fDataSources[n].fType == kShm )
752 FreeShmDataSource( fDataSources[n] );
756 int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
758 // see header file for class documentation
759 if ( source.fShmPtr )
760 shmdt( source.fShmPtr );
761 // if ( source.fShmID != -1 )
762 // shmctl( source.fShmID, IPC_RMID, NULL );
763 if ( source.fHostname )
764 delete [] source.fHostname;
768 int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
770 // see header file for class documentation
771 if ( source.fTCPConnection )
772 close( source.fTCPConnection );
773 if ( source.fHostname )
774 delete [] source.fHostname;
778 int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
780 // see header file for class documentation
781 if ( fDataSourceCnt<=0 )
783 // Clean up currently active event.
784 ReleaseCurrentEvent();
786 // Trigger all configured data sources
787 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
789 if ( fDataSources[n].fType == kTCP )
790 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
791 else if ( fDataSources[n].fType == kShm )
792 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
795 fErrorConnection = n;
796 fConnectionStatus=ret;
797 return fConnectionStatus;
800 // Now read in data from the configured data source
801 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
806 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
811 // for ( unsigned n = 0; n<fDataSourceCnt; n++ )
813 // if ( fDataSources[n].fType == kTCP )
814 // ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
816 // ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
819 // fErrorConnection = n;
820 // fConnectionStatus=ret;
821 // return fConnectionStatus;
824 //Check to see that all sources contributed data for the same event
825 homer_uint64 eventID;
826 homer_uint64 eventType;
827 if (!fDataSources[0].fData)
829 fErrorConnection = 0;
830 fConnectionStatus=56;//ENOBUF;
831 return fConnectionStatus;
833 eventID = GetSourceEventID( fDataSources[0] );
834 eventType = GetSourceEventType( fDataSources[0] );
835 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
837 if ( !fDataSources[n].fData || GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
839 fErrorConnection = n;
840 fConnectionStatus=56;//EBADRQC;
841 return fConnectionStatus;
844 // Find all the different data blocks contained in the data from all
846 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
848 ret = ParseSourceData( fDataSources[n] );
851 fErrorConnection = n;
852 fConnectionStatus=57;//EBADSLT;
856 fCurrentEventID = eventID;
857 fCurrentEventType = eventType;
861 void AliHLTHOMERReader::ReleaseCurrentEvent()
863 // see header file for class documentation
864 // sources.fDataRead = 0;
866 fCurrentEventID = ~(homer_uint64)0;
867 fCurrentEventType = ~(homer_uint64)0;
868 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
870 if ( fDataSources[n].fData )
872 if ( fDataSources[n].fType == kTCP )
873 delete [] (homer_uint8*)fDataSources[n].fData;
874 // do not reset the data pointer for kBuf sources since this
875 // can not be set again.
876 if ( fDataSources[n].fType != kBuf )
877 fDataSources[n].fData = NULL;
879 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
883 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
885 if ( fBlocks[n].fOriginatingNodeID )
886 delete [] fBlocks[n].fOriginatingNodeID;
895 int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
897 // see header file for class documentation
899 struct timeval oldSndTO, newSndTO;
902 socklen_t optlen=sizeof(oldSndTO);
903 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
908 if ( optlen!=sizeof(oldSndTO) )
912 newSndTO.tv_sec = timeoutUsec / 1000000;
913 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
914 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
920 // Send one event request
921 if ( !fEventRequestAdvanceTime )
923 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
925 if ( ret != (int)strlen(GET_ONE) )
928 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
936 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
937 if ( len>128 || len<0 )
940 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
944 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
946 if ( ret != (int)strlen(tmpCmd) )
949 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
957 int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long ) const
959 // see header file for class documentation
960 // clear the size indicator in the first 4 bytes of the buffer to request data
961 // from the HOMER writer.
962 if ( source.fShmPtr )
964 *(homer_uint32*)( source.fShmPtr ) = 0;
971 int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
973 // see header file for class documentation
981 unsigned firstConnection=~(unsigned)0;
982 for ( unsigned long n = 0; n < sourceCnt; n++ )
984 if ( sources[n].fDataSize == 0 // size specifier not yet read
985 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
988 FD_SET( sources[n].fTCPConnection, &conns );
989 if ( sources[n].fTCPConnection > highestConn )
990 highestConn = sources[n].fTCPConnection;
991 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
992 if ( firstConnection == ~(unsigned)0 )
997 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
1002 struct timeval tv, *ptv;
1005 tv.tv_sec = timeout / 1000000;
1006 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1011 // wait until something is ready to be read
1012 // either for timeout usecs or until eternity
1014 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
1017 fErrorConnection = firstConnection;
1019 fConnectionStatus = errno;
1021 fConnectionStatus = ETIMEDOUT;
1022 return fConnectionStatus;
1024 for ( unsigned n = 0; n < sourceCnt; n++ )
1026 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
1028 if ( sources[n].fDataSize == 0 )
1030 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
1031 if ( ret != sizeof(homer_uint32) )
1033 fErrorConnection = n;
1035 fConnectionStatus = errno;
1037 fConnectionStatus = ENOMSG;
1038 return fConnectionStatus;
1040 sources[n].fDataSize = ntohl( sources[n].fDataSize );
1041 sources[n].fDataRead = 0;
1042 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
1043 if ( !sources[n].fData )
1045 fErrorConnection = n;
1046 fConnectionStatus = ENOMEM;
1047 return fConnectionStatus;
1050 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
1052 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
1054 sources[n].fDataRead += ret;
1055 else if ( ret == 0 )
1057 fErrorConnection = n;
1058 fConnectionStatus = ECONNRESET;
1059 return fConnectionStatus;
1063 fErrorConnection = n;
1064 fConnectionStatus = errno;
1065 return fConnectionStatus;
1070 fErrorConnection = n;
1071 fConnectionStatus = ENXIO;
1072 return fConnectionStatus;
1083 int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
1085 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
1086 // Send one event request
1087 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
1088 if ( ret != strlen(GET_ONE) )
1092 // wait for and read back size specifier
1094 // The value transmitted is binary, in network byte order
1095 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1096 if ( ret != sizeof(sizeNBO) )
1100 // Convert back to host byte order
1101 source.fDataSize = ntohl( sizeNBO );
1102 source.fData = new homer_uint8[ source.fDataSize ];
1103 unsigned long dataRead=0, toRead;
1104 if ( !source.fData )
1107 // Read in data into buffer in order not to block connection
1108 while ( dataRead < source.fDataSize )
1110 if ( source.fDataSize-dataRead > 1024 )
1113 toRead = source.fDataSize-dataRead;
1114 ret = read( source.fTCPConnection, buffer, toRead );
1122 while ( dataRead < source.fDataSize )
1124 toRead = source.fDataSize-dataRead;
1125 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1128 else if ( ret == 0 && useTimeout )
1131 tv.tv_sec = timeout / 1000000;
1132 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1135 FD_SET( source.fTCPConnection, &conns );
1136 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1140 else if ( ret == 0 )
1157 int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1163 int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1165 // see header file for class documentation
1166 struct timeval tv1, tv2;
1170 gettimeofday( &tv1, NULL );
1175 for ( unsigned n = 0; n < sourceCnt; n++ )
1177 if ( !sources[n].fDataSize )
1179 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1182 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1183 if (sources[n].fType==kBuf)
1185 // the data buffer is already set to fData, just need to set fDataSize member
1186 // to invalidate after the first reading. Subsequent calls to ReadNextEvent return 0
1187 TriggerShmSource( sources[n], 0, 0 );
1190 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1194 if ( found && useTimeout )
1195 gettimeofday( &tv1, NULL );
1196 if ( !all && useTimeout )
1198 gettimeofday( &tv2, NULL );
1199 unsigned long long tdiff;
1200 tdiff = tv2.tv_sec-tv1.tv_sec;
1202 tdiff += tv2.tv_usec-tv1.tv_usec;
1203 if ( tdiff > timeout )
1213 int AliHLTHOMERReader::ParseSourceData( DataSource& source )
1215 // see header file for class documentation
1218 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1219 if (sourceByteOrder!=kHOMERLittleEndianByteOrder && sourceByteOrder!=kHOMERBigEndianByteOrder) return EBADMSG;
1220 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1221 // block count is not related to size of the data in the way the
1222 // following condition implies. But we can at least limit the block
1223 // count for the case the data is corrupted
1224 if (blockCnt>source.fDataSize) return EBADMSG;
1225 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1228 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1229 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1231 if (descrOffset+kLength_64b_Offset>=source.fDataSize) return EBADMSG;
1232 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1233 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1234 if (descrOffset+descrLen>=source.fDataSize) return EBADMSG;
1235 if (Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kID_64b_Offset ] ) != HOMER_BLOCK_DESCRIPTOR_TYPEID) return 126/*ENOKEY*/;
1236 fBlocks[fBlockCnt].fSource = source.fNdx;
1237 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1238 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1239 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1240 struct in_addr tmpA;
1241 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1242 char* addr = inet_ntoa( tmpA );
1243 char* tmpchar = new char[ strlen(addr)+1 ];
1246 strcpy( tmpchar, addr );
1247 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1248 descrOffset += descrLen;
1255 int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1257 // see header file for class documentation
1258 DataBlock* newBlocks;
1259 newBlocks = new DataBlock[ newCnt ];
1262 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1263 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1264 if ( newCnt > fMaxBlockCnt )
1265 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1268 fBlocks = newBlocks;
1269 fMaxBlockCnt = newCnt;
1273 homer_uint64 AliHLTHOMERReader::GetSourceEventID( DataSource& source )
1275 // see header file for class documentation
1276 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1277 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1280 homer_uint64 AliHLTHOMERReader::GetSourceEventType( DataSource& source )
1282 // see header file for class documentation
1283 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1284 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1287 homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1289 // see header file for class documentation
1290 if ( destFormat == sourceFormat )
1293 return ((source & 0xFFULL) << 56) |
1294 ((source & 0xFF00ULL) << 40) |
1295 ((source & 0xFF0000ULL) << 24) |
1296 ((source & 0xFF000000ULL) << 8) |
1297 ((source & 0xFF00000000ULL) >> 8) |
1298 ((source & 0xFF0000000000ULL) >> 24) |
1299 ((source & 0xFF000000000000ULL) >> 40) |
1300 ((source & 0xFF00000000000000ULL) >> 56);
1303 homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1305 // see header file for class documentation
1306 if ( destFormat == sourceFormat )
1309 return ((source & 0xFFUL) << 24) |
1310 ((source & 0xFF00UL) << 8) |
1311 ((source & 0xFF0000UL) >> 8) |
1312 ((source & 0xFF000000UL) >> 24);
1315 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPort(const char* hostname, unsigned short port )
1317 // see header file for function documentation
1318 return new AliHLTHOMERReader(hostname, port);
1321 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPorts(unsigned int tcpCnt, const char** hostnames, unsigned short* ports)
1323 // see header file for function documentation
1324 return new AliHLTHOMERReader(tcpCnt, hostnames, ports);
1327 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromBuffer(const void* pBuffer, int size)
1329 // see header file for function documentation
1330 return new AliHLTHOMERReader(pBuffer, size);
1333 void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1335 // see header file for function documentation
1336 if (pInstance) delete pInstance;
1340 ***************************************************************************
1342 ** $Author$ - Initial Version by Timm Morten Steinbeck
1346 ***************************************************************************