]> git.uio.no Git - u/mrichter/AliRoot.git/blame - HLT/BASE/HOMER/AliHLTHOMERReader.cxx
changing some names to fulfill coding rules
[u/mrichter/AliRoot.git] / HLT / BASE / HOMER / AliHLTHOMERReader.cxx
CommitLineData
2be16a33 1/************************************************************************
2**
3**
4** This file is property of and copyright by the Technical Computer
5** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6** University, Heidelberg, Germany, 2001
7** This file has been written by Timm Morten Steinbeck,
8** timm@kip.uni-heidelberg.de
9**
10**
11** See the file license.txt for details regarding usage, modification,
12** distribution and warranty.
13** Important: This file is provided without any warranty, including
14** fitness for any particular purpose.
15**
16**
17** Newer versions of this file's package will be made available from
18** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19** or the corresponding page of the Heidelberg Alice Level 3 group.
20**
21*************************************************************************/
22
23/*
24***************************************************************************
25**
26** $Author$ - Initial Version by Timm Morten Steinbeck
27**
28** $Id$
29**
30***************************************************************************
31*/
32
9e91e956 33/** @file AliHLTHOMERReader.cxx
2be16a33 34 @author Timm Steinbeck
35 @date Sep 14 2007
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
38
04a939f7 39// see header file for class documentation
2be16a33 40// or
41// refer to README to build package
42// or
43// visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
44
45#include "AliHLTHOMERReader.h"
6a8e0bb4 46//#include <stdio.h>
5a1c4a5e 47#ifdef __SUNPRO_CC
48#include <string.h>
49#else
6a8e0bb4 50#include <cstring>
5a1c4a5e 51#endif
6a8e0bb4 52#include <cerrno>
2be16a33 53#include <netdb.h>
54extern int h_errno;
6a8e0bb4 55//#include <sys/types.h>
56//#include <sys/socket.h>
57//#include <netinet/in.h>
58//#include <netinet/tcp.h>
2be16a33 59#include <unistd.h>
72ae28cd 60#ifndef __CYGWIN__
2be16a33 61#include <rpc/types.h>
72ae28cd 62#endif
2be16a33 63#include <fcntl.h>
64#include <sys/stat.h>
65#include <netinet/in.h>
66#include <arpa/inet.h>
67#ifdef USE_ROOT
68#include <Rtypes.h>
69#endif
70
71
72#define MOD_BIN "MOD BIN\n"
73#define MOD_ASC "MOD ASC\n"
74#define GET_ONE "GET ONE\n"
75#define GET_ALL "GET ALL\n"
76
27011446 77// MAXHOSTNAMELEN not defined on macosx
78// 686-apple-darwin9-gcc-4.0.1
79#ifndef MAXHOSTNAMELEN
80#define MAXHOSTNAMELEN 64
81#endif
82
2be16a33 83#ifdef USE_ROOT
fe1edbae 84ClassImp(AliHLTMonitoringReader);
85ClassImp(AliHLTHOMERReader);
2be16a33 86#endif
87
88
89
90
91
92#ifdef USE_ROOT
fe1edbae 93AliHLTHOMERReader::AliHLTHOMERReader()
746d2a94 94 :
95 fCurrentEventType(~(homer_uint64)0),
96 fCurrentEventID(~(homer_uint64)0),
97 fBlockCnt(0),
98 fMaxBlockCnt(0),
99 fBlocks(NULL),
100 fDataSourceCnt(0),
101 fTCPDataSourceCnt(0),
102 fShmDataSourceCnt(0),
103 fDataSourceMaxCnt(0),
04a939f7 104 fDataSources(NULL),
105 fConnectionStatus(0),
106 fErrorConnection(~(unsigned int)0),
107 fEventRequestAdvanceTime(0)
2be16a33 108 {
1d47dbac 109// Reader implementation of the HOMER interface.
110// The HLT Monitoring Environment including ROOT is
111// a native interface to ship out data from the HLT chain.
112// See pdf document shiped with the package
113// for class documentation and tutorial.
2be16a33 114 Init();
115 }
116#endif
117
118
fe1edbae 119AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
746d2a94 120 :
fe1edbae 121 AliHLTMonitoringReader(),
cfa641b1 122 TObject(),
746d2a94 123 fCurrentEventType(~(homer_uint64)0),
124 fCurrentEventID(~(homer_uint64)0),
125 fBlockCnt(0),
126 fMaxBlockCnt(0),
127 fBlocks(NULL),
128 fDataSourceCnt(0),
129 fTCPDataSourceCnt(0),
130 fShmDataSourceCnt(0),
131 fDataSourceMaxCnt(0),
04a939f7 132 fDataSources(NULL),
133 fConnectionStatus(0),
134 fErrorConnection(~(unsigned int)0),
135 fEventRequestAdvanceTime(0)
2be16a33 136 {
04a939f7 137// see header file for class documentation
138// For reading from a TCP port
2be16a33 139 Init();
140 if ( !AllocDataSources(1) )
141 {
142 fErrorConnection = 0;
143 fConnectionStatus = ENOMEM;
144 return;
145 }
146 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
147 if ( fConnectionStatus )
148 fErrorConnection = 0;
149 else
150 {
151 fDataSourceCnt++;
152 fTCPDataSourceCnt++;
153 fDataSources[0].fNdx = 0;
154 }
155 }
156
fe1edbae 157AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
746d2a94 158 :
fe1edbae 159 AliHLTMonitoringReader(),
cfa641b1 160 TObject(),
746d2a94 161 fCurrentEventType(~(homer_uint64)0),
162 fCurrentEventID(~(homer_uint64)0),
163 fBlockCnt(0),
164 fMaxBlockCnt(0),
165 fBlocks(NULL),
166 fDataSourceCnt(0),
167 fTCPDataSourceCnt(0),
168 fShmDataSourceCnt(0),
169 fDataSourceMaxCnt(0),
04a939f7 170 fDataSources(NULL),
171 fConnectionStatus(0),
172 fErrorConnection(~(unsigned int)0),
173 fEventRequestAdvanceTime(0)
2be16a33 174 {
04a939f7 175// see header file for class documentation
176// For reading from multiple TCP ports
2be16a33 177 Init();
178 if ( !AllocDataSources(tcpCnt) )
179 {
180 fErrorConnection = 0;
181 fConnectionStatus = ENOMEM;
182 return;
183 }
184 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
185 {
186 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
187 if ( fConnectionStatus )
188 {
189 fErrorConnection = n;
190 return;
191 }
192 fDataSources[n].fNdx = n;
193 }
194 }
195
fe1edbae 196AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
746d2a94 197 :
fe1edbae 198 AliHLTMonitoringReader(),
cfa641b1 199 TObject(),
746d2a94 200 fCurrentEventType(~(homer_uint64)0),
201 fCurrentEventID(~(homer_uint64)0),
202 fBlockCnt(0),
203 fMaxBlockCnt(0),
204 fBlocks(NULL),
205 fDataSourceCnt(0),
206 fTCPDataSourceCnt(0),
207 fShmDataSourceCnt(0),
208 fDataSourceMaxCnt(0),
04a939f7 209 fDataSources(NULL),
210 fConnectionStatus(0),
211 fErrorConnection(~(unsigned int)0),
212 fEventRequestAdvanceTime(0)
2be16a33 213 {
04a939f7 214// see header file for class documentation
215// For reading from a System V shared memory segment
2be16a33 216 Init();
217 if ( !AllocDataSources(1) )
218 {
219 fErrorConnection = 0;
220 fConnectionStatus = ENOMEM;
221 return;
222 }
223 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
224 if ( fConnectionStatus )
225 fErrorConnection = 0;
226 else
227 {
228 fDataSourceCnt++;
229 fShmDataSourceCnt++;
230 fDataSources[0].fNdx = 0;
231 }
232 }
233
fe1edbae 234AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
746d2a94 235 :
fe1edbae 236 AliHLTMonitoringReader(),
cfa641b1 237 TObject(),
746d2a94 238 fCurrentEventType(~(homer_uint64)0),
239 fCurrentEventID(~(homer_uint64)0),
240 fBlockCnt(0),
241 fMaxBlockCnt(0),
242 fBlocks(NULL),
243 fDataSourceCnt(0),
244 fTCPDataSourceCnt(0),
245 fShmDataSourceCnt(0),
246 fDataSourceMaxCnt(0),
04a939f7 247 fDataSources(NULL),
248 fConnectionStatus(0),
249 fErrorConnection(~(unsigned int)0),
250 fEventRequestAdvanceTime(0)
2be16a33 251 {
04a939f7 252// see header file for class documentation
253// For reading from multiple System V shared memory segments
2be16a33 254 Init();
255 if ( !AllocDataSources(shmCnt) )
256 {
257 fErrorConnection = 0;
258 fConnectionStatus = ENOMEM;
259 return;
260 }
261 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
262 {
263 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
264 if ( fConnectionStatus )
265 {
266 fErrorConnection = n;
267 return;
268 }
269 fDataSources[n].fNdx = n;
270 }
271 }
272
fe1edbae 273AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
2be16a33 274 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
746d2a94 275 :
fe1edbae 276 AliHLTMonitoringReader(),
cfa641b1 277 TObject(),
746d2a94 278 fCurrentEventType(~(homer_uint64)0),
279 fCurrentEventID(~(homer_uint64)0),
280 fBlockCnt(0),
281 fMaxBlockCnt(0),
282 fBlocks(NULL),
283 fDataSourceCnt(0),
284 fTCPDataSourceCnt(0),
285 fShmDataSourceCnt(0),
286 fDataSourceMaxCnt(0),
04a939f7 287 fDataSources(NULL),
288 fConnectionStatus(0),
289 fErrorConnection(~(unsigned int)0),
290 fEventRequestAdvanceTime(0)
2be16a33 291 {
04a939f7 292// see header file for class documentation
293// For reading from multiple TCP ports and multiple System V shared memory segments
2be16a33 294 Init();
295 if ( !AllocDataSources(tcpCnt+shmCnt) )
296 {
297 fErrorConnection = 0;
298 fConnectionStatus = ENOMEM;
299 return;
300 }
301 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
302 {
303 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
304 if ( fConnectionStatus )
305 {
306 fErrorConnection = n;
307 return;
308 }
309 fDataSources[n].fNdx = n;
310 }
311 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
312 {
313 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
314 if ( fConnectionStatus )
315 {
316 fErrorConnection = tcpCnt+n;
317 return;
318 }
319 fDataSources[n].fNdx = n;
320 }
321 }
8c617325 322
29e6fd20 323AliHLTHOMERReader::AliHLTHOMERReader( const void* pBuffer, int size )
8c617325 324 :
325 AliHLTMonitoringReader(),
cfa641b1 326 TObject(),
8c617325 327 fCurrentEventType(~(homer_uint64)0),
328 fCurrentEventID(~(homer_uint64)0),
329 fBlockCnt(0),
330 fMaxBlockCnt(0),
331 fBlocks(NULL),
332 fDataSourceCnt(0),
333 fTCPDataSourceCnt(0),
334 fShmDataSourceCnt(0),
335 fDataSourceMaxCnt(0),
336 fDataSources(NULL),
337 fConnectionStatus(0),
338 fErrorConnection(~(unsigned int)0),
339 fEventRequestAdvanceTime(0)
340 {
341// see header file for class documentation
342// For reading from a System V shared memory segment
343 Init();
344 if ( !AllocDataSources(1) )
345 {
346 fErrorConnection = 0;
347 fConnectionStatus = ENOMEM;
348 return;
349 }
29e6fd20 350 fConnectionStatus = AddDataSource(const_cast<void*>(pBuffer), size, fDataSources[0] );
8c617325 351 if ( fConnectionStatus )
352 fErrorConnection = 0;
353 else
354 {
355 fDataSourceCnt++;
aa5178e8 356 fShmDataSourceCnt++;
8c617325 357 fDataSources[0].fNdx = 0;
358 }
359 }
360
fe1edbae 361AliHLTHOMERReader::~AliHLTHOMERReader()
2be16a33 362 {
04a939f7 363// see header file for class documentation
2be16a33 364 ReleaseCurrentEvent();
365 FreeDataSources();
242e6536 366
367 if (fDataSources)
368 delete [] fDataSources;
2be16a33 369 }
370
fe1edbae 371int AliHLTHOMERReader::ReadNextEvent()
2be16a33 372 {
04a939f7 373// see header file for class documentation
374// Read in the next available event
2be16a33 375 return ReadNextEvent( false, 0 );
376 }
377
fe1edbae 378int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
2be16a33 379 {
04a939f7 380// see header file for class documentation
381// Read in the next available event
2be16a33 382 return ReadNextEvent( true, timeout );
383 }
384
fe1edbae 385unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
2be16a33 386 {
04a939f7 387// see header file for class documentation
388// Return the size (in bytes) of the current event's data
389// block with the given block index (starting at 0).
2be16a33 390 if ( ndx >= fBlockCnt )
391 return 0;
392 return fBlocks[ndx].fLength;
393 }
394
fe1edbae 395const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
2be16a33 396 {
04a939f7 397// see header file for class documentation
398// Return a pointer to the start of the current event's data
399// block with the given block index (starting at 0).
2be16a33 400 if ( ndx >= fBlockCnt )
401 return NULL;
402 return fBlocks[ndx].fData;
403 }
404
fe1edbae 405const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
2be16a33 406 {
04a939f7 407// see header file for class documentation
408// Return IP address or hostname of node which sent the
409// current event's data block with the given block index
410// (starting at 0).
411// For HOMER this is the ID of the node on which the subscriber
412// that provided this data runs/ran.
2be16a33 413 if ( ndx >= fBlockCnt )
414 return NULL;
415#ifdef DEBUG
416 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
417 {
937cec81 418 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%u) >= fDataSourceCnt (%u)\n",
2be16a33 419 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
420 return NULL;
421 }
422#endif
423 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
424 //return fBlocks[ndx].fOriginatingNodeID;
425 }
426
fe1edbae 427homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
2be16a33 428 {
04a939f7 429// see header file for class documentation
430// Return byte order of the data stored in the
431// current event's data block with the given block index (starting at 0).
432// 0 is unknown alignment,
433// 1 ist little endian,
434// 2 is big endian. */
2be16a33 435 if ( ndx >= fBlockCnt )
436 return 0;
437 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
438 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
439 }
04a939f7 440
fe1edbae 441homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
2be16a33 442 {
04a939f7 443// see header file for class documentation
444// Return the alignment (in bytes) of the given datatype
445// in the data stored in the current event's data block
446// with the given block index (starting at 0).
447// Possible values for the data type are
448// 0: homer_uint64
449// 1: homer_uint32
450// 2: uin16
451// 3: homer_uint8
452// 4: double
453// 5: float
2be16a33 454 if ( ndx >= fBlockCnt )
455 return 0;
456 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
457 return 0;
458 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
459 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
460 }
461
fe1edbae 462homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
2be16a33 463 {
04a939f7 464// see header file for class documentation
2be16a33 465 if ( ndx >= fBlockCnt )
466 return 0;
467 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
468 }
469
470/* HOMER specific */
471/* Return the type of the data in the current event's data
472 block with the given block index (starting at 0). */
fe1edbae 473homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
2be16a33 474 {
04a939f7 475// see header file for class documentation
2be16a33 476 if ( ndx >= fBlockCnt )
477 return ~(homer_uint64)0;
478 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
479 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
480 }
481
482/* Return the origin of the data in the current event's data
483 block with the given block index (starting at 0). */
fe1edbae 484homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
2be16a33 485 {
04a939f7 486// see header file for class documentation
2be16a33 487 if ( ndx >= fBlockCnt )
488 return ~(homer_uint32)0;
489 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
490 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
491 }
492
493/* Return a specification of the data in the current event's data
494 block with the given block index (starting at 0). */
fe1edbae 495homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
2be16a33 496 {
04a939f7 497// see header file for class documentation
2be16a33 498 if ( ndx >= fBlockCnt )
499 return ~(homer_uint32)0;
500 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
501 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
502 }
503
504/* Find the next data block in the current event with the given
505 data type, origin, and specification. Returns the block's
506 index. */
fe1edbae 507unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
2be16a33 508 homer_uint32 spec, unsigned long startNdx ) const
509 {
04a939f7 510// see header file for class documentation
2be16a33 511 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
512 {
513 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
514 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
515 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
516 return n;
517 }
518 return ~(unsigned long)0;
519 }
520
521/* Find the next data block in the current event with the given
522 data type, origin, and specification. Returns the block's
523 index. */
fe1edbae 524unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4],
2be16a33 525 homer_uint32 spec, unsigned long startNdx ) const
526 {
04a939f7 527// see header file for class documentation
2be16a33 528 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
529 {
530 bool found1=true, found2=true;
531 for ( unsigned i = 0; i < 8; i++ )
532 {
533 if ( type[i] != (char)0xFF )
534 {
535 found1=false;
536 break;
537 }
538 }
539 if ( !found1 )
540 {
541 found1 = true;
542 for ( unsigned i = 0; i < 8; i++ )
543 {
544 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
545 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
546 {
547 found1=false;
548 break;
549 }
550 }
551 }
552 for ( unsigned i = 0; i < 4; i++ )
553 {
554 if ( origin[i] != (char)0xFF )
555 {
556 found2 = false;
557 break;
558 }
559 }
560 if ( !found2 )
561 {
562 found2 = true;
563 for ( unsigned i = 0; i < 4; i++ )
564 {
565 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
566 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
567 {
568 found2=false;
569 break;
570 }
571 }
572 }
573 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
574 if ( found1 && found2 &&
575 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
576 return n;
577 }
578 return ~(unsigned long)0;
579 }
580
581/* Return the ID of the node that actually produced this data block.
582 This may be different from the node which sent the data to this
583 monitoring object as returned by GetBlockSendNodeID. */
fe1edbae 584const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
2be16a33 585 {
04a939f7 586// see header file for class documentation
2be16a33 587 if ( ndx >= fBlockCnt )
588 return NULL;
589 return fBlocks[ndx].fOriginatingNodeID;
590 }
591
592
fe1edbae 593void AliHLTHOMERReader::Init()
2be16a33 594 {
04a939f7 595// see header file for class documentation
2be16a33 596 fCurrentEventType = ~(homer_uint64)0;
597 fCurrentEventID = ~(homer_uint64)0;
598 fMaxBlockCnt = fBlockCnt = 0;
599 fBlocks = NULL;
600
601 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
602 fDataSources = NULL;
603
604
605 fConnectionStatus = 0;
606 fErrorConnection = ~(unsigned int)0;
607
04a939f7 608 fEventRequestAdvanceTime = 0;
2be16a33 609 }
610
fe1edbae 611bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
2be16a33 612 {
04a939f7 613// see header file for class documentation
2be16a33 614 fDataSources = new DataSource[ sourceCnt ];
615 if ( !fDataSources )
616 return false;
6a8e0bb4 617 memset(fDataSources, 0, sizeof(DataSource)*sourceCnt);
2be16a33 618 fDataSourceCnt = 0;
619 fDataSourceMaxCnt = sourceCnt;
620 return true;
621 }
622
fe1edbae 623int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
2be16a33 624 {
04a939f7 625// see header file for class documentation
2be16a33 626 struct hostent* he;
627 he = gethostbyname( hostname );
628 if ( he == NULL )
629 {
630 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
631 return EADDRNOTAVAIL;
632 }
633
634 struct sockaddr_in remoteAddr;
635 remoteAddr.sin_family = AF_INET; // host byte order
636 remoteAddr.sin_port = htons(port); // short, network byte order
637 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
638 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
639
640 // Create socket and connect to target program on remote node
641 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
642 if ( source.fTCPConnection == -1 )
643 {
644 return errno;
645 }
646
647 int ret;
648
649 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
650 if ( ret == -1 )
651 {
652 ret=errno;
653 close( source.fTCPConnection );
654 return ret;
655 }
656
657 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
658 if ( ret != (int)strlen(MOD_BIN) )
659 {
660 ret=errno;
661 close( source.fTCPConnection );
662 return ret;
663 }
664
c9926985 665 unsigned hostnamelen=strlen( hostname );
666 char* tmpchar = new char[ hostnamelen+1 ];
2be16a33 667 if ( !tmpchar )
668 {
669 close( source.fTCPConnection );
670 return ENOMEM;
671 }
c9926985 672 strncpy( tmpchar, hostname, hostnamelen );
673 tmpchar[hostnamelen]=0;
2be16a33 674 source.fHostname = tmpchar;
675
676 source.fType = kTCP;
677 source.fTCPPort = port;
678 source.fData = NULL;
679 source.fDataSize = 0;
680 source.fDataRead = 0;
681 return 0;
682 }
683
fe1edbae 684int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
2be16a33 685 {
04a939f7 686// see header file for class documentation
2be16a33 687 int ret;
688 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
689 if ( !tmpchar )
690 {
691 return ENOMEM;
692 }
693 gethostname( tmpchar, MAXHOSTNAMELEN );
694 tmpchar[MAXHOSTNAMELEN]=(char)0;
695 source.fHostname = tmpchar;
696
697 source.fShmID = shmget( shmKey, shmSize, 0660 );
698 if ( source.fShmID == -1 )
699 {
700 ret = errno;
701 delete [] source.fHostname;
702 return ret;
703 }
704
705 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
706
707 if ( !source.fShmPtr )
708 {
709 ret = errno;
710 shmctl( source.fShmID, IPC_RMID, NULL );
711 delete [] source.fHostname;
712 return ret;
713 }
714
715 source.fType = kShm;
716 source.fShmKey = shmKey;
717 source.fShmSize = shmSize;
718 source.fDataSize = 0;
719 source.fDataRead = 0;
720 return 0;
721 }
722
8c617325 723int AliHLTHOMERReader::AddDataSource( void* pBuffer, int size, DataSource& source )
724 {
725// see header file for class documentation
726// a buffer data source is like a shm source apart from the shm attach and detach
727// procedure. Furthermore, the size indicator at the beginning of the buffer is not
728// cleared right before sources are read but after the reading.
d76bc02a 729 //int ret;
8c617325 730 if ( !pBuffer || size<=0) return EINVAL;
731
732 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
733 if ( !tmpchar )
734 {
735 return ENOMEM;
736 }
737 gethostname( tmpchar, MAXHOSTNAMELEN );
738 tmpchar[MAXHOSTNAMELEN]=(char)0;
739 source.fHostname = tmpchar;
740
741 source.fShmID = -1;
742 // the data buffer does not contain a size indicator in the first 4 bytes
743 // like the shm source buffer. Still we want to use the mechanism to invalidate/
744 // trigger by clearing the size indicator. Take the source.fShmSize variable.
745 source.fShmPtr = &source.fShmSize;
746 source.fType = kBuf;
747 source.fShmKey = 0;
748 source.fShmSize = size;
749 source.fData = pBuffer;
750 source.fDataSize = 0;
751 source.fDataRead = 0;
752 return 0;
753 }
754
fe1edbae 755void AliHLTHOMERReader::FreeDataSources()
2be16a33 756 {
04a939f7 757// see header file for class documentation
2be16a33 758 for ( unsigned n=0; n < fDataSourceCnt; n++ )
759 {
760 if ( fDataSources[n].fType == kTCP )
761 FreeTCPDataSource( fDataSources[n] );
8c617325 762 else if ( fDataSources[n].fType == kShm )
2be16a33 763 FreeShmDataSource( fDataSources[n] );
242e6536 764 if ( fDataSources[n].fHostname )
765 delete [] fDataSources[n].fHostname;
2be16a33 766 }
242e6536 767 fDataSourceCnt=0;
2be16a33 768 }
769
fe1edbae 770int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
2be16a33 771 {
04a939f7 772// see header file for class documentation
2be16a33 773 if ( source.fShmPtr )
6001c5d5 774 shmdt( (char*)source.fShmPtr );
2be16a33 775// if ( source.fShmID != -1 )
776// shmctl( source.fShmID, IPC_RMID, NULL );
2be16a33 777 return 0;
778 }
779
fe1edbae 780int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
2be16a33 781 {
04a939f7 782// see header file for class documentation
2be16a33 783 if ( source.fTCPConnection )
784 close( source.fTCPConnection );
2be16a33 785 return 0;
786 }
787
fe1edbae 788int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
2be16a33 789 {
04a939f7 790// see header file for class documentation
2be16a33 791 if ( fDataSourceCnt<=0 )
792 return ENXIO;
793 // Clean up currently active event.
794 ReleaseCurrentEvent();
29e6fd20 795 int ret=0;
2be16a33 796 // Trigger all configured data sources
cfa641b1 797 for ( unsigned n = 0; n<fDataSourceCnt; n++ ){
798 if ( fDataSources[n].fType == kTCP )
799 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
800 else if ( fDataSources[n].fType == kShm )
801 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
802 if ( ret )
2be16a33 803 {
cfa641b1 804 fErrorConnection = n;
805 fConnectionStatus=ret;
806 return fConnectionStatus;
2be16a33 807 }
cfa641b1 808 }
2be16a33 809 // Now read in data from the configured data source
810 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
cfa641b1 811
2be16a33 812 if ( ret )
cfa641b1 813 {
2be16a33 814 return ret;
cfa641b1 815 }
2be16a33 816 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
817 if ( ret )
818 {
819 return ret;
820 }
821// for ( unsigned n = 0; n<fDataSourceCnt; n++ )
822// {
823// if ( fDataSources[n].fType == kTCP )
824// ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
825// else
826// ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
827// if ( ret )
828// {
829// fErrorConnection = n;
830// fConnectionStatus=ret;
831// return fConnectionStatus;
832// }
833// }
834 //Check to see that all sources contributed data for the same event
835 homer_uint64 eventID;
836 homer_uint64 eventType;
29e6fd20 837 if (!fDataSources[0].fData)
838 {
839 fErrorConnection = 0;
840 fConnectionStatus=56;//ENOBUF;
841 return fConnectionStatus;
842 }
2be16a33 843 eventID = GetSourceEventID( fDataSources[0] );
844 eventType = GetSourceEventType( fDataSources[0] );
845 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
846 {
29e6fd20 847 if ( !fDataSources[n].fData || GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
2be16a33 848 {
849 fErrorConnection = n;
746d2a94 850 fConnectionStatus=56;//EBADRQC;
2be16a33 851 return fConnectionStatus;
852 }
853 }
854 // Find all the different data blocks contained in the data from all
855 // the sources.
856 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
857 {
858 ret = ParseSourceData( fDataSources[n] );
859 if ( ret )
860 {
861 fErrorConnection = n;
746d2a94 862 fConnectionStatus=57;//EBADSLT;
aa5178e8 863 return ret;
2be16a33 864 }
865 }
866 fCurrentEventID = eventID;
867 fCurrentEventType = eventType;
868 return 0;
869 }
870
fe1edbae 871void AliHLTHOMERReader::ReleaseCurrentEvent()
2be16a33 872 {
04a939f7 873// see header file for class documentation
2be16a33 874 // sources.fDataRead = 0;
875 // fMaxBlockCnt
876 fCurrentEventID = ~(homer_uint64)0;
877 fCurrentEventType = ~(homer_uint64)0;
878 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
879 {
880 if ( fDataSources[n].fData )
881 {
882 if ( fDataSources[n].fType == kTCP )
883 delete [] (homer_uint8*)fDataSources[n].fData;
29e6fd20 884 // do not reset the data pointer for kBuf sources since this
885 // can not be set again.
886 if ( fDataSources[n].fType != kBuf )
887 fDataSources[n].fData = NULL;
2be16a33 888 }
889 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
890 }
891 if ( fBlocks )
892 {
893 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
894 {
895 if ( fBlocks[n].fOriginatingNodeID )
896 delete [] fBlocks[n].fOriginatingNodeID;
897 }
898 delete [] fBlocks;
899 fBlocks=0;
900 fMaxBlockCnt = 0;
901 fBlockCnt=0;
902 }
903 }
904
fe1edbae 905int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
2be16a33 906 {
04a939f7 907// see header file for class documentation
620aa46c 908 int ret=0;
2be16a33 909 struct timeval oldSndTO, newSndTO;
620aa46c 910 memset(&oldSndTO, 0, sizeof(oldSndTO));
911 memset(&newSndTO, 0, sizeof(newSndTO));
2be16a33 912 if ( useTimeout )
913 {
914 socklen_t optlen=sizeof(oldSndTO);
915 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
916 if ( ret )
917 {
918 return errno;
919 }
920 if ( optlen!=sizeof(oldSndTO) )
921 {
922 return ENXIO;
923 }
3a7c0444 924 newSndTO.tv_sec = timeoutUsec / 1000000;
925 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
2be16a33 926 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
927 if ( ret )
928 {
929 return errno;
930 }
931 }
932 // Send one event request
04a939f7 933 if ( !fEventRequestAdvanceTime )
2be16a33 934 {
935 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
936
937 if ( ret != (int)strlen(GET_ONE) )
938 {
939 ret=errno;
940 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
941 return ret;
942 }
943 }
944 else
945 {
946 char tmpCmd[ 128 ];
947
04a939f7 948 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
2be16a33 949 if ( len>128 || len<0 )
950 {
951 ret=EMSGSIZE;
952 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
953 return ret;
954 }
955
956 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
957
958 if ( ret != (int)strlen(tmpCmd) )
959 {
960 ret=errno;
961 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
962 return ret;
963 }
964
965 }
966 return 0;
967 }
968
6a8e0bb4 969int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long ) const
2be16a33 970 {
04a939f7 971// see header file for class documentation
fe1edbae 972// clear the size indicator in the first 4 bytes of the buffer to request data
973// from the HOMER writer.
2be16a33 974 if ( source.fShmPtr )
975 {
976 *(homer_uint32*)( source.fShmPtr ) = 0;
977 return 0;
978 }
979 else
980 return EFAULT;
981 }
982
fe1edbae 983int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
2be16a33 984 {
3a7c0444 985// see header file for class documentation
2be16a33 986 bool toRead = false;
987 do
988 {
989 fd_set conns;
990 FD_ZERO( &conns );
991 int highestConn=0;
992 toRead = false;
993 unsigned firstConnection=~(unsigned)0;
994 for ( unsigned long n = 0; n < sourceCnt; n++ )
995 {
996 if ( sources[n].fDataSize == 0 // size specifier not yet read
997 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
998 {
999 toRead = true;
1000 FD_SET( sources[n].fTCPConnection, &conns );
1001 if ( sources[n].fTCPConnection > highestConn )
1002 highestConn = sources[n].fTCPConnection;
1003 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
1004 if ( firstConnection == ~(unsigned)0 )
1005 firstConnection = n;
1006 }
1007 else
1008 {
1009 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
1010 }
1011 }
1012 if ( toRead )
1013 {
1014 struct timeval tv, *ptv;
1015 if ( useTimeout )
1016 {
1017 tv.tv_sec = timeout / 1000000;
1018 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1019 ptv = &tv;
1020 }
1021 else
1022 ptv = NULL;
1023 // wait until something is ready to be read
1024 // either for timeout usecs or until eternity
1025 int ret;
1026 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
1027 if ( ret <=0 )
1028 {
1029 fErrorConnection = firstConnection;
1030 if ( errno )
1031 fConnectionStatus = errno;
1032 else
1033 fConnectionStatus = ETIMEDOUT;
1034 return fConnectionStatus;
1035 }
1036 for ( unsigned n = 0; n < sourceCnt; n++ )
1037 {
1038 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
1039 {
1040 if ( sources[n].fDataSize == 0 )
1041 {
1042 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
1043 if ( ret != sizeof(homer_uint32) )
1044 {
1045 fErrorConnection = n;
1046 if ( errno )
1047 fConnectionStatus = errno;
1048 else
1049 fConnectionStatus = ENOMSG;
1050 return fConnectionStatus;
1051 }
1052 sources[n].fDataSize = ntohl( sources[n].fDataSize );
1053 sources[n].fDataRead = 0;
1054 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
1055 if ( !sources[n].fData )
1056 {
1057 fErrorConnection = n;
1058 fConnectionStatus = ENOMEM;
1059 return fConnectionStatus;
1060 }
1061 }
1062 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
1063 {
1064 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
1065 if ( ret>0 )
1066 sources[n].fDataRead += ret;
1067 else if ( ret == 0 )
1068 {
1069 fErrorConnection = n;
1070 fConnectionStatus = ECONNRESET;
1071 return fConnectionStatus;
1072 }
1073 else
1074 {
1075 fErrorConnection = n;
1076 fConnectionStatus = errno;
1077 return fConnectionStatus;
1078 }
1079 }
1080 else
1081 {
1082 fErrorConnection = n;
1083 fConnectionStatus = ENXIO;
1084 return fConnectionStatus;
1085 }
1086 }
1087 }
1088 }
1089 }
1090 while ( toRead );
1091 return 0;
1092 }
1093
1094/*
fe1edbae 1095int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
2be16a33 1096 {
1097#warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
1098 // Send one event request
1099 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
1100 if ( ret != strlen(GET_ONE) )
1101 {
1102 return errno;
1103 }
1104 // wait for and read back size specifier
1105 unsigned sizeNBO;
1106 // The value transmitted is binary, in network byte order
1107 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1108 if ( ret != sizeof(sizeNBO) )
1109 {
1110 return errno;
1111 }
1112 // Convert back to host byte order
1113 source.fDataSize = ntohl( sizeNBO );
1114 source.fData = new homer_uint8[ source.fDataSize ];
1115 unsigned long dataRead=0, toRead;
1116 if ( !source.fData )
1117 {
1118 char buffer[1024];
1119 // Read in data into buffer in order not to block connection
1120 while ( dataRead < source.fDataSize )
1121 {
1122 if ( source.fDataSize-dataRead > 1024 )
1123 toRead = 1024;
1124 else
1125 toRead = source.fDataSize-dataRead;
1126 ret = read( source.fTCPConnection, buffer, toRead );
1127 if ( ret > 0 )
1128 dataRead += ret;
1129 else
1130 return errno;
1131 }
1132 return ENOMEM;
1133 }
1134 while ( dataRead < source.fDataSize )
1135 {
1136 toRead = source.fDataSize-dataRead;
1137 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1138 if ( ret > 0 )
1139 dataRead += ret;
1140 else if ( ret == 0 && useTimeout )
1141 {
1142 struct timeval tv;
1143 tv.tv_sec = timeout / 1000000;
1144 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1145 fd_set conns;
1146 FD_ZERO( &conns );
1147 FD_SET( source.fTCPConnection, &conns );
1148 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1149 if ( ret <=0 )
1150 return errno;
1151 }
1152 else if ( ret == 0 )
1153 {
1154 if ( errno == EOK )
1155 return ECONNRESET;
1156 else
1157 return errno;
1158 }
1159 else
1160 {
1161 return errno;
1162 }
1163 }
1164 return 0;
1165 }
1166*/
1167
1168/*
fe1edbae 1169int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
2be16a33 1170 {
1171
1172 }
1173*/
1174
fe1edbae 1175int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
2be16a33 1176 {
04a939f7 1177// see header file for class documentation
2be16a33 1178 struct timeval tv1, tv2;
1179 bool found=false;
1180 bool all=true;
1181 if ( useTimeout )
1182 gettimeofday( &tv1, NULL );
1183 do
1184 {
1185 found = false;
1186 all = true;
1187 for ( unsigned n = 0; n < sourceCnt; n++ )
1188 {
1189 if ( !sources[n].fDataSize )
1190 all = false;
1191 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1192 {
1193 found = true;
1194 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
8c617325 1195 if (sources[n].fType==kBuf)
1196 {
1197 // the data buffer is already set to fData, just need to set fDataSize member
1198 // to invalidate after the first reading. Subsequent calls to ReadNextEvent return 0
1199 TriggerShmSource( sources[n], 0, 0 );
1200 } else
1201 {
1202 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1203 }
2be16a33 1204 }
1205 }
1206 if ( found && useTimeout )
1207 gettimeofday( &tv1, NULL );
1208 if ( !all && useTimeout )
1209 {
1210 gettimeofday( &tv2, NULL );
1211 unsigned long long tdiff;
1212 tdiff = tv2.tv_sec-tv1.tv_sec;
1213 tdiff *= 1000000;
1214 tdiff += tv2.tv_usec-tv1.tv_usec;
1215 if ( tdiff > timeout )
1216 return ETIMEDOUT;
1217 }
1218 if ( !all )
1219 usleep( 0 );
1220 }
1221 while ( !all );
1222 return 0;
1223 }
1224
fe1edbae 1225int AliHLTHOMERReader::ParseSourceData( DataSource& source )
2be16a33 1226 {
04a939f7 1227// see header file for class documentation
2be16a33 1228 if ( source.fData )
1229 {
1230 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
aa5178e8 1231 if (sourceByteOrder!=kHOMERLittleEndianByteOrder && sourceByteOrder!=kHOMERBigEndianByteOrder) return EBADMSG;
2be16a33 1232 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
aa5178e8 1233 // block count is not related to size of the data in the way the
1234 // following condition implies. But we can at least limit the block
1235 // count for the case the data is corrupted
1236 if (blockCnt>source.fDataSize) return EBADMSG;
2be16a33 1237 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1238 if ( ret )
1239 return ret;
1240 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1241 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1242 {
aa5178e8 1243 if (descrOffset+kLength_64b_Offset>=source.fDataSize) return EBADMSG;
2be16a33 1244 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1245 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
aa5178e8 1246 if (descrOffset+descrLen>=source.fDataSize) return EBADMSG;
31774e81 1247 if (Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kID_64b_Offset ] ) != HOMER_BLOCK_DESCRIPTOR_TYPEID) return 126/*ENOKEY*/;
2be16a33 1248 fBlocks[fBlockCnt].fSource = source.fNdx;
1249 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1250 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1251 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1252 struct in_addr tmpA;
1253 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1254 char* addr = inet_ntoa( tmpA );
c9926985 1255 unsigned straddrlen=strlen(addr);
1256 char* tmpchar = new char[ straddrlen+1 ];
2be16a33 1257 if ( !tmpchar )
1258 return ENOMEM;
c9926985 1259 strncpy( tmpchar, addr, straddrlen );
1260 tmpchar[straddrlen]=0;
2be16a33 1261 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1262 descrOffset += descrLen;
1263 }
1264 return 0;
1265 }
1266 return EFAULT;
1267 }
1268
fe1edbae 1269int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
2be16a33 1270 {
04a939f7 1271// see header file for class documentation
2be16a33 1272 DataBlock* newBlocks;
1273 newBlocks = new DataBlock[ newCnt ];
1274 if ( !newBlocks )
1275 return ENOMEM;
1276 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
c9926985 1277 if ( fBlocks ) {
2be16a33 1278 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
c9926985 1279 } else {
1280 fMaxBlockCnt=0;
1281 }
2be16a33 1282 if ( newCnt > fMaxBlockCnt )
1283 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1284 if ( fBlocks )
1285 delete [] fBlocks;
1286 fBlocks = newBlocks;
1287 fMaxBlockCnt = newCnt;
1288 return 0;
1289 }
1290
fe1edbae 1291homer_uint64 AliHLTHOMERReader::GetSourceEventID( DataSource& source )
2be16a33 1292 {
04a939f7 1293// see header file for class documentation
2be16a33 1294 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1295 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1296 }
1297
fe1edbae 1298homer_uint64 AliHLTHOMERReader::GetSourceEventType( DataSource& source )
2be16a33 1299 {
04a939f7 1300// see header file for class documentation
2be16a33 1301 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1302 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1303 }
1304
fe1edbae 1305homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
3a7c0444 1306 {
1307// see header file for class documentation
1308 if ( destFormat == sourceFormat )
1309 return source;
1310 else
1311 return ((source & 0xFFULL) << 56) |
1312 ((source & 0xFF00ULL) << 40) |
1313 ((source & 0xFF0000ULL) << 24) |
1314 ((source & 0xFF000000ULL) << 8) |
1315 ((source & 0xFF00000000ULL) >> 8) |
1316 ((source & 0xFF0000000000ULL) >> 24) |
1317 ((source & 0xFF000000000000ULL) >> 40) |
1318 ((source & 0xFF00000000000000ULL) >> 56);
1319 }
2be16a33 1320
fe1edbae 1321homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
3a7c0444 1322 {
1323// see header file for class documentation
1324 if ( destFormat == sourceFormat )
1325 return source;
1326 else
1327 return ((source & 0xFFUL) << 24) |
1328 ((source & 0xFF00UL) << 8) |
1329 ((source & 0xFF0000UL) >> 8) |
1330 ((source & 0xFF000000UL) >> 24);
1331 }
fe1edbae 1332
a183f221 1333AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPort(const char* hostname, unsigned short port )
fe1edbae 1334 {
a183f221 1335 // see header file for function documentation
1336 return new AliHLTHOMERReader(hostname, port);
1337 }
1338
1339AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPorts(unsigned int tcpCnt, const char** hostnames, unsigned short* ports)
1340 {
1341 // see header file for function documentation
1342 return new AliHLTHOMERReader(tcpCnt, hostnames, ports);
1343 }
1344
1345AliHLTHOMERReader* AliHLTHOMERReaderCreateFromBuffer(const void* pBuffer, int size)
1346 {
1347 // see header file for function documentation
8c617325 1348 return new AliHLTHOMERReader(pBuffer, size);
fe1edbae 1349 }
1350
1351void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1352 {
a183f221 1353 // see header file for function documentation
fe1edbae 1354 if (pInstance) delete pInstance;
1355 }
1356
2be16a33 1357/*
1358***************************************************************************
1359**
1360** $Author$ - Initial Version by Timm Morten Steinbeck
1361**
1362** $Id$
1363**
1364***************************************************************************
1365*/