chore: remove named pipe transport, use startinfo for pathing

This commit is contained in:
Kaz Wolfe 2025-11-25 10:08:24 -08:00
parent 0d8f577576
commit 7b286c427c
No known key found for this signature in database
GPG key ID: 258813F53A16EBB4
9 changed files with 41 additions and 196 deletions

View file

@ -5,7 +5,6 @@ using System.Threading.Tasks;
using Dalamud.Logging.Internal;
using Dalamud.Networking.Rpc.Transport;
using Dalamud.Utility;
namespace Dalamud.Networking.Rpc;
@ -26,7 +25,6 @@ internal class RpcHostService : IServiceType, IInternalDisposableService
public RpcHostService()
{
this.StartUnixTransport();
this.StartPipeTransport();
if (this.transports.Count == 0)
{
@ -90,16 +88,4 @@ internal class RpcHostService : IServiceType, IInternalDisposableService
transport.Start();
this.log.Information("RpcHostService listening to UNIX socket: {Socket}", transport.SocketPath);
}
private void StartPipeTransport()
{
// Wine doesn't support named pipes.
if (Util.IsWine())
return;
var transport = new PipeRpcTransport(this.registry);
this.transports.Add(transport);
transport.Start();
this.log.Information("RpcHostService listening to named pipe: {Pipe}", transport.PipeName);
}
}

View file

@ -1,154 +0,0 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Security.AccessControl;
using System.Security.Principal;
using System.Threading;
using System.Threading.Tasks;
using Dalamud.Logging.Internal;
using Dalamud.Utility;
namespace Dalamud.Networking.Rpc.Transport;
/// <summary>
/// Simple multi-client JSON-RPC named pipe host using StreamJsonRpc.
/// </summary>
internal class PipeRpcTransport : IRpcTransport
{
private readonly ModuleLog log = new("RPC/Transport/NamedPipe");
private readonly RpcServiceRegistry registry;
private readonly CancellationTokenSource cts = new();
private readonly ConcurrentDictionary<Guid, RpcConnection> sessions = new();
private Task? acceptLoopTask;
/// <summary>
/// Initializes a new instance of the <see cref="PipeRpcTransport"/> class.
/// </summary>
/// <param name="registry">The RPC service registry to use.</param>
/// <param name="pipeName">The pipe name to create.</param>
public PipeRpcTransport(RpcServiceRegistry registry, string? pipeName = null)
{
this.registry = registry;
// Default pipe name based on current process ID for uniqueness per Dalamud instance.
this.PipeName = pipeName ?? $"DalamudRPC.{Environment.ProcessId}";
}
/// <summary>
/// Gets the name of the named pipe this RPC host is using.
/// </summary>
public string PipeName { get; }
/// <inheritdoc/>
public IReadOnlyDictionary<Guid, RpcConnection> Connections => this.sessions;
/// <summary>Starts accepting client connections.</summary>
public void Start()
{
if (this.acceptLoopTask != null) return;
this.acceptLoopTask = Task.Factory.StartNew(this.AcceptLoopAsync, TaskCreationOptions.LongRunning);
}
/// <summary>Invoke an RPC request on a specific client expecting a result.</summary>
/// <param name="clientId">The client ID to invoke.</param>
/// <param name="method">The method to invoke.</param>
/// <param name="arguments">Any arguments to invoke.</param>
/// <returns>An optional return based on the specified RPC.</returns>
/// <typeparam name="T">The expected response type.</typeparam>
public Task<T> InvokeClientAsync<T>(Guid clientId, string method, params object[] arguments)
{
if (!this.sessions.TryGetValue(clientId, out var session))
throw new KeyNotFoundException($"No client {clientId}");
return session.Rpc.InvokeAsync<T>(method, arguments);
}
/// <summary>Send a notification to all connected clients (no response expected).</summary>
/// <param name="method">The method name to broadcast.</param>
/// <param name="arguments">The arguments to broadcast.</param>
/// <returns>Returns a Task when completed.</returns>
public Task BroadcastNotifyAsync(string method, params object[] arguments)
{
var list = this.sessions.Values;
var tasks = new List<Task>(list.Count);
foreach (var s in list)
{
tasks.Add(s.Rpc.NotifyAsync(method, arguments));
}
return Task.WhenAll(tasks);
}
/// <inheritdoc/>
public void Dispose()
{
this.cts.Cancel();
this.acceptLoopTask?.Wait(1000);
foreach (var kv in this.sessions)
{
kv.Value.Dispose();
}
this.sessions.Clear();
this.cts.Dispose();
this.log.Information("PipeRpcHost disposed ({Pipe})", this.PipeName);
GC.SuppressFinalize(this);
}
private PipeSecurity BuildPipeSecurity()
{
var ps = new PipeSecurity();
ps.AddAccessRule(new PipeAccessRule(WindowsIdentity.GetCurrent().User!, PipeAccessRights.FullControl, AccessControlType.Allow));
return ps;
}
private async Task AcceptLoopAsync()
{
var token = this.cts.Token;
var security = this.BuildPipeSecurity();
while (!token.IsCancellationRequested)
{
NamedPipeServerStream? server = null;
try
{
server = NamedPipeServerStreamAcl.Create(
this.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous,
65536,
65536,
security);
await server.WaitForConnectionAsync(token).ConfigureAwait(false);
var session = new RpcConnection(server, this.registry);
this.sessions.TryAdd(session.Id, session);
this.log.Debug("RPC connection created: {Id}", session.Id);
_ = session.Completion.ContinueWith(t =>
{
this.sessions.TryRemove(session.Id, out _);
this.log.Debug("RPC connection removed: {Id}", session.Id);
}, TaskScheduler.Default);
}
catch (OperationCanceledException)
{
server?.Dispose();
break;
}
catch (Exception ex)
{
server?.Dispose();
this.log.Error(ex, "Error in pipe accept loop");
await Task.Delay(500, token).ConfigureAwait(false);
}
}
}
}

