Excerpts from RDMA (Remote DMA) multi-client content-based routing communication library for Windows. The kernel-level driver allocates one non- pageable buffer for raw input and maps this into the memory space of any applications that register to communicate. Messages for different applications are interleaved. Atomic application-level decrement of a message counter enables data coherency even with zero data copy, no ring transitions, and no semaphores. #define AtomicDec16At(P) { _asm mov eax,P _asm lock dec word ptr [eax] } #define AtomicOr16At(P,M) { _asm mov eax,P _asm lock or word ptr [eax],M } #define AtomicAnd16At(P,M) { _asm mov eax,P _asm lock and word ptr [eax],M } typedef struct { long xi; // Extraction index relative to rxBegin. USHORT mcount; // Number of unextracted messages in queue. USHORT flags; UCHAR id; // Registered receiver's ID. } Qctl; class CLASS_DECLSPEC RxClient { enum { EVENT_ABORT = 0, EVENT_RX = 1 }; // events indices HANDLE events[2]; // ABORT and RX DvrApi *dvr; Qctl *qp; public: RxClient( DvrApi *dvr, HANDLE abortEvent, UCHAR begin = 0, UCHAR end = 255 ); // Default filter accepts every message (0-255). ~RxClient( void ) { unregister(); } void unregister( void ); void setRxRange( UCHAR begin, UCHAR end, UINT acceptReject ); UCHAR *getMsg( UCHAR *prev, short mode = RCGM_NORMAL ); USHORT *msgCounter( void ) { return &qp->mcount; } void testMultiple( int msgCount ); }; DllExport UCHAR *RxClient::getMsg( UCHAR *prev, short mode ) { USHORT len; USHORT *pmcount; USHORT *flags; UCHAR *xp; xp = rxBegin + qp->xi; if( prev == xp + 1 ) { // Discard the previous message by advancing the extraction point. xp += ( len = xp[1] + 2 ); qp->xi = xp - rxBegin; pmcount = &qp->mcount; AtomicDec16At( pmcount ); if( dvrStat->ackMargin > 0 && ( dvrStat->ackMargin -= len ) <= 0 ) dvr->command( CDXDVR_ENDRXPAUSE ); } if( mode == RCGM_ONLY_DISCARD ) return 0; if( qp->mcount == 0 ) { if( mode == RCGM_NO_WAIT ) return 0; /* If message count is 0 and mode is to wait for the next input, we need to * tell the low level driver (procRxMsg in onrxmsg.c) that we are waiting so * that it knows to signal us. To avoid excessive signalling we don't want to * set QCTL_RXWAITING before determining that there are no messages, but * setting the flag after testing mcount leaves a time period during which a * message might be received and procRxMsg doesn't know that we have already * determined that we need to wait (because mcount is 0). Our setting of * QCTL_RXWAITING won't do any good in this case, because procRxMsg will have * already received the first message. To avoid this, we set the flag and then * retest mcount. If it is still 0 then we know that we will be signalled at * the first message. Otherwise, the first message arrived between the time of * our first and second tests. In this case, the event may or may not be * signalled. Either way, we can safely reset the event without waiting for it. * We also have to reset QCTL_RXWAITING in case procRxMsg executed only between * the first test of mcount and the setting of the flag, in which case, * procRxMsg doesn't signal the event or clear the flag. */ flags = &qp->flags; AtomicOr16At( flags, QCTL_RXWAITING ); if( qp->mcount == 0 ) { switch( WaitForMultipleObjects( 2, events, FALSE, INFINITE )) { case WAIT_FAILED: throw ThrowAbort( QM_GETFAIL ); case WAIT_OBJECT_0 + EVENT_ABORT : throw ThrowAbort( QM_GET ); } } else { AtomicAnd16At( flags, ~QCTL_RXWAITING ); ResetEvent( events[ EVENT_RX ]); } xp = rxBegin + qp->xi; } /* Skip over messages not for this receiver. This isn't needed if we had to * wait for a message, because the inserter (ring 0 driver-onRxMsg) updates the * registered receiver's xp if its mcount is 0. Note reload of xp from xi. */ else { for( xp = rxBegin + qp->xi ; *xp != qp->id ; ) { if( *xp == QCTL_WRAP ) xp = rxBegin; else xp += xp[1] + 2; } qp->xi = xp - rxBegin; } return xp + 1; }