1 /************************************************************************
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck,
8 ** timm@kip.uni-heidelberg.de
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
17 ** Newer versions of this file's package will be made available from
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
21 *************************************************************************/
24 ***************************************************************************
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
30 ***************************************************************************
33 /** @file AliHLTHOMERReader.cxx
34 @author Timm Steinbeck
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
39 // see header file for class documentation
41 // refer to README to build package
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
45 #include "AliHLTHOMERReader.h"
55 //#include <sys/types.h>
56 //#include <sys/socket.h>
57 //#include <netinet/in.h>
58 //#include <netinet/tcp.h>
61 #include <rpc/types.h>
65 #include <netinet/in.h>
66 #include <arpa/inet.h>
72 #define MOD_BIN "MOD BIN\n"
73 #define MOD_ASC "MOD ASC\n"
74 #define GET_ONE "GET ONE\n"
75 #define GET_ALL "GET ALL\n"
77 // MAXHOSTNAMELEN not defined on macosx
78 // 686-apple-darwin9-gcc-4.0.1
79 #ifndef MAXHOSTNAMELEN
80 #define MAXHOSTNAMELEN 64
84 ClassImp(AliHLTMonitoringReader);
85 ClassImp(AliHLTHOMERReader);
93 AliHLTHOMERReader::AliHLTHOMERReader()
95 fCurrentEventType(~(homer_uint64)0),
96 fCurrentEventID(~(homer_uint64)0),
101 fTCPDataSourceCnt(0),
102 fShmDataSourceCnt(0),
103 fDataSourceMaxCnt(0),
105 fConnectionStatus(0),
106 fErrorConnection(~(unsigned int)0),
107 fEventRequestAdvanceTime(0)
109 // Reader implementation of the HOMER interface.
110 // The HLT Monitoring Environment including ROOT is
111 // a native interface to ship out data from the HLT chain.
112 // See pdf document shiped with the package
113 // for class documentation and tutorial.
119 AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
121 AliHLTMonitoringReader(),
122 fCurrentEventType(~(homer_uint64)0),
123 fCurrentEventID(~(homer_uint64)0),
128 fTCPDataSourceCnt(0),
129 fShmDataSourceCnt(0),
130 fDataSourceMaxCnt(0),
132 fConnectionStatus(0),
133 fErrorConnection(~(unsigned int)0),
134 fEventRequestAdvanceTime(0)
136 // see header file for class documentation
137 // For reading from a TCP port
139 if ( !AllocDataSources(1) )
141 fErrorConnection = 0;
142 fConnectionStatus = ENOMEM;
145 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
146 if ( fConnectionStatus )
147 fErrorConnection = 0;
152 fDataSources[0].fNdx = 0;
156 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
158 AliHLTMonitoringReader(),
159 fCurrentEventType(~(homer_uint64)0),
160 fCurrentEventID(~(homer_uint64)0),
165 fTCPDataSourceCnt(0),
166 fShmDataSourceCnt(0),
167 fDataSourceMaxCnt(0),
169 fConnectionStatus(0),
170 fErrorConnection(~(unsigned int)0),
171 fEventRequestAdvanceTime(0)
173 // see header file for class documentation
174 // For reading from multiple TCP ports
176 if ( !AllocDataSources(tcpCnt) )
178 fErrorConnection = 0;
179 fConnectionStatus = ENOMEM;
182 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
184 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
185 if ( fConnectionStatus )
187 fErrorConnection = n;
190 fDataSources[n].fNdx = n;
194 AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
196 AliHLTMonitoringReader(),
197 fCurrentEventType(~(homer_uint64)0),
198 fCurrentEventID(~(homer_uint64)0),
203 fTCPDataSourceCnt(0),
204 fShmDataSourceCnt(0),
205 fDataSourceMaxCnt(0),
207 fConnectionStatus(0),
208 fErrorConnection(~(unsigned int)0),
209 fEventRequestAdvanceTime(0)
211 // see header file for class documentation
212 // For reading from a System V shared memory segment
214 if ( !AllocDataSources(1) )
216 fErrorConnection = 0;
217 fConnectionStatus = ENOMEM;
220 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
221 if ( fConnectionStatus )
222 fErrorConnection = 0;
227 fDataSources[0].fNdx = 0;
231 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
233 AliHLTMonitoringReader(),
234 fCurrentEventType(~(homer_uint64)0),
235 fCurrentEventID(~(homer_uint64)0),
240 fTCPDataSourceCnt(0),
241 fShmDataSourceCnt(0),
242 fDataSourceMaxCnt(0),
244 fConnectionStatus(0),
245 fErrorConnection(~(unsigned int)0),
246 fEventRequestAdvanceTime(0)
248 // see header file for class documentation
249 // For reading from multiple System V shared memory segments
251 if ( !AllocDataSources(shmCnt) )
253 fErrorConnection = 0;
254 fConnectionStatus = ENOMEM;
257 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
259 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
260 if ( fConnectionStatus )
262 fErrorConnection = n;
265 fDataSources[n].fNdx = n;
269 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
270 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
272 AliHLTMonitoringReader(),
273 fCurrentEventType(~(homer_uint64)0),
274 fCurrentEventID(~(homer_uint64)0),
279 fTCPDataSourceCnt(0),
280 fShmDataSourceCnt(0),
281 fDataSourceMaxCnt(0),
283 fConnectionStatus(0),
284 fErrorConnection(~(unsigned int)0),
285 fEventRequestAdvanceTime(0)
287 // see header file for class documentation
288 // For reading from multiple TCP ports and multiple System V shared memory segments
290 if ( !AllocDataSources(tcpCnt+shmCnt) )
292 fErrorConnection = 0;
293 fConnectionStatus = ENOMEM;
296 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
298 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
299 if ( fConnectionStatus )
301 fErrorConnection = n;
304 fDataSources[n].fNdx = n;
306 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
308 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
309 if ( fConnectionStatus )
311 fErrorConnection = tcpCnt+n;
314 fDataSources[n].fNdx = n;
318 AliHLTHOMERReader::AliHLTHOMERReader( const void* pBuffer, int size )
320 AliHLTMonitoringReader(),
321 fCurrentEventType(~(homer_uint64)0),
322 fCurrentEventID(~(homer_uint64)0),
327 fTCPDataSourceCnt(0),
328 fShmDataSourceCnt(0),
329 fDataSourceMaxCnt(0),
331 fConnectionStatus(0),
332 fErrorConnection(~(unsigned int)0),
333 fEventRequestAdvanceTime(0)
335 // see header file for class documentation
336 // For reading from a System V shared memory segment
338 if ( !AllocDataSources(1) )
340 fErrorConnection = 0;
341 fConnectionStatus = ENOMEM;
344 fConnectionStatus = AddDataSource(const_cast<void*>(pBuffer), size, fDataSources[0] );
345 if ( fConnectionStatus )
346 fErrorConnection = 0;
351 fDataSources[0].fNdx = 0;
355 AliHLTHOMERReader::~AliHLTHOMERReader()
357 // see header file for class documentation
358 ReleaseCurrentEvent();
362 delete [] fDataSources;
365 int AliHLTHOMERReader::ReadNextEvent()
367 // see header file for class documentation
368 // Read in the next available event
369 return ReadNextEvent( false, 0 );
372 int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
374 // see header file for class documentation
375 // Read in the next available event
376 return ReadNextEvent( true, timeout );
379 unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
381 // see header file for class documentation
382 // Return the size (in bytes) of the current event's data
383 // block with the given block index (starting at 0).
384 if ( ndx >= fBlockCnt )
386 return fBlocks[ndx].fLength;
389 const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
391 // see header file for class documentation
392 // Return a pointer to the start of the current event's data
393 // block with the given block index (starting at 0).
394 if ( ndx >= fBlockCnt )
396 return fBlocks[ndx].fData;
399 const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
401 // see header file for class documentation
402 // Return IP address or hostname of node which sent the
403 // current event's data block with the given block index
405 // For HOMER this is the ID of the node on which the subscriber
406 // that provided this data runs/ran.
407 if ( ndx >= fBlockCnt )
410 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
412 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
413 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
417 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
418 //return fBlocks[ndx].fOriginatingNodeID;
421 homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
423 // see header file for class documentation
424 // Return byte order of the data stored in the
425 // current event's data block with the given block index (starting at 0).
426 // 0 is unknown alignment,
427 // 1 ist little endian,
428 // 2 is big endian. */
429 if ( ndx >= fBlockCnt )
431 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
432 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
435 homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
437 // see header file for class documentation
438 // Return the alignment (in bytes) of the given datatype
439 // in the data stored in the current event's data block
440 // with the given block index (starting at 0).
441 // Possible values for the data type are
448 if ( ndx >= fBlockCnt )
450 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
452 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
453 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
456 homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
458 // see header file for class documentation
459 if ( ndx >= fBlockCnt )
461 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
465 /* Return the type of the data in the current event's data
466 block with the given block index (starting at 0). */
467 homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
469 // see header file for class documentation
470 if ( ndx >= fBlockCnt )
471 return ~(homer_uint64)0;
472 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
473 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
476 /* Return the origin of the data in the current event's data
477 block with the given block index (starting at 0). */
478 homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
480 // see header file for class documentation
481 if ( ndx >= fBlockCnt )
482 return ~(homer_uint32)0;
483 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
484 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
487 /* Return a specification of the data in the current event's data
488 block with the given block index (starting at 0). */
489 homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
491 // see header file for class documentation
492 if ( ndx >= fBlockCnt )
493 return ~(homer_uint32)0;
494 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
495 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
498 /* Find the next data block in the current event with the given
499 data type, origin, and specification. Returns the block's
501 unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
502 homer_uint32 spec, unsigned long startNdx ) const
504 // see header file for class documentation
505 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
507 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
508 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
509 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
512 return ~(unsigned long)0;
515 /* Find the next data block in the current event with the given
516 data type, origin, and specification. Returns the block's
518 unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4],
519 homer_uint32 spec, unsigned long startNdx ) const
521 // see header file for class documentation
522 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
524 bool found1=true, found2=true;
525 for ( unsigned i = 0; i < 8; i++ )
527 if ( type[i] != (char)0xFF )
536 for ( unsigned i = 0; i < 8; i++ )
538 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
539 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
546 for ( unsigned i = 0; i < 4; i++ )
548 if ( origin[i] != (char)0xFF )
557 for ( unsigned i = 0; i < 4; i++ )
559 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
560 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
567 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
568 if ( found1 && found2 &&
569 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
572 return ~(unsigned long)0;
575 /* Return the ID of the node that actually produced this data block.
576 This may be different from the node which sent the data to this
577 monitoring object as returned by GetBlockSendNodeID. */
578 const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
580 // see header file for class documentation
581 if ( ndx >= fBlockCnt )
583 return fBlocks[ndx].fOriginatingNodeID;
587 void AliHLTHOMERReader::Init()
589 // see header file for class documentation
590 fCurrentEventType = ~(homer_uint64)0;
591 fCurrentEventID = ~(homer_uint64)0;
592 fMaxBlockCnt = fBlockCnt = 0;
595 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
599 fConnectionStatus = 0;
600 fErrorConnection = ~(unsigned int)0;
602 fEventRequestAdvanceTime = 0;
605 bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
607 // see header file for class documentation
608 fDataSources = new DataSource[ sourceCnt ];
611 memset(fDataSources, 0, sizeof(DataSource)*sourceCnt);
613 fDataSourceMaxCnt = sourceCnt;
617 int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
619 // see header file for class documentation
621 he = gethostbyname( hostname );
624 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
625 return EADDRNOTAVAIL;
628 struct sockaddr_in remoteAddr;
629 remoteAddr.sin_family = AF_INET; // host byte order
630 remoteAddr.sin_port = htons(port); // short, network byte order
631 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
632 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
634 // Create socket and connect to target program on remote node
635 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
636 if ( source.fTCPConnection == -1 )
643 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
647 close( source.fTCPConnection );
651 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
652 if ( ret != (int)strlen(MOD_BIN) )
655 close( source.fTCPConnection );
659 char* tmpchar = new char[ strlen( hostname )+1 ];
662 close( source.fTCPConnection );
665 strcpy( tmpchar, hostname );
666 source.fHostname = tmpchar;
669 source.fTCPPort = port;
671 source.fDataSize = 0;
672 source.fDataRead = 0;
676 int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
678 // see header file for class documentation
680 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
685 gethostname( tmpchar, MAXHOSTNAMELEN );
686 tmpchar[MAXHOSTNAMELEN]=(char)0;
687 source.fHostname = tmpchar;
689 source.fShmID = shmget( shmKey, shmSize, 0660 );
690 if ( source.fShmID == -1 )
693 delete [] source.fHostname;
697 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
699 if ( !source.fShmPtr )
702 shmctl( source.fShmID, IPC_RMID, NULL );
703 delete [] source.fHostname;
708 source.fShmKey = shmKey;
709 source.fShmSize = shmSize;
710 source.fDataSize = 0;
711 source.fDataRead = 0;
715 int AliHLTHOMERReader::AddDataSource( void* pBuffer, int size, DataSource& source )
717 // see header file for class documentation
718 // a buffer data source is like a shm source apart from the shm attach and detach
719 // procedure. Furthermore, the size indicator at the beginning of the buffer is not
720 // cleared right before sources are read but after the reading.
722 if ( !pBuffer || size<=0) return EINVAL;
724 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
729 gethostname( tmpchar, MAXHOSTNAMELEN );
730 tmpchar[MAXHOSTNAMELEN]=(char)0;
731 source.fHostname = tmpchar;
734 // the data buffer does not contain a size indicator in the first 4 bytes
735 // like the shm source buffer. Still we want to use the mechanism to invalidate/
736 // trigger by clearing the size indicator. Take the source.fShmSize variable.
737 source.fShmPtr = &source.fShmSize;
740 source.fShmSize = size;
741 source.fData = pBuffer;
742 source.fDataSize = 0;
743 source.fDataRead = 0;
747 void AliHLTHOMERReader::FreeDataSources()
749 // see header file for class documentation
750 for ( unsigned n=0; n < fDataSourceCnt; n++ )
752 if ( fDataSources[n].fType == kTCP )
753 FreeTCPDataSource( fDataSources[n] );
754 else if ( fDataSources[n].fType == kShm )
755 FreeShmDataSource( fDataSources[n] );
756 if ( fDataSources[n].fHostname )
757 delete [] fDataSources[n].fHostname;
762 int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
764 // see header file for class documentation
765 if ( source.fShmPtr )
766 shmdt( (char*)source.fShmPtr );
767 // if ( source.fShmID != -1 )
768 // shmctl( source.fShmID, IPC_RMID, NULL );
772 int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
774 // see header file for class documentation
775 if ( source.fTCPConnection )
776 close( source.fTCPConnection );
780 int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
782 // see header file for class documentation
783 if ( fDataSourceCnt<=0 )
785 // Clean up currently active event.
786 ReleaseCurrentEvent();
788 // Trigger all configured data sources
789 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
791 if ( fDataSources[n].fType == kTCP )
792 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
793 else if ( fDataSources[n].fType == kShm )
794 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
797 fErrorConnection = n;
798 fConnectionStatus=ret;
799 return fConnectionStatus;
802 // Now read in data from the configured data source
803 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
808 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
813 // for ( unsigned n = 0; n<fDataSourceCnt; n++ )
815 // if ( fDataSources[n].fType == kTCP )
816 // ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
818 // ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
821 // fErrorConnection = n;
822 // fConnectionStatus=ret;
823 // return fConnectionStatus;
826 //Check to see that all sources contributed data for the same event
827 homer_uint64 eventID;
828 homer_uint64 eventType;
829 if (!fDataSources[0].fData)
831 fErrorConnection = 0;
832 fConnectionStatus=56;//ENOBUF;
833 return fConnectionStatus;
835 eventID = GetSourceEventID( fDataSources[0] );
836 eventType = GetSourceEventType( fDataSources[0] );
837 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
839 if ( !fDataSources[n].fData || GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
841 fErrorConnection = n;
842 fConnectionStatus=56;//EBADRQC;
843 return fConnectionStatus;
846 // Find all the different data blocks contained in the data from all
848 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
850 ret = ParseSourceData( fDataSources[n] );
853 fErrorConnection = n;
854 fConnectionStatus=57;//EBADSLT;
858 fCurrentEventID = eventID;
859 fCurrentEventType = eventType;
863 void AliHLTHOMERReader::ReleaseCurrentEvent()
865 // see header file for class documentation
866 // sources.fDataRead = 0;
868 fCurrentEventID = ~(homer_uint64)0;
869 fCurrentEventType = ~(homer_uint64)0;
870 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
872 if ( fDataSources[n].fData )
874 if ( fDataSources[n].fType == kTCP )
875 delete [] (homer_uint8*)fDataSources[n].fData;
876 // do not reset the data pointer for kBuf sources since this
877 // can not be set again.
878 if ( fDataSources[n].fType != kBuf )
879 fDataSources[n].fData = NULL;
881 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
885 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
887 if ( fBlocks[n].fOriginatingNodeID )
888 delete [] fBlocks[n].fOriginatingNodeID;
897 int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
899 // see header file for class documentation
901 struct timeval oldSndTO, newSndTO;
904 socklen_t optlen=sizeof(oldSndTO);
905 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
910 if ( optlen!=sizeof(oldSndTO) )
914 newSndTO.tv_sec = timeoutUsec / 1000000;
915 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
916 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
922 // Send one event request
923 if ( !fEventRequestAdvanceTime )
925 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
927 if ( ret != (int)strlen(GET_ONE) )
930 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
938 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
939 if ( len>128 || len<0 )
942 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
946 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
948 if ( ret != (int)strlen(tmpCmd) )
951 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
959 int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long ) const
961 // see header file for class documentation
962 // clear the size indicator in the first 4 bytes of the buffer to request data
963 // from the HOMER writer.
964 if ( source.fShmPtr )
966 *(homer_uint32*)( source.fShmPtr ) = 0;
973 int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
975 // see header file for class documentation
983 unsigned firstConnection=~(unsigned)0;
984 for ( unsigned long n = 0; n < sourceCnt; n++ )
986 if ( sources[n].fDataSize == 0 // size specifier not yet read
987 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
990 FD_SET( sources[n].fTCPConnection, &conns );
991 if ( sources[n].fTCPConnection > highestConn )
992 highestConn = sources[n].fTCPConnection;
993 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
994 if ( firstConnection == ~(unsigned)0 )
999 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
1004 struct timeval tv, *ptv;
1007 tv.tv_sec = timeout / 1000000;
1008 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1013 // wait until something is ready to be read
1014 // either for timeout usecs or until eternity
1016 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
1019 fErrorConnection = firstConnection;
1021 fConnectionStatus = errno;
1023 fConnectionStatus = ETIMEDOUT;
1024 return fConnectionStatus;
1026 for ( unsigned n = 0; n < sourceCnt; n++ )
1028 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
1030 if ( sources[n].fDataSize == 0 )
1032 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
1033 if ( ret != sizeof(homer_uint32) )
1035 fErrorConnection = n;
1037 fConnectionStatus = errno;
1039 fConnectionStatus = ENOMSG;
1040 return fConnectionStatus;
1042 sources[n].fDataSize = ntohl( sources[n].fDataSize );
1043 sources[n].fDataRead = 0;
1044 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
1045 if ( !sources[n].fData )
1047 fErrorConnection = n;
1048 fConnectionStatus = ENOMEM;
1049 return fConnectionStatus;
1052 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
1054 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
1056 sources[n].fDataRead += ret;
1057 else if ( ret == 0 )
1059 fErrorConnection = n;
1060 fConnectionStatus = ECONNRESET;
1061 return fConnectionStatus;
1065 fErrorConnection = n;
1066 fConnectionStatus = errno;
1067 return fConnectionStatus;
1072 fErrorConnection = n;
1073 fConnectionStatus = ENXIO;
1074 return fConnectionStatus;
1085 int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
1087 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
1088 // Send one event request
1089 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
1090 if ( ret != strlen(GET_ONE) )
1094 // wait for and read back size specifier
1096 // The value transmitted is binary, in network byte order
1097 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1098 if ( ret != sizeof(sizeNBO) )
1102 // Convert back to host byte order
1103 source.fDataSize = ntohl( sizeNBO );
1104 source.fData = new homer_uint8[ source.fDataSize ];
1105 unsigned long dataRead=0, toRead;
1106 if ( !source.fData )
1109 // Read in data into buffer in order not to block connection
1110 while ( dataRead < source.fDataSize )
1112 if ( source.fDataSize-dataRead > 1024 )
1115 toRead = source.fDataSize-dataRead;
1116 ret = read( source.fTCPConnection, buffer, toRead );
1124 while ( dataRead < source.fDataSize )
1126 toRead = source.fDataSize-dataRead;
1127 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1130 else if ( ret == 0 && useTimeout )
1133 tv.tv_sec = timeout / 1000000;
1134 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1137 FD_SET( source.fTCPConnection, &conns );
1138 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1142 else if ( ret == 0 )
1159 int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1165 int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1167 // see header file for class documentation
1168 struct timeval tv1, tv2;
1172 gettimeofday( &tv1, NULL );
1177 for ( unsigned n = 0; n < sourceCnt; n++ )
1179 if ( !sources[n].fDataSize )
1181 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1184 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1185 if (sources[n].fType==kBuf)
1187 // the data buffer is already set to fData, just need to set fDataSize member
1188 // to invalidate after the first reading. Subsequent calls to ReadNextEvent return 0
1189 TriggerShmSource( sources[n], 0, 0 );
1192 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1196 if ( found && useTimeout )
1197 gettimeofday( &tv1, NULL );
1198 if ( !all && useTimeout )
1200 gettimeofday( &tv2, NULL );
1201 unsigned long long tdiff;
1202 tdiff = tv2.tv_sec-tv1.tv_sec;
1204 tdiff += tv2.tv_usec-tv1.tv_usec;
1205 if ( tdiff > timeout )
1215 int AliHLTHOMERReader::ParseSourceData( DataSource& source )
1217 // see header file for class documentation
1220 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1221 if (sourceByteOrder!=kHOMERLittleEndianByteOrder && sourceByteOrder!=kHOMERBigEndianByteOrder) return EBADMSG;
1222 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1223 // block count is not related to size of the data in the way the
1224 // following condition implies. But we can at least limit the block
1225 // count for the case the data is corrupted
1226 if (blockCnt>source.fDataSize) return EBADMSG;
1227 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1230 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1231 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1233 if (descrOffset+kLength_64b_Offset>=source.fDataSize) return EBADMSG;
1234 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1235 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1236 if (descrOffset+descrLen>=source.fDataSize) return EBADMSG;
1237 if (Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kID_64b_Offset ] ) != HOMER_BLOCK_DESCRIPTOR_TYPEID) return 126/*ENOKEY*/;
1238 fBlocks[fBlockCnt].fSource = source.fNdx;
1239 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1240 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1241 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1242 struct in_addr tmpA;
1243 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1244 char* addr = inet_ntoa( tmpA );
1245 char* tmpchar = new char[ strlen(addr)+1 ];
1248 strcpy( tmpchar, addr );
1249 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1250 descrOffset += descrLen;
1257 int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1259 // see header file for class documentation
1260 DataBlock* newBlocks;
1261 newBlocks = new DataBlock[ newCnt ];
1264 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1265 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1266 if ( newCnt > fMaxBlockCnt )
1267 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1270 fBlocks = newBlocks;
1271 fMaxBlockCnt = newCnt;
1275 homer_uint64 AliHLTHOMERReader::GetSourceEventID( DataSource& source )
1277 // see header file for class documentation
1278 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1279 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1282 homer_uint64 AliHLTHOMERReader::GetSourceEventType( DataSource& source )
1284 // see header file for class documentation
1285 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1286 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1289 homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1291 // see header file for class documentation
1292 if ( destFormat == sourceFormat )
1295 return ((source & 0xFFULL) << 56) |
1296 ((source & 0xFF00ULL) << 40) |
1297 ((source & 0xFF0000ULL) << 24) |
1298 ((source & 0xFF000000ULL) << 8) |
1299 ((source & 0xFF00000000ULL) >> 8) |
1300 ((source & 0xFF0000000000ULL) >> 24) |
1301 ((source & 0xFF000000000000ULL) >> 40) |
1302 ((source & 0xFF00000000000000ULL) >> 56);
1305 homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1307 // see header file for class documentation
1308 if ( destFormat == sourceFormat )
1311 return ((source & 0xFFUL) << 24) |
1312 ((source & 0xFF00UL) << 8) |
1313 ((source & 0xFF0000UL) >> 8) |
1314 ((source & 0xFF000000UL) >> 24);
1317 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPort(const char* hostname, unsigned short port )
1319 // see header file for function documentation
1320 return new AliHLTHOMERReader(hostname, port);
1323 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPorts(unsigned int tcpCnt, const char** hostnames, unsigned short* ports)
1325 // see header file for function documentation
1326 return new AliHLTHOMERReader(tcpCnt, hostnames, ports);
1329 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromBuffer(const void* pBuffer, int size)
1331 // see header file for function documentation
1332 return new AliHLTHOMERReader(pBuffer, size);
1335 void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1337 // see header file for function documentation
1338 if (pInstance) delete pInstance;
1342 ***************************************************************************
1344 ** $Author$ - Initial Version by Timm Morten Steinbeck
1348 ***************************************************************************