1 /************************************************************************
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck,
8 ** timm@kip.uni-heidelberg.de
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
17 ** Newer versions of this file's package will be made available from
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
21 *************************************************************************/
24 ***************************************************************************
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
30 ***************************************************************************
33 /** @file AliHLTHOMERReader.cxx
34 @author Timm Steinbeck
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
39 // see header file for class documentation
41 // refer to README to build package
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
45 #include "AliHLTHOMERReader.h"
54 //#include <sys/types.h>
55 //#include <sys/socket.h>
56 //#include <netinet/in.h>
57 //#include <netinet/tcp.h>
60 #include <rpc/types.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
71 #define MOD_BIN "MOD BIN\n"
72 #define MOD_ASC "MOD ASC\n"
73 #define GET_ONE "GET ONE\n"
74 #define GET_ALL "GET ALL\n"
76 // MAXHOSTNAMELEN not defined on macosx
77 // 686-apple-darwin9-gcc-4.0.1
78 #ifndef MAXHOSTNAMELEN
79 #define MAXHOSTNAMELEN 64
83 ClassImp(AliHLTMonitoringReader);
84 ClassImp(AliHLTHOMERReader);
92 AliHLTHOMERReader::AliHLTHOMERReader()
94 fCurrentEventType(~(homer_uint64)0),
95 fCurrentEventID(~(homer_uint64)0),
100 fTCPDataSourceCnt(0),
101 fShmDataSourceCnt(0),
102 fDataSourceMaxCnt(0),
104 fConnectionStatus(0),
105 fErrorConnection(~(unsigned int)0),
106 fEventRequestAdvanceTime(0)
108 // Reader implementation of the HOMER interface.
109 // The HLT Monitoring Environment including ROOT is
110 // a native interface to ship out data from the HLT chain.
111 // See pdf document shiped with the package
112 // for class documentation and tutorial.
118 AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
120 AliHLTMonitoringReader(),
122 fCurrentEventType(~(homer_uint64)0),
123 fCurrentEventID(~(homer_uint64)0),
128 fTCPDataSourceCnt(0),
129 fShmDataSourceCnt(0),
130 fDataSourceMaxCnt(0),
132 fConnectionStatus(0),
133 fErrorConnection(~(unsigned int)0),
134 fEventRequestAdvanceTime(0)
136 // see header file for class documentation
137 // For reading from a TCP port
139 if ( !AllocDataSources(1) )
141 fErrorConnection = 0;
142 fConnectionStatus = ENOMEM;
145 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
146 if ( fConnectionStatus )
147 fErrorConnection = 0;
152 fDataSources[0].fNdx = 0;
156 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, const unsigned short* ports )
158 AliHLTMonitoringReader(),
160 fCurrentEventType(~(homer_uint64)0),
161 fCurrentEventID(~(homer_uint64)0),
166 fTCPDataSourceCnt(0),
167 fShmDataSourceCnt(0),
168 fDataSourceMaxCnt(0),
170 fConnectionStatus(0),
171 fErrorConnection(~(unsigned int)0),
172 fEventRequestAdvanceTime(0)
174 // see header file for class documentation
175 // For reading from multiple TCP ports
177 if ( !AllocDataSources(tcpCnt) )
179 fErrorConnection = 0;
180 fConnectionStatus = ENOMEM;
183 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
185 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
186 if ( fConnectionStatus )
188 fErrorConnection = n;
191 fDataSources[n].fNdx = n;
195 AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
197 AliHLTMonitoringReader(),
199 fCurrentEventType(~(homer_uint64)0),
200 fCurrentEventID(~(homer_uint64)0),
205 fTCPDataSourceCnt(0),
206 fShmDataSourceCnt(0),
207 fDataSourceMaxCnt(0),
209 fConnectionStatus(0),
210 fErrorConnection(~(unsigned int)0),
211 fEventRequestAdvanceTime(0)
213 // see header file for class documentation
214 // For reading from a System V shared memory segment
216 if ( !AllocDataSources(1) )
218 fErrorConnection = 0;
219 fConnectionStatus = ENOMEM;
222 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
223 if ( fConnectionStatus )
224 fErrorConnection = 0;
229 fDataSources[0].fNdx = 0;
233 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, const key_t* shmKeys, const int* shmSizes )
235 AliHLTMonitoringReader(),
237 fCurrentEventType(~(homer_uint64)0),
238 fCurrentEventID(~(homer_uint64)0),
243 fTCPDataSourceCnt(0),
244 fShmDataSourceCnt(0),
245 fDataSourceMaxCnt(0),
247 fConnectionStatus(0),
248 fErrorConnection(~(unsigned int)0),
249 fEventRequestAdvanceTime(0)
251 // see header file for class documentation
252 // For reading from multiple System V shared memory segments
254 if ( !AllocDataSources(shmCnt) )
256 fErrorConnection = 0;
257 fConnectionStatus = ENOMEM;
260 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
262 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
263 if ( fConnectionStatus )
265 fErrorConnection = n;
268 fDataSources[n].fNdx = n;
272 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, const unsigned short* ports,
273 unsigned int shmCnt, const key_t* shmKeys, const int* shmSizes )
275 AliHLTMonitoringReader(),
277 fCurrentEventType(~(homer_uint64)0),
278 fCurrentEventID(~(homer_uint64)0),
283 fTCPDataSourceCnt(0),
284 fShmDataSourceCnt(0),
285 fDataSourceMaxCnt(0),
287 fConnectionStatus(0),
288 fErrorConnection(~(unsigned int)0),
289 fEventRequestAdvanceTime(0)
291 // see header file for class documentation
292 // For reading from multiple TCP ports and multiple System V shared memory segments
294 if ( !AllocDataSources(tcpCnt+shmCnt) )
296 fErrorConnection = 0;
297 fConnectionStatus = ENOMEM;
300 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
302 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
303 if ( fConnectionStatus )
305 fErrorConnection = n;
308 fDataSources[n].fNdx = n;
310 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
312 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
313 if ( fConnectionStatus )
315 fErrorConnection = tcpCnt+n;
318 fDataSources[n].fNdx = n;
322 AliHLTHOMERReader::AliHLTHOMERReader( const void* pBuffer, int size )
324 AliHLTMonitoringReader(),
326 fCurrentEventType(~(homer_uint64)0),
327 fCurrentEventID(~(homer_uint64)0),
332 fTCPDataSourceCnt(0),
333 fShmDataSourceCnt(0),
334 fDataSourceMaxCnt(0),
336 fConnectionStatus(0),
337 fErrorConnection(~(unsigned int)0),
338 fEventRequestAdvanceTime(0)
340 // see header file for class documentation
341 // For reading from a System V shared memory segment
343 if ( !AllocDataSources(1) )
345 fErrorConnection = 0;
346 fConnectionStatus = ENOMEM;
349 fConnectionStatus = AddDataSource(const_cast<void*>(pBuffer), size, fDataSources[0] );
350 if ( fConnectionStatus )
351 fErrorConnection = 0;
356 fDataSources[0].fNdx = 0;
360 AliHLTHOMERReader::~AliHLTHOMERReader()
362 // see header file for class documentation
363 ReleaseCurrentEvent();
367 delete [] fDataSources;
370 int AliHLTHOMERReader::ReadNextEvent()
372 // see header file for class documentation
373 // Read in the next available event
374 return ReadNextEvent( false, 0 );
377 int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
379 // see header file for class documentation
380 // Read in the next available event
381 return ReadNextEvent( true, timeout );
384 unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
386 // see header file for class documentation
387 // Return the size (in bytes) of the current event's data
388 // block with the given block index (starting at 0).
389 if ( ndx >= fBlockCnt )
391 return fBlocks[ndx].fLength;
394 const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
396 // see header file for class documentation
397 // Return a pointer to the start of the current event's data
398 // block with the given block index (starting at 0).
399 if ( ndx >= fBlockCnt )
401 return fBlocks[ndx].fData;
404 const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
406 // see header file for class documentation
407 // Return IP address or hostname of node which sent the
408 // current event's data block with the given block index
410 // For HOMER this is the ID of the node on which the subscriber
411 // that provided this data runs/ran.
412 if ( ndx >= fBlockCnt )
415 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
417 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%u) >= fDataSourceCnt (%u)\n",
418 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
422 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
423 //return fBlocks[ndx].fOriginatingNodeID;
426 homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
428 // see header file for class documentation
429 // Return byte order of the data stored in the
430 // current event's data block with the given block index (starting at 0).
431 // 0 is unknown alignment,
432 // 1 ist little endian,
433 // 2 is big endian. */
434 if ( ndx >= fBlockCnt )
436 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
437 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
440 homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
442 // see header file for class documentation
443 // Return the alignment (in bytes) of the given datatype
444 // in the data stored in the current event's data block
445 // with the given block index (starting at 0).
446 // Possible values for the data type are
453 if ( ndx >= fBlockCnt )
455 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
457 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
458 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
461 homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
463 // see header file for class documentation
464 if ( ndx >= fBlockCnt )
466 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
470 /* Return the type of the data in the current event's data
471 block with the given block index (starting at 0). */
472 homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
474 // see header file for class documentation
475 if ( ndx >= fBlockCnt )
476 return ~(homer_uint64)0;
477 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
478 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
481 /* Return the origin of the data in the current event's data
482 block with the given block index (starting at 0). */
483 homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
485 // see header file for class documentation
486 if ( ndx >= fBlockCnt )
487 return ~(homer_uint32)0;
488 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
489 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
492 /* Return a specification of the data in the current event's data
493 block with the given block index (starting at 0). */
494 homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
496 // see header file for class documentation
497 if ( ndx >= fBlockCnt )
498 return ~(homer_uint32)0;
499 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
500 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
503 homer_uint64 AliHLTHOMERReader::GetBlockBirthSeconds( unsigned long ndx ) const
505 // see header file for class documentation
506 if ( ndx >= fBlockCnt )
508 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kBirth_s_64b_Offset);
511 homer_uint64 AliHLTHOMERReader::GetBlockBirthMicroSeconds( unsigned long ndx ) const
513 // see header file for class documentation
514 if ( ndx >= fBlockCnt )
516 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kBirth_us_64b_Offset);
519 /* Find the next data block in the current event with the given
520 data type, origin, and specification. Returns the block's
522 unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
523 homer_uint32 spec, unsigned long startNdx ) const
525 // see header file for class documentation
526 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
528 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
529 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
530 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
533 return ~(unsigned long)0;
536 /* Find the next data block in the current event with the given
537 data type, origin, and specification. Returns the block's
539 unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4],
540 homer_uint32 spec, unsigned long startNdx ) const
542 // see header file for class documentation
543 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
545 bool found1=true, found2=true;
546 for ( unsigned i = 0; i < 8; i++ )
548 if ( type[i] != (char)0xFF )
557 for ( unsigned i = 0; i < 8; i++ )
559 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
560 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
567 for ( unsigned i = 0; i < 4; i++ )
569 if ( origin[i] != (char)0xFF )
578 for ( unsigned i = 0; i < 4; i++ )
580 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
581 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
588 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
589 if ( found1 && found2 &&
590 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
593 return ~(unsigned long)0;
596 /* Return the ID of the node that actually produced this data block.
597 This may be different from the node which sent the data to this
598 monitoring object as returned by GetBlockSendNodeID. */
599 const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
601 // see header file for class documentation
602 if ( ndx >= fBlockCnt )
604 return fBlocks[ndx].fOriginatingNodeID;
608 void AliHLTHOMERReader::Init()
610 // see header file for class documentation
611 fCurrentEventType = ~(homer_uint64)0;
612 fCurrentEventID = ~(homer_uint64)0;
613 fMaxBlockCnt = fBlockCnt = 0;
616 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
620 fConnectionStatus = 0;
621 fErrorConnection = ~(unsigned int)0;
623 fEventRequestAdvanceTime = 0;
626 bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
628 // see header file for class documentation
629 fDataSources = new DataSource[ sourceCnt ];
632 memset(fDataSources, 0, sizeof(DataSource)*sourceCnt);
634 fDataSourceMaxCnt = sourceCnt;
638 int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
640 // see header file for class documentation
642 he = gethostbyname( hostname );
645 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
646 return EADDRNOTAVAIL;
649 struct sockaddr_in remoteAddr;
650 remoteAddr.sin_family = AF_INET; // host byte order
651 remoteAddr.sin_port = htons(port); // short, network byte order
652 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
653 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
655 // Create socket and connect to target program on remote node
656 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
657 if ( source.fTCPConnection == -1 )
664 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
668 close( source.fTCPConnection );
672 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
673 if ( ret != (int)strlen(MOD_BIN) )
676 close( source.fTCPConnection );
680 unsigned hostnamelen=strlen( hostname );
681 char* tmpchar = new char[ hostnamelen+1 ];
684 close( source.fTCPConnection );
687 strncpy( tmpchar, hostname, hostnamelen );
688 tmpchar[hostnamelen]=0;
689 source.fHostname = tmpchar;
692 source.fTCPPort = port;
694 source.fDataSize = 0;
695 source.fDataRead = 0;
699 int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
701 // see header file for class documentation
703 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
708 gethostname( tmpchar, MAXHOSTNAMELEN );
709 tmpchar[MAXHOSTNAMELEN]=(char)0;
710 source.fHostname = tmpchar;
712 source.fShmID = shmget( shmKey, shmSize, 0660 );
713 if ( source.fShmID == -1 )
716 delete [] source.fHostname;
720 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
722 if ( !source.fShmPtr )
725 shmctl( source.fShmID, IPC_RMID, NULL );
726 delete [] source.fHostname;
731 source.fShmKey = shmKey;
732 source.fShmSize = shmSize;
733 source.fDataSize = 0;
734 source.fDataRead = 0;
738 int AliHLTHOMERReader::AddDataSource( void* pBuffer, int size, DataSource& source )
740 // see header file for class documentation
741 // a buffer data source is like a shm source apart from the shm attach and detach
742 // procedure. Furthermore, the size indicator at the beginning of the buffer is not
743 // cleared right before sources are read but after the reading.
745 if ( !pBuffer || size<=0) return EINVAL;
747 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
752 gethostname( tmpchar, MAXHOSTNAMELEN );
753 tmpchar[MAXHOSTNAMELEN]=(char)0;
754 source.fHostname = tmpchar;
757 // the data buffer does not contain a size indicator in the first 4 bytes
758 // like the shm source buffer. Still we want to use the mechanism to invalidate/
759 // trigger by clearing the size indicator. Take the source.fShmSize variable.
760 source.fShmPtr = &source.fShmSize;
763 source.fShmSize = size;
764 source.fData = pBuffer;
765 source.fDataSize = 0;
766 source.fDataRead = 0;
770 void AliHLTHOMERReader::FreeDataSources()
772 // see header file for class documentation
773 for ( unsigned n=0; n < fDataSourceCnt; n++ )
775 if ( fDataSources[n].fType == kTCP )
776 FreeTCPDataSource( fDataSources[n] );
777 else if ( fDataSources[n].fType == kShm )
778 FreeShmDataSource( fDataSources[n] );
779 if ( fDataSources[n].fHostname )
780 delete [] fDataSources[n].fHostname;
785 int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
787 // see header file for class documentation
788 if ( source.fShmPtr )
789 shmdt( (char*)source.fShmPtr );
790 // if ( source.fShmID != -1 )
791 // shmctl( source.fShmID, IPC_RMID, NULL );
795 int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
797 // see header file for class documentation
798 if ( source.fTCPConnection )
799 close( source.fTCPConnection );
803 int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
805 // see header file for class documentation
806 if ( fDataSourceCnt<=0 )
808 // Clean up currently active event.
809 ReleaseCurrentEvent();
811 // Trigger all configured data sources
812 for ( unsigned n = 0; n<fDataSourceCnt; n++ ){
813 if ( fDataSources[n].fType == kTCP )
814 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
815 else if ( fDataSources[n].fType == kShm )
816 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
819 fErrorConnection = n;
820 fConnectionStatus=ret;
821 return fConnectionStatus;
824 // Now read in data from the configured data source
825 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
831 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
836 // for ( unsigned n = 0; n<fDataSourceCnt; n++ )
838 // if ( fDataSources[n].fType == kTCP )
839 // ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
841 // ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
844 // fErrorConnection = n;
845 // fConnectionStatus=ret;
846 // return fConnectionStatus;
849 //Check to see that all sources contributed data for the same event
850 homer_uint64 eventID;
851 homer_uint64 eventType;
852 if (!fDataSources[0].fData)
854 fErrorConnection = 0;
855 fConnectionStatus=56;//ENOBUF;
856 return fConnectionStatus;
858 eventID = GetSourceEventID( fDataSources[0] );
859 eventType = GetSourceEventType( fDataSources[0] );
860 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
862 if ( !fDataSources[n].fData || GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
864 fErrorConnection = n;
865 fConnectionStatus=56;//EBADRQC;
866 return fConnectionStatus;
869 // Find all the different data blocks contained in the data from all
871 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
873 ret = ParseSourceData( fDataSources[n] );
876 fErrorConnection = n;
877 fConnectionStatus=57;//EBADSLT;
881 fCurrentEventID = eventID;
882 fCurrentEventType = eventType;
886 void AliHLTHOMERReader::ReleaseCurrentEvent()
888 // see header file for class documentation
889 // sources.fDataRead = 0;
891 fCurrentEventID = ~(homer_uint64)0;
892 fCurrentEventType = ~(homer_uint64)0;
893 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
895 if ( fDataSources[n].fData )
897 if ( fDataSources[n].fType == kTCP )
898 delete [] (homer_uint8*)fDataSources[n].fData;
899 // do not reset the data pointer for kBuf sources since this
900 // can not be set again.
901 if ( fDataSources[n].fType != kBuf )
902 fDataSources[n].fData = NULL;
904 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
908 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
910 if ( fBlocks[n].fOriginatingNodeID )
911 delete [] fBlocks[n].fOriginatingNodeID;
920 int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
922 // see header file for class documentation
924 struct timeval oldSndTO, newSndTO;
925 memset(&oldSndTO, 0, sizeof(oldSndTO));
926 memset(&newSndTO, 0, sizeof(newSndTO));
929 socklen_t optlen=sizeof(oldSndTO);
930 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
935 if ( optlen!=sizeof(oldSndTO) )
939 newSndTO.tv_sec = timeoutUsec / 1000000;
940 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
941 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
947 // Send one event request
948 if ( !fEventRequestAdvanceTime )
950 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
952 if ( ret != (int)strlen(GET_ONE) )
955 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
963 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
964 if ( len>128 || len<0 )
967 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
971 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
973 if ( ret != (int)strlen(tmpCmd) )
976 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
984 int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long ) const
986 // see header file for class documentation
987 // clear the size indicator in the first 4 bytes of the buffer to request data
988 // from the HOMER writer.
989 if ( source.fShmPtr )
991 *(homer_uint32*)( source.fShmPtr ) = 0;
998 int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1000 // see header file for class documentation
1001 bool toRead = false;
1008 unsigned firstConnection=~(unsigned)0;
1009 for ( unsigned long n = 0; n < sourceCnt; n++ )
1011 if ( sources[n].fDataSize == 0 // size specifier not yet read
1012 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
1015 FD_SET( sources[n].fTCPConnection, &conns );
1016 if ( sources[n].fTCPConnection > highestConn )
1017 highestConn = sources[n].fTCPConnection;
1018 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
1019 if ( firstConnection == ~(unsigned)0 )
1020 firstConnection = n;
1024 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
1029 struct timeval tv, *ptv;
1032 tv.tv_sec = timeout / 1000000;
1033 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1038 // wait until something is ready to be read
1039 // either for timeout usecs or until eternity
1041 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
1044 fErrorConnection = firstConnection;
1046 fConnectionStatus = errno;
1048 fConnectionStatus = ETIMEDOUT;
1049 return fConnectionStatus;
1051 for ( unsigned n = 0; n < sourceCnt; n++ )
1053 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
1055 if ( sources[n].fDataSize == 0 )
1057 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
1058 if ( ret != sizeof(homer_uint32) )
1060 fErrorConnection = n;
1062 fConnectionStatus = errno;
1064 fConnectionStatus = ENOMSG;
1065 return fConnectionStatus;
1067 sources[n].fDataSize = ntohl( sources[n].fDataSize );
1068 sources[n].fDataRead = 0;
1069 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
1070 if ( !sources[n].fData )
1072 fErrorConnection = n;
1073 fConnectionStatus = ENOMEM;
1074 return fConnectionStatus;
1077 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
1079 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
1081 sources[n].fDataRead += ret;
1082 else if ( ret == 0 )
1084 fErrorConnection = n;
1085 fConnectionStatus = ECONNRESET;
1086 return fConnectionStatus;
1090 fErrorConnection = n;
1091 fConnectionStatus = errno;
1092 return fConnectionStatus;
1097 fErrorConnection = n;
1098 fConnectionStatus = ENXIO;
1099 return fConnectionStatus;
1110 int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
1112 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
1113 // Send one event request
1114 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
1115 if ( ret != strlen(GET_ONE) )
1119 // wait for and read back size specifier
1121 // The value transmitted is binary, in network byte order
1122 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1123 if ( ret != sizeof(sizeNBO) )
1127 // Convert back to host byte order
1128 source.fDataSize = ntohl( sizeNBO );
1129 source.fData = new homer_uint8[ source.fDataSize ];
1130 unsigned long dataRead=0, toRead;
1131 if ( !source.fData )
1134 // Read in data into buffer in order not to block connection
1135 while ( dataRead < source.fDataSize )
1137 if ( source.fDataSize-dataRead > 1024 )
1140 toRead = source.fDataSize-dataRead;
1141 ret = read( source.fTCPConnection, buffer, toRead );
1149 while ( dataRead < source.fDataSize )
1151 toRead = source.fDataSize-dataRead;
1152 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1155 else if ( ret == 0 && useTimeout )
1158 tv.tv_sec = timeout / 1000000;
1159 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1162 FD_SET( source.fTCPConnection, &conns );
1163 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1167 else if ( ret == 0 )
1184 int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1190 int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1192 // see header file for class documentation
1193 struct timeval tv1, tv2;
1197 gettimeofday( &tv1, NULL );
1202 for ( unsigned n = 0; n < sourceCnt; n++ )
1204 if ( !sources[n].fDataSize )
1206 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1209 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1210 if (sources[n].fType==kBuf)
1212 // the data buffer is already set to fData, just need to set fDataSize member
1213 // to invalidate after the first reading. Subsequent calls to ReadNextEvent return 0
1214 TriggerShmSource( sources[n], 0, 0 );
1217 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1221 if ( found && useTimeout )
1222 gettimeofday( &tv1, NULL );
1223 if ( !all && useTimeout )
1225 gettimeofday( &tv2, NULL );
1226 unsigned long long tdiff;
1227 tdiff = tv2.tv_sec-tv1.tv_sec;
1229 tdiff += tv2.tv_usec-tv1.tv_usec;
1230 if ( tdiff > timeout )
1240 int AliHLTHOMERReader::ParseSourceData( const DataSource& source )
1242 // see header file for class documentation
1245 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1246 if (sourceByteOrder!=kHOMERLittleEndianByteOrder && sourceByteOrder!=kHOMERBigEndianByteOrder) return EBADMSG;
1247 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1248 // block count is not related to size of the data in the way the
1249 // following condition implies. But we can at least limit the block
1250 // count for the case the data is corrupted
1251 if (blockCnt>source.fDataSize) return EBADMSG;
1252 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1255 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1256 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1258 if (descrOffset+kLength_64b_Offset>=source.fDataSize) return EBADMSG;
1259 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1260 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1261 if (descrOffset+descrLen>=source.fDataSize) return EBADMSG;
1262 if (Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kID_64b_Offset ] ) != HOMER_BLOCK_DESCRIPTOR_TYPEID) return 126/*ENOKEY*/;
1263 fBlocks[fBlockCnt].fSource = source.fNdx;
1264 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1265 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1266 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1267 struct in_addr tmpA;
1268 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1269 char* addr = inet_ntoa( tmpA );
1270 unsigned straddrlen=strlen(addr);
1271 char* tmpchar = new char[ straddrlen+1 ];
1274 strncpy( tmpchar, addr, straddrlen );
1275 tmpchar[straddrlen]=0;
1276 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1277 descrOffset += descrLen;
1284 int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1286 // see header file for class documentation
1287 DataBlock* newBlocks;
1288 newBlocks = new DataBlock[ newCnt ];
1291 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1293 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1297 if ( newCnt > fMaxBlockCnt )
1298 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1301 fBlocks = newBlocks;
1302 fMaxBlockCnt = newCnt;
1306 homer_uint64 AliHLTHOMERReader::GetSourceEventID( const DataSource& source )
1308 // see header file for class documentation
1309 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1310 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1313 homer_uint64 AliHLTHOMERReader::GetSourceEventType( const DataSource& source )
1315 // see header file for class documentation
1316 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1317 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1320 homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1322 // see header file for class documentation
1323 if ( destFormat == sourceFormat )
1326 return ((source & 0xFFULL) << 56) |
1327 ((source & 0xFF00ULL) << 40) |
1328 ((source & 0xFF0000ULL) << 24) |
1329 ((source & 0xFF000000ULL) << 8) |
1330 ((source & 0xFF00000000ULL) >> 8) |
1331 ((source & 0xFF0000000000ULL) >> 24) |
1332 ((source & 0xFF000000000000ULL) >> 40) |
1333 ((source & 0xFF00000000000000ULL) >> 56);
1336 homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1338 // see header file for class documentation
1339 if ( destFormat == sourceFormat )
1342 return ((source & 0xFFUL) << 24) |
1343 ((source & 0xFF00UL) << 8) |
1344 ((source & 0xFF0000UL) >> 8) |
1345 ((source & 0xFF000000UL) >> 24);
1348 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPort(const char* hostname, unsigned short port )
1350 // see header file for function documentation
1351 return new AliHLTHOMERReader(hostname, port);
1354 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPorts(unsigned int tcpCnt, const char** hostnames, unsigned short* ports)
1356 // see header file for function documentation
1357 return new AliHLTHOMERReader(tcpCnt, hostnames, ports);
1360 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromBuffer(const void* pBuffer, int size)
1362 // see header file for function documentation
1363 return new AliHLTHOMERReader(pBuffer, size);
1366 void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1368 // see header file for function documentation
1369 if (pInstance) delete pInstance;
1373 ***************************************************************************
1375 ** $Author$ - Initial Version by Timm Morten Steinbeck
1379 ***************************************************************************