feat: Add unix sockets

- Unix sockets run parallel to Named Pipes
  - Named Pipes will only run on non-Wine
  - If the game crashes, the next run will clean up an orphaned socket.
- Restructure RPC to be a bit tidier
This commit is contained in:
Kaz Wolfe 2025-11-18 15:18:16 -08:00
parent 6a69a6e197
commit 71927a8bf6
No known key found for this signature in database
GPG key ID: 258813F53A16EBB4
14 changed files with 487 additions and 91 deletions

View file

@ -1,10 +1,11 @@
using System;
using System.Linq;
using Dalamud.Networking.Pipes;
using Dalamud.Networking.Rpc.Model;
using Xunit;
namespace Dalamud.Test.Pipes
namespace Dalamud.Test.Rpc
{
public class DalamudUriTests
{

View file

@ -1,49 +0,0 @@
using Dalamud.Logging.Internal;
namespace Dalamud.Networking.Pipes.Rpc;
/// <summary>
/// The Dalamud service repsonsible for hosting the RPC.
/// </summary>
[ServiceManager.EarlyLoadedService]
internal class RpcHostService : IServiceType, IInternalDisposableService
{
private readonly ModuleLog log = new("RPC");
private readonly PipeRpcHost host;
/// <summary>
/// Initializes a new instance of the <see cref="RpcHostService"/> class.
/// </summary>
[ServiceManager.ServiceConstructor]
public RpcHostService()
{
this.host = new PipeRpcHost();
this.host.Start();
this.log.Information("RpcHostService started on pipe {Pipe}", this.host.PipeName);
}
/// <summary>
/// Gets the RPC host to drill down.
/// </summary>
public PipeRpcHost Host => this.host;
/// <summary>
/// Add a new service Object to the RPC host.
/// </summary>
/// <param name="service">The object to add.</param>
public void AddService(object service) => this.host.AddService(service);
/// <summary>
/// Add a new standalone method to the RPC host.
/// </summary>
/// <param name="name">The method name to add.</param>
/// <param name="handler">The handler to add.</param>
public void AddMethod(string name, Delegate handler) => this.host.AddMethod(name, handler);
/// <inheritdoc/>
public void DisposeService()
{
this.host.Dispose();
}
}

View file

@ -3,12 +3,14 @@
using Dalamud.Console;
using Dalamud.IoC;
using Dalamud.IoC.Internal;
using Dalamud.Networking.Pipes.Internal;
using Dalamud.Networking.Rpc.Model;
using Dalamud.Networking.Rpc.Service;
using Dalamud.Plugin.Internal.Types;
using Dalamud.Plugin.Services;
#pragma warning disable DAL_RPC
namespace Dalamud.Networking.Pipes.Api;
namespace Dalamud.Networking.Rpc.Api;
/// <inheritdoc cref="IPluginLinkHandler" />
[PluginInterface]

View file

@ -2,7 +2,7 @@
using System.Collections.Specialized;
using System.Web;
namespace Dalamud.Networking.Pipes;
namespace Dalamud.Networking.Rpc.Model;
/// <summary>
/// A Dalamud Uri, in the format:

View file

@ -1,34 +1,37 @@
using System.IO.Pipes;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Dalamud.Networking.Rpc.Service;
using Serilog;
using StreamJsonRpc;
namespace Dalamud.Networking.Pipes.Rpc;
namespace Dalamud.Networking.Rpc;
/// <summary>
/// A single RPC client session connected via named pipe.
/// A single RPC client session connected via a stream (named pipe or Unix socket).
/// </summary>
internal class RpcConnection : IDisposable
{
private readonly NamedPipeServerStream pipe;
private readonly Stream stream;
private readonly RpcServiceRegistry registry;
private readonly CancellationTokenSource cts = new();
/// <summary>
/// Initializes a new instance of the <see cref="RpcConnection"/> class.
/// </summary>
/// <param name="pipe">The named pipe that this connection will handle.</param>
/// <param name="stream">The stream that this connection will handle.</param>
/// <param name="registry">A registry of RPC services.</param>
public RpcConnection(NamedPipeServerStream pipe, RpcServiceRegistry registry)
public RpcConnection(Stream stream, RpcServiceRegistry registry)
{
this.Id = Guid.CreateVersion7();
this.pipe = pipe;
this.stream = stream;
this.registry = registry;
var formatter = new JsonMessageFormatter();
var handler = new HeaderDelimitedMessageHandler(pipe, pipe, formatter);
var handler = new HeaderDelimitedMessageHandler(stream, stream, formatter);
this.Rpc = new JsonRpc(handler);
this.Rpc.AllowModificationWhileListening = true;
@ -72,11 +75,11 @@ internal class RpcConnection : IDisposable
try
{
this.pipe.Dispose();
this.stream.Dispose();
}
catch (Exception ex)
{
Log.Debug(ex, "Error disposing pipe for client {Id}", this.Id);
Log.Debug(ex, "Error disposing stream for client {Id}", this.Id);
}
this.cts.Dispose();

View file

@ -0,0 +1,105 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Dalamud.Logging.Internal;
using Dalamud.Networking.Rpc.Transport;
using Dalamud.Utility;
namespace Dalamud.Networking.Rpc;
/// <summary>
/// The Dalamud service repsonsible for hosting the RPC.
/// </summary>
[ServiceManager.EarlyLoadedService]
internal class RpcHostService : IServiceType, IInternalDisposableService
{
private readonly ModuleLog log = new("RPC");
private readonly RpcServiceRegistry registry = new();
private readonly List<IRpcTransport> transports = [];
/// <summary>
/// Initializes a new instance of the <see cref="RpcHostService"/> class.
/// </summary>
[ServiceManager.ServiceConstructor]
public RpcHostService()
{
this.StartUnixTransport();
this.StartPipeTransport();
if (this.transports.Count == 0)
{
this.log.Warning("No RPC hosts could be started on this platform");
}
}
/// <summary>
/// Gets all active RPC transports.
/// </summary>
public IReadOnlyList<IRpcTransport> Transports => this.transports;
/// <summary>
/// Add a new service Object to the RPC host.
/// </summary>
/// <param name="service">The object to add.</param>
public void AddService(object service) => this.registry.AddService(service);
/// <summary>
/// Add a new standalone method to the RPC host.
/// </summary>
/// <param name="name">The method name to add.</param>
/// <param name="handler">The handler to add.</param>
public void AddMethod(string name, Delegate handler) => this.registry.AddMethod(name, handler);
/// <inheritdoc/>
public void DisposeService()
{
foreach (var host in this.transports)
{
host.Dispose();
}
this.transports.Clear();
}
/// <inheritdoc cref="IRpcTransport.InvokeClientAsync"/>
public async Task<T> InvokeClientAsync<T>(Guid clientId, string method, params object[] arguments)
{
var clients = this.transports.SelectMany(t => t.Connections).ToImmutableDictionary();
if (!clients.TryGetValue(clientId, out var session))
throw new KeyNotFoundException($"No client {clientId}");
return await session.Rpc.InvokeAsync<T>(method, arguments).ConfigureAwait(false);
}
/// <inheritdoc cref="IRpcTransport.BroadcastNotifyAsync"/>
public async Task BroadcastNotifyAsync(string method, params object[] arguments)
{
await foreach (var transport in this.transports.ToAsyncEnumerable().ConfigureAwait(false))
{
await transport.BroadcastNotifyAsync(method, arguments).ConfigureAwait(false);
}
}
private void StartUnixTransport()
{
var transport = new UnixRpcTransport(this.registry);
this.transports.Add(transport);
transport.Start();
this.log.Information("RpcHostService started Unix socket host: {Socket}", transport.SocketPath);
}
private void StartPipeTransport()
{
// Wine doesn't support named pipes.
if (Util.IsWine())
return;
var transport = new PipeRpcTransport(this.registry);
this.transports.Add(transport);
transport.Start();
this.log.Information("RpcHostService started named pipe host: {Pipe}", transport.PipeName);
}
}

View file

@ -3,7 +3,7 @@ using System.Threading;
using StreamJsonRpc;
namespace Dalamud.Networking.Pipes.Rpc;
namespace Dalamud.Networking.Rpc;
/// <summary>
/// Thread-safe registry of local RPC target objects that are exposed to every connected JsonRpc session.

View file

@ -3,12 +3,11 @@
using Dalamud.Data;
using Dalamud.Game;
using Dalamud.Game.ClientState;
using Dalamud.Networking.Pipes.Rpc;
using Dalamud.Utility;
using Lumina.Excel.Sheets;
namespace Dalamud.Networking.Pipes.Internal;
namespace Dalamud.Networking.Rpc.Service;
/// <summary>
/// A minimal service to respond with information about this client.

View file

@ -2,10 +2,10 @@
using System.Collections.Generic;
using Dalamud.Logging.Internal;
using Dalamud.Networking.Pipes.Rpc;
using Dalamud.Networking.Rpc.Model;
using Dalamud.Utility;
namespace Dalamud.Networking.Pipes.Internal;
namespace Dalamud.Networking.Rpc.Service;
/// <summary>
/// A service responsible for handling Dalamud URIs and dispatching them accordingly.

View file

@ -0,0 +1,32 @@
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Dalamud.Networking.Rpc.Transport;
/// <summary>
/// Interface for RPC host implementations (named pipes or Unix sockets).
/// </summary>
internal interface IRpcTransport : IDisposable
{
/// <summary>
/// Gets a list of active RPC connections.
/// </summary>
IReadOnlyDictionary<Guid, RpcConnection> Connections { get; }
/// <summary>Starts accepting client connections.</summary>
void Start();
/// <summary>Invoke an RPC request on a specific client expecting a result.</summary>
/// <param name="clientId">The client ID to invoke.</param>
/// <param name="method">The method to invoke.</param>
/// <param name="arguments">Any arguments to invoke.</param>
/// <returns>An optional return based on the specified RPC.</returns>
/// <typeparam name="T">The expected response type.</typeparam>
Task<T> InvokeClientAsync<T>(Guid clientId, string method, params object[] arguments);
/// <summary>Send a notification to all connected clients (no response expected).</summary>
/// <param name="method">The method name to broadcast.</param>
/// <param name="arguments">The arguments to broadcast.</param>
/// <returns>Returns a Task when completed.</returns>
Task BroadcastNotifyAsync(string method, params object[] arguments);
}

View file

@ -9,26 +9,28 @@ using System.Threading.Tasks;
using Dalamud.Logging.Internal;
using Dalamud.Utility;
namespace Dalamud.Networking.Pipes.Rpc;
namespace Dalamud.Networking.Rpc.Transport;
/// <summary>
/// Simple multi-client JSON-RPC named pipe host using StreamJsonRpc.
/// </summary>
internal class PipeRpcHost : IDisposable
internal class PipeRpcTransport : IRpcTransport
{
private readonly ModuleLog log = new("RPC/Host");
private readonly RpcServiceRegistry registry = new();
private readonly RpcServiceRegistry registry;
private readonly CancellationTokenSource cts = new();
private readonly ConcurrentDictionary<Guid, RpcConnection> sessions = new();
private Task? acceptLoopTask;
/// <summary>
/// Initializes a new instance of the <see cref="PipeRpcHost"/> class.
/// Initializes a new instance of the <see cref="PipeRpcTransport"/> class.
/// </summary>
/// <param name="registry">The RPC service registry to use.</param>
/// <param name="pipeName">The pipe name to create.</param>
public PipeRpcHost(string? pipeName = null)
public PipeRpcTransport(RpcServiceRegistry registry, string? pipeName = null)
{
this.registry = registry;
// Default pipe name based on current process ID for uniqueness per Dalamud instance.
this.PipeName = pipeName ?? $"DalamudRPC.{Environment.ProcessId}";
}
@ -38,16 +40,8 @@ internal class PipeRpcHost : IDisposable
/// </summary>
public string PipeName { get; }
/// <summary>Adds a local object exposing RPC methods callable by clients.</summary>
/// <param name="service">An arbitrary service object that will be introspected to add to RPC.</param>
public void AddService(object service) => this.registry.AddService(service);
/// <summary>
/// Adds a standalone JSON-RPC method callable by clients.
/// </summary>
/// <param name="name">The name to add.</param>
/// <param name="handler">The delegate that acts as the handler.</param>
public void AddMethod(string name, Delegate handler) => this.registry.AddMethod(name, handler);
/// <inheritdoc/>
public IReadOnlyDictionary<Guid, RpcConnection> Connections => this.sessions;
/// <summary>Starts accepting client connections.</summary>
public void Start()
@ -86,12 +80,6 @@ internal class PipeRpcHost : IDisposable
return Task.WhenAll(tasks);
}
/// <summary>
/// Gets a list of connected client IDs.
/// </summary>
/// <returns>Connected client IDs.</returns>
public IReadOnlyCollection<Guid> GetClientIds() => this.sessions.Keys.AsReadOnlyCollection();
/// <inheritdoc/>
public void Dispose()
{

View file

@ -0,0 +1,223 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Dalamud.Logging.Internal;
using Dalamud.Utility;
using TerraFX.Interop.Windows;
namespace Dalamud.Networking.Rpc.Transport;
/// <summary>
/// Simple multi-client JSON-RPC Unix socket host using StreamJsonRpc.
/// </summary>
internal class UnixRpcTransport : IRpcTransport
{
private readonly ModuleLog log = new("RPC/UnixHost");
private readonly RpcServiceRegistry registry;
private readonly CancellationTokenSource cts = new();
private readonly ConcurrentDictionary<Guid, RpcConnection> sessions = new();
private readonly string? cleanupSocketDirectory;
private Task? acceptLoopTask;
private Socket? listenSocket;
/// <summary>
/// Initializes a new instance of the <see cref="UnixRpcTransport"/> class.
/// </summary>
/// <param name="registry">The RPC service registry to use.</param>
/// <param name="socketPath">The Unix socket path to create. If null, defaults to a path based on process ID.</param>
public UnixRpcTransport(RpcServiceRegistry registry, string? socketPath = null)
{
this.registry = registry;
if (socketPath != null)
{
this.SocketPath = socketPath;
}
else
{
var dalamudConfigPath = Service<Dalamud>.Get().StartInfo.ConfigurationPath;
var dalamudHome = Path.GetDirectoryName(dalamudConfigPath);
var socketName = $"DalamudRPC.{Environment.ProcessId}.sock";
if (dalamudHome == null)
{
this.SocketPath = Path.Combine(Path.GetTempPath(), socketName);
this.log.Warning("Dalamud home is empty! UDS socket will be in temp.");
}
else
{
this.SocketPath = Path.Combine(dalamudHome, socketName);
this.cleanupSocketDirectory = dalamudHome;
}
}
}
/// <summary>
/// Gets the path of the Unix socket this RPC host is using.
/// </summary>
public string SocketPath { get; }
/// <inheritdoc/>
public IReadOnlyDictionary<Guid, RpcConnection> Connections => this.sessions;
/// <summary>Starts accepting client connections.</summary>
public void Start()
{
if (this.acceptLoopTask != null) return;
// Make the directory for the socket if it doesn't exist
var socketDir = Path.GetDirectoryName(this.SocketPath);
if (!string.IsNullOrEmpty(socketDir) && !Directory.Exists(socketDir))
{
try
{
Directory.CreateDirectory(socketDir);
}
catch (Exception ex)
{
this.log.Error(ex, "Failed to create socket directory: {Path}", socketDir);
return;
}
}
// Delete existing socket for this PID, if it exists.
if (File.Exists(this.SocketPath))
{
try
{
File.Delete(this.SocketPath);
}
catch (Exception ex)
{
this.log.Warning(ex, "Failed to delete existing socket file: {Path}", this.SocketPath);
}
}
this.acceptLoopTask = Task.Factory.StartNew(this.AcceptLoopAsync, TaskCreationOptions.LongRunning);
// note: needs to be run _after_ we're alive so that we don't delete our own socket.
if (this.cleanupSocketDirectory != null)
{
Task.Run(async () => await UnixSocketUtil.CleanStaleSockets(this.cleanupSocketDirectory));
}
}
/// <summary>Invoke an RPC request on a specific client expecting a result.</summary>
/// <param name="clientId">The client ID to invoke.</param>
/// <param name="method">The method to invoke.</param>
/// <param name="arguments">Any arguments to invoke.</param>
/// <returns>An optional return based on the specified RPC.</returns>
/// <typeparam name="T">The expected response type.</typeparam>
public Task<T> InvokeClientAsync<T>(Guid clientId, string method, params object[] arguments)
{
if (!this.sessions.TryGetValue(clientId, out var session))
throw new KeyNotFoundException($"No client {clientId}");
return session.Rpc.InvokeAsync<T>(method, arguments);
}
/// <summary>Send a notification to all connected clients (no response expected).</summary>
/// <param name="method">The method name to broadcast.</param>
/// <param name="arguments">The arguments to broadcast.</param>
/// <returns>Returns a Task when completed.</returns>
public Task BroadcastNotifyAsync(string method, params object[] arguments)
{
var list = this.sessions.Values;
var tasks = new List<Task>(list.Count);
foreach (var s in list)
{
tasks.Add(s.Rpc.NotifyAsync(method, arguments));
}
return Task.WhenAll(tasks);
}
/// <inheritdoc/>
public void Dispose()
{
this.cts.Cancel();
this.acceptLoopTask?.Wait(1000);
foreach (var kv in this.sessions)
{
kv.Value.Dispose();
}
this.sessions.Clear();
this.listenSocket?.Dispose();
if (File.Exists(this.SocketPath))
{
try
{
File.Delete(this.SocketPath);
}
catch (Exception ex)
{
this.log.Warning(ex, "Failed to delete socket file on dispose: {Path}", this.SocketPath);
}
}
this.cts.Dispose();
this.log.Information("UnixRpcHost disposed ({Socket})", this.SocketPath);
GC.SuppressFinalize(this);
}
private async Task AcceptLoopAsync()
{
this.log.Information("UnixRpcHost starting on socket {Socket}", this.SocketPath);
var token = this.cts.Token;
try
{
var endpoint = new UnixDomainSocketEndPoint(this.SocketPath);
this.listenSocket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
this.listenSocket.Bind(endpoint);
this.listenSocket.Listen(128);
while (!token.IsCancellationRequested)
{
Socket? clientSocket = null;
try
{
clientSocket = await this.listenSocket.AcceptAsync(token).ConfigureAwait(false);
var stream = new NetworkStream(clientSocket, ownsSocket: true);
var session = new RpcConnection(stream, this.registry);
this.sessions.TryAdd(session.Id, session);
this.log.Debug("RPC connection created: {Id}", session.Id);
_ = session.Completion.ContinueWith(t =>
{
this.sessions.TryRemove(session.Id, out _);
this.log.Debug("RPC connection removed: {Id}", session.Id);
}, TaskScheduler.Default);
}
catch (OperationCanceledException)
{
clientSocket?.Dispose();
break;
}
catch (Exception ex)
{
clientSocket?.Dispose();
this.log.Error(ex, "Error in socket accept loop");
await Task.Delay(500, token).ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
this.log.Error(ex, "Fatal error in Unix socket accept loop");
}
}
}

View file

@ -1,6 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using Dalamud.Networking.Pipes;
using Dalamud.Networking.Rpc.Model;
namespace Dalamud.Plugin.Services;

View file

@ -0,0 +1,92 @@
using System.IO;
using System.Net.Sockets;
using System.Threading.Tasks;
using Serilog;
namespace Dalamud.Utility;
/// <summary>
/// A set of utilities to help manage Unix sockets.
/// </summary>
internal static class UnixSocketUtil
{
// Default probe timeout in milliseconds.
private const int DefaultProbeMs = 200;
/// <summary>
/// Test whether a Unix socket is alive/listening.
/// </summary>
/// <param name="path">The path to test.</param>
/// <param name="timeoutMs">How long to wait for a connection success.</param>
/// <returns>A task result representing if a socket is alive or not.</returns>
public static async Task<bool> IsSocketAlive(string path, int timeoutMs = DefaultProbeMs)
{
if (string.IsNullOrEmpty(path)) return false;
var endpoint = new UnixDomainSocketEndPoint(path);
using var client = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
var connectTask = client.ConnectAsync(endpoint);
var completed = await Task.WhenAny(connectTask, Task.Delay(timeoutMs)).ConfigureAwait(false);
if (completed == connectTask)
{
// Connected or failed very quickly. If the task is successful, the socket is alive.
if (connectTask.IsCompletedSuccessfully)
{
try
{
client.Shutdown(SocketShutdown.Both);
}
catch
{
// ignored
}
return true;
}
}
return false;
}
/// <summary>
/// Find and remove stale Dalamud RPC sockets.
/// </summary>
/// <param name="directory">The directory to scan for stale sockets.</param>
/// <param name="probeTimeoutMs">The timeout to wait for a connection attempt to succeed.</param>
/// <returns>A task that executes when sockets are purged.</returns>
public static async Task CleanStaleSockets(string directory, int probeTimeoutMs = DefaultProbeMs)
{
if (string.IsNullOrEmpty(directory) || !Directory.Exists(directory)) return;
foreach (var file in Directory.EnumerateFiles(directory, "DalamudRPC.*.sock", SearchOption.TopDirectoryOnly))
{
// we don't need to check ourselves.
if (file.Contains(Environment.ProcessId.ToString())) continue;
bool shouldDelete;
try
{
shouldDelete = !await IsSocketAlive(file, probeTimeoutMs);
}
catch
{
shouldDelete = true;
}
if (shouldDelete)
{
try
{
File.Delete(file);
}
catch (Exception ex)
{
Log.Error(ex, "Could not delete stale socket file: {File}", file);
}
}
}
}
}