diff --git a/Dalamud/Game/Network/Internal/NetworkHandlers.cs b/Dalamud/Game/Network/Internal/NetworkHandlers.cs index 9d552b519..ef8090d1c 100644 --- a/Dalamud/Game/Network/Internal/NetworkHandlers.cs +++ b/Dalamud/Game/Network/Internal/NetworkHandlers.cs @@ -2,8 +2,8 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Reactive.Linq; -using System.Reactive.Subjects; using System.Runtime.InteropServices; using System.Threading.Tasks; @@ -62,10 +62,7 @@ internal class NetworkHandlers : IDisposable, IServiceType } gameNetwork.NetworkMessage += Observe; - return () => - { - gameNetwork.NetworkMessage -= Observe; - }; + return () => { gameNetwork.NetworkMessage -= Observe; }; }); this.handleMarketBoardItemRequest = this.HandleMarketBoardItemRequest(); @@ -174,17 +171,8 @@ internal class NetworkHandlers : IDisposable, IServiceType private IObservable> OnMarketBoardListingsBatch( IObservable start) { - var startShared = start.Publish().RefCount(); var offeringsObservable = this.OnMarketBoardOfferings().Publish().RefCount(); - void LogStartObserved(MarketBoardItemRequest request) - { - Log.Verbose( - "Observed start of request for item#{CatalogId} with {NumListings} expected listings", - request.CatalogId, - request.AmountToArrive); - } - void LogEndObserved(MarketBoardCurrentOfferings offerings) { Log.Verbose( @@ -204,6 +192,7 @@ internal class NetworkHandlers : IDisposable, IServiceType { var totalPackets = Convert.ToInt32(Math.Ceiling((double)request.AmountToArrive / 10)); return offeringsObservable + .Where(offerings => offerings.ItemListings.All(l => l.CatalogId == request.CatalogId)) .Skip(totalPackets - 1) .Do(LogEndObserved); } @@ -213,7 +202,7 @@ internal class NetworkHandlers : IDisposable, IServiceType // packets, and then flatten them to the listings themselves. return offeringsObservable .Do(LogOfferingsObserved) - .Window(startShared.Where(request => request.Ok).Do(LogStartObserved), UntilBatchEnd) + .Window(start, UntilBatchEnd) .SelectMany( o => o.Aggregate( new List(), @@ -224,17 +213,59 @@ internal class NetworkHandlers : IDisposable, IServiceType })); } - private IObservable> OnMarketBoardSalesBatch() + private IObservable> OnMarketBoardSalesBatch( + IObservable start) { - return this.OnMarketBoardHistory().Select(history => history.HistoryListings); + var historyObservable = this.OnMarketBoardHistory().Publish().RefCount(); + + void LogHistoryObserved(MarketBoardHistory history) + { + Log.Verbose( + "Observed history for item {CatalogId} with {NumSales} sales", + history.CatalogId, + history.HistoryListings.Count); + } + + IObservable UntilBatchEnd(MarketBoardItemRequest request) + { + return historyObservable + .Where(history => history.CatalogId == request.CatalogId) + .Take(1); + } + + // When a start packet is observed, begin observing a window of history packets. + // We should only get one packet, which the window closing function ensures. + // This packet is flattened to its sale entries and emitted. + return historyObservable + .Do(LogHistoryObserved) + .Window(start, UntilBatchEnd) + .SelectMany( + o => o.Aggregate( + new List(), + (agg, next) => + { + agg.AddRange(next.HistoryListings); + return agg; + })); } private IDisposable HandleMarketBoardItemRequest() { - var startObservable = this.OnMarketBoardItemRequestStart(); + void LogStartObserved(MarketBoardItemRequest request) + { + Log.Verbose( + "Observed start of request for item#{CatalogId} with {NumListings} expected listings", + request.CatalogId, + request.AmountToArrive); + } + + var startObservable = this.OnMarketBoardItemRequestStart() + .Where(request => request.Ok).Do(LogStartObserved) + .Publish() + .RefCount(); return Observable.When( startObservable - .And(this.OnMarketBoardSalesBatch()) + .And(this.OnMarketBoardSalesBatch(startObservable)) .And(this.OnMarketBoardListingsBatch(startObservable)) .Then((request, sales, listings) => (request, sales, listings))) .Where(this.ShouldUpload) @@ -252,6 +283,18 @@ internal class NetworkHandlers : IDisposable, IServiceType ICollection sales, ICollection listings) { + if (listings.Count != request.AmountToArrive) + { + Log.Error("Wrong number of Market Board listings received for request: {ListingsCount} != {RequestAmountToArrive} item#{RequestCatalogId}", listings.Count, request.AmountToArrive, request.CatalogId); + return; + } + + if (listings.Any(listing => listing.CatalogId != request.CatalogId)) + { + Log.Error("Received listings with mismatched item IDs for item#{RequestCatalogId}", request.CatalogId); + return; + } + Log.Verbose( "Market Board request resolved, starting upload: item#{CatalogId} listings#{ListingsObserved} sales#{SalesObserved}", request.CatalogId,