From ecf039a200910e2982c889855230ceb9fab99323 Mon Sep 17 00:00:00 2001 From: karashiiro <49822414+karashiiro@users.noreply.github.com> Date: Wed, 15 Feb 2023 18:07:39 -0800 Subject: [PATCH 1/4] fix(network): Handle exceptions during observer streams --- .../Game/Network/Internal/NetworkHandlers.cs | 370 +++++++++--------- 1 file changed, 194 insertions(+), 176 deletions(-) diff --git a/Dalamud/Game/Network/Internal/NetworkHandlers.cs b/Dalamud/Game/Network/Internal/NetworkHandlers.cs index 9523b08d4..f6ab30293 100644 --- a/Dalamud/Game/Network/Internal/NetworkHandlers.cs +++ b/Dalamud/Game/Network/Internal/NetworkHandlers.cs @@ -3,7 +3,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; -using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Runtime.InteropServices; @@ -185,241 +184,260 @@ internal class NetworkHandlers : IDisposable, IServiceType private IDisposable HandleMarketBoardItemRequest() { return this.OnMarketBoardItemRequest() - .Where(_ => this.configuration.IsMbCollect) - .Subscribe(request => - { - this.marketBoardRequests.Add(request); - Log.Verbose($"NEW MB REQUEST START: item#{request.CatalogId} amount#{request.AmountToArrive}"); - }); + .Where(this.ShouldUpload) + .Subscribe( + request => + { + this.marketBoardRequests.Add(request); + Log.Verbose($"NEW MB REQUEST START: item#{request.CatalogId} amount#{request.AmountToArrive}"); + }, + ex => Log.Error(ex, "Failed to handle Market Board item request event")); } private IDisposable HandleMarketBoardOfferings() { return this.OnMarketBoardOfferings() - .Where(_ => this.configuration.IsMbCollect) - .Subscribe(listing => - { - var request = - this.marketBoardRequests.LastOrDefault( - r => r.CatalogId == listing.ItemListings[0].CatalogId && !r.IsDone); - - if (request == default) + .Where(this.ShouldUpload) + .Subscribe( + listing => { - Log.Error( - $"Market Board data arrived without a corresponding request: item#{listing.ItemListings[0].CatalogId}"); - return; - } + var request = + this.marketBoardRequests.LastOrDefault( + r => r.CatalogId == listing.ItemListings[0].CatalogId && !r.IsDone); - if (request.Listings.Count + listing.ItemListings.Count > request.AmountToArrive) - { - Log.Error( - $"Too many Market Board listings received for request: {request.Listings.Count + listing.ItemListings.Count} > {request.AmountToArrive} item#{listing.ItemListings[0].CatalogId}"); - return; - } + if (request == default) + { + Log.Error( + $"Market Board data arrived without a corresponding request: item#{listing.ItemListings[0].CatalogId}"); + return; + } - if (request.ListingsRequestId != -1 && request.ListingsRequestId != listing.RequestId) - { - Log.Error( - $"Non-matching RequestIds for Market Board data request: {request.ListingsRequestId}, {listing.RequestId}"); - return; - } + if (request.Listings.Count + listing.ItemListings.Count > request.AmountToArrive) + { + Log.Error( + $"Too many Market Board listings received for request: {request.Listings.Count + listing.ItemListings.Count} > {request.AmountToArrive} item#{listing.ItemListings[0].CatalogId}"); + return; + } - if (request.ListingsRequestId == -1 && request.Listings.Count > 0) - { - Log.Error( - $"Market Board data request sequence break: {request.ListingsRequestId}, {request.Listings.Count}"); - return; - } + if (request.ListingsRequestId != -1 && request.ListingsRequestId != listing.RequestId) + { + Log.Error( + $"Non-matching RequestIds for Market Board data request: {request.ListingsRequestId}, {listing.RequestId}"); + return; + } - if (request.ListingsRequestId == -1) - { - request.ListingsRequestId = listing.RequestId; - Log.Verbose($"First Market Board packet in sequence: {listing.RequestId}"); - } + if (request.ListingsRequestId == -1 && request.Listings.Count > 0) + { + Log.Error( + $"Market Board data request sequence break: {request.ListingsRequestId}, {request.Listings.Count}"); + return; + } - request.Listings.AddRange(listing.ItemListings); + if (request.ListingsRequestId == -1) + { + request.ListingsRequestId = listing.RequestId; + Log.Verbose($"First Market Board packet in sequence: {listing.RequestId}"); + } - Log.Verbose( - "Added {0} ItemListings to request#{1}, now {2}/{3}, item#{4}", - listing.ItemListings.Count, - request.ListingsRequestId, - request.Listings.Count, - request.AmountToArrive, - request.CatalogId); + request.Listings.AddRange(listing.ItemListings); - if (request.IsDone) - { Log.Verbose( - "Market Board request finished, starting upload: request#{0} item#{1} amount#{2}", + "Added {0} ItemListings to request#{1}, now {2}/{3}, item#{4}", + listing.ItemListings.Count, request.ListingsRequestId, - request.CatalogId, - request.AmountToArrive); + request.Listings.Count, + request.AmountToArrive, + request.CatalogId); - Task.Run(() => this.uploader.Upload(request)) - .ContinueWith( - task => Log.Error(task.Exception, "Market Board offerings data upload failed."), - TaskContinuationOptions.OnlyOnFaulted); - } - }); + if (request.IsDone) + { + Log.Verbose( + "Market Board request finished, starting upload: request#{0} item#{1} amount#{2}", + request.ListingsRequestId, + request.CatalogId, + request.AmountToArrive); + + Task.Run(() => this.uploader.Upload(request)) + .ContinueWith( + task => Log.Error(task.Exception, "Market Board offerings data upload failed"), + TaskContinuationOptions.OnlyOnFaulted); + } + }, + ex => Log.Error(ex, "Failed to handle Market Board offerings data event")); } private IDisposable HandleMarketBoardHistory() { return this.OnMarketBoardHistory() - .Where(_ => this.configuration.IsMbCollect) - .Subscribe(listing => - { - var request = this.marketBoardRequests.LastOrDefault(r => r.CatalogId == listing.CatalogId); - - if (request == default) + .Where(this.ShouldUpload) + .Subscribe( + listing => { - Log.Error( - $"Market Board data arrived without a corresponding request: item#{listing.CatalogId}"); - return; - } + var request = this.marketBoardRequests.LastOrDefault(r => r.CatalogId == listing.CatalogId); - if (request.ListingsRequestId != -1) - { - Log.Error( - $"Market Board data history sequence break: {request.ListingsRequestId}, {request.Listings.Count}"); - return; - } + if (request == default) + { + Log.Error( + $"Market Board data arrived without a corresponding request: item#{listing.CatalogId}"); + return; + } - request.History.AddRange(listing.HistoryListings); + if (request.ListingsRequestId != -1) + { + Log.Error( + $"Market Board data history sequence break: {request.ListingsRequestId}, {request.Listings.Count}"); + return; + } - Log.Verbose("Added history for item#{0}", listing.CatalogId); + request.History.AddRange(listing.HistoryListings); - if (request.AmountToArrive == 0) - { - Log.Verbose("Request had 0 amount, uploading now"); + Log.Verbose("Added history for item#{0}", listing.CatalogId); - Task.Run(() => this.uploader.Upload(request)) - .ContinueWith( - (task) => Log.Error(task.Exception, "Market Board history data upload failed."), - TaskContinuationOptions.OnlyOnFaulted); - } - }); + if (request.AmountToArrive == 0) + { + Log.Verbose("Request had 0 amount, uploading now"); + + Task.Run(() => this.uploader.Upload(request)) + .ContinueWith( + (task) => Log.Error(task.Exception, "Market Board history data upload failed"), + TaskContinuationOptions.OnlyOnFaulted); + } + }, + ex => Log.Error(ex, "Failed to handle Market Board history data event")); } private IDisposable HandleMarketTaxRates() { return this.OnMarketTaxRates() - .Where(_ => this.configuration.IsMbCollect) - .Subscribe(taxes => - { - Log.Verbose( - "MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}", - taxes.LimsaLominsaTax, - taxes.GridaniaTax, - taxes.UldahTax, - taxes.IshgardTax, - taxes.KuganeTax, - taxes.CrystariumTax, - taxes.SharlayanTax); + .Where(this.ShouldUpload) + .Subscribe( + taxes => + { + Log.Verbose( + "MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}", + taxes.LimsaLominsaTax, + taxes.GridaniaTax, + taxes.UldahTax, + taxes.IshgardTax, + taxes.KuganeTax, + taxes.CrystariumTax, + taxes.SharlayanTax); - Task.Run(() => this.uploader.UploadTax(taxes)) - .ContinueWith( - task => Log.Error(task.Exception, "Market Board tax data upload failed."), - TaskContinuationOptions.OnlyOnFaulted); - }); + Task.Run(() => this.uploader.UploadTax(taxes)) + .ContinueWith( + task => Log.Error(task.Exception, "Market Board tax data upload failed"), + TaskContinuationOptions.OnlyOnFaulted); + }, + ex => Log.Error(ex, "Failed to handle Market Board tax data event")); } private IDisposable HandleMarketBoardPurchaseHandler() { return this.OnMarketBoardPurchaseHandler() .Where(_ => this.configuration.IsMbCollect) - .Subscribe(handler => { this.marketBoardPurchaseHandler = handler; }); + .Subscribe( + handler => { this.marketBoardPurchaseHandler = handler; }, + ex => Log.Error(ex, "Failed to handle Market Board purchase handler event")); } private IDisposable HandleMarketBoardPurchase() { return this.OnMarketBoardPurchase() .Where(_ => this.configuration.IsMbCollect) - .Subscribe(purchase => - { - if (this.marketBoardPurchaseHandler == null) - return; - - var sameQty = purchase.ItemQuantity == this.marketBoardPurchaseHandler.ItemQuantity; - var itemMatch = purchase.CatalogId == this.marketBoardPurchaseHandler.CatalogId; - var itemMatchHq = purchase.CatalogId == this.marketBoardPurchaseHandler.CatalogId + 1_000_000; - - // Transaction succeeded - if (sameQty && (itemMatch || itemMatchHq)) + .Subscribe( + purchase => { - Log.Verbose( - $"Bought {purchase.ItemQuantity}x {this.marketBoardPurchaseHandler.CatalogId} for {this.marketBoardPurchaseHandler.PricePerUnit * purchase.ItemQuantity} gils, listing id is {this.marketBoardPurchaseHandler.ListingId}"); + if (this.marketBoardPurchaseHandler == null) + return; - var handler = - this.marketBoardPurchaseHandler; // Capture the object so that we don't pass in a null one when the task starts. + var sameQty = purchase.ItemQuantity == this.marketBoardPurchaseHandler.ItemQuantity; + var itemMatch = purchase.CatalogId == this.marketBoardPurchaseHandler.CatalogId; + var itemMatchHq = purchase.CatalogId == this.marketBoardPurchaseHandler.CatalogId + 1_000_000; - Task.Run(() => this.uploader.UploadPurchase(handler)) - .ContinueWith( - task => Log.Error(task.Exception, "Market Board purchase data upload failed."), - TaskContinuationOptions.OnlyOnFaulted); - } + // Transaction succeeded + if (sameQty && (itemMatch || itemMatchHq)) + { + Log.Verbose( + $"Bought {purchase.ItemQuantity}x {this.marketBoardPurchaseHandler.CatalogId} for {this.marketBoardPurchaseHandler.PricePerUnit * purchase.ItemQuantity} gils, listing id is {this.marketBoardPurchaseHandler.ListingId}"); - this.marketBoardPurchaseHandler = null; - }); + var handler = + this.marketBoardPurchaseHandler; // Capture the object so that we don't pass in a null one when the task starts. + + Task.Run(() => this.uploader.UploadPurchase(handler)) + .ContinueWith( + task => Log.Error(task.Exception, "Market Board purchase data upload failed."), + TaskContinuationOptions.OnlyOnFaulted); + } + + this.marketBoardPurchaseHandler = null; + }, + ex => Log.Error(ex, "Failed to handle Market Board purchase event")); } private unsafe IDisposable HandleCfPop() { return this.OnCfNotifyPop() - .Subscribe(message => - { - using var stream = new UnmanagedMemoryStream((byte*)message.Data.ToPointer(), 64); - using var reader = new BinaryReader(stream); - - var notifyType = reader.ReadByte(); - stream.Position += 0x1B; - var conditionId = reader.ReadUInt16(); - - if (notifyType != 3) - return; - - var cfConditionSheet = message.DataManager!.GetExcelSheet()!; - var cfCondition = cfConditionSheet.GetRow(conditionId); - - if (cfCondition == null) + .Subscribe( + message => { - Log.Error($"CFC key {conditionId} not in Lumina data."); - return; - } + using var stream = new UnmanagedMemoryStream((byte*)message.Data.ToPointer(), 64); + using var reader = new BinaryReader(stream); - var cfcName = cfCondition.Name.ToString(); - if (cfcName.IsNullOrEmpty()) - { - cfcName = "Duty Roulette"; - cfCondition.Image = 112324; - } + var notifyType = reader.ReadByte(); + stream.Position += 0x1B; + var conditionId = reader.ReadUInt16(); - // Flash window - if (this.configuration.DutyFinderTaskbarFlash && !NativeFunctions.ApplicationIsActivated()) - { - var flashInfo = new NativeFunctions.FlashWindowInfo + if (notifyType != 3) + return; + + var cfConditionSheet = message.DataManager!.GetExcelSheet()!; + var cfCondition = cfConditionSheet.GetRow(conditionId); + + if (cfCondition == null) { - Size = (uint)Marshal.SizeOf(), - Count = uint.MaxValue, - Timeout = 0, - Flags = NativeFunctions.FlashWindow.All | NativeFunctions.FlashWindow.TimerNoFG, - Hwnd = Process.GetCurrentProcess().MainWindowHandle, - }; - NativeFunctions.FlashWindowEx(ref flashInfo); - } - - Task.Run(() => - { - if (this.configuration.DutyFinderChatMessage) - { - Service.GetNullable()?.Print($"Duty pop: {cfcName}"); + Log.Error("CFC key {ConditionId} not in Lumina data", conditionId); + return; } - this.CfPop.InvokeSafely(this, cfCondition); - }).ContinueWith( - task => Log.Error(task.Exception, "CfPop.Invoke failed."), - TaskContinuationOptions.OnlyOnFaulted); - }); + var cfcName = cfCondition.Name.ToString(); + if (cfcName.IsNullOrEmpty()) + { + cfcName = "Duty Roulette"; + cfCondition.Image = 112324; + } + + // Flash window + if (this.configuration.DutyFinderTaskbarFlash && !NativeFunctions.ApplicationIsActivated()) + { + var flashInfo = new NativeFunctions.FlashWindowInfo + { + Size = (uint)Marshal.SizeOf(), + Count = uint.MaxValue, + Timeout = 0, + Flags = NativeFunctions.FlashWindow.All | NativeFunctions.FlashWindow.TimerNoFG, + Hwnd = Process.GetCurrentProcess().MainWindowHandle, + }; + NativeFunctions.FlashWindowEx(ref flashInfo); + } + + Task.Run(() => + { + if (this.configuration.DutyFinderChatMessage) + { + Service.GetNullable()?.Print($"Duty pop: {cfcName}"); + } + + this.CfPop.InvokeSafely(this, cfCondition); + }).ContinueWith( + task => Log.Error(task.Exception, "CfPop.Invoke failed"), + TaskContinuationOptions.OnlyOnFaulted); + }, + ex => Log.Error(ex, "Failed to handle Market Board purchase event")); + } + + private bool ShouldUpload(T any) + { + return this.configuration.IsMbCollect; } private class NetworkMessage From 0f3a63420b4627f9e40f4c5afa4493e642435339 Mon Sep 17 00:00:00 2001 From: karashiiro <49822414+karashiiro@users.noreply.github.com> Date: Wed, 15 Feb 2023 18:30:24 -0800 Subject: [PATCH 2/4] feat(network): Combine purchase request and response event streams --- .../Game/Network/Internal/NetworkHandlers.cs | 42 ++++++------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/Dalamud/Game/Network/Internal/NetworkHandlers.cs b/Dalamud/Game/Network/Internal/NetworkHandlers.cs index f6ab30293..c87246cf2 100644 --- a/Dalamud/Game/Network/Internal/NetworkHandlers.cs +++ b/Dalamud/Game/Network/Internal/NetworkHandlers.cs @@ -36,14 +36,11 @@ internal class NetworkHandlers : IDisposable, IServiceType private readonly IDisposable handleMarketBoardHistory; private readonly IDisposable handleMarketTaxRates; private readonly IDisposable handleMarketBoardPurchaseHandler; - private readonly IDisposable handleMarketBoardPurchase; private readonly IDisposable handleCfPop; [ServiceManager.ServiceDependency] private readonly DalamudConfiguration configuration = Service.Get(); - private MarketBoardPurchaseHandler? marketBoardPurchaseHandler; - private bool disposing; [ServiceManager.ServiceConstructor] @@ -60,7 +57,6 @@ internal class NetworkHandlers : IDisposable, IServiceType this.handleMarketBoardHistory = this.HandleMarketBoardHistory(); this.handleMarketTaxRates = this.HandleMarketTaxRates(); this.handleMarketBoardPurchaseHandler = this.HandleMarketBoardPurchaseHandler(); - this.handleMarketBoardPurchase = this.HandleMarketBoardPurchase(); this.handleCfPop = this.HandleCfPop(); gameNetwork.NetworkMessage += this.ObserveNetworkMessage; @@ -94,7 +90,6 @@ internal class NetworkHandlers : IDisposable, IServiceType this.handleMarketBoardHistory.Dispose(); this.handleMarketTaxRates.Dispose(); this.handleMarketBoardPurchaseHandler.Dispose(); - this.handleMarketBoardPurchase.Dispose(); this.handleCfPop.Dispose(); } @@ -334,42 +329,31 @@ internal class NetworkHandlers : IDisposable, IServiceType private IDisposable HandleMarketBoardPurchaseHandler() { return this.OnMarketBoardPurchaseHandler() - .Where(_ => this.configuration.IsMbCollect) + .Where(this.ShouldUpload) + .Zip(this.OnMarketBoardPurchase().Where(this.ShouldUpload)) .Subscribe( - handler => { this.marketBoardPurchaseHandler = handler; }, - ex => Log.Error(ex, "Failed to handle Market Board purchase handler event")); - } - - private IDisposable HandleMarketBoardPurchase() - { - return this.OnMarketBoardPurchase() - .Where(_ => this.configuration.IsMbCollect) - .Subscribe( - purchase => + data => { - if (this.marketBoardPurchaseHandler == null) - return; + var (handler, purchase) = data; - var sameQty = purchase.ItemQuantity == this.marketBoardPurchaseHandler.ItemQuantity; - var itemMatch = purchase.CatalogId == this.marketBoardPurchaseHandler.CatalogId; - var itemMatchHq = purchase.CatalogId == this.marketBoardPurchaseHandler.CatalogId + 1_000_000; + var sameQty = purchase.ItemQuantity == handler.ItemQuantity; + var itemMatch = purchase.CatalogId == handler.CatalogId; + var itemMatchHq = purchase.CatalogId == handler.CatalogId + 1_000_000; // Transaction succeeded if (sameQty && (itemMatch || itemMatchHq)) { Log.Verbose( - $"Bought {purchase.ItemQuantity}x {this.marketBoardPurchaseHandler.CatalogId} for {this.marketBoardPurchaseHandler.PricePerUnit * purchase.ItemQuantity} gils, listing id is {this.marketBoardPurchaseHandler.ListingId}"); - - var handler = - this.marketBoardPurchaseHandler; // Capture the object so that we don't pass in a null one when the task starts. - + "Bought {PurchaseItemQuantity}x {HandlerCatalogId} for {HandlerPricePerUnit} gils, listing id is {HandlerListingId}", + purchase.ItemQuantity, + handler.CatalogId, + handler.PricePerUnit * purchase.ItemQuantity, + handler.ListingId); Task.Run(() => this.uploader.UploadPurchase(handler)) .ContinueWith( - task => Log.Error(task.Exception, "Market Board purchase data upload failed."), + task => Log.Error(task.Exception, "Market Board purchase data upload failed"), TaskContinuationOptions.OnlyOnFaulted); } - - this.marketBoardPurchaseHandler = null; }, ex => Log.Error(ex, "Failed to handle Market Board purchase event")); } From 6c8ed5c8d4f8bd27a30f89e82ee0e4c4d852f138 Mon Sep 17 00:00:00 2001 From: karashiiro <49822414+karashiiro@users.noreply.github.com> Date: Wed, 15 Feb 2023 22:48:46 -0800 Subject: [PATCH 3/4] chore(network): Remove redundant ShouldUpload check --- Dalamud/Game/Network/Internal/NetworkHandlers.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dalamud/Game/Network/Internal/NetworkHandlers.cs b/Dalamud/Game/Network/Internal/NetworkHandlers.cs index c87246cf2..27d5407ef 100644 --- a/Dalamud/Game/Network/Internal/NetworkHandlers.cs +++ b/Dalamud/Game/Network/Internal/NetworkHandlers.cs @@ -329,8 +329,8 @@ internal class NetworkHandlers : IDisposable, IServiceType private IDisposable HandleMarketBoardPurchaseHandler() { return this.OnMarketBoardPurchaseHandler() + .Zip(this.OnMarketBoardPurchase()) .Where(this.ShouldUpload) - .Zip(this.OnMarketBoardPurchase().Where(this.ShouldUpload)) .Subscribe( data => { From 93fbd69530b512f4980809544642f4f85558db43 Mon Sep 17 00:00:00 2001 From: karashiiro <49822414+karashiiro@users.noreply.github.com> Date: Thu, 16 Feb 2023 19:51:00 -0800 Subject: [PATCH 4/4] feat(network): Send market board listings/sales in one request This also removes a ton of state-management code that used to make it difficult to express this logic cleanly. --- .../MarketBoardItemRequest.cs | 13 +- ...est.cs => UniversalisItemUploadRequest.cs} | 10 +- .../UniversalisMarketBoardUploader.cs | 30 +-- .../Game/Network/Internal/NetworkHandlers.cs | 244 ++++++++---------- 4 files changed, 131 insertions(+), 166 deletions(-) rename Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/{UniversalisHistoryUploadRequest.cs => UniversalisItemUploadRequest.cs} (72%) diff --git a/Dalamud/Game/Network/Internal/MarketBoardUploaders/MarketBoardItemRequest.cs b/Dalamud/Game/Network/Internal/MarketBoardUploaders/MarketBoardItemRequest.cs index 18ed5c4b5..bc8043732 100644 --- a/Dalamud/Game/Network/Internal/MarketBoardUploaders/MarketBoardItemRequest.cs +++ b/Dalamud/Game/Network/Internal/MarketBoardUploaders/MarketBoardItemRequest.cs @@ -20,6 +20,16 @@ internal class MarketBoardItemRequest /// public uint CatalogId { get; private set; } + /// + /// Gets the request status. Nonzero statuses are errors. + /// + public uint Status { get; private set; } + + /// + /// Gets a value indicating whether or not this request was successful. + /// + public bool Ok => this.Status == 0; + /// /// Gets the amount to arrive. /// @@ -58,7 +68,8 @@ internal class MarketBoardItemRequest var output = new MarketBoardItemRequest(); output.CatalogId = reader.ReadUInt32(); - stream.Position += 0x7; + output.Status = reader.ReadUInt32(); + stream.Position += 0x3; output.AmountToArrive = reader.ReadByte(); return output; diff --git a/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisHistoryUploadRequest.cs b/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisItemUploadRequest.cs similarity index 72% rename from Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisHistoryUploadRequest.cs rename to Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisItemUploadRequest.cs index ba5e4ad47..d3032c160 100644 --- a/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisHistoryUploadRequest.cs +++ b/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisItemUploadRequest.cs @@ -7,7 +7,7 @@ namespace Dalamud.Game.Network.Internal.MarketBoardUploaders.Universalis.Types; /// /// A Universalis API structure. /// -internal class UniversalisHistoryUploadRequest +internal class UniversalisItemUploadRequest { /// /// Gets or sets the world ID. @@ -21,11 +21,17 @@ internal class UniversalisHistoryUploadRequest [JsonProperty("itemID")] public uint ItemId { get; set; } + /// + /// Gets or sets the list of available items. + /// + [JsonProperty("listings")] + public List Listings { get; set; } + /// /// Gets or sets the list of available entries. /// [JsonProperty("entries")] - public List Entries { get; set; } + public List Sales { get; set; } /// /// Gets or sets the uploader ID. diff --git a/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/UniversalisMarketBoardUploader.cs b/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/UniversalisMarketBoardUploader.cs index 790493129..b31c4d217 100644 --- a/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/UniversalisMarketBoardUploader.cs +++ b/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/UniversalisMarketBoardUploader.cs @@ -41,12 +41,13 @@ internal class UniversalisMarketBoardUploader : IMarketBoardUploader // ==================================================================================== - var listingsUploadObject = new UniversalisItemListingsUploadRequest + var uploadObject = new UniversalisItemUploadRequest { WorldId = clientState.LocalPlayer?.CurrentWorld.Id ?? 0, UploaderId = uploader.ToString(), ItemId = request.CatalogId, Listings = new List(), + Sales = new List(), }; foreach (var marketBoardItemListing in request.Listings) @@ -77,27 +78,12 @@ internal class UniversalisMarketBoardUploader : IMarketBoardUploader }); } - listingsUploadObject.Listings.Add(universalisListing); + uploadObject.Listings.Add(universalisListing); } - var listingPath = "/upload"; - var listingUpload = JsonConvert.SerializeObject(listingsUploadObject); - Log.Verbose("{ListingPath}: {ListingUpload}", listingPath, listingUpload); - await Util.HttpClient.PostAsync($"{ApiBase}{listingPath}/{ApiKey}", new StringContent(listingUpload, Encoding.UTF8, "application/json")); - - // ==================================================================================== - - var historyUploadObject = new UniversalisHistoryUploadRequest - { - WorldId = clientState.LocalPlayer?.CurrentWorld.Id ?? 0, - UploaderId = uploader.ToString(), - ItemId = request.CatalogId, - Entries = new List(), - }; - foreach (var marketBoardHistoryListing in request.History) { - historyUploadObject.Entries.Add(new UniversalisHistoryEntry + uploadObject.Sales.Add(new UniversalisHistoryEntry { BuyerName = marketBoardHistoryListing.BuyerName, Hq = marketBoardHistoryListing.IsHq, @@ -108,10 +94,10 @@ internal class UniversalisMarketBoardUploader : IMarketBoardUploader }); } - var historyPath = "/upload"; - var historyUpload = JsonConvert.SerializeObject(historyUploadObject); - Log.Verbose("{HistoryPath}: {HistoryUpload}", historyPath, historyUpload); - await Util.HttpClient.PostAsync($"{ApiBase}{historyPath}/{ApiKey}", new StringContent(historyUpload, Encoding.UTF8, "application/json")); + var uploadPath = "/upload"; + var uploadData = JsonConvert.SerializeObject(uploadObject); + Log.Verbose("{ListingPath}: {ListingUpload}", uploadPath, uploadData); + await Util.HttpClient.PostAsync($"{ApiBase}{uploadPath}/{ApiKey}", new StringContent(uploadData, Encoding.UTF8, "application/json")); // ==================================================================================== diff --git a/Dalamud/Game/Network/Internal/NetworkHandlers.cs b/Dalamud/Game/Network/Internal/NetworkHandlers.cs index 27d5407ef..4092e0f40 100644 --- a/Dalamud/Game/Network/Internal/NetworkHandlers.cs +++ b/Dalamud/Game/Network/Internal/NetworkHandlers.cs @@ -2,7 +2,6 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; -using System.Linq; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Runtime.InteropServices; @@ -28,12 +27,9 @@ internal class NetworkHandlers : IDisposable, IServiceType { private readonly IMarketBoardUploader uploader; - private readonly List marketBoardRequests; private readonly ISubject messages; private readonly IDisposable handleMarketBoardItemRequest; - private readonly IDisposable handleMarketBoardOfferings; - private readonly IDisposable handleMarketBoardHistory; private readonly IDisposable handleMarketTaxRates; private readonly IDisposable handleMarketBoardPurchaseHandler; private readonly IDisposable handleCfPop; @@ -47,14 +43,11 @@ internal class NetworkHandlers : IDisposable, IServiceType private NetworkHandlers(GameNetwork gameNetwork) { this.uploader = new UniversalisMarketBoardUploader(); - this.marketBoardRequests = new List(); this.CfPop = (_, _) => { }; this.messages = new Subject(); this.handleMarketBoardItemRequest = this.HandleMarketBoardItemRequest(); - this.handleMarketBoardOfferings = this.HandleMarketBoardOfferings(); - this.handleMarketBoardHistory = this.HandleMarketBoardHistory(); this.handleMarketTaxRates = this.HandleMarketTaxRates(); this.handleMarketBoardPurchaseHandler = this.HandleMarketBoardPurchaseHandler(); this.handleCfPop = this.HandleCfPop(); @@ -86,8 +79,6 @@ internal class NetworkHandlers : IDisposable, IServiceType return; this.handleMarketBoardItemRequest.Dispose(); - this.handleMarketBoardOfferings.Dispose(); - this.handleMarketBoardHistory.Dispose(); this.handleMarketTaxRates.Dispose(); this.handleMarketBoardPurchaseHandler.Dispose(); this.handleCfPop.Dispose(); @@ -113,7 +104,7 @@ internal class NetworkHandlers : IDisposable, IServiceType return this.messages.Where(message => message.DataManager?.IsDataReady == true); } - private IObservable OnMarketBoardItemRequest() + private IObservable OnMarketBoardItemRequestStart() { return this.OnNetworkMessage() .Where(message => message.Direction == NetworkMessageDirection.ZoneDown) @@ -176,129 +167,100 @@ internal class NetworkHandlers : IDisposable, IServiceType .Where(message => message.Opcode == message.DataManager?.ServerOpCodes["CfNotifyPop"]); } + private IObservable> OnMarketBoardListingsBatch( + IObservable start) + { + var startShared = start.Publish().RefCount(); + var offeringsObservable = this.OnMarketBoardOfferings().Publish().RefCount(); + + void LogStartObserved(MarketBoardItemRequest request) + { + Log.Verbose( + "Observed start of request for item#{CatalogId} with {NumListings} expected listings", + request.CatalogId, + request.AmountToArrive); + } + + void LogEndObserved(MarketBoardCurrentOfferings offerings) + { + Log.Verbose( + "Observed end of request {RequestId}", + offerings.RequestId); + } + + void LogOfferingsObserved(MarketBoardCurrentOfferings offerings) + { + Log.Verbose( + "Observed element of request {RequestId} with {NumListings} listings", + offerings.RequestId, + offerings.ItemListings.Count); + } + + IObservable UntilBatchEnd(MarketBoardItemRequest request) + { + var totalPackets = Convert.ToInt32(Math.Ceiling((double)request.AmountToArrive / 10)); + return offeringsObservable + .Skip(totalPackets - 1) + .Do(LogEndObserved); + } + + // When a start packet is observed, begin observing a window of listings packets + // according to the count described by the start packet. Aggregate the listings + // packets, and then flatten them to the listings themselves. + return offeringsObservable + .Do(LogOfferingsObserved) + .Window(startShared.Where(request => request.Ok).Do(LogStartObserved), UntilBatchEnd) + .SelectMany( + o => o.Aggregate( + new List(), + (agg, next) => + { + agg.AddRange(next.ItemListings); + return agg; + })); + } + + private IObservable> OnMarketBoardSalesBatch() + { + return this.OnMarketBoardHistory().Select(history => history.HistoryListings); + } + private IDisposable HandleMarketBoardItemRequest() { - return this.OnMarketBoardItemRequest() - .Where(this.ShouldUpload) - .Subscribe( - request => - { - this.marketBoardRequests.Add(request); - Log.Verbose($"NEW MB REQUEST START: item#{request.CatalogId} amount#{request.AmountToArrive}"); - }, - ex => Log.Error(ex, "Failed to handle Market Board item request event")); + var startObservable = this.OnMarketBoardItemRequestStart(); + return Observable.When( + startObservable + .And(this.OnMarketBoardSalesBatch()) + .And(this.OnMarketBoardListingsBatch(startObservable)) + .Then((request, sales, listings) => (request, sales, listings))) + .Where(this.ShouldUpload) + .Subscribe( + data => + { + var (request, sales, listings) = data; + this.UploadMarketBoardData(request, sales, listings); + }, + ex => Log.Error(ex, "Failed to handle Market Board item request event")); } - private IDisposable HandleMarketBoardOfferings() + private void UploadMarketBoardData( + MarketBoardItemRequest request, + ICollection sales, + ICollection listings) { - return this.OnMarketBoardOfferings() - .Where(this.ShouldUpload) - .Subscribe( - listing => - { - var request = - this.marketBoardRequests.LastOrDefault( - r => r.CatalogId == listing.ItemListings[0].CatalogId && !r.IsDone); + Log.Verbose( + "Market Board request resolved, starting upload: item#{CatalogId} listings#{ListingsObserved} sales#{SalesObserved}", + request.CatalogId, + listings.Count, + sales.Count); - if (request == default) - { - Log.Error( - $"Market Board data arrived without a corresponding request: item#{listing.ItemListings[0].CatalogId}"); - return; - } + request.Listings.AddRange(listings); + request.History.AddRange(sales); - if (request.Listings.Count + listing.ItemListings.Count > request.AmountToArrive) - { - Log.Error( - $"Too many Market Board listings received for request: {request.Listings.Count + listing.ItemListings.Count} > {request.AmountToArrive} item#{listing.ItemListings[0].CatalogId}"); - return; - } - - if (request.ListingsRequestId != -1 && request.ListingsRequestId != listing.RequestId) - { - Log.Error( - $"Non-matching RequestIds for Market Board data request: {request.ListingsRequestId}, {listing.RequestId}"); - return; - } - - if (request.ListingsRequestId == -1 && request.Listings.Count > 0) - { - Log.Error( - $"Market Board data request sequence break: {request.ListingsRequestId}, {request.Listings.Count}"); - return; - } - - if (request.ListingsRequestId == -1) - { - request.ListingsRequestId = listing.RequestId; - Log.Verbose($"First Market Board packet in sequence: {listing.RequestId}"); - } - - request.Listings.AddRange(listing.ItemListings); - - Log.Verbose( - "Added {0} ItemListings to request#{1}, now {2}/{3}, item#{4}", - listing.ItemListings.Count, - request.ListingsRequestId, - request.Listings.Count, - request.AmountToArrive, - request.CatalogId); - - if (request.IsDone) - { - Log.Verbose( - "Market Board request finished, starting upload: request#{0} item#{1} amount#{2}", - request.ListingsRequestId, - request.CatalogId, - request.AmountToArrive); - - Task.Run(() => this.uploader.Upload(request)) - .ContinueWith( - task => Log.Error(task.Exception, "Market Board offerings data upload failed"), - TaskContinuationOptions.OnlyOnFaulted); - } - }, - ex => Log.Error(ex, "Failed to handle Market Board offerings data event")); - } - - private IDisposable HandleMarketBoardHistory() - { - return this.OnMarketBoardHistory() - .Where(this.ShouldUpload) - .Subscribe( - listing => - { - var request = this.marketBoardRequests.LastOrDefault(r => r.CatalogId == listing.CatalogId); - - if (request == default) - { - Log.Error( - $"Market Board data arrived without a corresponding request: item#{listing.CatalogId}"); - return; - } - - if (request.ListingsRequestId != -1) - { - Log.Error( - $"Market Board data history sequence break: {request.ListingsRequestId}, {request.Listings.Count}"); - return; - } - - request.History.AddRange(listing.HistoryListings); - - Log.Verbose("Added history for item#{0}", listing.CatalogId); - - if (request.AmountToArrive == 0) - { - Log.Verbose("Request had 0 amount, uploading now"); - - Task.Run(() => this.uploader.Upload(request)) - .ContinueWith( - (task) => Log.Error(task.Exception, "Market Board history data upload failed"), - TaskContinuationOptions.OnlyOnFaulted); - } - }, - ex => Log.Error(ex, "Failed to handle Market Board history data event")); + Task.Run(() => this.uploader.Upload(request)) + .ContinueWith( + task => Log.Error(task.Exception, "Market Board offerings data upload failed"), + TaskContinuationOptions.OnlyOnFaulted); } private IDisposable HandleMarketTaxRates() @@ -307,22 +269,22 @@ internal class NetworkHandlers : IDisposable, IServiceType .Where(this.ShouldUpload) .Subscribe( taxes => - { - Log.Verbose( - "MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}", - taxes.LimsaLominsaTax, - taxes.GridaniaTax, - taxes.UldahTax, - taxes.IshgardTax, - taxes.KuganeTax, - taxes.CrystariumTax, - taxes.SharlayanTax); + { + Log.Verbose( + "MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}", + taxes.LimsaLominsaTax, + taxes.GridaniaTax, + taxes.UldahTax, + taxes.IshgardTax, + taxes.KuganeTax, + taxes.CrystariumTax, + taxes.SharlayanTax); - Task.Run(() => this.uploader.UploadTax(taxes)) - .ContinueWith( - task => Log.Error(task.Exception, "Market Board tax data upload failed"), - TaskContinuationOptions.OnlyOnFaulted); - }, + Task.Run(() => this.uploader.UploadTax(taxes)) + .ContinueWith( + task => Log.Error(task.Exception, "Market Board tax data upload failed"), + TaskContinuationOptions.OnlyOnFaulted); + }, ex => Log.Error(ex, "Failed to handle Market Board tax data event")); }