1 /************************************************************************
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck,
8 ** timm@kip.uni-heidelberg.de
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
17 ** Newer versions of this file's package will be made available from
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
21 *************************************************************************/
24 ***************************************************************************
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
30 ***************************************************************************
33 /** @file AliHLTHOMERReader.cxx
34 @author Timm Steinbeck
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
39 // see header file for class documentation
41 // refer to README to build package
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
45 #include "AliHLTHOMERReader.h"
51 //#include <sys/types.h>
52 //#include <sys/socket.h>
53 //#include <netinet/in.h>
54 //#include <netinet/tcp.h>
56 #include <rpc/types.h>
59 #include <netinet/in.h>
60 #include <arpa/inet.h>
66 #define MOD_BIN "MOD BIN\n"
67 #define MOD_ASC "MOD ASC\n"
68 #define GET_ONE "GET ONE\n"
69 #define GET_ALL "GET ALL\n"
71 // MAXHOSTNAMELEN not defined on macosx
72 // 686-apple-darwin9-gcc-4.0.1
73 #ifndef MAXHOSTNAMELEN
74 #define MAXHOSTNAMELEN 64
78 ClassImp(AliHLTMonitoringReader);
79 ClassImp(AliHLTHOMERReader);
87 AliHLTHOMERReader::AliHLTHOMERReader()
89 fCurrentEventType(~(homer_uint64)0),
90 fCurrentEventID(~(homer_uint64)0),
100 fErrorConnection(~(unsigned int)0),
101 fEventRequestAdvanceTime(0)
103 // Reader implementation of the HOMER interface.
104 // The HLT Monitoring Environment including ROOT is
105 // a native interface to ship out data from the HLT chain.
106 // See pdf document shiped with the package
107 // for class documentation and tutorial.
113 AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
115 AliHLTMonitoringReader(),
116 fCurrentEventType(~(homer_uint64)0),
117 fCurrentEventID(~(homer_uint64)0),
122 fTCPDataSourceCnt(0),
123 fShmDataSourceCnt(0),
124 fDataSourceMaxCnt(0),
126 fConnectionStatus(0),
127 fErrorConnection(~(unsigned int)0),
128 fEventRequestAdvanceTime(0)
130 // see header file for class documentation
131 // For reading from a TCP port
133 if ( !AllocDataSources(1) )
135 fErrorConnection = 0;
136 fConnectionStatus = ENOMEM;
139 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
140 if ( fConnectionStatus )
141 fErrorConnection = 0;
146 fDataSources[0].fNdx = 0;
150 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
152 AliHLTMonitoringReader(),
153 fCurrentEventType(~(homer_uint64)0),
154 fCurrentEventID(~(homer_uint64)0),
159 fTCPDataSourceCnt(0),
160 fShmDataSourceCnt(0),
161 fDataSourceMaxCnt(0),
163 fConnectionStatus(0),
164 fErrorConnection(~(unsigned int)0),
165 fEventRequestAdvanceTime(0)
167 // see header file for class documentation
168 // For reading from multiple TCP ports
170 if ( !AllocDataSources(tcpCnt) )
172 fErrorConnection = 0;
173 fConnectionStatus = ENOMEM;
176 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
178 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
179 if ( fConnectionStatus )
181 fErrorConnection = n;
184 fDataSources[n].fNdx = n;
188 AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
190 AliHLTMonitoringReader(),
191 fCurrentEventType(~(homer_uint64)0),
192 fCurrentEventID(~(homer_uint64)0),
197 fTCPDataSourceCnt(0),
198 fShmDataSourceCnt(0),
199 fDataSourceMaxCnt(0),
201 fConnectionStatus(0),
202 fErrorConnection(~(unsigned int)0),
203 fEventRequestAdvanceTime(0)
205 // see header file for class documentation
206 // For reading from a System V shared memory segment
208 if ( !AllocDataSources(1) )
210 fErrorConnection = 0;
211 fConnectionStatus = ENOMEM;
214 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
215 if ( fConnectionStatus )
216 fErrorConnection = 0;
221 fDataSources[0].fNdx = 0;
225 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
227 AliHLTMonitoringReader(),
228 fCurrentEventType(~(homer_uint64)0),
229 fCurrentEventID(~(homer_uint64)0),
234 fTCPDataSourceCnt(0),
235 fShmDataSourceCnt(0),
236 fDataSourceMaxCnt(0),
238 fConnectionStatus(0),
239 fErrorConnection(~(unsigned int)0),
240 fEventRequestAdvanceTime(0)
242 // see header file for class documentation
243 // For reading from multiple System V shared memory segments
245 if ( !AllocDataSources(shmCnt) )
247 fErrorConnection = 0;
248 fConnectionStatus = ENOMEM;
251 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
253 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
254 if ( fConnectionStatus )
256 fErrorConnection = n;
259 fDataSources[n].fNdx = n;
263 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
264 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
266 AliHLTMonitoringReader(),
267 fCurrentEventType(~(homer_uint64)0),
268 fCurrentEventID(~(homer_uint64)0),
273 fTCPDataSourceCnt(0),
274 fShmDataSourceCnt(0),
275 fDataSourceMaxCnt(0),
277 fConnectionStatus(0),
278 fErrorConnection(~(unsigned int)0),
279 fEventRequestAdvanceTime(0)
281 // see header file for class documentation
282 // For reading from multiple TCP ports and multiple System V shared memory segments
284 if ( !AllocDataSources(tcpCnt+shmCnt) )
286 fErrorConnection = 0;
287 fConnectionStatus = ENOMEM;
290 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
292 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
293 if ( fConnectionStatus )
295 fErrorConnection = n;
298 fDataSources[n].fNdx = n;
300 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
302 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
303 if ( fConnectionStatus )
305 fErrorConnection = tcpCnt+n;
308 fDataSources[n].fNdx = n;
312 AliHLTHOMERReader::AliHLTHOMERReader( const void* /*pBuffer*/, int /*size*/ )
314 AliHLTMonitoringReader(),
315 fCurrentEventType(~(homer_uint64)0),
316 fCurrentEventID(~(homer_uint64)0),
321 fTCPDataSourceCnt(0),
322 fShmDataSourceCnt(0),
323 fDataSourceMaxCnt(0),
325 fConnectionStatus(0),
326 fErrorConnection(~(unsigned int)0),
327 fEventRequestAdvanceTime(0)
329 // see header file for class documentation
330 // For reading from a System V shared memory segment
332 if ( !AllocDataSources(1) )
334 fErrorConnection = 0;
335 fConnectionStatus = ENOMEM;
338 //fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
339 if ( fConnectionStatus )
340 fErrorConnection = 0;
345 fDataSources[0].fNdx = 0;
349 AliHLTHOMERReader::~AliHLTHOMERReader()
351 // see header file for class documentation
352 ReleaseCurrentEvent();
356 int AliHLTHOMERReader::ReadNextEvent()
358 // see header file for class documentation
359 // Read in the next available event
360 return ReadNextEvent( false, 0 );
363 int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
365 // see header file for class documentation
366 // Read in the next available event
367 return ReadNextEvent( true, timeout );
370 unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
372 // see header file for class documentation
373 // Return the size (in bytes) of the current event's data
374 // block with the given block index (starting at 0).
375 if ( ndx >= fBlockCnt )
377 return fBlocks[ndx].fLength;
380 const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
382 // see header file for class documentation
383 // Return a pointer to the start of the current event's data
384 // block with the given block index (starting at 0).
385 if ( ndx >= fBlockCnt )
387 return fBlocks[ndx].fData;
390 const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
392 // see header file for class documentation
393 // Return IP address or hostname of node which sent the
394 // current event's data block with the given block index
396 // For HOMER this is the ID of the node on which the subscriber
397 // that provided this data runs/ran.
398 if ( ndx >= fBlockCnt )
401 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
403 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
404 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
408 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
409 //return fBlocks[ndx].fOriginatingNodeID;
412 homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
414 // see header file for class documentation
415 // Return byte order of the data stored in the
416 // current event's data block with the given block index (starting at 0).
417 // 0 is unknown alignment,
418 // 1 ist little endian,
419 // 2 is big endian. */
420 if ( ndx >= fBlockCnt )
422 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
423 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
426 homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
428 // see header file for class documentation
429 // Return the alignment (in bytes) of the given datatype
430 // in the data stored in the current event's data block
431 // with the given block index (starting at 0).
432 // Possible values for the data type are
439 if ( ndx >= fBlockCnt )
441 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
443 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
444 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
447 homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
449 // see header file for class documentation
450 if ( ndx >= fBlockCnt )
452 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
456 /* Return the type of the data in the current event's data
457 block with the given block index (starting at 0). */
458 homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
460 // see header file for class documentation
461 if ( ndx >= fBlockCnt )
462 return ~(homer_uint64)0;
463 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
464 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
467 /* Return the origin of the data in the current event's data
468 block with the given block index (starting at 0). */
469 homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
471 // see header file for class documentation
472 if ( ndx >= fBlockCnt )
473 return ~(homer_uint32)0;
474 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
475 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
478 /* Return a specification of the data in the current event's data
479 block with the given block index (starting at 0). */
480 homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
482 // see header file for class documentation
483 if ( ndx >= fBlockCnt )
484 return ~(homer_uint32)0;
485 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
486 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
489 /* Find the next data block in the current event with the given
490 data type, origin, and specification. Returns the block's
492 unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
493 homer_uint32 spec, unsigned long startNdx ) const
495 // see header file for class documentation
496 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
498 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
499 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
500 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
503 return ~(unsigned long)0;
506 /* Find the next data block in the current event with the given
507 data type, origin, and specification. Returns the block's
509 unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4],
510 homer_uint32 spec, unsigned long startNdx ) const
512 // see header file for class documentation
513 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
515 bool found1=true, found2=true;
516 for ( unsigned i = 0; i < 8; i++ )
518 if ( type[i] != (char)0xFF )
527 for ( unsigned i = 0; i < 8; i++ )
529 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
530 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
537 for ( unsigned i = 0; i < 4; i++ )
539 if ( origin[i] != (char)0xFF )
548 for ( unsigned i = 0; i < 4; i++ )
550 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
551 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
558 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
559 if ( found1 && found2 &&
560 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
563 return ~(unsigned long)0;
566 /* Return the ID of the node that actually produced this data block.
567 This may be different from the node which sent the data to this
568 monitoring object as returned by GetBlockSendNodeID. */
569 const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
571 // see header file for class documentation
572 if ( ndx >= fBlockCnt )
574 return fBlocks[ndx].fOriginatingNodeID;
578 void AliHLTHOMERReader::Init()
580 // see header file for class documentation
581 fCurrentEventType = ~(homer_uint64)0;
582 fCurrentEventID = ~(homer_uint64)0;
583 fMaxBlockCnt = fBlockCnt = 0;
586 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
590 fConnectionStatus = 0;
591 fErrorConnection = ~(unsigned int)0;
593 fEventRequestAdvanceTime = 0;
596 bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
598 // see header file for class documentation
599 fDataSources = new DataSource[ sourceCnt ];
602 memset(fDataSources, 0, sizeof(DataSource)*sourceCnt);
604 fDataSourceMaxCnt = sourceCnt;
608 int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
610 // see header file for class documentation
612 he = gethostbyname( hostname );
615 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
616 return EADDRNOTAVAIL;
619 struct sockaddr_in remoteAddr;
620 remoteAddr.sin_family = AF_INET; // host byte order
621 remoteAddr.sin_port = htons(port); // short, network byte order
622 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
623 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
625 // Create socket and connect to target program on remote node
626 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
627 if ( source.fTCPConnection == -1 )
634 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
638 close( source.fTCPConnection );
642 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
643 if ( ret != (int)strlen(MOD_BIN) )
646 close( source.fTCPConnection );
650 char* tmpchar = new char[ strlen( hostname )+1 ];
653 close( source.fTCPConnection );
656 strcpy( tmpchar, hostname );
657 source.fHostname = tmpchar;
660 source.fTCPPort = port;
662 source.fDataSize = 0;
663 source.fDataRead = 0;
667 int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
669 // see header file for class documentation
671 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
676 gethostname( tmpchar, MAXHOSTNAMELEN );
677 tmpchar[MAXHOSTNAMELEN]=(char)0;
678 source.fHostname = tmpchar;
680 source.fShmID = shmget( shmKey, shmSize, 0660 );
681 if ( source.fShmID == -1 )
684 delete [] source.fHostname;
688 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
690 if ( !source.fShmPtr )
693 shmctl( source.fShmID, IPC_RMID, NULL );
694 delete [] source.fHostname;
699 source.fShmKey = shmKey;
700 source.fShmSize = shmSize;
701 source.fDataSize = 0;
702 source.fDataRead = 0;
706 int AliHLTHOMERReader::AddDataSource( void* pBuffer, int size, DataSource& source )
708 // see header file for class documentation
709 // a buffer data source is like a shm source apart from the shm attach and detach
710 // procedure. Furthermore, the size indicator at the beginning of the buffer is not
711 // cleared right before sources are read but after the reading.
713 if ( !pBuffer || size<=0) return EINVAL;
715 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
720 gethostname( tmpchar, MAXHOSTNAMELEN );
721 tmpchar[MAXHOSTNAMELEN]=(char)0;
722 source.fHostname = tmpchar;
725 // the data buffer does not contain a size indicator in the first 4 bytes
726 // like the shm source buffer. Still we want to use the mechanism to invalidate/
727 // trigger by clearing the size indicator. Take the source.fShmSize variable.
728 source.fShmPtr = &source.fShmSize;
731 source.fShmSize = size;
732 source.fData = pBuffer;
733 source.fDataSize = 0;
734 source.fDataRead = 0;
738 void AliHLTHOMERReader::FreeDataSources()
740 // see header file for class documentation
741 for ( unsigned n=0; n < fDataSourceCnt; n++ )
743 if ( fDataSources[n].fType == kTCP )
744 FreeTCPDataSource( fDataSources[n] );
745 else if ( fDataSources[n].fType == kShm )
746 FreeShmDataSource( fDataSources[n] );
750 int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
752 // see header file for class documentation
753 if ( source.fShmPtr )
754 shmdt( source.fShmPtr );
755 // if ( source.fShmID != -1 )
756 // shmctl( source.fShmID, IPC_RMID, NULL );
757 if ( source.fHostname )
758 delete [] source.fHostname;
762 int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
764 // see header file for class documentation
765 if ( source.fTCPConnection )
766 close( source.fTCPConnection );
767 if ( source.fHostname )
768 delete [] source.fHostname;
772 int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
774 // see header file for class documentation
775 if ( fDataSourceCnt<=0 )
777 // Clean up currently active event.
778 ReleaseCurrentEvent();
780 // Trigger all configured data sources
781 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
783 if ( fDataSources[n].fType == kTCP )
784 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
785 else if ( fDataSources[n].fType == kShm )
786 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
789 fErrorConnection = n;
790 fConnectionStatus=ret;
791 return fConnectionStatus;
794 // Now read in data from the configured data source
795 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
800 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
805 // for ( unsigned n = 0; n<fDataSourceCnt; n++ )
807 // if ( fDataSources[n].fType == kTCP )
808 // ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
810 // ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
813 // fErrorConnection = n;
814 // fConnectionStatus=ret;
815 // return fConnectionStatus;
818 //Check to see that all sources contributed data for the same event
819 homer_uint64 eventID;
820 homer_uint64 eventType;
821 eventID = GetSourceEventID( fDataSources[0] );
822 eventType = GetSourceEventType( fDataSources[0] );
823 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
825 if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
827 fErrorConnection = n;
828 fConnectionStatus=56;//EBADRQC;
829 return fConnectionStatus;
832 // Find all the different data blocks contained in the data from all
834 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
836 ret = ParseSourceData( fDataSources[n] );
839 fErrorConnection = n;
840 fConnectionStatus=57;//EBADSLT;
841 return fConnectionStatus;
844 fCurrentEventID = eventID;
845 fCurrentEventType = eventType;
849 void AliHLTHOMERReader::ReleaseCurrentEvent()
851 // see header file for class documentation
852 // sources.fDataRead = 0;
854 fCurrentEventID = ~(homer_uint64)0;
855 fCurrentEventType = ~(homer_uint64)0;
856 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
858 if ( fDataSources[n].fData )
860 if ( fDataSources[n].fType == kTCP )
861 delete [] (homer_uint8*)fDataSources[n].fData;
862 fDataSources[n].fData = NULL;
864 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
868 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
870 if ( fBlocks[n].fOriginatingNodeID )
871 delete [] fBlocks[n].fOriginatingNodeID;
880 int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
882 // see header file for class documentation
884 struct timeval oldSndTO, newSndTO;
887 socklen_t optlen=sizeof(oldSndTO);
888 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
893 if ( optlen!=sizeof(oldSndTO) )
897 newSndTO.tv_sec = timeoutUsec / 1000000;
898 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
899 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
905 // Send one event request
906 if ( !fEventRequestAdvanceTime )
908 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
910 if ( ret != (int)strlen(GET_ONE) )
913 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
921 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
922 if ( len>128 || len<0 )
925 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
929 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
931 if ( ret != (int)strlen(tmpCmd) )
934 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
942 int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long ) const
944 // see header file for class documentation
945 // clear the size indicator in the first 4 bytes of the buffer to request data
946 // from the HOMER writer.
947 if ( source.fShmPtr )
949 *(homer_uint32*)( source.fShmPtr ) = 0;
956 int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
958 // see header file for class documentation
966 unsigned firstConnection=~(unsigned)0;
967 for ( unsigned long n = 0; n < sourceCnt; n++ )
969 if ( sources[n].fDataSize == 0 // size specifier not yet read
970 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
973 FD_SET( sources[n].fTCPConnection, &conns );
974 if ( sources[n].fTCPConnection > highestConn )
975 highestConn = sources[n].fTCPConnection;
976 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
977 if ( firstConnection == ~(unsigned)0 )
982 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
987 struct timeval tv, *ptv;
990 tv.tv_sec = timeout / 1000000;
991 tv.tv_usec = timeout - (tv.tv_sec*1000000);
996 // wait until something is ready to be read
997 // either for timeout usecs or until eternity
999 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
1002 fErrorConnection = firstConnection;
1004 fConnectionStatus = errno;
1006 fConnectionStatus = ETIMEDOUT;
1007 return fConnectionStatus;
1009 for ( unsigned n = 0; n < sourceCnt; n++ )
1011 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
1013 if ( sources[n].fDataSize == 0 )
1015 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
1016 if ( ret != sizeof(homer_uint32) )
1018 fErrorConnection = n;
1020 fConnectionStatus = errno;
1022 fConnectionStatus = ENOMSG;
1023 return fConnectionStatus;
1025 sources[n].fDataSize = ntohl( sources[n].fDataSize );
1026 sources[n].fDataRead = 0;
1027 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
1028 if ( !sources[n].fData )
1030 fErrorConnection = n;
1031 fConnectionStatus = ENOMEM;
1032 return fConnectionStatus;
1035 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
1037 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
1039 sources[n].fDataRead += ret;
1040 else if ( ret == 0 )
1042 fErrorConnection = n;
1043 fConnectionStatus = ECONNRESET;
1044 return fConnectionStatus;
1048 fErrorConnection = n;
1049 fConnectionStatus = errno;
1050 return fConnectionStatus;
1055 fErrorConnection = n;
1056 fConnectionStatus = ENXIO;
1057 return fConnectionStatus;
1068 int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
1070 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
1071 // Send one event request
1072 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
1073 if ( ret != strlen(GET_ONE) )
1077 // wait for and read back size specifier
1079 // The value transmitted is binary, in network byte order
1080 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1081 if ( ret != sizeof(sizeNBO) )
1085 // Convert back to host byte order
1086 source.fDataSize = ntohl( sizeNBO );
1087 source.fData = new homer_uint8[ source.fDataSize ];
1088 unsigned long dataRead=0, toRead;
1089 if ( !source.fData )
1092 // Read in data into buffer in order not to block connection
1093 while ( dataRead < source.fDataSize )
1095 if ( source.fDataSize-dataRead > 1024 )
1098 toRead = source.fDataSize-dataRead;
1099 ret = read( source.fTCPConnection, buffer, toRead );
1107 while ( dataRead < source.fDataSize )
1109 toRead = source.fDataSize-dataRead;
1110 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1113 else if ( ret == 0 && useTimeout )
1116 tv.tv_sec = timeout / 1000000;
1117 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1120 FD_SET( source.fTCPConnection, &conns );
1121 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1125 else if ( ret == 0 )
1142 int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1148 int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1150 // see header file for class documentation
1151 struct timeval tv1, tv2;
1155 gettimeofday( &tv1, NULL );
1160 for ( unsigned n = 0; n < sourceCnt; n++ )
1162 if ( !sources[n].fDataSize )
1164 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1167 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1168 if (sources[n].fType==kBuf)
1170 // the data buffer is already set to fData, just need to set fDataSize member
1171 // to invalidate after the first reading. Subsequent calls to ReadNextEvent return 0
1172 TriggerShmSource( sources[n], 0, 0 );
1175 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1179 if ( found && useTimeout )
1180 gettimeofday( &tv1, NULL );
1181 if ( !all && useTimeout )
1183 gettimeofday( &tv2, NULL );
1184 unsigned long long tdiff;
1185 tdiff = tv2.tv_sec-tv1.tv_sec;
1187 tdiff += tv2.tv_usec-tv1.tv_usec;
1188 if ( tdiff > timeout )
1198 int AliHLTHOMERReader::ParseSourceData( DataSource& source )
1200 // see header file for class documentation
1203 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1204 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1205 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1208 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1209 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1211 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1212 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1213 fBlocks[fBlockCnt].fSource = source.fNdx;
1214 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1215 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1216 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1217 struct in_addr tmpA;
1218 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1219 char* addr = inet_ntoa( tmpA );
1220 char* tmpchar = new char[ strlen(addr)+1 ];
1223 strcpy( tmpchar, addr );
1224 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1225 descrOffset += descrLen;
1232 int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1234 // see header file for class documentation
1235 DataBlock* newBlocks;
1236 newBlocks = new DataBlock[ newCnt ];
1239 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1240 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1241 if ( newCnt > fMaxBlockCnt )
1242 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1245 fBlocks = newBlocks;
1246 fMaxBlockCnt = newCnt;
1250 homer_uint64 AliHLTHOMERReader::GetSourceEventID( DataSource& source )
1252 // see header file for class documentation
1253 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1254 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1257 homer_uint64 AliHLTHOMERReader::GetSourceEventType( DataSource& source )
1259 // see header file for class documentation
1260 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1261 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1264 homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1266 // see header file for class documentation
1267 if ( destFormat == sourceFormat )
1270 return ((source & 0xFFULL) << 56) |
1271 ((source & 0xFF00ULL) << 40) |
1272 ((source & 0xFF0000ULL) << 24) |
1273 ((source & 0xFF000000ULL) << 8) |
1274 ((source & 0xFF00000000ULL) >> 8) |
1275 ((source & 0xFF0000000000ULL) >> 24) |
1276 ((source & 0xFF000000000000ULL) >> 40) |
1277 ((source & 0xFF00000000000000ULL) >> 56);
1280 homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1282 // see header file for class documentation
1283 if ( destFormat == sourceFormat )
1286 return ((source & 0xFFUL) << 24) |
1287 ((source & 0xFF00UL) << 8) |
1288 ((source & 0xFF0000UL) >> 8) |
1289 ((source & 0xFF000000UL) >> 24);
1292 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPort(const char* hostname, unsigned short port )
1294 // see header file for function documentation
1295 return new AliHLTHOMERReader(hostname, port);
1298 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPorts(unsigned int tcpCnt, const char** hostnames, unsigned short* ports)
1300 // see header file for function documentation
1301 return new AliHLTHOMERReader(tcpCnt, hostnames, ports);
1304 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromBuffer(const void* pBuffer, int size)
1306 // see header file for function documentation
1307 return new AliHLTHOMERReader(pBuffer, size);
1310 void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1312 // see header file for function documentation
1313 if (pInstance) delete pInstance;
1317 ***************************************************************************
1319 ** $Author$ - Initial Version by Timm Morten Steinbeck
1323 ***************************************************************************