diff --git a/Dalamud/Game/Network/Internal/MarketBoardUploaders/MarketBoardItemRequest.cs b/Dalamud/Game/Network/Internal/MarketBoardUploaders/MarketBoardItemRequest.cs index 18ed5c4b5..bc8043732 100644 --- a/Dalamud/Game/Network/Internal/MarketBoardUploaders/MarketBoardItemRequest.cs +++ b/Dalamud/Game/Network/Internal/MarketBoardUploaders/MarketBoardItemRequest.cs @@ -20,6 +20,16 @@ internal class MarketBoardItemRequest /// public uint CatalogId { get; private set; } + /// + /// Gets the request status. Nonzero statuses are errors. + /// + public uint Status { get; private set; } + + /// + /// Gets a value indicating whether or not this request was successful. + /// + public bool Ok => this.Status == 0; + /// /// Gets the amount to arrive. /// @@ -58,7 +68,8 @@ internal class MarketBoardItemRequest var output = new MarketBoardItemRequest(); output.CatalogId = reader.ReadUInt32(); - stream.Position += 0x7; + output.Status = reader.ReadUInt32(); + stream.Position += 0x3; output.AmountToArrive = reader.ReadByte(); return output; diff --git a/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisHistoryUploadRequest.cs b/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisItemUploadRequest.cs similarity index 72% rename from Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisHistoryUploadRequest.cs rename to Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisItemUploadRequest.cs index ba5e4ad47..d3032c160 100644 --- a/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisHistoryUploadRequest.cs +++ b/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/Types/UniversalisItemUploadRequest.cs @@ -7,7 +7,7 @@ namespace Dalamud.Game.Network.Internal.MarketBoardUploaders.Universalis.Types; /// /// A Universalis API structure. /// -internal class UniversalisHistoryUploadRequest +internal class UniversalisItemUploadRequest { /// /// Gets or sets the world ID. @@ -21,11 +21,17 @@ internal class UniversalisHistoryUploadRequest [JsonProperty("itemID")] public uint ItemId { get; set; } + /// + /// Gets or sets the list of available items. + /// + [JsonProperty("listings")] + public List Listings { get; set; } + /// /// Gets or sets the list of available entries. /// [JsonProperty("entries")] - public List Entries { get; set; } + public List Sales { get; set; } /// /// Gets or sets the uploader ID. diff --git a/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/UniversalisMarketBoardUploader.cs b/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/UniversalisMarketBoardUploader.cs index 790493129..b31c4d217 100644 --- a/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/UniversalisMarketBoardUploader.cs +++ b/Dalamud/Game/Network/Internal/MarketBoardUploaders/Universalis/UniversalisMarketBoardUploader.cs @@ -41,12 +41,13 @@ internal class UniversalisMarketBoardUploader : IMarketBoardUploader // ==================================================================================== - var listingsUploadObject = new UniversalisItemListingsUploadRequest + var uploadObject = new UniversalisItemUploadRequest { WorldId = clientState.LocalPlayer?.CurrentWorld.Id ?? 0, UploaderId = uploader.ToString(), ItemId = request.CatalogId, Listings = new List(), + Sales = new List(), }; foreach (var marketBoardItemListing in request.Listings) @@ -77,27 +78,12 @@ internal class UniversalisMarketBoardUploader : IMarketBoardUploader }); } - listingsUploadObject.Listings.Add(universalisListing); + uploadObject.Listings.Add(universalisListing); } - var listingPath = "/upload"; - var listingUpload = JsonConvert.SerializeObject(listingsUploadObject); - Log.Verbose("{ListingPath}: {ListingUpload}", listingPath, listingUpload); - await Util.HttpClient.PostAsync($"{ApiBase}{listingPath}/{ApiKey}", new StringContent(listingUpload, Encoding.UTF8, "application/json")); - - // ==================================================================================== - - var historyUploadObject = new UniversalisHistoryUploadRequest - { - WorldId = clientState.LocalPlayer?.CurrentWorld.Id ?? 0, - UploaderId = uploader.ToString(), - ItemId = request.CatalogId, - Entries = new List(), - }; - foreach (var marketBoardHistoryListing in request.History) { - historyUploadObject.Entries.Add(new UniversalisHistoryEntry + uploadObject.Sales.Add(new UniversalisHistoryEntry { BuyerName = marketBoardHistoryListing.BuyerName, Hq = marketBoardHistoryListing.IsHq, @@ -108,10 +94,10 @@ internal class UniversalisMarketBoardUploader : IMarketBoardUploader }); } - var historyPath = "/upload"; - var historyUpload = JsonConvert.SerializeObject(historyUploadObject); - Log.Verbose("{HistoryPath}: {HistoryUpload}", historyPath, historyUpload); - await Util.HttpClient.PostAsync($"{ApiBase}{historyPath}/{ApiKey}", new StringContent(historyUpload, Encoding.UTF8, "application/json")); + var uploadPath = "/upload"; + var uploadData = JsonConvert.SerializeObject(uploadObject); + Log.Verbose("{ListingPath}: {ListingUpload}", uploadPath, uploadData); + await Util.HttpClient.PostAsync($"{ApiBase}{uploadPath}/{ApiKey}", new StringContent(uploadData, Encoding.UTF8, "application/json")); // ==================================================================================== diff --git a/Dalamud/Game/Network/Internal/NetworkHandlers.cs b/Dalamud/Game/Network/Internal/NetworkHandlers.cs index 27d5407ef..4092e0f40 100644 --- a/Dalamud/Game/Network/Internal/NetworkHandlers.cs +++ b/Dalamud/Game/Network/Internal/NetworkHandlers.cs @@ -2,7 +2,6 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; -using System.Linq; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Runtime.InteropServices; @@ -28,12 +27,9 @@ internal class NetworkHandlers : IDisposable, IServiceType { private readonly IMarketBoardUploader uploader; - private readonly List marketBoardRequests; private readonly ISubject messages; private readonly IDisposable handleMarketBoardItemRequest; - private readonly IDisposable handleMarketBoardOfferings; - private readonly IDisposable handleMarketBoardHistory; private readonly IDisposable handleMarketTaxRates; private readonly IDisposable handleMarketBoardPurchaseHandler; private readonly IDisposable handleCfPop; @@ -47,14 +43,11 @@ internal class NetworkHandlers : IDisposable, IServiceType private NetworkHandlers(GameNetwork gameNetwork) { this.uploader = new UniversalisMarketBoardUploader(); - this.marketBoardRequests = new List(); this.CfPop = (_, _) => { }; this.messages = new Subject(); this.handleMarketBoardItemRequest = this.HandleMarketBoardItemRequest(); - this.handleMarketBoardOfferings = this.HandleMarketBoardOfferings(); - this.handleMarketBoardHistory = this.HandleMarketBoardHistory(); this.handleMarketTaxRates = this.HandleMarketTaxRates(); this.handleMarketBoardPurchaseHandler = this.HandleMarketBoardPurchaseHandler(); this.handleCfPop = this.HandleCfPop(); @@ -86,8 +79,6 @@ internal class NetworkHandlers : IDisposable, IServiceType return; this.handleMarketBoardItemRequest.Dispose(); - this.handleMarketBoardOfferings.Dispose(); - this.handleMarketBoardHistory.Dispose(); this.handleMarketTaxRates.Dispose(); this.handleMarketBoardPurchaseHandler.Dispose(); this.handleCfPop.Dispose(); @@ -113,7 +104,7 @@ internal class NetworkHandlers : IDisposable, IServiceType return this.messages.Where(message => message.DataManager?.IsDataReady == true); } - private IObservable OnMarketBoardItemRequest() + private IObservable OnMarketBoardItemRequestStart() { return this.OnNetworkMessage() .Where(message => message.Direction == NetworkMessageDirection.ZoneDown) @@ -176,129 +167,100 @@ internal class NetworkHandlers : IDisposable, IServiceType .Where(message => message.Opcode == message.DataManager?.ServerOpCodes["CfNotifyPop"]); } + private IObservable> OnMarketBoardListingsBatch( + IObservable start) + { + var startShared = start.Publish().RefCount(); + var offeringsObservable = this.OnMarketBoardOfferings().Publish().RefCount(); + + void LogStartObserved(MarketBoardItemRequest request) + { + Log.Verbose( + "Observed start of request for item#{CatalogId} with {NumListings} expected listings", + request.CatalogId, + request.AmountToArrive); + } + + void LogEndObserved(MarketBoardCurrentOfferings offerings) + { + Log.Verbose( + "Observed end of request {RequestId}", + offerings.RequestId); + } + + void LogOfferingsObserved(MarketBoardCurrentOfferings offerings) + { + Log.Verbose( + "Observed element of request {RequestId} with {NumListings} listings", + offerings.RequestId, + offerings.ItemListings.Count); + } + + IObservable UntilBatchEnd(MarketBoardItemRequest request) + { + var totalPackets = Convert.ToInt32(Math.Ceiling((double)request.AmountToArrive / 10)); + return offeringsObservable + .Skip(totalPackets - 1) + .Do(LogEndObserved); + } + + // When a start packet is observed, begin observing a window of listings packets + // according to the count described by the start packet. Aggregate the listings + // packets, and then flatten them to the listings themselves. + return offeringsObservable + .Do(LogOfferingsObserved) + .Window(startShared.Where(request => request.Ok).Do(LogStartObserved), UntilBatchEnd) + .SelectMany( + o => o.Aggregate( + new List(), + (agg, next) => + { + agg.AddRange(next.ItemListings); + return agg; + })); + } + + private IObservable> OnMarketBoardSalesBatch() + { + return this.OnMarketBoardHistory().Select(history => history.HistoryListings); + } + private IDisposable HandleMarketBoardItemRequest() { - return this.OnMarketBoardItemRequest() - .Where(this.ShouldUpload) - .Subscribe( - request => - { - this.marketBoardRequests.Add(request); - Log.Verbose($"NEW MB REQUEST START: item#{request.CatalogId} amount#{request.AmountToArrive}"); - }, - ex => Log.Error(ex, "Failed to handle Market Board item request event")); + var startObservable = this.OnMarketBoardItemRequestStart(); + return Observable.When( + startObservable + .And(this.OnMarketBoardSalesBatch()) + .And(this.OnMarketBoardListingsBatch(startObservable)) + .Then((request, sales, listings) => (request, sales, listings))) + .Where(this.ShouldUpload) + .Subscribe( + data => + { + var (request, sales, listings) = data; + this.UploadMarketBoardData(request, sales, listings); + }, + ex => Log.Error(ex, "Failed to handle Market Board item request event")); } - private IDisposable HandleMarketBoardOfferings() + private void UploadMarketBoardData( + MarketBoardItemRequest request, + ICollection sales, + ICollection listings) { - return this.OnMarketBoardOfferings() - .Where(this.ShouldUpload) - .Subscribe( - listing => - { - var request = - this.marketBoardRequests.LastOrDefault( - r => r.CatalogId == listing.ItemListings[0].CatalogId && !r.IsDone); + Log.Verbose( + "Market Board request resolved, starting upload: item#{CatalogId} listings#{ListingsObserved} sales#{SalesObserved}", + request.CatalogId, + listings.Count, + sales.Count); - if (request == default) - { - Log.Error( - $"Market Board data arrived without a corresponding request: item#{listing.ItemListings[0].CatalogId}"); - return; - } + request.Listings.AddRange(listings); + request.History.AddRange(sales); - if (request.Listings.Count + listing.ItemListings.Count > request.AmountToArrive) - { - Log.Error( - $"Too many Market Board listings received for request: {request.Listings.Count + listing.ItemListings.Count} > {request.AmountToArrive} item#{listing.ItemListings[0].CatalogId}"); - return; - } - - if (request.ListingsRequestId != -1 && request.ListingsRequestId != listing.RequestId) - { - Log.Error( - $"Non-matching RequestIds for Market Board data request: {request.ListingsRequestId}, {listing.RequestId}"); - return; - } - - if (request.ListingsRequestId == -1 && request.Listings.Count > 0) - { - Log.Error( - $"Market Board data request sequence break: {request.ListingsRequestId}, {request.Listings.Count}"); - return; - } - - if (request.ListingsRequestId == -1) - { - request.ListingsRequestId = listing.RequestId; - Log.Verbose($"First Market Board packet in sequence: {listing.RequestId}"); - } - - request.Listings.AddRange(listing.ItemListings); - - Log.Verbose( - "Added {0} ItemListings to request#{1}, now {2}/{3}, item#{4}", - listing.ItemListings.Count, - request.ListingsRequestId, - request.Listings.Count, - request.AmountToArrive, - request.CatalogId); - - if (request.IsDone) - { - Log.Verbose( - "Market Board request finished, starting upload: request#{0} item#{1} amount#{2}", - request.ListingsRequestId, - request.CatalogId, - request.AmountToArrive); - - Task.Run(() => this.uploader.Upload(request)) - .ContinueWith( - task => Log.Error(task.Exception, "Market Board offerings data upload failed"), - TaskContinuationOptions.OnlyOnFaulted); - } - }, - ex => Log.Error(ex, "Failed to handle Market Board offerings data event")); - } - - private IDisposable HandleMarketBoardHistory() - { - return this.OnMarketBoardHistory() - .Where(this.ShouldUpload) - .Subscribe( - listing => - { - var request = this.marketBoardRequests.LastOrDefault(r => r.CatalogId == listing.CatalogId); - - if (request == default) - { - Log.Error( - $"Market Board data arrived without a corresponding request: item#{listing.CatalogId}"); - return; - } - - if (request.ListingsRequestId != -1) - { - Log.Error( - $"Market Board data history sequence break: {request.ListingsRequestId}, {request.Listings.Count}"); - return; - } - - request.History.AddRange(listing.HistoryListings); - - Log.Verbose("Added history for item#{0}", listing.CatalogId); - - if (request.AmountToArrive == 0) - { - Log.Verbose("Request had 0 amount, uploading now"); - - Task.Run(() => this.uploader.Upload(request)) - .ContinueWith( - (task) => Log.Error(task.Exception, "Market Board history data upload failed"), - TaskContinuationOptions.OnlyOnFaulted); - } - }, - ex => Log.Error(ex, "Failed to handle Market Board history data event")); + Task.Run(() => this.uploader.Upload(request)) + .ContinueWith( + task => Log.Error(task.Exception, "Market Board offerings data upload failed"), + TaskContinuationOptions.OnlyOnFaulted); } private IDisposable HandleMarketTaxRates() @@ -307,22 +269,22 @@ internal class NetworkHandlers : IDisposable, IServiceType .Where(this.ShouldUpload) .Subscribe( taxes => - { - Log.Verbose( - "MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}", - taxes.LimsaLominsaTax, - taxes.GridaniaTax, - taxes.UldahTax, - taxes.IshgardTax, - taxes.KuganeTax, - taxes.CrystariumTax, - taxes.SharlayanTax); + { + Log.Verbose( + "MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}", + taxes.LimsaLominsaTax, + taxes.GridaniaTax, + taxes.UldahTax, + taxes.IshgardTax, + taxes.KuganeTax, + taxes.CrystariumTax, + taxes.SharlayanTax); - Task.Run(() => this.uploader.UploadTax(taxes)) - .ContinueWith( - task => Log.Error(task.Exception, "Market Board tax data upload failed"), - TaskContinuationOptions.OnlyOnFaulted); - }, + Task.Run(() => this.uploader.UploadTax(taxes)) + .ContinueWith( + task => Log.Error(task.Exception, "Market Board tax data upload failed"), + TaskContinuationOptions.OnlyOnFaulted); + }, ex => Log.Error(ex, "Failed to handle Market Board tax data event")); }