View file

@ -31,30 +31,30 @@ internal class UnixRpcTransport : IRpcTransport
/// Initializes a new instance of the <see cref="UnixRpcTransport"/> class.
/// </summary>
/// <param name="registry">The RPC service registry to use.</param>
/// <param name="socketPath">The Unix socket path to create. If null, defaults to a path based on process ID.</param>
public UnixRpcTransport(RpcServiceRegistry registry, string? socketPath = null)
/// <param name="socketDirectory">The Unix socket directory to use. If null, defaults to Dalamud home directory.</param>
/// <param name="socketName">The name of the socket to create.</param>
public UnixRpcTransport(RpcServiceRegistry registry, string? socketDirectory = null, string? socketName = null)
{
this.registry = registry;
socketName ??= $"DalamudRPC.{Environment.ProcessId}.sock";
if (socketPath != null)
if (!socketDirectory.IsNullOrEmpty())
{
this.SocketPath = socketPath;
this.SocketPath = Path.Combine(socketDirectory, socketName);
}
else
{
var dalamudConfigPath = Service<Dalamud>.Get().StartInfo.ConfigurationPath;
var dalamudHome = Path.GetDirectoryName(dalamudConfigPath);
var socketName = $"DalamudRPC.{Environment.ProcessId}.sock";
socketDirectory = Service<Dalamud>.Get().StartInfo.TempDirectory;
if (dalamudHome == null)
if (socketDirectory == null)
{
this.SocketPath = Path.Combine(Path.GetTempPath(), socketName);
this.log.Warning("Dalamud home is empty! UDS socket will be in temp.");
this.log.Warning("Temp dir was not set in StartInfo; using system temp for unix socket.");
}
else
{
this.SocketPath = Path.Combine(dalamudHome, socketName);
this.cleanupSocketDirectory = dalamudHome;
this.SocketPath = Path.Combine(socketDirectory, socketName);
this.cleanupSocketDirectory = socketDirectory;
}
}
}
@ -76,15 +76,8 @@ internal class UnixRpcTransport : IRpcTransport
var socketDir = Path.GetDirectoryName(this.SocketPath);
if (!string.IsNullOrEmpty(socketDir) && !Directory.Exists(socketDir))
{
try
{
Directory.CreateDirectory(socketDir);
}
catch (Exception ex)
{
this.log.Error(ex, "Failed to create socket directory: {Path}", socketDir);
return;
}
this.log.Error("Directory for unix socket does not exist: {Path}", socketDir);
return;
}
// Delete existing socket for this PID, if it exists.
@ -103,6 +96,7 @@ internal class UnixRpcTransport : IRpcTransport
this.acceptLoopTask = Task.Factory.StartNew(this.AcceptLoopAsync, TaskCreationOptions.LongRunning);
// note: needs to be run _after_ we're alive so that we don't delete our own socket.
// TODO: This should *probably* be handed by the launcher instead.
if (this.cleanupSocketDirectory != null)
{
Task.Run(async () => await UnixSocketUtil.CleanStaleSockets(this.cleanupSocketDirectory));

View file

@ -4,7 +4,7 @@ using System.Threading.Tasks;
using Serilog;
namespace Dalamud.Utility;
namespace Dalamud.Networking.Rpc;
/// <summary>
/// A set of utilities to help manage Unix sockets.