TextureLoadThrottler: make CreateLoader never block

This commit is contained in:
Soreepeong 2024-02-28 19:59:36 +09:00
parent f8492dc06b
commit 55b5c5094a

View file

@ -1,6 +1,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis; using System.Diagnostics.CodeAnalysis;
using System.Threading; using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Dalamud.Interface.Internal; namespace Dalamud.Interface.Internal;
@ -9,15 +10,25 @@ namespace Dalamud.Interface.Internal;
/// Service for managing texture loads. /// Service for managing texture loads.
/// </summary> /// </summary>
[ServiceManager.EarlyLoadedService] [ServiceManager.EarlyLoadedService]
internal class TextureLoadThrottler : IServiceType internal class TextureLoadThrottler : IServiceType, IDisposable
{ {
private readonly CancellationTokenSource disposeCancellationTokenSource = new();
private readonly Task adderTask;
private readonly Task[] workerTasks;
private readonly object workListLock = new(); private readonly object workListLock = new();
private readonly List<WorkItem> pendingWorkList = new(); private readonly Channel<WorkItem> newItemChannel = Channel.CreateUnbounded<WorkItem>();
private readonly List<WorkItem> activeWorkList = new(); private readonly Channel<object?> workTokenChannel = Channel.CreateUnbounded<object?>();
private readonly List<WorkItem> workItemPending = new();
[ServiceManager.ServiceConstructor] [ServiceManager.ServiceConstructor]
private TextureLoadThrottler() => private TextureLoadThrottler()
this.MaxActiveWorkItems = Math.Min(64, Environment.ProcessorCount); {
this.adderTask = Task.Run(this.LoopAddWorkItemAsync);
this.workerTasks = new Task[Math.Min(64, Environment.ProcessorCount)];
foreach (ref var task in this.workerTasks.AsSpan())
task = Task.Run(this.LoopProcessWorkItemAsync);
}
/// <summary> /// <summary>
/// Basis for throttling. /// Basis for throttling.
@ -40,7 +51,20 @@ internal class TextureLoadThrottler : IServiceType
long LatestRequestedTick { get; } long LatestRequestedTick { get; }
} }
private int MaxActiveWorkItems { get; } /// <inheritdoc/>
public void Dispose()
{
this.newItemChannel.Writer.Complete();
this.workTokenChannel.Writer.Complete();
this.disposeCancellationTokenSource.Cancel();
this.adderTask.Wait();
Task.WaitAll(this.workerTasks);
_ = this.adderTask.Exception;
foreach (var t in this.workerTasks)
_ = t.Exception;
}
/// <summary> /// <summary>
/// Creates a texture loader. /// Creates a texture loader.
@ -62,84 +86,113 @@ internal class TextureLoadThrottler : IServiceType
ImmediateLoadFunction = immediateLoadFunction, ImmediateLoadFunction = immediateLoadFunction,
}; };
_ = Task.Run(() => this.ContinueWork(work), default); return
this.newItemChannel.Writer.TryWrite(work)
return work.TaskCompletionSource.Task; ? work.TaskCompletionSource.Task
: Task.FromException<IDalamudTextureWrap>(new ObjectDisposedException(nameof(TextureLoadThrottler)));
} }
private async Task ContinueWork(WorkItem? newItem) private async Task LoopAddWorkItemAsync()
{ {
while (true) var newWorkTemp = new List<WorkItem>();
var reader = this.newItemChannel.Reader;
while (!reader.Completion.IsCompleted)
{ {
WorkItem? minWork = null; await reader.WaitToReadAsync();
newWorkTemp.EnsureCapacity(reader.Count);
while (newWorkTemp.Count < newWorkTemp.Capacity && reader.TryRead(out var newWork))
newWorkTemp.Add(newWork);
lock (this.workListLock) lock (this.workListLock)
{ this.workItemPending.AddRange(newWorkTemp);
if (newItem is not null) for (var i = newWorkTemp.Count; i > 0; i--)
{ this.workTokenChannel.Writer.TryWrite(null);
this.pendingWorkList.Add(newItem); newWorkTemp.Clear();
newItem = null; }
} }
if (this.activeWorkList.Count >= this.MaxActiveWorkItems) private async Task LoopProcessWorkItemAsync()
return; {
var reader = this.workTokenChannel.Reader;
while (!reader.Completion.IsCompleted)
{
_ = await reader.ReadAsync();
var minIndex = -1; if (this.ExtractHighestPriorityWorkItem() is not { } work)
for (var i = 0; i < this.pendingWorkList.Count; i++) continue;
{
var work = this.pendingWorkList[i];
if (work.CancellationToken.IsCancellationRequested)
{
work.TaskCompletionSource.SetCanceled(work.CancellationToken);
_ = work.TaskCompletionSource.Task.Exception;
this.RelocatePendingWorkItemToEndAndEraseUnsafe(i--);
continue;
}
if (minIndex == -1 || work.CompareTo(this.pendingWorkList[minIndex]) < 0)
{
minIndex = i;
minWork = work;
}
}
if (minWork is null)
return;
this.RelocatePendingWorkItemToEndAndEraseUnsafe(minIndex);
this.activeWorkList.Add(minWork);
}
try try
{ {
var r = await minWork.ImmediateLoadFunction(minWork.CancellationToken); IDalamudTextureWrap wrap;
minWork.TaskCompletionSource.SetResult(r); if (work.CancellationToken.CanBeCanceled)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(
this.disposeCancellationTokenSource.Token,
work.CancellationToken);
wrap = await work.ImmediateLoadFunction(cts.Token);
}
else
{
wrap = await work.ImmediateLoadFunction(this.disposeCancellationTokenSource.Token);
}
work.TaskCompletionSource.SetResult(wrap);
} }
catch (Exception e) catch (Exception e)
{ {
minWork.TaskCompletionSource.SetException(e); work.TaskCompletionSource.SetException(e);
_ = minWork.TaskCompletionSource.Task.Exception; _ = work.TaskCompletionSource.Task.Exception;
}
}
}
private WorkItem? ExtractHighestPriorityWorkItem()
{
lock (this.workListLock)
{
WorkItem? highestPriorityWork = null;
var highestPriorityIndex = -1;
for (var i = 0; i < this.workItemPending.Count; i++)
{
var work = this.workItemPending[i];
if (work.CancellationToken.IsCancellationRequested)
{
work.TaskCompletionSource.SetCanceled(work.CancellationToken);
_ = work.TaskCompletionSource.Task.Exception;
this.RelocatePendingWorkItemToEndAndEraseUnsafe(i--);
continue;
}
if (highestPriorityIndex == -1 ||
work.CompareTo(this.workItemPending[highestPriorityIndex]) < 0)
{
highestPriorityIndex = i;
highestPriorityWork = work;
}
} }
lock (this.workListLock) if (highestPriorityWork is null)
this.activeWorkList.Remove(minWork); return null;
this.RelocatePendingWorkItemToEndAndEraseUnsafe(highestPriorityIndex);
return highestPriorityWork;
} }
} }
/// <summary> /// <summary>
/// Remove an item in <see cref="pendingWorkList"/>, avoiding shifting. /// Remove an item in <see cref="workItemPending"/>, avoiding shifting.
/// </summary> /// </summary>
/// <param name="index">Index of the item to remove.</param> /// <param name="index">Index of the item to remove.</param>
private void RelocatePendingWorkItemToEndAndEraseUnsafe(int index) private void RelocatePendingWorkItemToEndAndEraseUnsafe(int index)
{ {
// Relocate the element to remove to the last. // Relocate the element to remove to the last.
if (index != this.pendingWorkList.Count - 1) if (index != this.workItemPending.Count - 1)
{ {
(this.pendingWorkList[^1], this.pendingWorkList[index]) = (this.workItemPending[^1], this.workItemPending[index]) =
(this.pendingWorkList[index], this.pendingWorkList[^1]); (this.workItemPending[index], this.workItemPending[^1]);
} }
this.pendingWorkList.RemoveAt(this.pendingWorkList.Count - 1); this.workItemPending.RemoveAt(this.workItemPending.Count - 1);
} }
/// <summary> /// <summary>