Merge pull request #1189 from KazWolfe/happy-eyeballs2

Happy Eyeballs Rev 2: The DNS-ening
This commit is contained in:
goat 2023-05-06 14:54:52 +02:00 committed by GitHub
commit 0d8967c462
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 144 additions and 69 deletions

View file

@ -1,12 +1,14 @@
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq;
using System.Net; using System.Net;
using System.Net.Http; using System.Net.Http;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dalamud.Logging.Internal;
using Dalamud.Utility; using Dalamud.Utility;
namespace Dalamud.Networking.Http; namespace Dalamud.Networking.Http;
@ -14,34 +16,36 @@ namespace Dalamud.Networking.Http;
// Inspired by and adapted from https://github.com/jellyfin/jellyfin/pull/8598 // Inspired by and adapted from https://github.com/jellyfin/jellyfin/pull/8598
/// <summary> /// <summary>
/// A class to provide a <see cref="SocketsHttpHandler.ConnectCallback"/> method (and tracked state) to implement a /// A class to provide a <see cref="SocketsHttpHandler.ConnectCallback"/> method to implement a variant of the Happy
/// variant of the Happy Eyeballs algorithm for HTTP connections to dual-stack servers. /// Eyeballs algorithm for HTTP connections to dual-stack servers.
///
/// Each instance of this class tracks its own state.
/// </summary> /// </summary>
public class HappyEyeballsCallback : IDisposable public class HappyEyeballsCallback : IDisposable
{ {
private readonly ConcurrentDictionary<DnsEndPoint, AddressFamily> addressFamilyCache = new(); private static readonly ModuleLog Log = new("HTTP");
private readonly AddressFamily? forcedAddressFamily; /*
private readonly int ipv6GracePeriod; * ToDo: Eventually add in some kind of state management to cache DNS and IP Family.
* For now, this is ignored as the HTTPClient will keep connections alive, but there are benefits to sharing
* cached lookups between different clients. We just need to be able to easily expire those lookups first.
*/
private readonly AddressFamily forcedAddressFamily;
private readonly int connectionBackoff;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="HappyEyeballsCallback"/> class. /// Initializes a new instance of the <see cref="HappyEyeballsCallback"/> class.
/// </summary> /// </summary>
/// <param name="forcedAddressFamily">Optional override to force a specific AddressFamily.</param> /// <param name="forcedAddressFamily">Optional override to force a specific AddressFamily.</param>
/// <param name="ipv6GracePeriod">Grace period for IPv6 connectivity before starting IPv4 attempt.</param> /// <param name="connectionBackoff">Backoff time between concurrent connection attempts.</param>
public HappyEyeballsCallback(AddressFamily? forcedAddressFamily = null, int ipv6GracePeriod = 100) public HappyEyeballsCallback(AddressFamily? forcedAddressFamily = null, int connectionBackoff = 75)
{ {
this.forcedAddressFamily = forcedAddressFamily; this.forcedAddressFamily = forcedAddressFamily ?? AddressFamily.Unspecified;
this.ipv6GracePeriod = ipv6GracePeriod; this.connectionBackoff = connectionBackoff;
} }
/// <inheritdoc/> /// <inheritdoc/>
public void Dispose() public void Dispose()
{ {
this.addressFamilyCache.Clear();
GC.SuppressFinalize(this); GC.SuppressFinalize(this);
} }
@ -53,75 +57,54 @@ public class HappyEyeballsCallback : IDisposable
/// <returns>Returns a Stream for consumption by HttpClient.</returns> /// <returns>Returns a Stream for consumption by HttpClient.</returns>
public async ValueTask<Stream> ConnectCallback(SocketsHttpConnectionContext context, CancellationToken token) public async ValueTask<Stream> ConnectCallback(SocketsHttpConnectionContext context, CancellationToken token)
{ {
var addressFamilyOverride = this.GetAddressFamilyOverride(context); var sortedRecords = await this.GetSortedAddresses(context.DnsEndPoint.Host, token);
if (addressFamilyOverride.HasValue) var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(token);
var tasks = new List<Task<NetworkStream>>();
var delayCts = CancellationTokenSource.CreateLinkedTokenSource(linkedToken.Token);
for (var i = 0; i < sortedRecords.Count; i++)
{ {
return this.AttemptConnection(addressFamilyOverride.Value, context, token).GetAwaiter().GetResult(); var record = sortedRecords[i];
delayCts.CancelAfter(this.connectionBackoff * i);
var task = this.AttemptConnection(record, context.DnsEndPoint.Port, linkedToken.Token, delayCts.Token);
tasks.Add(task);
var nextDelayCts = CancellationTokenSource.CreateLinkedTokenSource(linkedToken.Token);
_ = task.ContinueWith(_ => { nextDelayCts.Cancel(); }, TaskContinuationOptions.OnlyOnFaulted);
delayCts = nextDelayCts;
} }
using var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(token); var stream = await AsyncUtils.FirstSuccessfulTask(tasks).ConfigureAwait(false);
Log.Verbose($"Established connection to {context.DnsEndPoint.Host} at {stream.Socket.RemoteEndPoint}");
NetworkStream stream; // If we're here, it means we have a successful connection. A failure to connect would have caused the above
// line to explode, so we're safe to clean everything up.
linkedToken.Cancel();
tasks.ForEach(task => { task.ContinueWith(this.CleanupConnectionTask); });
// Give IPv6 a chance to connect first.
// However, only give it ipv4WaitMillis to connect before throwing IPv4 into the mix.
var tryConnectIPv6 = this.AttemptConnection(AddressFamily.InterNetworkV6, context, linkedToken.Token);
var timedV6Attempt = Task.WhenAny(tryConnectIPv6, Task.Delay(this.ipv6GracePeriod, linkedToken.Token));
if (await timedV6Attempt == tryConnectIPv6 && tryConnectIPv6.IsCompletedSuccessfully)
{
stream = tryConnectIPv6.GetAwaiter().GetResult();
}
else
{
var race = AsyncUtils.FirstSuccessfulTask(new List<Task<NetworkStream>>
{
tryConnectIPv6,
this.AttemptConnection(AddressFamily.InterNetwork, context, linkedToken.Token),
});
// If our connections all fail, this will explode with an exception.
stream = race.GetAwaiter().GetResult();
}
this.addressFamilyCache[context.DnsEndPoint] = stream.Socket.AddressFamily;
return stream; return stream;
} }
private AddressFamily? GetAddressFamilyOverride(SocketsHttpConnectionContext context) private async Task<NetworkStream> AttemptConnection(IPAddress address, int port, CancellationToken token, CancellationToken delayToken)
{ {
if (this.forcedAddressFamily.HasValue) await AsyncUtils.CancellableDelay(-1, delayToken).ConfigureAwait(false);
if (token.IsCancellationRequested)
{ {
return this.forcedAddressFamily.Value; return Task.FromCanceled<NetworkStream>(token).Result;
} }
// Force IPv4 if IPv6 support isn't detected to avoid the resolution delay. var socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
if (!Socket.OSSupportsIPv6)
{
return AddressFamily.InterNetwork;
}
if (this.addressFamilyCache.TryGetValue(context.DnsEndPoint, out var cachedValue))
{
// TODO: Find some way to delete this after a while.
return cachedValue;
}
return null;
}
private async Task<NetworkStream> AttemptConnection(
AddressFamily family, SocketsHttpConnectionContext context, CancellationToken token)
{
var socket = new Socket(family, SocketType.Stream, ProtocolType.Tcp)
{ {
NoDelay = true, NoDelay = true,
}; };
try try
{ {
await socket.ConnectAsync(context.DnsEndPoint, token).ConfigureAwait(false); await socket.ConnectAsync(address, port, token).ConfigureAwait(false);
return new NetworkStream(socket, ownsSocket: true); return new NetworkStream(socket, ownsSocket: true);
} }
catch catch
@ -130,4 +113,31 @@ public class HappyEyeballsCallback : IDisposable
throw; throw;
} }
} }
private async Task<List<IPAddress>> GetSortedAddresses(string hostname, CancellationToken token)
{
// This method abuses DNS ordering and LINQ a bit. We can normally assume that "addresses" will be provided in
// the order the system wants to use. GroupBy will return its groups *in the order they're discovered*. Meaning,
// the first group created will always be the "preferred" group, and all other groups are in preference order.
// This means a straight zipper merge is nice and clean and gives us most -> least preferred, repeating.
var dnsRecords = await Dns.GetHostAddressesAsync(hostname, this.forcedAddressFamily, token);
var groups = dnsRecords
.GroupBy(a => a.AddressFamily)
.Select(g => g.Select(v => v)).ToArray();
return Util.ZipperMerge(groups).ToList();
}
private void CleanupConnectionTask(Task task)
{
// marks the exception as handled as well, nifty!
// will also handle canceled cases, which aren't explicitly faulted.
var exception = task.Exception;
if (task.IsFaulted)
{
Log.Verbose(exception!, "A HappyEyeballs connection task failed. Are there network issues?");
}
}
} }

