From 20b3a75fc65f44fb835684e3ff52c4f2247c1ca3 Mon Sep 17 00:00:00 2001 From: karashiiro <49822414+karashiiro@users.noreply.github.com> Date: Sun, 19 Feb 2023 08:12:23 -0800 Subject: [PATCH] fix(network): Ensure correct MB history packets are being bundled This injects the start packet into the history observable to use it for validating the item ID in the history packets. Also adds back some other validation as a safeguard. --- .../Game/Network/Internal/NetworkHandlers.cs | 81 ++++++++++++++----- 1 file changed, 62 insertions(+), 19 deletions(-) diff --git a/Dalamud/Game/Network/Internal/NetworkHandlers.cs b/Dalamud/Game/Network/Internal/NetworkHandlers.cs index 9d552b519..ef8090d1c 100644 --- a/Dalamud/Game/Network/Internal/NetworkHandlers.cs +++ b/Dalamud/Game/Network/Internal/NetworkHandlers.cs @@ -2,8 +2,8 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Reactive.Linq; -using System.Reactive.Subjects; using System.Runtime.InteropServices; using System.Threading.Tasks; @@ -62,10 +62,7 @@ internal class NetworkHandlers : IDisposable, IServiceType } gameNetwork.NetworkMessage += Observe; - return () => - { - gameNetwork.NetworkMessage -= Observe; - }; + return () => { gameNetwork.NetworkMessage -= Observe; }; }); this.handleMarketBoardItemRequest = this.HandleMarketBoardItemRequest(); @@ -174,17 +171,8 @@ internal class NetworkHandlers : IDisposable, IServiceType private IObservable> OnMarketBoardListingsBatch( IObservable start) { - var startShared = start.Publish().RefCount(); var offeringsObservable = this.OnMarketBoardOfferings().Publish().RefCount(); - void LogStartObserved(MarketBoardItemRequest request) - { - Log.Verbose( - "Observed start of request for item#{CatalogId} with {NumListings} expected listings", - request.CatalogId, - request.AmountToArrive); - } - void LogEndObserved(MarketBoardCurrentOfferings offerings) { Log.Verbose( @@ -204,6 +192,7 @@ internal class NetworkHandlers : IDisposable, IServiceType { var totalPackets = Convert.ToInt32(Math.Ceiling((double)request.AmountToArrive / 10)); return offeringsObservable + .Where(offerings => offerings.ItemListings.All(l => l.CatalogId == request.CatalogId)) .Skip(totalPackets - 1) .Do(LogEndObserved); } @@ -213,7 +202,7 @@ internal class NetworkHandlers : IDisposable, IServiceType // packets, and then flatten them to the listings themselves. return offeringsObservable .Do(LogOfferingsObserved) - .Window(startShared.Where(request => request.Ok).Do(LogStartObserved), UntilBatchEnd) + .Window(start, UntilBatchEnd) .SelectMany( o => o.Aggregate( new List(), @@ -224,17 +213,59 @@ internal class NetworkHandlers : IDisposable, IServiceType })); } - private IObservable> OnMarketBoardSalesBatch() + private IObservable> OnMarketBoardSalesBatch( + IObservable start) { - return this.OnMarketBoardHistory().Select(history => history.HistoryListings); + var historyObservable = this.OnMarketBoardHistory().Publish().RefCount(); + + void LogHistoryObserved(MarketBoardHistory history) + { + Log.Verbose( + "Observed history for item {CatalogId} with {NumSales} sales", + history.CatalogId, + history.HistoryListings.Count); + } + + IObservable UntilBatchEnd(MarketBoardItemRequest request) + { + return historyObservable + .Where(history => history.CatalogId == request.CatalogId) + .Take(1); + } + + // When a start packet is observed, begin observing a window of history packets. + // We should only get one packet, which the window closing function ensures. + // This packet is flattened to its sale entries and emitted. + return historyObservable + .Do(LogHistoryObserved) + .Window(start, UntilBatchEnd) + .SelectMany( + o => o.Aggregate( + new List(), + (agg, next) => + { + agg.AddRange(next.HistoryListings); + return agg; + })); } private IDisposable HandleMarketBoardItemRequest() { - var startObservable = this.OnMarketBoardItemRequestStart(); + void LogStartObserved(MarketBoardItemRequest request) + { + Log.Verbose( + "Observed start of request for item#{CatalogId} with {NumListings} expected listings", + request.CatalogId, + request.AmountToArrive); + } + + var startObservable = this.OnMarketBoardItemRequestStart() + .Where(request => request.Ok).Do(LogStartObserved) + .Publish() + .RefCount(); return Observable.When( startObservable - .And(this.OnMarketBoardSalesBatch()) + .And(this.OnMarketBoardSalesBatch(startObservable)) .And(this.OnMarketBoardListingsBatch(startObservable)) .Then((request, sales, listings) => (request, sales, listings))) .Where(this.ShouldUpload) @@ -252,6 +283,18 @@ internal class NetworkHandlers : IDisposable, IServiceType ICollection sales, ICollection listings) { + if (listings.Count != request.AmountToArrive) + { + Log.Error("Wrong number of Market Board listings received for request: {ListingsCount} != {RequestAmountToArrive} item#{RequestCatalogId}", listings.Count, request.AmountToArrive, request.CatalogId); + return; + } + + if (listings.Any(listing => listing.CatalogId != request.CatalogId)) + { + Log.Error("Received listings with mismatched item IDs for item#{RequestCatalogId}", request.CatalogId); + return; + } + Log.Verbose( "Market Board request resolved, starting upload: item#{CatalogId} listings#{ListingsObserved} sales#{SalesObserved}", request.CatalogId,