Creating a message port and a message
The different threads that are part of a data streaming application communicate using messages. The application therefore has to create a message port and a message.This section first looks at sample code for creating a message port and a message, then discusses using messages in some more detail.
Using messages is discussed in some detail in the 3DO Portfolio System Programmer's Guide.
Example 1: Creating and removing a message port and a message.
/* InitCPakPlayerFromStreamHeader() creates a message port and one
* message item to communicate with the data streamer.
*/
ctx->messagePort = NewMsgPort( NULL);
if ( ctx->messagePort < 0 )
goto CLEANUP;
ctx->messageItem = CreateMsgItem( ctx->messagePort );
if ( ctx->messageItem < 0 )
goto CLEANUP;
ctx->endOfStreamMessageItem =
CreateMsgItem( ctx->messagePort );
if ( ctx->endOfStreamMessageItem < 0 )
goto CLEANUP;
/*Here's the code from DismantlePlayer(),
* which is called by CLEANUP,
* that removes the items created above.
*/
if ( ctx->messageItem ) RemoveMsgItem( ctx->messageItem );
if ( ctx->endOfStreamMessageItem )
RemoveMsgItem( ctx->endOfStreamMessageItem );
if ( ctx->messagePort ) RemoveMsgPort( ctx->messagePort );
Example 2: Message header.
The following table shows the fields in the common message header.
/* The following preamble is used for all types of messages sent
* by the streamer.
*/
#define DS_MSG_HEADER \
long whatToDo; /* opcode determining msg contents */
Item msgItem; /* msg item for sending this buffer */
void * privatePtr; /* ptr to sender's private data */
void * link; /* user-defined, for linking into lists*/
Table 1: Message header fields. -------------------------------------------------------- Field |Description -------------------------------------------------------- whatToDo |Specifies the type of request being made by |the message sender. Its value typically |appears in a switch() statement and is used |to access various union fields in the |remainder of the message structure. -------------------------------------------------------- msgItem |System item number that sends message data |to a receiver. It is also used by message |receiver when calling ReplyMsg() to signal |that the requested operation is complete. -------------------------------------------------------- privatePtr |Stores a pointer to a private data |structure that performs post-processing |after the receiver calls ReplyMsg(). -------------------------------------------------------- link |General-purpose linking field to form |queues of messages received from the |streamer thread. Other threads can use it |to queue messages. --------------------------------------------------------
ReplyMsg()
. This is necessary because such completions can trigger other actions. For example, when the data acquisition thread replies to a request to fill a buffer, the reply adds the filled buffer to a list of filled buffers that will subsequently be parsed and their data distributed to the appropriate subscriber thread. When a subscriber thread replies to a message that announces the availability of a new data chunk, the reply signals that the subscriber has finished using that data. Once all subscribers using data in a buffer have signaled that they are finished with their portions of the buffer, the buffer can be reused.
Allocating data buffers
This section looks at sample code that creates data buffers used by an application using the DataStreamer. It also briefly looks at some heuristics for deciding on buffer size.
Example 3: Creating a buffer list.
Example 4: Using stream header information with CreateBufferList().
/***********************************************************************************
* Routine to allocate and initialize a buffer list for use with the streamer
***********************************************************************************/
DSDataBufPtr CreateBufferList( long numBuffers, long bufferSize )
{
# define ROUND_TO_LONG(x) ((x + 3) & ~3)
DSDataBufPtr bp;
DSDataBufPtr firstBp;
long totalBufferSpace;
long actualEntrySize;
long bufferNum;
actualEntrySize = sizeof(DSDataBuf) + ROUND_TO_LONG( bufferSize );
totalBufferSpace = numBuffers * actualEntrySize;
/* Try to allocate the needed memory */
firstBp = (DSDataBufPtr) NewPtr( totalBufferSpace, MEMTYPE_ANY );
if ( firstBp == NULL )
return NULL;
/* Loop to take the big buffer and split it into "N" buffers
* of size "bufferSize", linked together.
*/
for ( bp = firstBp, bufferNum = 0; bufferNum < (numBuffers - 1); bufferNum++ )
{
bp->next = (DSDataBufPtr) ( ((char *) bp) + actualEntrySize );
bp->permanentNext = bp->next;
/* Advance to point to the next entry */
bp = bp->next;
}
/* Make last entry's forward link point to nil */
bp->next = NULL;
bp->permanentNext = NULL;
/* Return a pointer to the first buffer in the list
*/
return firstBp;
}
/* Allocate the stream buffers and build the linked list of
* buffers that are input to the streamer.
*/
ctx->bufferList =
CreateBufferList( ctx->hdr.streamBuffers, ctx->hdr.streamBlockSize );
if ( ctx->bufferList == NULL )
return kPSMemFullErr;
Launching the required threads
Before a program can use the DataStreamer, it has to initialize the data acquisition, streamer, and subscriber threads, discussed in detail in DataStreamer threads and data flow. This section looks at some sample code for initializing the threads.
InitDataAcq()
and InitDataStreaming()
to initialize the memory pools for the threads.
NewDataAcq()
and NewDataStream()
to create the specific thread. The prototype illustrates which information you have to specify:
int32 NewDataStream (DSStreamCBPtr *pCtx, void* bufferListPtr, long bufferSize, long deltaPriority, long numSubsMsgs)
int32 NewDataAcq (AcqContextPtr *pCtx, char* fileName, long deltaPriority)
When the threads are created, information about them, such as buffer size or streamblock size can either be supplied directly or be extracted from the stream header, as in the example below.
DSConnect()
to connect the streamer and the data acquisition thread.
/* Initialize data acquisition for use with 1 file */
status = InitDataAcq( 1 );
if ( status != 0 )
goto CLEANUP;
/* Initialize data acquisition for the specified file */
status = NewDataAcq( &ctx->acqContext, streamFileName,
ctx->hdr.dataAcqDeltaPri );
if ( status != 0 )
goto CLEANUP;
/* Initialize for 1 streamer thread */
status = InitDataStreaming( 1 );
if ( status != 0 )
goto CLEANUP;
status = NewDataStream( &ctx->streamCBPtr, /* output: stream control block ptr */
ctx->bufferList, /* pointer to buffer list */
ctx->hdr.streamBlockSize, /* size of each buffer */
ctx->hdr.streamerDeltaPri, /* streamer thread rel. priority */
ctx->hdr.numSubsMsgs ); /* number of subscriber messages */
if ( status != 0 )
goto CLEANUP;
/* Connect the stream to its data supplier */
status = DSConnect( ctx->messageItem, NULL, ctx->streamCBPtr,
ctx->acqContext->requestPort );
CHECK_DS_RESULT( "DSConnect()", status );
if ( status != 0 )
goto CLEANUP;
DSSubscribe()
to connect the subscriber thread and the streamer thread.
Example 6: Preparing subscriber threads.
/* Loop through the subscriber descriptor table and initialize all subscribers */
for ( subscriberIndex = 0;
ctx->hdr.subscriberList[ subscriberIndex ].subscriberType != 0;
subscriberIndex++ )
{
subsPtr = ctx->hdr.subscriberList + subscriberIndex;
switch ( subsPtr->subscriberType )
{
case FILM_CHUNK_TYPE:
status = InitCPakSubscriber();
CHECK_DS_RESULT( "InitCPakSubscriber", status );
if ( status != 0 )
goto CLEANUP;
status = NewCPakSubscriber( &ctx->cpakContextPtr, 1,
subsPtr->deltaPriority );
CHECK_DS_RESULT( "NewCPakSubscriber", status );
if ( status != 0 )
goto CLEANUP;
status = DSSubscribe( ctx->messageItem, // control msg item
NULL, // synchronous call
ctx->streamCBPtr, // stream context block
(DSDataType) FILM_CHUNK_TYPE, // subscriber data type
ctx->cpakContextPtr->requestPort );
// subscriber message port
CHECK_DS_RESULT( "DSSubscribe for cinepak", status );
if ( status != 0 )
goto CLEANUP;
status = InitCPakCel( ctx->streamCBPtr, // Needed for DSGetClock in sub
ctx->cpakContextPtr, // The subscriber's context
&ctx->cpakChannelPtr, // The channel's context
0, // The channel number
true ); // true = flush on DS synch msg
CHECK_DS_RESULT( "InitCPakCel", status );
if ( status != 0 )
goto CLEANUP;
break;