View file

@ -24,7 +24,7 @@ internal class HappyHttpClient : IDisposable, IServiceType
this.SharedHttpClient = new HttpClient(new SocketsHttpHandler this.SharedHttpClient = new HttpClient(new SocketsHttpHandler
{ {
AutomaticDecompression = DecompressionMethods.All, AutomaticDecompression = DecompressionMethods.All,
ConnectCallback = new HappyEyeballsCallback().ConnectCallback, ConnectCallback = this.SharedHappyEyeballsCallback.ConnectCallback,
}); });
} }
@ -40,8 +40,8 @@ internal class HappyHttpClient : IDisposable, IServiceType
/// Gets a <see cref="HappyEyeballsCallback"/> meant to be shared across any custom <see cref="HttpClient"/>s that /// Gets a <see cref="HappyEyeballsCallback"/> meant to be shared across any custom <see cref="HttpClient"/>s that
/// need to be made in other parts of the application. /// need to be made in other parts of the application.
/// ///
/// This should be used when shared callback/IPv6 cache state is desired across multiple clients, as sharing the /// This should be used when shared callback state is desired across multiple clients, as sharing the SocketsHandler
/// SocketsHandler may lead to GC issues. /// may lead to GC issues.
/// </summary> /// </summary>
public HappyEyeballsCallback SharedHappyEyeballsCallback { get; } public HappyEyeballsCallback SharedHappyEyeballsCallback { get; }

