Jump to content
al17nichols

how to flush buffers on Indy components

Recommended Posts

What is the best way to flush the buffer on both the indy client tidtcpclient and server tidtcpserver components?  Also, is there an acknowledgement when the flush is complete.  This includes flushing read and write buffers for both components. 

 

Edited by al17nichols
clarity

Share this post


Link to post
18 hours ago, al17nichols said:

What is the best way to flush the buffer on both the indy client tidtcpclient and server tidtcpserver components?

I don't really understand what you are asking for, but why would you ever need something like this?  Please provide a use-case example.

18 hours ago, al17nichols said:

Also, is there an acknowledgement when the flush is complete.

No (if such a flush were possible).

18 hours ago, al17nichols said:

This includes flushing read and write buffers for both components.

By default, there is no write buffering beyond the send buffer that is built-in to the underlying socket at the OS layer.  You can't "flush" that buffer, but you can reduce its size with the SO_SNDBUF socket option, or disable/avoid it with the TCP_NODELAY socket option (Indy's UseNagle property).  On top of that, Indy does provide an application-level send buffer via the IOHandler.WriteBuffer...() methods.  That buffer is flushed to the socket when the specified threshold is reached, or when WriteBufferFlush() or WriteBufferClose() is called. That buffer can be discarded with WriteBufferCancel().

 

Read buffering is a bit trickier. There is the receive buffer that is built-in to the underlying socket at the OS layer. You can't "flush" that buffer, only reduce its size with the SO_RCVBUF socket option. On top of that is the application-level buffering that Indy implements via the IOHandler.InputBuffer property, where the IOHandler.RecvBufferSize property controls how many bytes Indy attempts to read from the socket's receive buffer into the InputBuffer at a time.  Once data is in the InputBuffer, you can "flush" that buffer by calling its Clear() or Remove() method (in the latter case, you could use the IOHandler.Discard() method instead).

 

So, what is it that you really want to "flush", exactly? What is the scenario that you feel "flushing" is the solution for?

Share this post


Link to post

Thanks for your reply.  A scenario can occur when receiving data that an exception occurs or something happens that disrupts the flow of data.  I want to flush the remaining data that is being received so that I can reread the data.  I was doing a file transfer and the transfer was disrupted.  I read from the socket and there is more data coming from the buffer.  I have to close the socket and reopen to resync communication.  I hope this makes sense.

 

Share this post


Link to post

I don't know if its the right way but with Indy10 I use:
 

unit osCustomConnection;

interface

uses
  Windows,

  IdGlobal,
  IdBuffer,
  IdIOHandler,
  IdTCPClient,
  IdTCPConnection;

type
  TCustomConnection = class
  private
    FDevice: TIdTCPClient;
  private
    function GetConnected: Boolean;
    function GetHost: string;
    function GetPort: Integer;
    function GetInputBuffer: TIdBuffer;
  private
    procedure SetHost(const Value: string);
    procedure SetPort(Value: Integer);
  public
    procedure Connect(const ATimeout: Integer = IdTimeoutDefault);
    procedure Disconnect;
    function ReadFromStack(const ARaiseExceptionIfDisconnected: Boolean = True; ATimeout: Integer = IdTimeoutDefault; const ARaiseExceptionOnTimeout: Boolean = True): Integer;
    procedure WriteBuffer(const ABuffer; AByteCount: Integer; const AWriteNow: Boolean = False);
  public
	function FlushInputBuffer: Integer;  
  public
    property Connected: Boolean read GetConnected;
    property Host: string read GetHost write SetHost;
    property Port: Integer read GetPort write SetPort;
    property InputBuffer: TIdBuffer read GetInputBuffer;
  public
    constructor Create;
    destructor Destroy; override;
  end;

  TIdIOHandlerHelper = class(TIdIOHandler)
  public
    function ReadFromSource(ARaiseExceptionIfDisconnected: Boolean; ATimeout: Integer; ARaiseExceptionOnTimeout: Boolean): Integer;
  end;

implementation

{ TCustomConnection }

procedure TCustomConnection.Connect(const ATimeout: Integer);
begin
  FDevice.ConnectTimeout := ATimeOut;
  FDevice.Connect;
end;

constructor TCustomConnection.Create;
begin
  FDevice := nil;

  FDevice := TIdTCPClient.Create(nil);
