Board index » cppbuilder » Missed UDP packets

Missed UDP packets


2008-07-27 12:38:08 AM
cppbuilder24
Hello,
I am using the Indy UDP server component in an application, and am
experiencing quite a few missed packets in circumstances where the
UDPRead method isn't called quickly enough after a packet arrives. For
example, if I pause the program briefly in the de{*word*81} packets are
invariably lost. This also happens in normal operation if there is any
delay in responding to incoming data. Is this normal behavior for the
UDP server? Is data lost when the Read method is not called before more
data arrives? How can the problem be resolved, or can it be?
Thanks
Dave
 
 

Re:Missed UDP packets

Dave, what I do is to create a circular buffer
then take the entire UDP packet and place it in this
circular buffer.
This the job of the UDP Indy event when a pack arrives.
That is the ONLY thing that the UDP packet availability
event does.
Now the buffer is a FIFO buffer, with synchronization when
reading and writing.
A next thread then consumes the buffer contents. While it consumes
the contents, more packets can be written to the buffer at the same
time in the read event handler.
Will post code on demand....
Dave K. wrote:
Quote
Hello,
I am using the Indy UDP server component in an application, and am
experiencing quite a few missed packets in circumstances where the
UDPRead method isn't called quickly enough after a packet arrives. For
example, if I pause the program briefly in the de{*word*81} packets are
invariably lost. This also happens in normal operation if there is any
delay in responding to incoming data. Is this normal behavior for the
UDP server? Is data lost when the Read method is not called before more
data arrives? How can the problem be resolved, or can it be?

Thanks
Dave
 

Re:Missed UDP packets

Colin, thanks for the suggestion. If you could post your code I would
greatly appreciate it.
Thanks
Dave
Colin B Maharaj wrote:
Quote
Dave, what I do is to create a circular buffer
then take the entire UDP packet and place it in this
circular buffer.

This the job of the UDP Indy event when a pack arrives.
That is the ONLY thing that the UDP packet availability
event does.

Now the buffer is a FIFO buffer, with synchronization when
reading and writing.

A next thread then consumes the buffer contents. While it consumes
the contents, more packets can be written to the buffer at the same
time in the read event handler.

Will post code on demand....



Dave K. wrote:
>Hello,
>I am using the Indy UDP server component in an application, and am
>experiencing quite a few missed packets in circumstances where the
>UDPRead method isn't called quickly enough after a packet arrives. For
>example, if I pause the program briefly in the de{*word*81} packets are
>invariably lost. This also happens in normal operation if there is any
>delay in responding to incoming data. Is this normal behavior for the
>UDP server? Is data lost when the Read method is not called before
>more data arrives? How can the problem be resolved, or can it be?
>
>Thanks
>Dave
 

{smallsort}

Re:Missed UDP packets