View file

@ -13,7 +13,7 @@ public static class AsyncUtils
{ {
/// <summary> /// <summary>
/// Race a set of tasks, returning either the first to succeed or an aggregate of all exceptions. This helper does /// Race a set of tasks, returning either the first to succeed or an aggregate of all exceptions. This helper does
/// not perform any automatic cancellation of losing tasks. /// not perform any automatic cancellation of losing tasks, nor does it handle exceptions of losing tasks.
/// </summary> /// </summary>
/// <remarks>Derived from <a href="https://stackoverflow.com/a/37529395">this StackOverflow post</a>.</remarks> /// <remarks>Derived from <a href="https://stackoverflow.com/a/37529395">this StackOverflow post</a>.</remarks>
/// <param name="tasks">A list of tasks to race.</param> /// <param name="tasks">A list of tasks to race.</param>
@ -29,7 +29,7 @@ public static class AsyncUtils
{ {
task.ContinueWith(t => task.ContinueWith(t =>
{ {
if (task.IsCompletedSuccessfully) if (t.IsCompletedSuccessfully)
{ {
tcs.TrySetResult(t.Result); tcs.TrySetResult(t.Result);
} }
@ -42,4 +42,19 @@ public static class AsyncUtils
return tcs.Task; return tcs.Task;
} }
/// <summary>
/// Provide a <see cref="Task.Delay(int, CancellationToken)"/> that won't throw an exception when it's canceled.
/// </summary>
/// <inheritdoc cref="Task.Delay(int, CancellationToken)"/>
public static async Task CancellableDelay(int millisecondsDelay, CancellationToken cancellationToken)
{
try
{
await Task.Delay(millisecondsDelay, cancellationToken);
}
catch (TaskCanceledException)
{
}
}
} }

View file

@ -556,6 +556,56 @@ public static class Util
Process.Start(process); Process.Start(process);
} }
/// <summary>
/// Perform a "zipper merge" (A, 1, B, 2, C, 3) of multiple enumerables, allowing for lists to end early.
/// </summary>
/// <param name="sources">A set of enumerable sources to combine.</param>
/// <typeparam name="TSource">The resulting type of the merged list to return.</typeparam>
/// <returns>A new enumerable, consisting of the final merge of all lists.</returns>
public static IEnumerable<TSource> ZipperMerge<TSource>(params IEnumerable<TSource>[] sources)
{
// Borrowed from https://codereview.stackexchange.com/a/263451, thank you!
var enumerators = new IEnumerator<TSource>[sources.Length];
try
{
for (var i = 0; i < sources.Length; i++)
{
enumerators[i] = sources[i].GetEnumerator();
}
var hasNext = new bool[enumerators.Length];
bool MoveNext()
{
var anyHasNext = false;
for (var i = 0; i < enumerators.Length; i++)
{
anyHasNext |= hasNext[i] = enumerators[i].MoveNext();
}
return anyHasNext;
}
while (MoveNext())
{
for (var i = 0; i < enumerators.Length; i++)
{
if (hasNext[i])
{
yield return enumerators[i].Current;
}
}
}
}
finally
{
foreach (var enumerator in enumerators)
{
enumerator?.Dispose();
}
}
}
/// <summary> /// <summary>
/// Dispose this object. /// Dispose this object.
/// </summary> /// </summary>