end;

destructor TCustomConnection.Destroy;
begin
  FDevice.Free;

  inherited;
end;

procedure TCustomConnection.Disconnect;
begin
  FDevice.Disconnect;
end;

function TCustomConnection.GetConnected: Boolean;
begin
  Result := FDevice.Connected;
end;

function TCustomConnection.GetHost: string;
begin
  Result := FDevice.Host;
end;

function TCustomConnection.GetInputBuffer: TIdBuffer;
begin
  Result := FDevice.IOHandler.InputBuffer;
end;

function TCustomConnection.GetPort: Integer;
begin
  Result := FDevice.Port;
end;

function TCustomConnection.ReadFromStack(const ARaiseExceptionIfDisconnected: Boolean; ATimeout: Integer; const ARaiseExceptionOnTimeout: Boolean): Integer;
begin
  Result := TIdIOHandlerHelper(FDevice.IOHandler).ReadFromSource(ARaiseExceptionIfDisconnected, ATimeout, ARaiseExceptionOnTimeout);
end;

procedure TCustomConnection.SetHost(const Value: string);
begin
  FDevice.Host := Value;
end;

procedure TCustomConnection.SetPort(Value: Integer);
begin
  FDevice.Port := Value;
end;

procedure TCustomConnection.WriteBuffer(const ABuffer; AByteCount: Integer; const AWriteNow: Boolean);
begin
  if AByteCount = 0 then Exit;
  FDevice.IOHandler.WriteDirect(TIdBytes(@ABuffer), AByteCount);
end;

// flushes communications device input buffer
function TCustomConnection.FlushInputBuffer: Integer;
begin
  if not Connected then
    Result := 0
  else
  begin
    ReadFromStack(False, 1, False);
    Result := InputBuffer.Size;
    InputBuffer.Clear;
  end;
end;

{ TIdIOHandlerHelper }

function TIdIOHandlerHelper.ReadFromSource(ARaiseExceptionIfDisconnected: Boolean; ATimeout: Integer; ARaiseExceptionOnTimeout: Boolean): Integer;
begin
  Result := inherited ReadFromSource(ARaiseExceptionIfDisconnected, ATimeout, ARaiseExceptionIfDisconnected);
end;

end.

It's a part of a more complex class (reduced to be simple to be read) but works.

 

I use Indy10 TCP to capture encoded/encrypted/compressed continuous flow of data and so I've moved to public ReadFromSource,

which call private TIdIOHandler.ReadFromSource function, to manage better the flow in a capture/decode/decrypt/uncompress thread.

FlusInputBuffer calls a ReadFromStack to capture pending data en delete InputBuffer.Size return how many bytes were flushed.

Edited by shineworld

Share this post


Link to post
8 hours ago, al17nichols said:

I was doing a file transfer and the transfer was disrupted.  I read from the socket and there is more data coming from the buffer.  I have to close the socket and reopen to resync communication.

I don't get it. If you close a socket, you automatically discard all the buffers.

Share this post


Link to post

The desire is not to close the socket, but remove all of the bytes like a resync so that we can start communicating again.  The fact that I do not want to close and reopen the socket was not clear, but I do not want to close, but I do want to restart the communication.

 

 

Edited by al17nichols
clarity

Share this post


Link to post

Can you just read up remains to a junk buffer and then start new conversation?

Also, what does "the transfer was disrupted" mean? Some troubles at client side? Because if there's something wrong at server side, there will be no "data coming".

Share this post


Link to post
On 8/2/2022 at 4:00 PM, al17nichols said:

A scenario can occur when receiving data that an exception occurs or something happens that disrupts the flow of data.  I want to flush the remaining data that is being received so that I can reread the data.

Unless the data is structured in such a way that you can definitively identify, from ANY point within the data, where the data ends and the next piece of communication begins, then "flushing" the existing data to resume communications is usually not a viable option.  Especially if the next piece of communication has already been received and buffered and waiting for your code to read it from the buffer.

Quote

I was doing a file transfer and the transfer was disrupted.

Are you using a standard protocol like FTP for the transfer, or are you using a custom protocol?

Quote

I read from the socket and there is more data coming from the buffer. I have to close the socket and reopen to resync communication.

