1 /************************************************************************
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck,
8 ** timm@kip.uni-heidelberg.de
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
17 ** Newer versions of this file's package will be made available from
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
21 *************************************************************************/
24 ***************************************************************************
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
30 ***************************************************************************
33 /** @file AliHLTHOMERReader.cxx
34 @author Timm Steinbeck
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
39 // see header file for class documentation
41 // refer to README to build package
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
45 #include "AliHLTHOMERReader.h"
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <netinet/in.h>
54 #include <netinet/tcp.h>
56 #include <rpc/types.h>
59 #include <netinet/in.h>
60 #include <arpa/inet.h>
66 #define MOD_BIN "MOD BIN\n"
67 #define MOD_ASC "MOD ASC\n"
68 #define GET_ONE "GET ONE\n"
69 #define GET_ALL "GET ALL\n"
72 ClassImp(AliHLTMonitoringReader);
73 ClassImp(AliHLTHOMERReader);
81 AliHLTHOMERReader::AliHLTHOMERReader()
83 fCurrentEventType(~(homer_uint64)0),
84 fCurrentEventID(~(homer_uint64)0),
94 fErrorConnection(~(unsigned int)0),
95 fEventRequestAdvanceTime(0)
97 // Reader implementation of the HOMER interface.
98 // The HLT Monitoring Environment including ROOT is
99 // a native interface to ship out data from the HLT chain.
100 // See pdf document shiped with the package
101 // for class documentation and tutorial.
107 AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
109 AliHLTMonitoringReader(),
110 fCurrentEventType(~(homer_uint64)0),
111 fCurrentEventID(~(homer_uint64)0),
116 fTCPDataSourceCnt(0),
117 fShmDataSourceCnt(0),
118 fDataSourceMaxCnt(0),
120 fConnectionStatus(0),
121 fErrorConnection(~(unsigned int)0),
122 fEventRequestAdvanceTime(0)
124 // see header file for class documentation
125 // For reading from a TCP port
127 if ( !AllocDataSources(1) )
129 fErrorConnection = 0;
130 fConnectionStatus = ENOMEM;
133 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
134 if ( fConnectionStatus )
135 fErrorConnection = 0;
140 fDataSources[0].fNdx = 0;
144 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
146 AliHLTMonitoringReader(),
147 fCurrentEventType(~(homer_uint64)0),
148 fCurrentEventID(~(homer_uint64)0),
153 fTCPDataSourceCnt(0),
154 fShmDataSourceCnt(0),
155 fDataSourceMaxCnt(0),
157 fConnectionStatus(0),
158 fErrorConnection(~(unsigned int)0),
159 fEventRequestAdvanceTime(0)
161 // see header file for class documentation
162 // For reading from multiple TCP ports
164 if ( !AllocDataSources(tcpCnt) )
166 fErrorConnection = 0;
167 fConnectionStatus = ENOMEM;
170 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
172 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
173 if ( fConnectionStatus )
175 fErrorConnection = n;
178 fDataSources[n].fNdx = n;
182 AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
184 AliHLTMonitoringReader(),
185 fCurrentEventType(~(homer_uint64)0),
186 fCurrentEventID(~(homer_uint64)0),
191 fTCPDataSourceCnt(0),
192 fShmDataSourceCnt(0),
193 fDataSourceMaxCnt(0),
195 fConnectionStatus(0),
196 fErrorConnection(~(unsigned int)0),
197 fEventRequestAdvanceTime(0)
199 // see header file for class documentation
200 // For reading from a System V shared memory segment
202 if ( !AllocDataSources(1) )
204 fErrorConnection = 0;
205 fConnectionStatus = ENOMEM;
208 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
209 if ( fConnectionStatus )
210 fErrorConnection = 0;
215 fDataSources[0].fNdx = 0;
219 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
221 AliHLTMonitoringReader(),
222 fCurrentEventType(~(homer_uint64)0),
223 fCurrentEventID(~(homer_uint64)0),
228 fTCPDataSourceCnt(0),
229 fShmDataSourceCnt(0),
230 fDataSourceMaxCnt(0),
232 fConnectionStatus(0),
233 fErrorConnection(~(unsigned int)0),
234 fEventRequestAdvanceTime(0)
236 // see header file for class documentation
237 // For reading from multiple System V shared memory segments
239 if ( !AllocDataSources(shmCnt) )
241 fErrorConnection = 0;
242 fConnectionStatus = ENOMEM;
245 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
247 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
248 if ( fConnectionStatus )
250 fErrorConnection = n;
253 fDataSources[n].fNdx = n;
257 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
258 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
260 AliHLTMonitoringReader(),
261 fCurrentEventType(~(homer_uint64)0),
262 fCurrentEventID(~(homer_uint64)0),
267 fTCPDataSourceCnt(0),
268 fShmDataSourceCnt(0),
269 fDataSourceMaxCnt(0),
271 fConnectionStatus(0),
272 fErrorConnection(~(unsigned int)0),
273 fEventRequestAdvanceTime(0)
275 // see header file for class documentation
276 // For reading from multiple TCP ports and multiple System V shared memory segments
278 if ( !AllocDataSources(tcpCnt+shmCnt) )
280 fErrorConnection = 0;
281 fConnectionStatus = ENOMEM;
284 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
286 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
287 if ( fConnectionStatus )
289 fErrorConnection = n;
292 fDataSources[n].fNdx = n;
294 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
296 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
297 if ( fConnectionStatus )
299 fErrorConnection = tcpCnt+n;
302 fDataSources[n].fNdx = n;
306 AliHLTHOMERReader::AliHLTHOMERReader( const void* pBuffer, int size )
308 AliHLTMonitoringReader(),
309 fCurrentEventType(~(homer_uint64)0),
310 fCurrentEventID(~(homer_uint64)0),
315 fTCPDataSourceCnt(0),
316 fShmDataSourceCnt(0),
317 fDataSourceMaxCnt(0),
319 fConnectionStatus(0),
320 fErrorConnection(~(unsigned int)0),
321 fEventRequestAdvanceTime(0)
323 // see header file for class documentation
324 // For reading from a System V shared memory segment
326 if ( !AllocDataSources(1) )
328 fErrorConnection = 0;
329 fConnectionStatus = ENOMEM;
332 //fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
333 if ( fConnectionStatus )
334 fErrorConnection = 0;
339 fDataSources[0].fNdx = 0;
343 AliHLTHOMERReader::~AliHLTHOMERReader()
345 // see header file for class documentation
346 ReleaseCurrentEvent();
350 int AliHLTHOMERReader::ReadNextEvent()
352 // see header file for class documentation
353 // Read in the next available event
354 return ReadNextEvent( false, 0 );
357 int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
359 // see header file for class documentation
360 // Read in the next available event
361 return ReadNextEvent( true, timeout );
364 unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
366 // see header file for class documentation
367 // Return the size (in bytes) of the current event's data
368 // block with the given block index (starting at 0).
369 if ( ndx >= fBlockCnt )
371 return fBlocks[ndx].fLength;
374 const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
376 // see header file for class documentation
377 // Return a pointer to the start of the current event's data
378 // block with the given block index (starting at 0).
379 if ( ndx >= fBlockCnt )
381 return fBlocks[ndx].fData;
384 const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
386 // see header file for class documentation
387 // Return IP address or hostname of node which sent the
388 // current event's data block with the given block index
390 // For HOMER this is the ID of the node on which the subscriber
391 // that provided this data runs/ran.
392 if ( ndx >= fBlockCnt )
395 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
397 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
398 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
402 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
403 //return fBlocks[ndx].fOriginatingNodeID;
406 homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
408 // see header file for class documentation
409 // Return byte order of the data stored in the
410 // current event's data block with the given block index (starting at 0).
411 // 0 is unknown alignment,
412 // 1 ist little endian,
413 // 2 is big endian. */
414 if ( ndx >= fBlockCnt )
416 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
417 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
420 homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
422 // see header file for class documentation
423 // Return the alignment (in bytes) of the given datatype
424 // in the data stored in the current event's data block
425 // with the given block index (starting at 0).
426 // Possible values for the data type are
433 if ( ndx >= fBlockCnt )
435 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
437 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
438 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
441 homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
443 // see header file for class documentation
444 if ( ndx >= fBlockCnt )
446 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
450 /* Return the type of the data in the current event's data
451 block with the given block index (starting at 0). */
452 homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
454 // see header file for class documentation
455 if ( ndx >= fBlockCnt )
456 return ~(homer_uint64)0;
457 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
458 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
461 /* Return the origin of the data in the current event's data
462 block with the given block index (starting at 0). */
463 homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
465 // see header file for class documentation
466 if ( ndx >= fBlockCnt )
467 return ~(homer_uint32)0;
468 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
469 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
472 /* Return a specification of the data in the current event's data
473 block with the given block index (starting at 0). */
474 homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
476 // see header file for class documentation
477 if ( ndx >= fBlockCnt )
478 return ~(homer_uint32)0;
479 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
480 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
483 /* Find the next data block in the current event with the given
484 data type, origin, and specification. Returns the block's
486 unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
487 homer_uint32 spec, unsigned long startNdx ) const
489 // see header file for class documentation
490 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
492 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
493 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
494 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
497 return ~(unsigned long)0;
500 /* Find the next data block in the current event with the given
501 data type, origin, and specification. Returns the block's
503 unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4],
504 homer_uint32 spec, unsigned long startNdx ) const
506 // see header file for class documentation
507 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
509 bool found1=true, found2=true;
510 for ( unsigned i = 0; i < 8; i++ )
512 if ( type[i] != (char)0xFF )
521 for ( unsigned i = 0; i < 8; i++ )
523 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
524 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
531 for ( unsigned i = 0; i < 4; i++ )
533 if ( origin[i] != (char)0xFF )
542 for ( unsigned i = 0; i < 4; i++ )
544 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
545 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
552 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
553 if ( found1 && found2 &&
554 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
557 return ~(unsigned long)0;
560 /* Return the ID of the node that actually produced this data block.
561 This may be different from the node which sent the data to this
562 monitoring object as returned by GetBlockSendNodeID. */
563 const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
565 // see header file for class documentation
566 if ( ndx >= fBlockCnt )
568 return fBlocks[ndx].fOriginatingNodeID;
572 void AliHLTHOMERReader::Init()
574 // see header file for class documentation
575 fCurrentEventType = ~(homer_uint64)0;
576 fCurrentEventID = ~(homer_uint64)0;
577 fMaxBlockCnt = fBlockCnt = 0;
580 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
584 fConnectionStatus = 0;
585 fErrorConnection = ~(unsigned int)0;
587 fEventRequestAdvanceTime = 0;
590 bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
592 // see header file for class documentation
593 fDataSources = new DataSource[ sourceCnt ];
597 fDataSourceMaxCnt = sourceCnt;
601 int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
603 // see header file for class documentation
605 he = gethostbyname( hostname );
608 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
609 return EADDRNOTAVAIL;
612 struct sockaddr_in remoteAddr;
613 remoteAddr.sin_family = AF_INET; // host byte order
614 remoteAddr.sin_port = htons(port); // short, network byte order
615 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
616 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
618 // Create socket and connect to target program on remote node
619 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
620 if ( source.fTCPConnection == -1 )
627 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
631 close( source.fTCPConnection );
635 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
636 if ( ret != (int)strlen(MOD_BIN) )
639 close( source.fTCPConnection );
643 char* tmpchar = new char[ strlen( hostname )+1 ];
646 close( source.fTCPConnection );
649 strcpy( tmpchar, hostname );
650 source.fHostname = tmpchar;
653 source.fTCPPort = port;
655 source.fDataSize = 0;
656 source.fDataRead = 0;
660 int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
662 // see header file for class documentation
664 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
669 gethostname( tmpchar, MAXHOSTNAMELEN );
670 tmpchar[MAXHOSTNAMELEN]=(char)0;
671 source.fHostname = tmpchar;
673 source.fShmID = shmget( shmKey, shmSize, 0660 );
674 if ( source.fShmID == -1 )
677 delete [] source.fHostname;
681 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
683 if ( !source.fShmPtr )
686 shmctl( source.fShmID, IPC_RMID, NULL );
687 delete [] source.fHostname;
692 source.fShmKey = shmKey;
693 source.fShmSize = shmSize;
694 source.fDataSize = 0;
695 source.fDataRead = 0;
699 int AliHLTHOMERReader::AddDataSource( void* pBuffer, int size, DataSource& source )
701 // see header file for class documentation
702 // a buffer data source is like a shm source apart from the shm attach and detach
703 // procedure. Furthermore, the size indicator at the beginning of the buffer is not
704 // cleared right before sources are read but after the reading.
706 if ( !pBuffer || size<=0) return EINVAL;
708 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
713 gethostname( tmpchar, MAXHOSTNAMELEN );
714 tmpchar[MAXHOSTNAMELEN]=(char)0;
715 source.fHostname = tmpchar;
718 // the data buffer does not contain a size indicator in the first 4 bytes
719 // like the shm source buffer. Still we want to use the mechanism to invalidate/
720 // trigger by clearing the size indicator. Take the source.fShmSize variable.
721 source.fShmPtr = &source.fShmSize;
724 source.fShmSize = size;
725 source.fData = pBuffer;
726 source.fDataSize = 0;
727 source.fDataRead = 0;
731 void AliHLTHOMERReader::FreeDataSources()
733 // see header file for class documentation
734 for ( unsigned n=0; n < fDataSourceCnt; n++ )
736 if ( fDataSources[n].fType == kTCP )
737 FreeTCPDataSource( fDataSources[n] );
738 else if ( fDataSources[n].fType == kShm )
739 FreeShmDataSource( fDataSources[n] );
743 int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
745 // see header file for class documentation
746 if ( source.fShmPtr )
747 shmdt( source.fShmPtr );
748 // if ( source.fShmID != -1 )
749 // shmctl( source.fShmID, IPC_RMID, NULL );
750 if ( source.fHostname )
751 delete [] source.fHostname;
755 int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
757 // see header file for class documentation
758 if ( source.fTCPConnection )
759 close( source.fTCPConnection );
760 if ( source.fHostname )
761 delete [] source.fHostname;
765 int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
767 // see header file for class documentation
768 if ( fDataSourceCnt<=0 )
770 // Clean up currently active event.
771 ReleaseCurrentEvent();
773 // Trigger all configured data sources
774 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
776 if ( fDataSources[n].fType == kTCP )
777 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
778 else if ( fDataSources[n].fType == kShm )
779 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
782 fErrorConnection = n;
783 fConnectionStatus=ret;
784 return fConnectionStatus;
787 // Now read in data from the configured data source
788 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
793 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
798 // for ( unsigned n = 0; n<fDataSourceCnt; n++ )
800 // if ( fDataSources[n].fType == kTCP )
801 // ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
803 // ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
806 // fErrorConnection = n;
807 // fConnectionStatus=ret;
808 // return fConnectionStatus;
811 //Check to see that all sources contributed data for the same event
812 homer_uint64 eventID;
813 homer_uint64 eventType;
814 eventID = GetSourceEventID( fDataSources[0] );
815 eventType = GetSourceEventType( fDataSources[0] );
816 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
818 if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
820 fErrorConnection = n;
821 fConnectionStatus=56;//EBADRQC;
822 return fConnectionStatus;
825 // Find all the different data blocks contained in the data from all
827 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
829 ret = ParseSourceData( fDataSources[n] );
832 fErrorConnection = n;
833 fConnectionStatus=57;//EBADSLT;
834 return fConnectionStatus;
837 fCurrentEventID = eventID;
838 fCurrentEventType = eventType;
842 void AliHLTHOMERReader::ReleaseCurrentEvent()
844 // see header file for class documentation
845 // sources.fDataRead = 0;
847 fCurrentEventID = ~(homer_uint64)0;
848 fCurrentEventType = ~(homer_uint64)0;
849 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
851 if ( fDataSources[n].fData )
853 if ( fDataSources[n].fType == kTCP )
854 delete [] (homer_uint8*)fDataSources[n].fData;
855 fDataSources[n].fData = NULL;
857 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
861 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
863 if ( fBlocks[n].fOriginatingNodeID )
864 delete [] fBlocks[n].fOriginatingNodeID;
873 int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
875 // see header file for class documentation
877 struct timeval oldSndTO, newSndTO;
880 socklen_t optlen=sizeof(oldSndTO);
881 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
886 if ( optlen!=sizeof(oldSndTO) )
890 newSndTO.tv_sec = timeoutUsec / 1000000;
891 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
892 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
898 // Send one event request
899 if ( !fEventRequestAdvanceTime )
901 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
903 if ( ret != (int)strlen(GET_ONE) )
906 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
914 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
915 if ( len>128 || len<0 )
918 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
922 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
924 if ( ret != (int)strlen(tmpCmd) )
927 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
935 int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long )
937 // see header file for class documentation
938 // clear the size indicator in the first 4 bytes of the buffer to request data
939 // from the HOMER writer.
940 if ( source.fShmPtr )
942 *(homer_uint32*)( source.fShmPtr ) = 0;
949 int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
951 // see header file for class documentation
959 unsigned firstConnection=~(unsigned)0;
960 for ( unsigned long n = 0; n < sourceCnt; n++ )
962 if ( sources[n].fDataSize == 0 // size specifier not yet read
963 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
966 FD_SET( sources[n].fTCPConnection, &conns );
967 if ( sources[n].fTCPConnection > highestConn )
968 highestConn = sources[n].fTCPConnection;
969 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
970 if ( firstConnection == ~(unsigned)0 )
975 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
980 struct timeval tv, *ptv;
983 tv.tv_sec = timeout / 1000000;
984 tv.tv_usec = timeout - (tv.tv_sec*1000000);
989 // wait until something is ready to be read
990 // either for timeout usecs or until eternity
992 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
995 fErrorConnection = firstConnection;
997 fConnectionStatus = errno;
999 fConnectionStatus = ETIMEDOUT;
1000 return fConnectionStatus;
1002 for ( unsigned n = 0; n < sourceCnt; n++ )
1004 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
1006 if ( sources[n].fDataSize == 0 )
1008 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
1009 if ( ret != sizeof(homer_uint32) )
1011 fErrorConnection = n;
1013 fConnectionStatus = errno;
1015 fConnectionStatus = ENOMSG;
1016 return fConnectionStatus;
1018 sources[n].fDataSize = ntohl( sources[n].fDataSize );
1019 sources[n].fDataRead = 0;
1020 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
1021 if ( !sources[n].fData )
1023 fErrorConnection = n;
1024 fConnectionStatus = ENOMEM;
1025 return fConnectionStatus;
1028 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
1030 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
1032 sources[n].fDataRead += ret;
1033 else if ( ret == 0 )
1035 fErrorConnection = n;
1036 fConnectionStatus = ECONNRESET;
1037 return fConnectionStatus;
1041 fErrorConnection = n;
1042 fConnectionStatus = errno;
1043 return fConnectionStatus;
1048 fErrorConnection = n;
1049 fConnectionStatus = ENXIO;
1050 return fConnectionStatus;
1061 int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
1063 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
1064 // Send one event request
1065 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
1066 if ( ret != strlen(GET_ONE) )
1070 // wait for and read back size specifier
1072 // The value transmitted is binary, in network byte order
1073 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1074 if ( ret != sizeof(sizeNBO) )
1078 // Convert back to host byte order
1079 source.fDataSize = ntohl( sizeNBO );
1080 source.fData = new homer_uint8[ source.fDataSize ];
1081 unsigned long dataRead=0, toRead;
1082 if ( !source.fData )
1085 // Read in data into buffer in order not to block connection
1086 while ( dataRead < source.fDataSize )
1088 if ( source.fDataSize-dataRead > 1024 )
1091 toRead = source.fDataSize-dataRead;
1092 ret = read( source.fTCPConnection, buffer, toRead );
1100 while ( dataRead < source.fDataSize )
1102 toRead = source.fDataSize-dataRead;
1103 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1106 else if ( ret == 0 && useTimeout )
1109 tv.tv_sec = timeout / 1000000;
1110 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1113 FD_SET( source.fTCPConnection, &conns );
1114 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1118 else if ( ret == 0 )
1135 int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1141 int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1143 // see header file for class documentation
1144 struct timeval tv1, tv2;
1148 gettimeofday( &tv1, NULL );
1153 for ( unsigned n = 0; n < sourceCnt; n++ )
1155 if ( !sources[n].fDataSize )
1157 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1160 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1161 if (sources[n].fType==kBuf)
1163 // the data buffer is already set to fData, just need to set fDataSize member
1164 // to invalidate after the first reading. Subsequent calls to ReadNextEvent return 0
1165 TriggerShmSource( sources[n], 0, 0 );
1168 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1172 if ( found && useTimeout )
1173 gettimeofday( &tv1, NULL );
1174 if ( !all && useTimeout )
1176 gettimeofday( &tv2, NULL );
1177 unsigned long long tdiff;
1178 tdiff = tv2.tv_sec-tv1.tv_sec;
1180 tdiff += tv2.tv_usec-tv1.tv_usec;
1181 if ( tdiff > timeout )
1191 int AliHLTHOMERReader::ParseSourceData( DataSource& source )
1193 // see header file for class documentation
1196 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1197 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1198 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1201 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1202 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1204 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1205 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1206 fBlocks[fBlockCnt].fSource = source.fNdx;
1207 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1208 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1209 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1210 struct in_addr tmpA;
1211 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1212 char* addr = inet_ntoa( tmpA );
1213 char* tmpchar = new char[ strlen(addr)+1 ];
1216 strcpy( tmpchar, addr );
1217 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1218 descrOffset += descrLen;
1225 int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1227 // see header file for class documentation
1228 DataBlock* newBlocks;
1229 newBlocks = new DataBlock[ newCnt ];
1232 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1233 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1234 if ( newCnt > fMaxBlockCnt )
1235 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1238 fBlocks = newBlocks;
1239 fMaxBlockCnt = newCnt;
1243 homer_uint64 AliHLTHOMERReader::GetSourceEventID( DataSource& source )
1245 // see header file for class documentation
1246 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1247 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1250 homer_uint64 AliHLTHOMERReader::GetSourceEventType( DataSource& source )
1252 // see header file for class documentation
1253 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1254 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1257 homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1259 // see header file for class documentation
1260 if ( destFormat == sourceFormat )
1263 return ((source & 0xFFULL) << 56) |
1264 ((source & 0xFF00ULL) << 40) |
1265 ((source & 0xFF0000ULL) << 24) |
1266 ((source & 0xFF000000ULL) << 8) |
1267 ((source & 0xFF00000000ULL) >> 8) |
1268 ((source & 0xFF0000000000ULL) >> 24) |
1269 ((source & 0xFF000000000000ULL) >> 40) |
1270 ((source & 0xFF00000000000000ULL) >> 56);
1273 homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1275 // see header file for class documentation
1276 if ( destFormat == sourceFormat )
1279 return ((source & 0xFFUL) << 24) |
1280 ((source & 0xFF00UL) << 8) |
1281 ((source & 0xFF0000UL) >> 8) |
1282 ((source & 0xFF000000UL) >> 24);
1285 AliHLTHOMERReader* AliHLTHOMERReaderCreate(const void* pBuffer, int size)
1287 // see header file for function documentation
1288 return new AliHLTHOMERReader(pBuffer, size);
1291 void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1293 // see header file for function documentation
1294 if (pInstance) delete pInstance;
1298 ***************************************************************************
1300 ** $Author$ - Initial Version by Timm Morten Steinbeck
1304 ***************************************************************************