mirror of
https://github.com/goatcorp/Dalamud.git
synced 2025-12-12 18:27:23 +01:00
Happy Eyeballs Rev 2: The DNS-ening
Instead of simply trying a V6 connection and following it with a plain V4 connection, this method relies on DNS to make some smarter decisions on resolution. This improves performance and reliability for users (and servers) without IPv6 connectivity at all. - Switch to a DNS-driven strategy for Happy Eyeballs - Add a Zipper Merge and CancellableDelay utility to the corups - Remove state from HappyEyeballsCallback as it largely went unused - Fix a couple small implementation bugs - Suppress innocent (unhandled) exceptions.
This commit is contained in:
parent
66f955c87c
commit
76190d7e59
4 changed files with 136 additions and 68 deletions
|
|
@ -1,12 +1,14 @@
|
|||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using Dalamud.Logging.Internal;
|
||||
using Dalamud.Utility;
|
||||
|
||||
namespace Dalamud.Networking.Http;
|
||||
|
|
@ -14,34 +16,36 @@ namespace Dalamud.Networking.Http;
|
|||
// Inspired by and adapted from https://github.com/jellyfin/jellyfin/pull/8598
|
||||
|
||||
/// <summary>
|
||||
/// A class to provide a <see cref="SocketsHttpHandler.ConnectCallback"/> method (and tracked state) to implement a
|
||||
/// variant of the Happy Eyeballs algorithm for HTTP connections to dual-stack servers.
|
||||
///
|
||||
/// Each instance of this class tracks its own state.
|
||||
/// A class to provide a <see cref="SocketsHttpHandler.ConnectCallback"/> method to implement a variant of the Happy
|
||||
/// Eyeballs algorithm for HTTP connections to dual-stack servers.
|
||||
/// </summary>
|
||||
public class HappyEyeballsCallback : IDisposable
|
||||
{
|
||||
private readonly ConcurrentDictionary<DnsEndPoint, AddressFamily> addressFamilyCache = new();
|
||||
private static readonly ModuleLog Log = new("HTTP");
|
||||
|
||||
private readonly AddressFamily? forcedAddressFamily;
|
||||
private readonly int ipv6GracePeriod;
|
||||
/*
|
||||
* ToDo: Eventually add in some kind of state management to cache DNS and IP Family.
|
||||
* For now, this is ignored as the HTTPClient will keep connections alive, but there are benefits to sharing
|
||||
* cached lookups between different clients. We just need to be able to easily expire those lookups first.
|
||||
*/
|
||||
|
||||
private readonly AddressFamily forcedAddressFamily;
|
||||
private readonly int connectionBackoff;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="HappyEyeballsCallback"/> class.
|
||||
/// </summary>
|
||||
/// <param name="forcedAddressFamily">Optional override to force a specific AddressFamily.</param>
|
||||
/// <param name="ipv6GracePeriod">Grace period for IPv6 connectivity before starting IPv4 attempt.</param>
|
||||
public HappyEyeballsCallback(AddressFamily? forcedAddressFamily = null, int ipv6GracePeriod = 100)
|
||||
/// <param name="connectionBackoff">Backoff time between concurrent connection attempts.</param>
|
||||
public HappyEyeballsCallback(AddressFamily? forcedAddressFamily = null, int connectionBackoff = 75)
|
||||
{
|
||||
this.forcedAddressFamily = forcedAddressFamily;
|
||||
this.ipv6GracePeriod = ipv6GracePeriod;
|
||||
this.forcedAddressFamily = forcedAddressFamily ?? AddressFamily.Unspecified;
|
||||
this.connectionBackoff = connectionBackoff;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void Dispose()
|
||||
{
|
||||
this.addressFamilyCache.Clear();
|
||||
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
|
|
@ -53,75 +57,59 @@ public class HappyEyeballsCallback : IDisposable
|
|||
/// <returns>Returns a Stream for consumption by HttpClient.</returns>
|
||||
public async ValueTask<Stream> ConnectCallback(SocketsHttpConnectionContext context, CancellationToken token)
|
||||
{
|
||||
var addressFamilyOverride = this.GetAddressFamilyOverride(context);
|
||||
var sortedRecords = await this.GetSortedAddresses(context.DnsEndPoint.Host, token);
|
||||
|
||||
if (addressFamilyOverride.HasValue)
|
||||
var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(token);
|
||||
var tasks = new List<Task<NetworkStream>>();
|
||||
|
||||
var delayCts = CancellationTokenSource.CreateLinkedTokenSource(linkedToken.Token);
|
||||
for (var i = 0; i < sortedRecords.Count; i++)
|
||||
{
|
||||
return this.AttemptConnection(addressFamilyOverride.Value, context, token).GetAwaiter().GetResult();
|
||||
var record = sortedRecords[i];
|
||||
|
||||
delayCts.CancelAfter(this.connectionBackoff * i);
|
||||
|
||||
var task = this.AttemptConnection(record, context.DnsEndPoint.Port, linkedToken.Token, delayCts.Token);
|
||||
tasks.Add(task);
|
||||
|
||||
var nextDelayCts = CancellationTokenSource.CreateLinkedTokenSource(linkedToken.Token);
|
||||
_ = task.ContinueWith(_ => { nextDelayCts.Cancel(); }, TaskContinuationOptions.OnlyOnFaulted);
|
||||
delayCts = nextDelayCts;
|
||||
}
|
||||
|
||||
using var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(token);
|
||||
var stream = await AsyncUtils.FirstSuccessfulTask(tasks).ConfigureAwait(false);
|
||||
Log.Verbose($"Established connection to {context.DnsEndPoint.Host} at {stream.Socket.RemoteEndPoint}");
|
||||
|
||||
NetworkStream stream;
|
||||
|
||||
// Give IPv6 a chance to connect first.
|
||||
// However, only give it ipv4WaitMillis to connect before throwing IPv4 into the mix.
|
||||
var tryConnectIPv6 = this.AttemptConnection(AddressFamily.InterNetworkV6, context, linkedToken.Token);
|
||||
var timedV6Attempt = Task.WhenAny(tryConnectIPv6, Task.Delay(this.ipv6GracePeriod, linkedToken.Token));
|
||||
|
||||
if (await timedV6Attempt == tryConnectIPv6 && tryConnectIPv6.IsCompletedSuccessfully)
|
||||
// If we're here, it means we have a successful connection. A failure to connect would have caused the above
|
||||
// line to explode, so we're safe to clean everything up.
|
||||
linkedToken.Cancel();
|
||||
tasks.ForEach(task =>
|
||||
{
|
||||
stream = tryConnectIPv6.GetAwaiter().GetResult();
|
||||
}
|
||||
else
|
||||
{
|
||||
var race = AsyncUtils.FirstSuccessfulTask(new List<Task<NetworkStream>>
|
||||
{
|
||||
tryConnectIPv6,
|
||||
this.AttemptConnection(AddressFamily.InterNetwork, context, linkedToken.Token),
|
||||
});
|
||||
// Suppress any other exceptions that come down the pipe. We've already thrown an exception we cared
|
||||
// about above.
|
||||
task.ContinueWith(inner => { _ = inner.Exception; });
|
||||
});
|
||||
|
||||
// If our connections all fail, this will explode with an exception.
|
||||
stream = race.GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
this.addressFamilyCache[context.DnsEndPoint] = stream.Socket.AddressFamily;
|
||||
return stream;
|
||||
}
|
||||
|
||||
private AddressFamily? GetAddressFamilyOverride(SocketsHttpConnectionContext context)
|
||||
private async Task<NetworkStream> AttemptConnection(IPAddress address, int port, CancellationToken token, CancellationToken delayToken)
|
||||
{
|
||||
if (this.forcedAddressFamily.HasValue)
|
||||
await AsyncUtils.CancellableDelay(-1, delayToken).ConfigureAwait(false);
|
||||
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
return this.forcedAddressFamily.Value;
|
||||
return Task.FromCanceled<NetworkStream>(token).Result;
|
||||
}
|
||||
|
||||
// Force IPv4 if IPv6 support isn't detected to avoid the resolution delay.
|
||||
if (!Socket.OSSupportsIPv6)
|
||||
{
|
||||
return AddressFamily.InterNetwork;
|
||||
}
|
||||
|
||||
if (this.addressFamilyCache.TryGetValue(context.DnsEndPoint, out var cachedValue))
|
||||
{
|
||||
// TODO: Find some way to delete this after a while.
|
||||
return cachedValue;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private async Task<NetworkStream> AttemptConnection(
|
||||
AddressFamily family, SocketsHttpConnectionContext context, CancellationToken token)
|
||||
{
|
||||
var socket = new Socket(family, SocketType.Stream, ProtocolType.Tcp)
|
||||
var socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
|
||||
{
|
||||
NoDelay = true,
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
await socket.ConnectAsync(context.DnsEndPoint, token).ConfigureAwait(false);
|
||||
await socket.ConnectAsync(address, port, token).ConfigureAwait(false);
|
||||
return new NetworkStream(socket, ownsSocket: true);
|
||||
}
|
||||
catch
|
||||
|
|
@ -130,4 +118,19 @@ public class HappyEyeballsCallback : IDisposable
|
|||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<List<IPAddress>> GetSortedAddresses(string hostname, CancellationToken token)
|
||||
{
|
||||
// This method abuses DNS ordering and LINQ a bit. We can normally assume that "addresses" will be provided in
|
||||
// the order the system wants to use. GroupBy will return its groups *in the order they're discovered*. Meaning,
|
||||
// the first group created will always be the "preferred" group, and all other groups are in preference order.
|
||||
// This means a straight zipper merge is nice and clean and gives us most -> least preferred, repeating.
|
||||
var dnsRecords = await Dns.GetHostAddressesAsync(hostname, this.forcedAddressFamily, token);
|
||||
|
||||
var groups = dnsRecords
|
||||
.GroupBy(a => a.AddressFamily)
|
||||
.Select(g => g.Select(v => v)).ToArray();
|
||||
|
||||
return Util.ZipperMerge(groups).ToList();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ internal class HappyHttpClient : IDisposable, IServiceType
|
|||
this.SharedHttpClient = new HttpClient(new SocketsHttpHandler
|
||||
{
|
||||
AutomaticDecompression = DecompressionMethods.All,
|
||||
ConnectCallback = new HappyEyeballsCallback().ConnectCallback,
|
||||
ConnectCallback = this.SharedHappyEyeballsCallback.ConnectCallback,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -40,8 +40,8 @@ internal class HappyHttpClient : IDisposable, IServiceType
|
|||
/// Gets a <see cref="HappyEyeballsCallback"/> meant to be shared across any custom <see cref="HttpClient"/>s that
|
||||
/// need to be made in other parts of the application.
|
||||
///
|
||||
/// This should be used when shared callback/IPv6 cache state is desired across multiple clients, as sharing the
|
||||
/// SocketsHandler may lead to GC issues.
|
||||
/// This should be used when shared callback state is desired across multiple clients, as sharing the SocketsHandler
|
||||
/// may lead to GC issues.
|
||||
/// </summary>
|
||||
public HappyEyeballsCallback SharedHappyEyeballsCallback { get; }
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ public static class AsyncUtils
|
|||
{
|
||||
/// <summary>
|
||||
/// Race a set of tasks, returning either the first to succeed or an aggregate of all exceptions. This helper does
|
||||
/// not perform any automatic cancellation of losing tasks.
|
||||
/// not perform any automatic cancellation of losing tasks, nor does it handle exceptions of losing tasks.
|
||||
/// </summary>
|
||||
/// <remarks>Derived from <a href="https://stackoverflow.com/a/37529395">this StackOverflow post</a>.</remarks>
|
||||
/// <param name="tasks">A list of tasks to race.</param>
|
||||
|
|
@ -29,7 +29,7 @@ public static class AsyncUtils
|
|||
{
|
||||
task.ContinueWith(t =>
|
||||
{
|
||||
if (task.IsCompletedSuccessfully)
|
||||
if (t.IsCompletedSuccessfully)
|
||||
{
|
||||
tcs.TrySetResult(t.Result);
|
||||
}
|
||||
|
|
@ -42,4 +42,19 @@ public static class AsyncUtils
|
|||
|
||||
return tcs.Task;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Provide a <see cref="Task.Delay(int, CancellationToken)"/> that won't throw an exception when it's canceled.
|
||||
/// </summary>
|
||||
/// <inheritdoc cref="Task.Delay(int, CancellationToken)"/>
|
||||
public static async Task CancellableDelay(int millisecondsDelay, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(millisecondsDelay, cancellationToken);
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -556,6 +556,56 @@ public static class Util
|
|||
Process.Start(process);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Perform a "zipper merge" (A, 1, B, 2, C, 3) of multiple enumerables, allowing for lists to end early.
|
||||
/// </summary>
|
||||
/// <param name="sources">A set of enumerable sources to combine.</param>
|
||||
/// <typeparam name="TSource">The resulting type of the merged list to return.</typeparam>
|
||||
/// <returns>A new enumerable, consisting of the final merge of all lists.</returns>
|
||||
public static IEnumerable<TSource> ZipperMerge<TSource>(params IEnumerable<TSource>[] sources)
|
||||
{
|
||||
// Borrowed from https://codereview.stackexchange.com/a/263451, thank you!
|
||||
var enumerators = new IEnumerator<TSource>[sources.Length];
|
||||
try
|
||||
{
|
||||
for (var i = 0; i < sources.Length; i++)
|
||||
{
|
||||
enumerators[i] = sources[i].GetEnumerator();
|
||||
}
|
||||
|
||||
var hasNext = new bool[enumerators.Length];
|
||||
|
||||
bool MoveNext()
|
||||
{
|
||||
var anyHasNext = false;
|
||||
for (var i = 0; i < enumerators.Length; i++)
|
||||
{
|
||||
anyHasNext |= hasNext[i] = enumerators[i].MoveNext();
|
||||
}
|
||||
|
||||
return anyHasNext;
|
||||
}
|
||||
|
||||
while (MoveNext())
|
||||
{
|
||||
for (var i = 0; i < enumerators.Length; i++)
|
||||
{
|
||||
if (hasNext[i])
|
||||
{
|
||||
yield return enumerators[i].Current;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
foreach (var enumerator in enumerators)
|
||||
{
|
||||
enumerator?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Dispose this object.
|
||||
/// </summary>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue