#region File Header // // FiniteQueueStream.cs - A generic finite queue (ring/circular buffer) with index [] operator. // Derived from Stream class (byte oriented) // // Copyright (C) Javier Valcarce. BSD License #endregion #region Using Statements using System; using System.IO; using System.Threading; using System.Diagnostics; using System.Collections; using System.Collections.Generic; #endregion namespace TestArq { /// /// FinitQueueStream is a Blocking API Finite Queue derived from Stream class /// This class implements a thread-safe generic finite queue (also called circular buffer, /// ring buffer or bounded buffer) with *blocking* Enqueue()/Dequeue() operations. It has /// also a index [] operator to examine and change the data inside the queue. /// public class FiniteQueueStream : Stream { #region Private Members int head; int tail; byte[] buff; int count; int space; object sync = new object(); #endregion #region Constructor /// /// Creates a ring buffer of n elements of type T. /// /// Queue's capacity public FiniteQueueStream(int n) { if (n < 1) throw new ArgumentException(); buff = new byte[n]; Clear(); } /// /// Creates a ring buffer using a T array as the underlaying storage. /// /// Array of T elements public FiniteQueueStream(byte[] v) { if (v.Length < 1) throw new ArgumentException(); buff = v; Clear(); } #endregion #region Public Methods /// /// Removes all objects from the FiniteQueue /// public void Clear() { lock (sync) { head = 0; tail = 0; count = 0; space = buff.Length; } } /// /// Enqueue a single object to the end of the FiniteQueue /// /// public int Enqueue(byte token) { lock (sync) { if (space == 0) Monitor.Wait(sync); buff[tail] = token; tail++; if (tail == buff.Length) tail = 0; count++; space--; Monitor.Pulse(sync); return 1; } } /// /// Enqueue an array of objects to the end of the RingBuffer]]> /// /// public int Enqueue(byte[] src, int offset, int count) { lock (sync) { if (this.space < count) Monitor.Wait(sync); for (int i = 0; i < count; i++) { buff[tail] = src[offset + i]; tail++; if (tail == buff.Length) tail = 0; } this.count += count; this.space -= count; Monitor.Pulse(sync); return count; } } /// /// Removes and returns the token at the beginning of the FiniteQueue /// /// The number of tokens dequeued, always 1 public int Dequeue(out byte token) { lock (sync) { if (count == 0) Monitor.Wait(sync); token = buff[head]; head++; if (head == buff.Length) head = 0; count--; space++; Monitor.Pulse(sync); return 1; } } /// /// Removes and returns the first count tokens at the beginning of the FiniteQueue /// /// Destination array to store the dequeued tokens /// Offset in the dst array /// Number of tokens to be dequeued /// public int Dequeue(byte[] dst, int offset, int count) { lock (sync) { if (this.count < count) Monitor.Wait(sync); for (int i = 0; i < count; i++) { dst[offset + i] = buff[head]; head++; if (head == buff.Length) head = 0; } this.count -= count; this.space += count; Monitor.Pulse(sync); return count; } } #endregion #region Properties /// /// Index operator. /// Warning: this operator use MOD (%) to compute the index, its performance is not /// very good. Use it sporadically. /// /// Index in the queue /// The indexed element public byte this[int index] { get { lock (sync) { return buff[(head + index) % buff.Length]; } } set { lock (sync) { buff[(head + index) % buff.Length] = value; } } } public int Capacity { get { return buff.Length; } } /// /// Number of tokens in the queue /// public int Count { get { lock (sync) { return count; } } } /// /// Available free space in the queue /// public int Space { get { lock (sync) { return space; } } } #endregion #region STREAM INTERFACE public override int Read (byte[] buffer, int offset, int count) { return Dequeue(buffer, offset, count); } public override void Write(byte[] buffer, int offset, int count) { Enqueue(buffer, offset, count); } public override void Flush() { } // public override bool CanRead { get { return true; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return true; } } public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } #endregion } }