How to use...(actual code is below)...
#include "CircBuff.h"
TCircBuff *Buff1 = new TCircBuff; // Declaration....
//---------------------------------------------------------------------------
// In Server Code - UDP Reader
//---------------------------------------------------------------------------
void __fastcall TDataModule1::UDPServer1UDPRead(TObject *Sender,
TStream *AData, TIdSocketHandle *ABinding)
{
if (Buff1) // does our object exists?
{
Buff1->tmpl = AData->Size;
AData->ReadBuffer(Buff1->tmps, Buff1->tmpl);
Buff1->AddTmp();
}
}
//---------------------------------------------------------------------------
// In Consumer Code, e.g. A Form with a Timer and a Memo
//---------------------------------------------------------------------------
void __fastcall TForm1::Timer1Timer(TObject *Sender)
{
unsigned char s[4096]; // you may want to declare this in a heap...
int len;
Timer1->Enabled = false;
if (Buff1->Count>0)
{
Memo1->Lines->Add("----------------------");
Buff1->GetElement(s, len);
AnsiString B = s;
Memo1->Lines->Add(B); // place in a memo or something...
}
Timer1->Enabled = true;
}
==================================CIRCBUFF.H================================
//--------------------------------------------------------------------------
#ifndef CircBuffH
#define CircBuffH
// You may need to change these accordingly
#define TOTALPACKETS 512
#define PACKETSIZE 4096
struct packetstruct
{
unsigned char *s;
int l;
};
class TCircBuff
{
private:
volatile int front, rear, count;
packetstruct * buffer;
CRITICAL_SECTION GlobalCriticalSection;
void memcpy2(unsigned char *d, unsigned char *s, int l);
public:
__fastcall TCircBuff();
__fastcall ~TCircBuff();
char *tmps;
int tmpl;
void GetElement(unsigned char *ss, int & ll);
void SetElement(unsigned char *ss, int ll);
int FGetCount(void) { return count; };
void AddTmp();
__property int Count = { read=FGetCount };
};
//---------------------------------------------------------------------------
//---------------------------------------------------------------------------
#endif
=============================end-of-CIRCBUFF.H===============================
================================CIRCBUFF.CPP=================================
//---------------------------------------------------------------------------
#include <vcl.h>
#pragma hdrstop
#include "CircBuff.h"
//---------------------------------------------------------------------------
#pragma package(smart_init)
//---------------------------------------------------------------------------
__fastcall TCircBuff::TCircBuff()
{
buffer = new packetstruct [TOTALPACKETS];
tmps = new unsigned char [PACKETSIZE];
for (int i=0; i<TOTALPACKETS; i++)
{
buffer[i].s = new unsigned char [PACKETSIZE];
buffer[i].l = 0;
}
front=0;
rear=0;
count=0;
InitializeCriticalSection(&GlobalCriticalSection);
}
//---------------------------------------------------------------------------
__fastcall TCircBuff::~TCircBuff()
{
for (int i=0; i<TOTALPACKETS; i++)
{
delete []buffer[i].s;
}
delete []buffer;
delete []tmps;
DeleteCriticalSection(&GlobalCriticalSection);
}
//---------------------------------------------------------------------------
void TCircBuff::GetElement(unsigned char *ss, int & ll)
{
EnterCriticalSection(&GlobalCriticalSection);
if (count>0)
{
memcpy2(tmps, buffer[front].s, PACKETSIZE);
tmpl = buffer[front].l;
count--;
front++;
if (front == TOTALPACKETS) front=0;
}
else
{
ZeroMemory(tmps, PACKETSIZE);
tmpl=0;
}
memcpy2(ss, tmps, PACKETSIZE);
ll = tmpl;
LeaveCriticalSection(&GlobalCriticalSection);
}
//---------------------------------------------------------------------------
void TCircBuff::SetElement(unsigned char *ss, int ll)
{
EnterCriticalSection(&GlobalCriticalSection);
memcpy2(buffer[rear].s, ss,PACKETSIZE);
buffer[rear].l = ll;
if (count < TOTALPACKETS)
{
count++;
rear++;
if (rear == TOTALPACKETS)
{
rear = 0;
}
}
LeaveCriticalSection(&GlobalCriticalSection);
}
//---------------------------------------------------------------------------
void TCircBuff::AddTmp()
{
SetElement(tmps, tmpl);
}
//---------------------------------------------------------------------------
void TCircBuff::memcpy2(unsigned char *d, unsigned char *s, int l)
{
if (!d) return;
if (!s) return;
while (l--) *d++ = *s++;
}
//---------------------------------------------------------------------------
==============================end-of-CIRCBUFF.CPP============================
Dave K. wrote:
Quote
Colin, thanks for the suggestion. If you could post your code I would
greatly appreciate it.

Thanks
Dave


Colin B Maharaj wrote:
>Dave, what I do is to create a circular buffer
>then take the entire UDP packet and place it in this
>circular buffer.
>Will post code on demand....
>
>
>
>Dave K. wrote:
>>Hello,
>>I am using the Indy UDP server component in an application, and am
>>experiencing quite a few missed packets
>>Dave
 

Re:Missed UDP packets

One more thing.
This buffer does not grow dynamically.
This was intentional in my design, in case
the consumer thread stops, then we get memory
growing and growing.
The packet size is 4096 (adjustable)
and the amount of packets is 512 (adjustable)
If you are receiving full UDP you may want to
change the packet size to 16384
Later - good luck....
Dave K. wrote:
Quote
Colin, thanks for the suggestion. If you could post your code I would
greatly appreciate it.

 

Re:Missed UDP packets

