fix(network): Handle exceptions during observer streams

This commit is contained in:
karashiiro 2023-02-15 18:07:39 -08:00
parent c59dd59858
commit ecf039a200

View file

@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Reactive;
using System.Reactive.Linq; using System.Reactive.Linq;
using System.Reactive.Subjects; using System.Reactive.Subjects;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
@ -185,19 +184,22 @@ internal class NetworkHandlers : IDisposable, IServiceType
private IDisposable HandleMarketBoardItemRequest() private IDisposable HandleMarketBoardItemRequest()
{ {
return this.OnMarketBoardItemRequest() return this.OnMarketBoardItemRequest()
.Where(_ => this.configuration.IsMbCollect) .Where(this.ShouldUpload)
.Subscribe(request => .Subscribe(
request =>
{ {
this.marketBoardRequests.Add(request); this.marketBoardRequests.Add(request);
Log.Verbose($"NEW MB REQUEST START: item#{request.CatalogId} amount#{request.AmountToArrive}"); Log.Verbose($"NEW MB REQUEST START: item#{request.CatalogId} amount#{request.AmountToArrive}");
}); },
ex => Log.Error(ex, "Failed to handle Market Board item request event"));
} }
private IDisposable HandleMarketBoardOfferings() private IDisposable HandleMarketBoardOfferings()
{ {
return this.OnMarketBoardOfferings() return this.OnMarketBoardOfferings()
.Where(_ => this.configuration.IsMbCollect) .Where(this.ShouldUpload)
.Subscribe(listing => .Subscribe(
listing =>
{ {
var request = var request =
this.marketBoardRequests.LastOrDefault( this.marketBoardRequests.LastOrDefault(
@ -257,17 +259,19 @@ internal class NetworkHandlers : IDisposable, IServiceType
Task.Run(() => this.uploader.Upload(request)) Task.Run(() => this.uploader.Upload(request))
.ContinueWith( .ContinueWith(
task => Log.Error(task.Exception, "Market Board offerings data upload failed."), task => Log.Error(task.Exception, "Market Board offerings data upload failed"),
TaskContinuationOptions.OnlyOnFaulted); TaskContinuationOptions.OnlyOnFaulted);
} }
}); },
ex => Log.Error(ex, "Failed to handle Market Board offerings data event"));
} }
private IDisposable HandleMarketBoardHistory() private IDisposable HandleMarketBoardHistory()
{ {
return this.OnMarketBoardHistory() return this.OnMarketBoardHistory()
.Where(_ => this.configuration.IsMbCollect) .Where(this.ShouldUpload)
.Subscribe(listing => .Subscribe(
listing =>
{ {
var request = this.marketBoardRequests.LastOrDefault(r => r.CatalogId == listing.CatalogId); var request = this.marketBoardRequests.LastOrDefault(r => r.CatalogId == listing.CatalogId);
@ -295,17 +299,19 @@ internal class NetworkHandlers : IDisposable, IServiceType
Task.Run(() => this.uploader.Upload(request)) Task.Run(() => this.uploader.Upload(request))
.ContinueWith( .ContinueWith(
(task) => Log.Error(task.Exception, "Market Board history data upload failed."), (task) => Log.Error(task.Exception, "Market Board history data upload failed"),
TaskContinuationOptions.OnlyOnFaulted); TaskContinuationOptions.OnlyOnFaulted);
} }
}); },
ex => Log.Error(ex, "Failed to handle Market Board history data event"));
} }
private IDisposable HandleMarketTaxRates() private IDisposable HandleMarketTaxRates()
{ {
return this.OnMarketTaxRates() return this.OnMarketTaxRates()
.Where(_ => this.configuration.IsMbCollect) .Where(this.ShouldUpload)
.Subscribe(taxes => .Subscribe(
taxes =>
{ {
Log.Verbose( Log.Verbose(
"MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}", "MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}",
@ -319,23 +325,27 @@ internal class NetworkHandlers : IDisposable, IServiceType
Task.Run(() => this.uploader.UploadTax(taxes)) Task.Run(() => this.uploader.UploadTax(taxes))
.ContinueWith( .ContinueWith(
task => Log.Error(task.Exception, "Market Board tax data upload failed."), task => Log.Error(task.Exception, "Market Board tax data upload failed"),
TaskContinuationOptions.OnlyOnFaulted); TaskContinuationOptions.OnlyOnFaulted);
}); },
ex => Log.Error(ex, "Failed to handle Market Board tax data event"));
} }
private IDisposable HandleMarketBoardPurchaseHandler() private IDisposable HandleMarketBoardPurchaseHandler()
{ {
return this.OnMarketBoardPurchaseHandler() return this.OnMarketBoardPurchaseHandler()
.Where(_ => this.configuration.IsMbCollect) .Where(_ => this.configuration.IsMbCollect)
.Subscribe(handler => { this.marketBoardPurchaseHandler = handler; }); .Subscribe(
handler => { this.marketBoardPurchaseHandler = handler; },
ex => Log.Error(ex, "Failed to handle Market Board purchase handler event"));
} }
private IDisposable HandleMarketBoardPurchase() private IDisposable HandleMarketBoardPurchase()
{ {
return this.OnMarketBoardPurchase() return this.OnMarketBoardPurchase()
.Where(_ => this.configuration.IsMbCollect) .Where(_ => this.configuration.IsMbCollect)
.Subscribe(purchase => .Subscribe(
purchase =>
{ {
if (this.marketBoardPurchaseHandler == null) if (this.marketBoardPurchaseHandler == null)
return; return;
@ -360,13 +370,15 @@ internal class NetworkHandlers : IDisposable, IServiceType
} }
this.marketBoardPurchaseHandler = null; this.marketBoardPurchaseHandler = null;
}); },
ex => Log.Error(ex, "Failed to handle Market Board purchase event"));
} }
private unsafe IDisposable HandleCfPop() private unsafe IDisposable HandleCfPop()
{ {
return this.OnCfNotifyPop() return this.OnCfNotifyPop()
.Subscribe(message => .Subscribe(
message =>
{ {
using var stream = new UnmanagedMemoryStream((byte*)message.Data.ToPointer(), 64); using var stream = new UnmanagedMemoryStream((byte*)message.Data.ToPointer(), 64);
using var reader = new BinaryReader(stream); using var reader = new BinaryReader(stream);
@ -383,7 +395,7 @@ internal class NetworkHandlers : IDisposable, IServiceType
if (cfCondition == null) if (cfCondition == null)
{ {
Log.Error($"CFC key {conditionId} not in Lumina data."); Log.Error("CFC key {ConditionId} not in Lumina data", conditionId);
return; return;
} }
@ -417,9 +429,15 @@ internal class NetworkHandlers : IDisposable, IServiceType
this.CfPop.InvokeSafely(this, cfCondition); this.CfPop.InvokeSafely(this, cfCondition);
}).ContinueWith( }).ContinueWith(
task => Log.Error(task.Exception, "CfPop.Invoke failed."), task => Log.Error(task.Exception, "CfPop.Invoke failed"),
TaskContinuationOptions.OnlyOnFaulted); TaskContinuationOptions.OnlyOnFaulted);
}); },
ex => Log.Error(ex, "Failed to handle Market Board purchase event"));
}
private bool ShouldUpload<T>(T any)
{
return this.configuration.IsMbCollect;
} }
private class NetworkMessage private class NetworkMessage