Unless the exception is a socket error, then you don't know the actual state of the socket, or how the data was buffered by the socket or by Indy.  So, once the exception occurs, usually the only sane thing to do is to just disconnect and reconnect to restart communications fresh, yes.  This is true for any TCP library, it is not limited to just Indy.  Since you know how much data you transferred before the exception, you can resume the transfer from that position, if your system allows for resuming.

Edited by Remy Lebeau

Share this post


Link to post
On 8/2/2022 at 10:58 PM, shineworld said:

I don't know if its the right way but with Indy10 I use:

FYI, your use of the TIdIOHandler.ReadFromSource() method is just mirroring what the public TIdIOHandler.CheckForDataOnSource() method does internally, so you don't really need your TIdIOHandlerHelper class at all, just use CheckForDataOnSource(1) instead of ReadFromStack(False, 1, False).

  • Like 1

Share this post


Link to post
On 8/3/2022 at 7:08 AM, al17nichols said:

The desire is not to close the socket, but remove all of the bytes like a resync so that we can start communicating again.  The fact that I do not want to close and reopen the socket was not clear, but I do not want to close, but I do want to restart the communication.

Unless your communication protocol is specifically designed to allow resyncing communications without closing the socket, then this is usually not a viable option.  What does your protocol actually look like? How are your messages delimited? What does your "disrupted flow of data" look like? At the very least, it is possible to discard bytes only until the next message delimiter is reached.  But you won't know if you discarded a (partial) message that was important to your communications, so resyncing via a reconnect is usually the best option.

Edited by Remy Lebeau

Share this post


Link to post

Hello Remy.  Apologies for the time it has taken to get back to this post.  In this case I am doing a file transfer using this approach.  IOHandler.ReadStream( XferStream, FileSize ,false ).  This actually works most of the time as in (99.9%) of the time.  If it fails it is a result of some issue of mine, but if it fails I need to recover the client and server applications.

 

Share this post


Link to post
12 hours ago, al17nichols said:

In this case I am doing a file transfer using this approach.  IOHandler.ReadStream( XferStream, FileSize ,false ).  This actually works most of the time as in (99.9%) of the time.  If it fails it is a result of some issue of mine, but if it fails I need to recover the client and server applications.

Without knowing more about your actual protocol, I can't answer that.  Is the transfer being done on the same connection that initiates the transfer (ala HTTP)?  Or, is it on a separate connection (ala FTP)?  How does the receiver know what the FileSize is?  Does the receiver receive anything at the end of the transfer that it could look for if the transfer fails with a non-socket error?  I ask this, because if the transfer fails with a non-socket error, you will only know how many bytes were written to the XferStream, but not necessarily how many bytes were actually consumed from the socket itself.  So, the only way to recover without disconnecting/reconnecting is if you have a definitive way to identify where a new message begins after a transfer is finished.

Share this post


Link to post

the IOHandler referenced points to the tidtpclient with a designated port.  Transferring a file using the filestream.  The server sends the filesize a little earlier in a message that is wrapped in XML to include other information prior to the server (tidtcpserver) sending the actual file.  The receive does not send any information at the end of the transfer (interesting idea) at the current time.  The problem may occur that the transfer gets disrupted for some reason and I want to continue to use the connection by flushing any remaining data.  FYI: I read the remainder of your message and see that I need to add a message or something to indicate that the transfer has ended.

Share this post


Link to post
3 hours ago, al17nichols said:

I read the remainder of your message and see that I need to add a message or something to indicate that the transfer has ended.

If the connection doesn't break on transfer failure, you'll either have to analyze data stream for special "transfer broke" message or handle no-data-timeout. Alternatively you can wrap transfers into protocol messages so that client must be reading these messages always, then decide what's going on and what to do next - read data or request transfer again starting from some position.

I'd go this way:

CLI: <get filename from NNN>

SRV: <data filename chunksize> [...chunksize bytes of data...]

SRV: <data filename break> - failure indicator

CLI: <get filename from MMM>

Edited by Fr0sT.Brutal

Share this post


Link to post
On 8/18/2022 at 9:07 PM, al17nichols said:

Transferring a file using the filestream.  The server sends the filesize a little earlier in a message that is wrapped in XML to include other information prior to the server (tidtcpserver) sending the actual file.  The receive does not send any information at the end of the transfer (interesting idea) at the current time.  The problem may occur that the transfer gets disrupted for some reason and I want to continue to use the connection by flushing any remaining data.  FYI: I read the remainder of your message and see that I need to add a message or something to indicate that the transfer has ended.