Colin B Maharaj wrote:
Quote
How to use...(actual code is below)...
Nice code.
I'd be very tempted to replace tmps with
buffer[rear].s which would eliminate the
memcpy2, speeding up the critical section.
It would also eliminate what appears to be a bug:
Thread 1 does GetElement() which copies
data into tmps and then copies tmps into
the target buffer. While Thread 1 is
copying tmps twice (in a critical section!),
Thread 2 fills tmps (outside a critical section)
to issue a SetElement().
I think "read_ buffer" and "write_buffer"
property wrappers would help.
(following is without properties)
{Alternative would be read_bufer and write_buffer
pointers which are set to point into buffers[].)
Quote
void __fastcall TDataModule1::UDPServer1UDPRead(TObject *Sender,
TStream *AData, TIdSocketHandle *ABinding)
{
if (Buff1) // does our object exists?
{
int len = AData->Size; // in case Size changes
// Will len>PACKETSIZE ever happen ?
*Buff1->write_size() = 0;// existing data is obsolete
AData->ReadBuffer(Buff1->write_buffer(), len);
*Buff1->write_size() = len;// data is available
Buff1->Add();
Quote
}
}
char* TCircBuff::write_buffer()
{ return buffer[rear].s;
}
int* TCircBuff::write_size()
{ return &buffer[rear].l;
}
// Single-writer version
//void TCircBuff::SetElement(unsigned char *ss, int ll)
void TCircBuff::Add()
Quote
{
// two fewer buffers available this way
// [front] is being consumed, [rear] is being written
// if we use the multi-consumer below, then -1 is fine
if (count < TOTALPACKETS - 2 )
Quote
{
EnterCriticalSection(&GlobalCriticalSection);
// might need to list count as volatile.
Quote
count++;
LeaveCriticalSection(&GlobalCriticalSection);
rear++;
if (rear == TOTALPACKETS)
{
rear = 0;
}
//write_buffer = buffer[rear].s;
}
}
// Single-consumer version
// Note difference in ss parameter
void TCircBuff::GetElement(unsigned char **ss, int & ll)
Quote
{
if( count>0 )
{
//read_buffer = buffer[front].s;
*ss = buffer[front].s;
ll = buffer[front].l;
EnterCriticalSection(&GlobalCriticalSection);
Quote
count--;
LeaveCriticalSection(&GlobalCriticalSection);
front++;
if (front == TOTALPACKETS) front=0;
}
else
{
*ss = 0;
ll=0;
Quote
}
}
One of the goals of circular buffers is to eliminate
temporary buffers. The data is entered and consumed
directly to/from the internal buffers.
It is also a goal that buffer[front] != buffer[rear]
so that there is no collision requiring a critical
section on the buffers in the first place.
Critical sections should be short.
Since tmps is available to the application outside
a critical section, placing the tmps copy code inside
a critical section is meaningless.
Ok, so if you have multiple consumer threads, then
you need to take the data out of the buffer during
the "pop". Just don't use the vulnerable tmps.
// Multi-consumer version
void TCircBuff::GetElement(unsigned char *ss, int & ll)
Quote
{
EnterCriticalSection(&GlobalCriticalSection);
if (count>0)
{
memcpy2(ss, buffer[front].s, PACKETSIZE);
ll = buffer[front].l;
Quote
count--;
front++;
if (front == TOTALPACKETS) front=0;
LeaveCriticalSection(&GlobalCriticalSection);
}
else
{
LeaveCriticalSection(&GlobalCriticalSection);
ZeroMemory(ss, PACKETSIZE);
ll=0;
Quote
}
}
And just to be complete:
(AddTmp and tmps don't exist for multi-writer)
// Multi-writer version
void TCircBuff::SetElement(unsigned char *ss, int ll)
Quote
{
/* FudgeFactor
= 2 for single-single
= 1 for mixed
=0 for multi/multi
*/
Quote
EnterCriticalSection(&GlobalCriticalSection);
if (count < TOTALPACKETS /* - FudgeFactor */ )
{
memcpy2(buffer[rear].s, ss,PACKETSIZE);
buffer[rear].l = ll;
count++;
rear++;
if (rear == TOTALPACKETS)
{
rear = 0;
}
}else{
//throw EDroppedPacket
Quote
};
LeaveCriticalSection(&GlobalCriticalSection);
}
In summary, the multi-writer/multi-reader is the general-purpose/cover all bases version,
and as such, is slower with all those memcpy functions.
This overhead might be aleviated by giving each thread a buffer and just swapping
pointers.
// Multi-writer pointer version
// (again, **ss)
void TCircBuff::SetElement(unsigned char **ss, int ll)
{
EnterCriticalSection(&GlobalCriticalSection);
if (count < TOTALPACKETS )
{
swap( buffer[rear].s, *ss );//might be wrong
// places the thread's buffer into our array,
// and gives the thread our empty buffer
Quote
buffer[rear].l = ll;
count++;
rear++;
if (rear == TOTALPACKETS)
{
rear = 0;
}
}else{
//throw EDroppedPacket
Quote
};
LeaveCriticalSection(&GlobalCriticalSection);
}
{original code not snipped}
#include "CircBuff.h"
TCircBuff *Buff1 = new TCircBuff; // Declaration....

//---------------------------------------------------------------------------
// In Server Code - UDP Reader
//---------------------------------------------------------------------------

void __fastcall TDataModule1::UDPServer1UDPRead(TObject *Sender,
TStream *AData, TIdSocketHandle *ABinding)
{
if (Buff1) // does our object exists?
{
Buff1->tmpl = AData->Size;
AData->ReadBuffer(Buff1->tmps, Buff1->tmpl);
Buff1->AddTmp();
}
}
//---------------------------------------------------------------------------
// In Consumer Code, e.g. A Form with a Timer and a Memo
//---------------------------------------------------------------------------

void __fastcall TForm1::Timer1Timer(TObject *Sender)
{
unsigned char s[4096]; // you may want to declare this in a heap...
int len;
Timer1->Enabled = false;
if (Buff1->Count>0)
{
Memo1->Lines->Add("----------------------");
Buff1->GetElement(s, len);
AnsiString B = s;
Memo1->Lines->Add(B); // place in a memo or something...
}
Timer1->Enabled = true;
}
==================================CIRCBUFF.H================================

#ifndef CircBuffH
#define CircBuffH

// You may need to change these accordingly

#define TOTALPACKETS 512
#define PACKETSIZE 4096
struct packetstruct
{
unsigned char *s;
int l;
};

class TCircBuff
{
private:
volatile int front, rear, count;
packetstruct * buffer;

CRITICAL_SECTION GlobalCriticalSection;
void memcpy2(unsigned char *d, unsigned char *s, int l);

public:
__fastcall TCircBuff();
__fastcall ~TCircBuff();

char *tmps;
int tmpl;

void GetElement(unsigned char *ss, int & ll);
void SetElement(unsigned char *ss, int ll);
int FGetCount(void) { return count; };
void AddTmp();

__property int Count = { read=FGetCount };
};
#endif

=============================end-of-CIRCBUFF.H===============================

================================CIRCBUFF.CPP=================================

#include <vcl.h>
#pragma hdrstop

#include "CircBuff.h"

//---------------------------------------------------------------------------
#pragma package(smart_init)
//---------------------------------------------------------------------------

__fastcall TCircBuff::TCircBuff()
{
buffer = new packetstruct [TOTALPACKETS];
tmps = new unsigned char [PACKETSIZE];
for (int i=0; i<TOTALPACKETS; i++)
{
buffer[i].s = new unsigned char [PACKETSIZE];
buffer[i].l = 0;
}
front=0;
rear=0;
count=0;
InitializeCriticalSection(&GlobalCriticalSection);
}

//---------------------------------------------------------------------------

__fastcall TCircBuff::~TCircBuff()
{
for (int i=0; i<TOTALPACKETS; i++)
{
delete []buffer[i].s;
}
delete []buffer;
delete []tmps;
DeleteCriticalSection(&GlobalCriticalSection);
}

//---------------------------------------------------------------------------

void TCircBuff::GetElement(unsigned char *ss, int & ll)
{
EnterCriticalSection(&GlobalCriticalSection);
if (count>0)
{
memcpy2(tmps, buffer[front].s, PACKETSIZE);
tmpl = buffer[front].l;
count--;
front++;
if (front == TOTALPACKETS) front=0;
}
else
{
ZeroMemory(tmps, PACKETSIZE);
tmpl=0;
}
memcpy2(ss, tmps, PACKETSIZE);
ll = tmpl;
LeaveCriticalSection(&GlobalCriticalSection);
}

//---------------------------------------------------------------------------

void TCircBuff::SetElement(unsigned char *ss, int ll)
{
EnterCriticalSection(&GlobalCriticalSection);
memcpy2(buffer[rear].s, ss,PACKETSIZE);
buffer[rear].l = ll;
if (count < TOTALPACKETS)
{
count++;
rear++;
if (rear == TOTALPACKETS)
{
rear = 0;
}
}
LeaveCriticalSection(&GlobalCriticalSection);
}

//---------------------------------------------------------------------------

void TCircBuff::AddTmp()
{
SetElement(tmps, tmpl);
}

//---------------------------------------------------------------------------

void TCircBuff::memcpy2(unsigned char *d, unsigned char *s, int l)
{
if (!d) return;
if (!s) return;
while (l--) *d++ = *s++;
}

//---------------------------------------------------------------------------

==============================end-of-CIRCBUFF.CPP============================
 

Re:Missed UDP packets

Hey Bob, thanks for the advice will do some reading up later.....
:)
Bob Gonder wrote:
Quote
Colin B Maharaj wrote:

>How to use...(actual code is below)...

Nice code.
I'd be very tempted to replace tmps with
buffer[rear].s which would eliminate the
memcpy2, speeding up the critical section.
It would also eliminate what appears to be a bug:

Thread 1 does GetElement() which copies
data into tmps and then copies tmps into