Refactored to improve performance by less locking
This commit is contained in:
parent
9c7e4664ce
commit
69e8715a4a
@ -1,47 +1,70 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
using System.Diagnostics.CodeAnalysis;
|
using System.Diagnostics.CodeAnalysis;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Channels;
|
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using FastRng.Double.Distributions;
|
using FastRng.Double.Distributions;
|
||||||
|
|
||||||
namespace FastRng.Double
|
namespace FastRng.Double
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
/// A fast multi-threaded pseudo random number generator.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Please note, that Math.NET's (https://www.mathdotnet.com/) random number generator is in some situations faster.
|
||||||
|
/// Unlike Math.NET, MultiThreadedRng is multi-threaded and async. Consumers can await the next number without
|
||||||
|
/// blocking resources. Additionally, consumers can use a token to cancel e.g. timeout an operation as well.<br/><br/>
|
||||||
|
///
|
||||||
|
/// MultiThreadedRng using a shape fitter (a rejection sampler) to enforce arbitrary shapes of probabilities for
|
||||||
|
/// desired distributions. By using the shape fitter, it is even easy to define discontinuous, arbitrary functions
|
||||||
|
/// as shapes. Any consumer can define and use own distributions.<br/><br/>
|
||||||
|
///
|
||||||
/// This class uses the George Marsaglia's MWC algorithm. The algorithm's implementation based loosely on John D.
|
/// This class uses the George Marsaglia's MWC algorithm. The algorithm's implementation based loosely on John D.
|
||||||
/// Cook's (johndcook.com) implementation (https://www.codeproject.com/Articles/25172/Simple-Random-Number-Generation).
|
/// Cook's (johndcook.com) implementation (https://www.codeproject.com/Articles/25172/Simple-Random-Number-Generation).
|
||||||
/// Thanks John for the inspiration.
|
/// Thanks John for the inspiration.<br/><br/>
|
||||||
/// </summary>
|
///
|
||||||
public sealed class MultiThreadedRng : IRandom
|
/// Please notice: When using the debug environment, MultiThreadedRng uses a smaller buffer size. Please ensure,
|
||||||
|
/// that the production environment uses a release build, though.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class MultiThreadedRng : IRandom, IDisposable
|
||||||
{
|
{
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
private const int CAPACITY_RANDOM_NUMBERS_4_SOURCE = 10_000;
|
private const int BUFFER_SIZE = 10_000;
|
||||||
#else
|
#else
|
||||||
private const int CAPACITY_RANDOM_NUMBERS_4_SOURCE = 16_000_000;
|
private const int BUFFER_SIZE = 1_000_000;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
private readonly CancellationTokenSource producerTokenSource = new CancellationTokenSource();
|
// The queue size means, how many buffer we store in a queue at the same time:
|
||||||
private readonly object syncUintGenerators = new object();
|
private const int QUEUE_SIZE = 2;
|
||||||
private readonly object syncUniformDistributedDoubleGenerators = new object();
|
|
||||||
private readonly Thread[] producerRandomUint = new Thread[2];
|
|
||||||
private readonly Thread[] producerRandomUniformDistributedDouble = new Thread[2];
|
|
||||||
|
|
||||||
|
// Gets used to stop the producer threads:
|
||||||
|
private readonly CancellationTokenSource producerTokenSource = new CancellationTokenSource();
|
||||||
|
|
||||||
|
// The time a thread waits e.g. to check if the queue needs a new buffer:
|
||||||
|
private readonly TimeSpan waiter = TimeSpan.FromMilliseconds(10);
|
||||||
|
|
||||||
|
// The first queue, where to store buffers of random uint numbers:
|
||||||
|
private readonly ConcurrentQueue<uint[]> queueIntegers = new ConcurrentQueue<uint[]>();
|
||||||
|
|
||||||
|
// The second queue, where to store buffers of uniform random double numbers:
|
||||||
|
private readonly ConcurrentQueue<double[]> queueDoubles = new ConcurrentQueue<double[]>();
|
||||||
|
|
||||||
|
// The uint producer thread:
|
||||||
|
private Thread producerRandomUint;
|
||||||
|
|
||||||
|
// The uniform double producer thread:
|
||||||
|
private Thread producerRandomUniformDistributedDouble;
|
||||||
|
|
||||||
|
// Variable w and z for the uint generator. Both get used
|
||||||
|
// as seeding variable as well (cf. constructors)
|
||||||
private uint mW;
|
private uint mW;
|
||||||
private uint mZ;
|
private uint mZ;
|
||||||
|
|
||||||
private readonly Channel<uint> channelRandomUint = Channel.CreateBounded<uint>(new BoundedChannelOptions(CAPACITY_RANDOM_NUMBERS_4_SOURCE)
|
// This is the current buffer for the consumer side i.e. the public interfaces:
|
||||||
{
|
private double[] currentBuffer = Array.Empty<double>();
|
||||||
FullMode = BoundedChannelFullMode.Wait,
|
|
||||||
SingleReader = false,
|
|
||||||
SingleWriter = false,
|
|
||||||
});
|
|
||||||
|
|
||||||
private readonly Channel<double> channelRandomUniformDistributedDouble = Channel.CreateBounded<double>(new BoundedChannelOptions(CAPACITY_RANDOM_NUMBERS_4_SOURCE)
|
// The current pointer to the next current buffer's address to read from:
|
||||||
{
|
private int currentBufferPointer = BUFFER_SIZE;
|
||||||
FullMode = BoundedChannelFullMode.Wait,
|
|
||||||
SingleReader = false,
|
|
||||||
SingleWriter = false,
|
|
||||||
});
|
|
||||||
|
|
||||||
#region Constructors
|
#region Constructors
|
||||||
|
|
||||||
@ -54,39 +77,30 @@ namespace FastRng.Double
|
|||||||
var now = DateTime.Now;
|
var now = DateTime.Now;
|
||||||
var ticks = now.Ticks;
|
var ticks = now.Ticks;
|
||||||
this.mW = (uint) (ticks >> 16);
|
this.mW = (uint) (ticks >> 16);
|
||||||
this.mZ = (uint) (ticks % 4294967296);
|
this.mZ = (uint) (ticks % 4_294_967_296);
|
||||||
this.StartProducerThreads(deterministic: false);
|
this.StartProducerThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
public MultiThreadedRng(uint seedU)
|
public MultiThreadedRng(uint seedU)
|
||||||
{
|
{
|
||||||
this.mW = seedU;
|
this.mW = seedU;
|
||||||
this.mZ = 362436069;
|
this.mZ = 362_436_069;
|
||||||
this.StartProducerThreads(deterministic: true);
|
this.StartProducerThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
public MultiThreadedRng(uint seedU, uint seedV)
|
public MultiThreadedRng(uint seedU, uint seedV)
|
||||||
{
|
{
|
||||||
this.mW = seedU;
|
this.mW = seedU;
|
||||||
this.mZ = seedV;
|
this.mZ = seedV;
|
||||||
this.StartProducerThreads(deterministic: true);
|
this.StartProducerThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void StartProducerThreads(bool deterministic = false)
|
private void StartProducerThreads()
|
||||||
{
|
{
|
||||||
this.producerRandomUint[0] = new Thread(() => this.RandomProducerUint(this.channelRandomUint.Writer, this.producerTokenSource.Token)) {IsBackground = true};
|
this.producerRandomUint = new Thread(() => this.RandomProducerUint(this.producerTokenSource.Token)) {IsBackground = true};
|
||||||
this.producerRandomUint[1] = new Thread(() => this.RandomProducerUint(this.channelRandomUint.Writer, this.producerTokenSource.Token)) {IsBackground = true};
|
this.producerRandomUint.Start();
|
||||||
this.producerRandomUint[0].Start();
|
this.producerRandomUniformDistributedDouble = new Thread(() => this.RandomProducerUniformDistributedDouble(this.producerTokenSource.Token)) {IsBackground = true};
|
||||||
|
this.producerRandomUniformDistributedDouble.Start();
|
||||||
if(!deterministic)
|
|
||||||
this.producerRandomUint[1].Start();
|
|
||||||
|
|
||||||
this.producerRandomUniformDistributedDouble[0] = new Thread(() => this.RandomProducerUniformDistributedDouble(this.channelRandomUint.Reader, channelRandomUniformDistributedDouble.Writer, this.producerTokenSource.Token)) {IsBackground = true};
|
|
||||||
this.producerRandomUniformDistributedDouble[1] = new Thread(() => this.RandomProducerUniformDistributedDouble(this.channelRandomUint.Reader, channelRandomUniformDistributedDouble.Writer, this.producerTokenSource.Token)) {IsBackground = true};
|
|
||||||
this.producerRandomUniformDistributedDouble[0].Start();
|
|
||||||
|
|
||||||
if(!deterministic)
|
|
||||||
this.producerRandomUniformDistributedDouble[1].Start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
@ -94,25 +108,44 @@ namespace FastRng.Double
|
|||||||
#region Producers
|
#region Producers
|
||||||
|
|
||||||
[ExcludeFromCodeCoverage]
|
[ExcludeFromCodeCoverage]
|
||||||
private async void RandomProducerUint(ChannelWriter<uint> channelWriter, CancellationToken cancellationToken)
|
private async void RandomProducerUint(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var buffer = new uint[CAPACITY_RANDOM_NUMBERS_4_SOURCE];
|
|
||||||
while (!cancellationToken.IsCancellationRequested)
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
lock (syncUintGenerators)
|
// A local next buffer, which gets filled next:
|
||||||
{
|
var nextBuffer = new uint[BUFFER_SIZE];
|
||||||
for (var n = 0; n < buffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
|
||||||
|
// Produce the necessary number of random uints:
|
||||||
|
for (var n = 0; n < nextBuffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
||||||
{
|
{
|
||||||
this.mZ = 36_969 * (this.mZ & 65_535) + (this.mZ >> 16);
|
this.mZ = 36_969 * (this.mZ & 65_535) + (this.mZ >> 16);
|
||||||
this.mW = 18_000 * (this.mW & 65_535) + (this.mW >> 16);
|
this.mW = 18_000 * (this.mW & 65_535) + (this.mW >> 16);
|
||||||
buffer[n] = (this.mZ << 16) + this.mW;
|
nextBuffer[n] = (this.mZ << 16) + this.mW;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (var n = 0; n < buffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
// Inside this loop, we try to enqueue the produced buffer:
|
||||||
await channelWriter.WriteAsync(buffer[n], cancellationToken);
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Ensure, that we do not produce more buffers, as configured:
|
||||||
|
if (this.queueIntegers.Count < QUEUE_SIZE)
|
||||||
|
{
|
||||||
|
this.queueIntegers.Enqueue(nextBuffer);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The queue was full. Wait a moment and try it again:
|
||||||
|
await Task.Delay(this.waiter, cancellationToken);
|
||||||
|
}
|
||||||
|
catch (TaskCanceledException)
|
||||||
|
{
|
||||||
|
// The producers should be stopped:
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
@ -121,23 +154,50 @@ namespace FastRng.Double
|
|||||||
}
|
}
|
||||||
|
|
||||||
[ExcludeFromCodeCoverage]
|
[ExcludeFromCodeCoverage]
|
||||||
private async void RandomProducerUniformDistributedDouble(ChannelReader<uint> channelReaderUint, ChannelWriter<double> channelWriter, CancellationToken cancellationToken)
|
private async void RandomProducerUniformDistributedDouble(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var buffer = new double[CAPACITY_RANDOM_NUMBERS_4_SOURCE];
|
|
||||||
var randomUint = new uint[CAPACITY_RANDOM_NUMBERS_4_SOURCE];
|
|
||||||
while (!cancellationToken.IsCancellationRequested)
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
for (var n = 0; n < randomUint.Length; n++)
|
// A local source buffer of uints:
|
||||||
randomUint[n] = await channelReaderUint.ReadAsync(cancellationToken);
|
uint[] bufferSource = null;
|
||||||
|
|
||||||
lock (syncUniformDistributedDoubleGenerators)
|
// Try to get the next source buffer:
|
||||||
for (var n = 0; n < buffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
while (!this.queueIntegers.TryDequeue(out bufferSource) && !cancellationToken.IsCancellationRequested)
|
||||||
buffer[n] = (randomUint[n] + 1.0) * 2.328306435454494e-10; // 2.328 => 1/(2^32 + 2)
|
await Task.Delay(this.waiter, cancellationToken);
|
||||||
|
|
||||||
for (var n = 0; n < buffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
// Case: The producers should be stopped:
|
||||||
await channelWriter.WriteAsync(buffer[n], cancellationToken);
|
if(bufferSource == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// A local buffer to fill with uniform doubles:
|
||||||
|
var nextBuffer = new double[BUFFER_SIZE];
|
||||||
|
|
||||||
|
// Generate the necessary number of doubles:
|
||||||
|
for (var n = 0; n < nextBuffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
||||||
|
nextBuffer[n] = (bufferSource[n] + 1.0) * 2.328306435454494e-10;
|
||||||
|
|
||||||
|
// Inside this loop, we try to enqueue the generated buffer:
|
||||||
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Ensure, that the queue contains only the configured number of buffers:
|
||||||
|
if (this.queueDoubles.Count < QUEUE_SIZE)
|
||||||
|
{
|
||||||
|
this.queueDoubles.Enqueue(nextBuffer);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The queue was full. Wait a moment and try it again:
|
||||||
|
await Task.Delay(this.waiter, cancellationToken);
|
||||||
|
}
|
||||||
|
catch (TaskCanceledException)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
@ -151,16 +211,94 @@ namespace FastRng.Double
|
|||||||
|
|
||||||
public async ValueTask<double> GetUniform(CancellationToken cancel = default)
|
public async ValueTask<double> GetUniform(CancellationToken cancel = default)
|
||||||
{
|
{
|
||||||
|
while (!cancel.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
// Check, if we need a new buffer to read from:
|
||||||
|
if (this.currentBufferPointer >= BUFFER_SIZE)
|
||||||
|
{
|
||||||
|
// Create a local copy of the current buffer's pointer:
|
||||||
|
var currentBufferReference = this.currentBuffer;
|
||||||
|
|
||||||
|
// Here, we store the next buffer until we implement it:
|
||||||
|
var nextBuffer = Array.Empty<double>();
|
||||||
|
|
||||||
|
// Try to get the next buffer from the queue:
|
||||||
|
while (this.currentBufferPointer >= BUFFER_SIZE && currentBufferReference == this.currentBuffer && !this.queueDoubles.TryDequeue(out nextBuffer))
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// Case: There is no next buffer available.
|
||||||
|
// Must wait for producer(s) to provide next.
|
||||||
|
//
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
return await this.channelRandomUniformDistributedDouble.Reader.ReadAsync(cancel);
|
await Task.Delay(this.waiter, cancel);
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (TaskCanceledException)
|
||||||
{
|
{
|
||||||
|
//
|
||||||
|
// Case: The consumer cancelled the request.
|
||||||
|
//
|
||||||
return double.NaN;
|
return double.NaN;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Note: In general, it does not matter if the following compare-exchange is successful.
|
||||||
|
// 1st case: It was successful -- everything is fine. But we are responsible to re-set the currentBufferPointer.
|
||||||
|
// 2nd case: It was not successful. This means, that another thread was successful, though.
|
||||||
|
// That case is fine as well. But we would loose one buffer of work. Thus, we
|
||||||
|
// check for this case and preserve the buffer full of work.
|
||||||
|
//
|
||||||
|
|
||||||
|
// Try to implement the dequeued buffer without locking other threads:
|
||||||
|
if (Interlocked.CompareExchange(ref this.currentBuffer, nextBuffer, currentBufferReference) != currentBufferReference)
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// Case: Another thread updated the buffer already.
|
||||||
|
// Thus, we enqueue our copy of the next buffer to preserve it.
|
||||||
|
//
|
||||||
|
this.queueDoubles.Enqueue(nextBuffer);
|
||||||
|
|
||||||
|
// Next? We can go ahead and yield a random number...
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// Case: We updated the buffer.
|
||||||
|
//
|
||||||
|
this.currentBufferPointer = 0;
|
||||||
|
|
||||||
|
// Next? We can go ahead and yield a random number...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Made a local copy of the current pointer:
|
||||||
|
var myPointer = this.currentBufferPointer;
|
||||||
|
|
||||||
|
// Increment the pointer for the next thread or call:
|
||||||
|
var nextPointer = myPointer + 1;
|
||||||
|
|
||||||
|
// Try to update the pointer without locking other threads:
|
||||||
|
if (Interlocked.CompareExchange(ref this.currentBufferPointer, nextPointer, myPointer) == myPointer)
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// Case: Success. We updated the pointer and, thus, can use the pointer to read a number.
|
||||||
|
//
|
||||||
|
return this.currentBuffer[myPointer];
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Case: Another thread updated the pointer already. Must restart the process
|
||||||
|
// to get a random number.
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Case: The consumer cancelled the request.
|
||||||
|
//
|
||||||
|
return double.NaN;
|
||||||
|
}
|
||||||
|
|
||||||
public async ValueTask<uint> NextNumber(uint rangeStart, uint rangeEnd, IDistribution distribution, CancellationToken cancel = default)
|
public async ValueTask<uint> NextNumber(uint rangeStart, uint rangeEnd, IDistribution distribution, CancellationToken cancel = default)
|
||||||
{
|
{
|
||||||
if (rangeStart > rangeEnd)
|
if (rangeStart > rangeEnd)
|
||||||
@ -213,6 +351,8 @@ namespace FastRng.Double
|
|||||||
|
|
||||||
public void StopProducer() => this.producerTokenSource.Cancel();
|
public void StopProducer() => this.producerTokenSource.Cancel();
|
||||||
|
|
||||||
|
public void Dispose() => this.StopProducer();
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,47 +1,70 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
using System.Diagnostics.CodeAnalysis;
|
using System.Diagnostics.CodeAnalysis;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Channels;
|
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using FastRng.Float.Distributions;
|
using FastRng.Float.Distributions;
|
||||||
|
|
||||||
namespace FastRng.Float
|
namespace FastRng.Float
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
/// A fast multi-threaded pseudo random number generator.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Please note, that Math.NET's (https://www.mathdotnet.com/) random number generator is in some situations faster.
|
||||||
|
/// Unlike Math.NET, MultiThreadedRng is multi-threaded and async. Consumers can await the next number without
|
||||||
|
/// blocking resources. Additionally, consumers can use a token to cancel e.g. timeout an operation as well.<br/><br/>
|
||||||
|
///
|
||||||
|
/// MultiThreadedRng using a shape fitter (a rejection sampler) to enforce arbitrary shapes of probabilities for
|
||||||
|
/// desired distributions. By using the shape fitter, it is even easy to define discontinuous, arbitrary functions
|
||||||
|
/// as shapes. Any consumer can define and use own distributions.<br/><br/>
|
||||||
|
///
|
||||||
/// This class uses the George Marsaglia's MWC algorithm. The algorithm's implementation based loosely on John D.
|
/// This class uses the George Marsaglia's MWC algorithm. The algorithm's implementation based loosely on John D.
|
||||||
/// Cook's (johndcook.com) implementation (https://www.codeproject.com/Articles/25172/Simple-Random-Number-Generation).
|
/// Cook's (johndcook.com) implementation (https://www.codeproject.com/Articles/25172/Simple-Random-Number-Generation).
|
||||||
/// Thanks John for the inspiration.
|
/// Thanks John for the inspiration.<br/><br/>
|
||||||
/// </summary>
|
///
|
||||||
public sealed class MultiThreadedRng : IRandom
|
/// Please notice: When using the debug environment, MultiThreadedRng uses a smaller buffer size. Please ensure,
|
||||||
|
/// that the production environment uses a release build, though.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class MultiThreadedRng : IRandom, IDisposable
|
||||||
{
|
{
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
private const int CAPACITY_RANDOM_NUMBERS_4_SOURCE = 10_000;
|
private const int BUFFER_SIZE = 10_000;
|
||||||
#else
|
#else
|
||||||
private const int CAPACITY_RANDOM_NUMBERS_4_SOURCE = 16_000_000;
|
private const int BUFFER_SIZE = 1_000_000;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
private readonly CancellationTokenSource producerTokenSource = new CancellationTokenSource();
|
// The queue size means, how many buffer we store in a queue at the same time:
|
||||||
private readonly object syncUintGenerators = new object();
|
private const int QUEUE_SIZE = 2;
|
||||||
private readonly object syncUniformDistributedFloatGenerators = new object();
|
|
||||||
private readonly Thread[] producerRandomUint = new Thread[2];
|
|
||||||
private readonly Thread[] producerRandomUniformDistributedFloat = new Thread[2];
|
|
||||||
|
|
||||||
|
// Gets used to stop the producer threads:
|
||||||
|
private readonly CancellationTokenSource producerTokenSource = new CancellationTokenSource();
|
||||||
|
|
||||||
|
// The time a thread waits e.g. to check if the queue needs a new buffer:
|
||||||
|
private readonly TimeSpan waiter = TimeSpan.FromMilliseconds(10);
|
||||||
|
|
||||||
|
// The first queue, where to store buffers of random uint numbers:
|
||||||
|
private readonly ConcurrentQueue<uint[]> queueIntegers = new ConcurrentQueue<uint[]>();
|
||||||
|
|
||||||
|
// The second queue, where to store buffers of uniform random floating point numbers:
|
||||||
|
private readonly ConcurrentQueue<float[]> queueFloats = new ConcurrentQueue<float[]>();
|
||||||
|
|
||||||
|
// The uint producer thread:
|
||||||
|
private Thread producerRandomUint;
|
||||||
|
|
||||||
|
// The uniform float producer thread:
|
||||||
|
private Thread producerRandomUniformDistributedFloat;
|
||||||
|
|
||||||
|
// Variable w and z for the uint generator. Both get used
|
||||||
|
// as seeding variable as well (cf. constructors)
|
||||||
private uint mW;
|
private uint mW;
|
||||||
private uint mZ;
|
private uint mZ;
|
||||||
|
|
||||||
private readonly Channel<uint> channelRandomUint = Channel.CreateBounded<uint>(new BoundedChannelOptions(CAPACITY_RANDOM_NUMBERS_4_SOURCE)
|
// This is the current buffer for the consumer side i.e. the public interfaces:
|
||||||
{
|
private float[] currentBuffer = Array.Empty<float>();
|
||||||
FullMode = BoundedChannelFullMode.Wait,
|
|
||||||
SingleReader = false,
|
|
||||||
SingleWriter = false,
|
|
||||||
});
|
|
||||||
|
|
||||||
private readonly Channel<float> channelRandomUniformDistributedFloat = Channel.CreateBounded<float>(new BoundedChannelOptions(CAPACITY_RANDOM_NUMBERS_4_SOURCE)
|
// The current pointer to the next current buffer's address to read from:
|
||||||
{
|
private int currentBufferPointer = BUFFER_SIZE;
|
||||||
FullMode = BoundedChannelFullMode.Wait,
|
|
||||||
SingleReader = false,
|
|
||||||
SingleWriter = false,
|
|
||||||
});
|
|
||||||
|
|
||||||
#region Constructors
|
#region Constructors
|
||||||
|
|
||||||
@ -54,39 +77,30 @@ namespace FastRng.Float
|
|||||||
var now = DateTime.Now;
|
var now = DateTime.Now;
|
||||||
var ticks = now.Ticks;
|
var ticks = now.Ticks;
|
||||||
this.mW = (uint) (ticks >> 16);
|
this.mW = (uint) (ticks >> 16);
|
||||||
this.mZ = (uint) (ticks % 4294967296);
|
this.mZ = (uint) (ticks % 4_294_967_296);
|
||||||
this.StartProducerThreads(deterministic: false);
|
this.StartProducerThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
public MultiThreadedRng(uint seedU)
|
public MultiThreadedRng(uint seedU)
|
||||||
{
|
{
|
||||||
this.mW = seedU;
|
this.mW = seedU;
|
||||||
this.mZ = 362436069;
|
this.mZ = 362_436_069;
|
||||||
this.StartProducerThreads(deterministic: true);
|
this.StartProducerThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
public MultiThreadedRng(uint seedU, uint seedV)
|
public MultiThreadedRng(uint seedU, uint seedV)
|
||||||
{
|
{
|
||||||
this.mW = seedU;
|
this.mW = seedU;
|
||||||
this.mZ = seedV;
|
this.mZ = seedV;
|
||||||
this.StartProducerThreads(deterministic: true);
|
this.StartProducerThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void StartProducerThreads(bool deterministic = false)
|
private void StartProducerThreads()
|
||||||
{
|
{
|
||||||
this.producerRandomUint[0] = new Thread(() => this.RandomProducerUint(this.channelRandomUint.Writer, this.producerTokenSource.Token)) {IsBackground = true};
|
this.producerRandomUint = new Thread(() => this.RandomProducerUint(this.producerTokenSource.Token)) {IsBackground = true};
|
||||||
this.producerRandomUint[1] = new Thread(() => this.RandomProducerUint(this.channelRandomUint.Writer, this.producerTokenSource.Token)) {IsBackground = true};
|
this.producerRandomUint.Start();
|
||||||
this.producerRandomUint[0].Start();
|
this.producerRandomUniformDistributedFloat = new Thread(() => this.RandomProducerUniformDistributedFloat(this.producerTokenSource.Token)) {IsBackground = true};
|
||||||
|
this.producerRandomUniformDistributedFloat.Start();
|
||||||
if(!deterministic)
|
|
||||||
this.producerRandomUint[1].Start();
|
|
||||||
|
|
||||||
this.producerRandomUniformDistributedFloat[0] = new Thread(() => this.RandomProducerUniformDistributedFloat(this.channelRandomUint.Reader, channelRandomUniformDistributedFloat.Writer, this.producerTokenSource.Token)) {IsBackground = true};
|
|
||||||
this.producerRandomUniformDistributedFloat[1] = new Thread(() => this.RandomProducerUniformDistributedFloat(this.channelRandomUint.Reader, channelRandomUniformDistributedFloat.Writer, this.producerTokenSource.Token)) {IsBackground = true};
|
|
||||||
this.producerRandomUniformDistributedFloat[0].Start();
|
|
||||||
|
|
||||||
if(!deterministic)
|
|
||||||
this.producerRandomUniformDistributedFloat[1].Start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
@ -94,25 +108,44 @@ namespace FastRng.Float
|
|||||||
#region Producers
|
#region Producers
|
||||||
|
|
||||||
[ExcludeFromCodeCoverage]
|
[ExcludeFromCodeCoverage]
|
||||||
private async void RandomProducerUint(ChannelWriter<uint> channelWriter, CancellationToken cancellationToken)
|
private async void RandomProducerUint(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var buffer = new uint[CAPACITY_RANDOM_NUMBERS_4_SOURCE];
|
|
||||||
while (!cancellationToken.IsCancellationRequested)
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
lock (syncUintGenerators)
|
// A local next buffer, which gets filled next:
|
||||||
{
|
var nextBuffer = new uint[BUFFER_SIZE];
|
||||||
for (var n = 0; n < buffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
|
||||||
|
// Produce the necessary number of random uints:
|
||||||
|
for (var n = 0; n < nextBuffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
||||||
{
|
{
|
||||||
this.mZ = 36_969 * (this.mZ & 65_535) + (this.mZ >> 16);
|
this.mZ = 36_969 * (this.mZ & 65_535) + (this.mZ >> 16);
|
||||||
this.mW = 18_000 * (this.mW & 65_535) + (this.mW >> 16);
|
this.mW = 18_000 * (this.mW & 65_535) + (this.mW >> 16);
|
||||||
buffer[n] = (this.mZ << 16) + this.mW;
|
nextBuffer[n] = (this.mZ << 16) + this.mW;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (var n = 0; n < buffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
// Inside this loop, we try to enqueue the produced buffer:
|
||||||
await channelWriter.WriteAsync(buffer[n], cancellationToken);
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Ensure, that we do not produce more buffers, as configured:
|
||||||
|
if (this.queueIntegers.Count < QUEUE_SIZE)
|
||||||
|
{
|
||||||
|
this.queueIntegers.Enqueue(nextBuffer);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The queue was full. Wait a moment and try it again:
|
||||||
|
await Task.Delay(this.waiter, cancellationToken);
|
||||||
|
}
|
||||||
|
catch (TaskCanceledException)
|
||||||
|
{
|
||||||
|
// The producers should be stopped:
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
@ -121,23 +154,50 @@ namespace FastRng.Float
|
|||||||
}
|
}
|
||||||
|
|
||||||
[ExcludeFromCodeCoverage]
|
[ExcludeFromCodeCoverage]
|
||||||
private async void RandomProducerUniformDistributedFloat(ChannelReader<uint> channelReaderUint, ChannelWriter<float> channelWriter, CancellationToken cancellationToken)
|
private async void RandomProducerUniformDistributedFloat(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var buffer = new float[CAPACITY_RANDOM_NUMBERS_4_SOURCE];
|
|
||||||
var randomUint = new uint[CAPACITY_RANDOM_NUMBERS_4_SOURCE];
|
|
||||||
while (!cancellationToken.IsCancellationRequested)
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
for (var n = 0; n < randomUint.Length; n++)
|
// A local source buffer of uints:
|
||||||
randomUint[n] = await channelReaderUint.ReadAsync(cancellationToken);
|
uint[] bufferSource = null;
|
||||||
|
|
||||||
lock (syncUniformDistributedFloatGenerators)
|
// Try to get the next source buffer:
|
||||||
for (var n = 0; n < buffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
while (!this.queueIntegers.TryDequeue(out bufferSource) && !cancellationToken.IsCancellationRequested)
|
||||||
buffer[n] = (randomUint[n] + 1.0f) * 2.328306435454494e-10f; // 2.328 => 1/(2^32 + 2)
|
await Task.Delay(this.waiter, cancellationToken);
|
||||||
|
|
||||||
for (var n = 0; n < buffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
// Case: The producers should be stopped:
|
||||||
await channelWriter.WriteAsync(buffer[n], cancellationToken);
|
if(bufferSource == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// A local buffer to fill with uniform floats:
|
||||||
|
var nextBuffer = new float[BUFFER_SIZE];
|
||||||
|
|
||||||
|
// Generate the necessary number of floats:
|
||||||
|
for (var n = 0; n < nextBuffer.Length && !cancellationToken.IsCancellationRequested; n++)
|
||||||
|
nextBuffer[n] = (bufferSource[n] + 1.0f) * 2.328306435454494e-10f;
|
||||||
|
|
||||||
|
// Inside this loop, we try to enqueue the generated buffer:
|
||||||
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Ensure, that the queue contains only the configured number of buffers:
|
||||||
|
if (this.queueFloats.Count < QUEUE_SIZE)
|
||||||
|
{
|
||||||
|
this.queueFloats.Enqueue(nextBuffer);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The queue was full. Wait a moment and try it again:
|
||||||
|
await Task.Delay(this.waiter, cancellationToken);
|
||||||
|
}
|
||||||
|
catch (TaskCanceledException)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
@ -151,16 +211,94 @@ namespace FastRng.Float
|
|||||||
|
|
||||||
public async ValueTask<float> GetUniform(CancellationToken cancel = default)
|
public async ValueTask<float> GetUniform(CancellationToken cancel = default)
|
||||||
{
|
{
|
||||||
|
while (!cancel.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
// Check, if we need a new buffer to read from:
|
||||||
|
if (this.currentBufferPointer >= BUFFER_SIZE)
|
||||||
|
{
|
||||||
|
// Create a local copy of the current buffer's pointer:
|
||||||
|
var currentBufferReference = this.currentBuffer;
|
||||||
|
|
||||||
|
// Here, we store the next buffer until we implement it:
|
||||||
|
var nextBuffer = Array.Empty<float>();
|
||||||
|
|
||||||
|
// Try to get the next buffer from the queue:
|
||||||
|
while (this.currentBufferPointer >= BUFFER_SIZE && currentBufferReference == this.currentBuffer && !this.queueFloats.TryDequeue(out nextBuffer))
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// Case: There is no next buffer available.
|
||||||
|
// Must wait for producer(s) to provide next.
|
||||||
|
//
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
return await this.channelRandomUniformDistributedFloat.Reader.ReadAsync(cancel);
|
await Task.Delay(this.waiter, cancel);
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (TaskCanceledException)
|
||||||
{
|
{
|
||||||
|
//
|
||||||
|
// Case: The consumer cancelled the request.
|
||||||
|
//
|
||||||
return float.NaN;
|
return float.NaN;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Note: In general, it does not matter if the following compare-exchange is successful.
|
||||||
|
// 1st case: It was successful -- everything is fine. But we are responsible to re-set the currentBufferPointer.
|
||||||
|
// 2nd case: It was not successful. This means, that another thread was successful, though.
|
||||||
|
// That case is fine as well. But we would loose one buffer of work. Thus, we
|
||||||
|
// check for this case and preserve the buffer full of work.
|
||||||
|
//
|
||||||
|
|
||||||
|
// Try to implement the dequeued buffer without locking other threads:
|
||||||
|
if (Interlocked.CompareExchange(ref this.currentBuffer, nextBuffer, currentBufferReference) != currentBufferReference)
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// Case: Another thread updated the buffer already.
|
||||||
|
// Thus, we enqueue our copy of the next buffer to preserve it.
|
||||||
|
//
|
||||||
|
this.queueFloats.Enqueue(nextBuffer);
|
||||||
|
|
||||||
|
// Next? We can go ahead and yield a random number...
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// Case: We updated the buffer.
|
||||||
|
//
|
||||||
|
this.currentBufferPointer = 0;
|
||||||
|
|
||||||
|
// Next? We can go ahead and yield a random number...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Made a local copy of the current pointer:
|
||||||
|
var myPointer = this.currentBufferPointer;
|
||||||
|
|
||||||
|
// Increment the pointer for the next thread or call:
|
||||||
|
var nextPointer = myPointer + 1;
|
||||||
|
|
||||||
|
// Try to update the pointer without locking other threads:
|
||||||
|
if (Interlocked.CompareExchange(ref this.currentBufferPointer, nextPointer, myPointer) == myPointer)
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// Case: Success. We updated the pointer and, thus, can use the pointer to read a number.
|
||||||
|
//
|
||||||
|
return this.currentBuffer[myPointer];
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Case: Another thread updated the pointer already. Must restart the process
|
||||||
|
// to get a random number.
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Case: The consumer cancelled the request.
|
||||||
|
//
|
||||||
|
return float.NaN;
|
||||||
|
}
|
||||||
|
|
||||||
public async ValueTask<uint> NextNumber(uint rangeStart, uint rangeEnd, IDistribution distribution, CancellationToken cancel = default)
|
public async ValueTask<uint> NextNumber(uint rangeStart, uint rangeEnd, IDistribution distribution, CancellationToken cancel = default)
|
||||||
{
|
{
|
||||||
if (rangeStart > rangeEnd)
|
if (rangeStart > rangeEnd)
|
||||||
@ -213,6 +351,8 @@ namespace FastRng.Float
|
|||||||
|
|
||||||
public void StopProducer() => this.producerTokenSource.Cancel();
|
public void StopProducer() => this.producerTokenSource.Cancel();
|
||||||
|
|
||||||
|
public void Dispose() => this.StopProducer();
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user