I have asked you multiple times now to please provide an example of your actual communication in action, but you have not done so yet.  I CANNOT help you in implementing recovery logic without seeing a real example.  For all I know, your protocol simply CAN'T recover the way you want.  That is a likely possibility, many common protocols can't recover without disconnecting the socket and reconnecting.  If you do not provide a real example of a file transfer with surrounding messaging, then there is nothing I can do.

Share this post


Link to post

below is the server send file code and the client receive code.  forgive me. This was done before I knew much about indy.

Server side send file
TmpFileFS : TFileStream ;

  if FileExists( FilePath+FileName ) then
    Connection.IOHandler.WriteLn( Format( XMLHead + XMLTail, ['SPS_AttachGetFileRS', 'ResponseType="[1]" MSG="1" Ack="1"', 'SPS_AttachGetFileRS'] ))
  else
    Connection.IOHandler.WriteLn( Format( XMLHead + XMLTail, ['SPS_AttachGetFileRS', 'ResponseType="[1]" MSG="1" Ack="0"', 'SPS_AttachGetFileRS'] )) ;

  if FileExists( FilePath+FileName ) then
    begin
    TmpFileFS := TFileStream.Create( FilePath + FileName, fmOpenRead );
    Try
      // AContext.Connection.IOHandler.WriteLn( Format( XMLHead + XMLTail, ['SPS_AttachFileXferRS', 'ResponseType="[1]" MSG="1" ' + 'FileName="' + TmpFile + '"', 'SPS_AttachFileXferRS'] )) ;
      Connection.IOHandler.Write( TmpFileFS );
      //  AContext.Connection.IOHandler.WriteLn( Format( XMLHead + XMLTail, ['SPS_AttachFileXferRS', 'ResponseType="[1]" MSG="1" ' + 'Ack="1"', 'SPS_AttachFileXferRS'] )) ;
    Finally
      TmpFileFS.Free ;

 


client side receive file
/*Plain message sent requesing file by name ;*/
/* XML response received below at readln */
--->MsgResp := frmTuner.CommMsgInt.IOHandler.Readln ;
 if Length(MsgResp) = 0 then
   raise Exception.Create('File Not Located');
 VarList := TList.Create ;
 New( XMLVars ) ;
 XMLVars.VarName := 'SPS_AttachGetFileRS' ;
 XMLVars.AttNode := xxXMLNode ;
 XMLVars.VarValue := '' ;
 VarList.Add( XMLVars ) ;
 New( XMLVars ) ;
 XMLVars.VarName := 'Ack' ;
  XMLVars.AttNode := xxXMLAttrib ;
  XMLVars.VarValue := '' ;
  VarList.Add( XMLVars ) ;
  XMLMsgToTxt( MsgResp, VarList ) ;
  MsgResp := PXMLInfo( VarList[1]).VarValue  ;
  if MsgResp = '1' then  /* if value is 1 the file was located
    begin
    /* the filesize and other info was stored in the database  */
    xFerStream := TFileStream.Create( TempDir + FileName, fmCreate ) ;
 --->   frmTuner.CommMsgInt.IOHandler.ReadStream( XferStream, FileSize ,false );
    XFerStream.Free ;

Share this post


Link to post
10 hours ago, al17nichols said:

below is the server send file code and the client receive code.  forgive me. This was done before I knew much about indy.

I really wanted to see your actual raw messages, not your code.  But whatever.

 

From the code you have shown, it appears that your messages are formatted as XML.  In which case, you can recover from a non-socket error by reading bytes from the IOHandler until the next opening XML element is encountered.  For instance, you can call IOHandler.WaitFor('<', False, False) to ignore all bytes until a '<' character becomes available at the front of the InputBuffer, and then call IOHandler.CheckForDataOnSource()+IOHandler.InputBuffer.Peek() to see if subsequent byte(s) contain a valid message name.  If not, call IOHandler.Discard() to ignore the unwanted byte(s) and repeat the WaitFor()+Peek() until a valid message is detected.  Eventually, you should encounter the beginning of a valid message in the InputBuffer and can then resume your normal reading.

Edited by Remy Lebeau

Share this post


Link to post

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

×