using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Dalamud.Utility;
/// Base class for loading resources in dynamic order.
internal class DynamicPriorityQueueLoader : IDisposable
{
private readonly CancellationTokenSource disposeCancellationTokenSource = new();
private readonly Task adderTask;
private readonly Task[] workerTasks;
private readonly Channel newItemChannel;
private readonly Channel workTokenChannel;
private readonly List workItemPending = new();
private bool disposing;
/// Initializes a new instance of the class.
/// Maximum number of concurrent load tasks.
public DynamicPriorityQueueLoader(int concurrency)
{
this.newItemChannel = Channel.CreateUnbounded(new() { SingleReader = true });
this.workTokenChannel = Channel.CreateUnbounded(new() { SingleWriter = true });
this.adderTask = Task.Run(this.LoopAddWorkItemAsync);
this.workerTasks = new Task[concurrency];
foreach (ref var task in this.workerTasks.AsSpan())
task = Task.Run(this.LoopProcessWorkItemAsync);
}
/// Provider for priority metrics.
internal interface IThrottleBasisProvider
{
/// Gets a value indicating whether the resource is requested in an opportunistic way.
bool IsOpportunistic { get; }
/// Gets the first requested tick count from .
long FirstRequestedTick { get; }
/// Gets the latest requested tick count from .
long LatestRequestedTick { get; }
}
///
public void Dispose()
{
if (this.disposing)
return;
this.disposing = true;
this.newItemChannel.Writer.Complete();
this.workTokenChannel.Writer.Complete();
this.disposeCancellationTokenSource.Cancel();
this.adderTask.Wait();
Task.WaitAll(this.workerTasks);
_ = this.adderTask.Exception;
foreach (var t in this.workerTasks)
_ = t.Exception;
}
/// Loads a resource according to some order.
/// The type of resource.
/// The throttle basis. null may be used to create a new instance of
/// that is not opportunistic with time values of now.
/// The immediate load function.
/// The cancellation token.
/// Disposables to dispose when the task completes.
/// The task.
///
/// may throw immediately without returning anything, or the returned
/// may complete in failure.
///
public Task LoadAsync(
IThrottleBasisProvider? basis,
Func> immediateLoadFunction,
CancellationToken cancellationToken,
params IDisposable?[] disposables)
{
basis ??= new ReadOnlyThrottleBasisProvider();
var work = new WorkItem(basis, immediateLoadFunction, cancellationToken, disposables);
if (this.newItemChannel.Writer.TryWrite(work))
return work.Task;
work.Dispose();
return Task.FromException(new ObjectDisposedException(this.GetType().Name));
}
/// Continuously transfers work items added from to
/// , until all items are transferred and is called.
private async Task LoopAddWorkItemAsync()
{
const int batchAddSize = 64;
var newWorks = new List(batchAddSize);
var reader = this.newItemChannel.Reader;
while (await reader.WaitToReadAsync())
{
while (newWorks.Count < batchAddSize && reader.TryRead(out var newWork))
newWorks.Add(newWork);
lock (this.workItemPending)
this.workItemPending.AddRange(newWorks);
for (var i = newWorks.Count; i > 0; i--)
this.workTokenChannel.Writer.TryWrite(null);
newWorks.Clear();
}
}
/// Continuously processes work items in , until all items are processed and
/// is called.
private async Task LoopProcessWorkItemAsync()
{
var reader = this.workTokenChannel.Reader;
while (await reader.WaitToReadAsync())
{
if (!reader.TryRead(out _))
continue;
if (this.ExtractHighestPriorityWorkItem() is not { } work)
continue;
await work.Process(this.disposeCancellationTokenSource.Token);
work.Dispose();
}
}
/// Extracts the work item with the highest priority from ,
/// and removes cancelled items, if any.
/// The order of items of is undefined after this function.
private WorkItem? ExtractHighestPriorityWorkItem()
{
lock (this.workItemPending)
{
for (var startIndex = 0; startIndex < this.workItemPending.Count - 1;)
{
var span = CollectionsMarshal.AsSpan(this.workItemPending)[startIndex..];
ref var lastRef = ref span[^1];
foreach (ref var itemRef in span[..^1])
{
if (itemRef.CancelAsRequested())
{
itemRef.Dispose();
itemRef = lastRef;
this.workItemPending.RemoveAt(this.workItemPending.Count - 1);
break;
}
if (itemRef.CompareTo(lastRef) < 0)
(itemRef, lastRef) = (lastRef, itemRef);
startIndex++;
}
}
if (this.workItemPending.Count == 0)
return null;
var last = this.workItemPending[^1];
this.workItemPending.RemoveAt(this.workItemPending.Count - 1);
if (last.CancelAsRequested())
{
last.Dispose();
return null;
}
return last;
}
}
/// A read-only implementation of .
private class ReadOnlyThrottleBasisProvider : IThrottleBasisProvider
{
///
public bool IsOpportunistic { get; init; } = false;
///
public long FirstRequestedTick { get; init; } = Environment.TickCount64;
///
public long LatestRequestedTick { get; init; } = Environment.TickCount64;
}
/// Represents a work item added from .
private abstract class WorkItem : IComparable, IDisposable
{
private readonly IThrottleBasisProvider basis;
private readonly IDisposable?[] disposables;
protected WorkItem(
IThrottleBasisProvider basis,
CancellationToken cancellationToken,
params IDisposable?[] disposables)
{
this.basis = basis;
this.CancellationToken = cancellationToken;
this.disposables = disposables;
}
protected CancellationToken CancellationToken { get; }
public void Dispose()
{
foreach (ref var d in this.disposables.AsSpan())
Interlocked.Exchange(ref d, null)?.Dispose();
}
public int CompareTo(WorkItem other)
{
if (this.basis.IsOpportunistic != other.basis.IsOpportunistic)
return this.basis.IsOpportunistic ? 1 : -1;
if (this.basis.IsOpportunistic)
return -this.basis.LatestRequestedTick.CompareTo(other.basis.LatestRequestedTick);
return this.basis.FirstRequestedTick.CompareTo(other.basis.FirstRequestedTick);
}
public abstract bool CancelAsRequested();
public abstract ValueTask Process(CancellationToken serviceDisposeToken);
}
/// Typed version of .
private sealed class WorkItem : WorkItem
{
private readonly TaskCompletionSource taskCompletionSource;
private readonly Func> immediateLoadFunction;
public WorkItem(
IThrottleBasisProvider basis,
Func> immediateLoadFunction,
CancellationToken cancellationToken,
params IDisposable?[] disposables)
: base(basis, cancellationToken, disposables)
{
this.taskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
this.immediateLoadFunction = immediateLoadFunction;
}
public Task Task => this.taskCompletionSource.Task;
public override bool CancelAsRequested()
{
if (!this.CancellationToken.IsCancellationRequested)
return false;
// Cancel the load task and move on.
this.taskCompletionSource.TrySetCanceled(this.CancellationToken);
// Suppress the OperationCanceledException caused from the above.
_ = this.taskCompletionSource.Task.Exception;
return true;
}
public override async ValueTask Process(CancellationToken serviceDisposeToken)
{
try
{
T wrap;
if (this.CancellationToken.CanBeCanceled)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(
serviceDisposeToken,
this.CancellationToken);
wrap = await this.immediateLoadFunction(cts.Token);
}
else
{
wrap = await this.immediateLoadFunction(serviceDisposeToken);
}
if (!this.taskCompletionSource.TrySetResult(wrap))
(wrap as IDisposable)?.Dispose();
}
catch (Exception e)
{
this.taskCompletionSource.TrySetException(e);
_ = this.taskCompletionSource.Task.Exception;
}
}
}
}