mirror of
https://github.com/goatcorp/Dalamud.git
synced 2025-12-12 18:27:23 +01:00
fix(network): Ensure correct MB history packets are being bundled
This injects the start packet into the history observable to use it for validating the item ID in the history packets. Also adds back some other validation as a safeguard.
This commit is contained in:
parent
be3fd993d1
commit
20b3a75fc6
1 changed files with 62 additions and 19 deletions
|
|
@ -2,8 +2,8 @@ using System;
|
|||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Reactive.Linq;
|
||||
using System.Reactive.Subjects;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
|
|
@ -62,10 +62,7 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
}
|
||||
|
||||
gameNetwork.NetworkMessage += Observe;
|
||||
return () =>
|
||||
{
|
||||
gameNetwork.NetworkMessage -= Observe;
|
||||
};
|
||||
return () => { gameNetwork.NetworkMessage -= Observe; };
|
||||
});
|
||||
|
||||
this.handleMarketBoardItemRequest = this.HandleMarketBoardItemRequest();
|
||||
|
|
@ -174,17 +171,8 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
private IObservable<List<MarketBoardCurrentOfferings.MarketBoardItemListing>> OnMarketBoardListingsBatch(
|
||||
IObservable<MarketBoardItemRequest> start)
|
||||
{
|
||||
var startShared = start.Publish().RefCount();
|
||||
var offeringsObservable = this.OnMarketBoardOfferings().Publish().RefCount();
|
||||
|
||||
void LogStartObserved(MarketBoardItemRequest request)
|
||||
{
|
||||
Log.Verbose(
|
||||
"Observed start of request for item#{CatalogId} with {NumListings} expected listings",
|
||||
request.CatalogId,
|
||||
request.AmountToArrive);
|
||||
}
|
||||
|
||||
void LogEndObserved(MarketBoardCurrentOfferings offerings)
|
||||
{
|
||||
Log.Verbose(
|
||||
|
|
@ -204,6 +192,7 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
{
|
||||
var totalPackets = Convert.ToInt32(Math.Ceiling((double)request.AmountToArrive / 10));
|
||||
return offeringsObservable
|
||||
.Where(offerings => offerings.ItemListings.All(l => l.CatalogId == request.CatalogId))
|
||||
.Skip(totalPackets - 1)
|
||||
.Do(LogEndObserved);
|
||||
}
|
||||
|
|
@ -213,7 +202,7 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
// packets, and then flatten them to the listings themselves.
|
||||
return offeringsObservable
|
||||
.Do(LogOfferingsObserved)
|
||||
.Window(startShared.Where(request => request.Ok).Do(LogStartObserved), UntilBatchEnd)
|
||||
.Window(start, UntilBatchEnd)
|
||||
.SelectMany(
|
||||
o => o.Aggregate(
|
||||
new List<MarketBoardCurrentOfferings.MarketBoardItemListing>(),
|
||||
|
|
@ -224,17 +213,59 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
}));
|
||||
}
|
||||
|
||||
private IObservable<List<MarketBoardHistory.MarketBoardHistoryListing>> OnMarketBoardSalesBatch()
|
||||
private IObservable<List<MarketBoardHistory.MarketBoardHistoryListing>> OnMarketBoardSalesBatch(
|
||||
IObservable<MarketBoardItemRequest> start)
|
||||
{
|
||||
return this.OnMarketBoardHistory().Select(history => history.HistoryListings);
|
||||
var historyObservable = this.OnMarketBoardHistory().Publish().RefCount();
|
||||
|
||||
void LogHistoryObserved(MarketBoardHistory history)
|
||||
{
|
||||
Log.Verbose(
|
||||
"Observed history for item {CatalogId} with {NumSales} sales",
|
||||
history.CatalogId,
|
||||
history.HistoryListings.Count);
|
||||
}
|
||||
|
||||
IObservable<MarketBoardHistory> UntilBatchEnd(MarketBoardItemRequest request)
|
||||
{
|
||||
return historyObservable
|
||||
.Where(history => history.CatalogId == request.CatalogId)
|
||||
.Take(1);
|
||||
}
|
||||
|
||||
// When a start packet is observed, begin observing a window of history packets.
|
||||
// We should only get one packet, which the window closing function ensures.
|
||||
// This packet is flattened to its sale entries and emitted.
|
||||
return historyObservable
|
||||
.Do(LogHistoryObserved)
|
||||
.Window(start, UntilBatchEnd)
|
||||
.SelectMany(
|
||||
o => o.Aggregate(
|
||||
new List<MarketBoardHistory.MarketBoardHistoryListing>(),
|
||||
(agg, next) =>
|
||||
{
|
||||
agg.AddRange(next.HistoryListings);
|
||||
return agg;
|
||||
}));
|
||||
}
|
||||
|
||||
private IDisposable HandleMarketBoardItemRequest()
|
||||
{
|
||||
var startObservable = this.OnMarketBoardItemRequestStart();
|
||||
void LogStartObserved(MarketBoardItemRequest request)
|
||||
{
|
||||
Log.Verbose(
|
||||
"Observed start of request for item#{CatalogId} with {NumListings} expected listings",
|
||||
request.CatalogId,
|
||||
request.AmountToArrive);
|
||||
}
|
||||
|
||||
var startObservable = this.OnMarketBoardItemRequestStart()
|
||||
.Where(request => request.Ok).Do(LogStartObserved)
|
||||
.Publish()
|
||||
.RefCount();
|
||||
return Observable.When(
|
||||
startObservable
|
||||
.And(this.OnMarketBoardSalesBatch())
|
||||
.And(this.OnMarketBoardSalesBatch(startObservable))
|
||||
.And(this.OnMarketBoardListingsBatch(startObservable))
|
||||
.Then((request, sales, listings) => (request, sales, listings)))
|
||||
.Where(this.ShouldUpload)
|
||||
|
|
@ -252,6 +283,18 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
ICollection<MarketBoardHistory.MarketBoardHistoryListing> sales,
|
||||
ICollection<MarketBoardCurrentOfferings.MarketBoardItemListing> listings)
|
||||
{
|
||||
if (listings.Count != request.AmountToArrive)
|
||||
{
|
||||
Log.Error("Wrong number of Market Board listings received for request: {ListingsCount} != {RequestAmountToArrive} item#{RequestCatalogId}", listings.Count, request.AmountToArrive, request.CatalogId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (listings.Any(listing => listing.CatalogId != request.CatalogId))
|
||||
{
|
||||
Log.Error("Received listings with mismatched item IDs for item#{RequestCatalogId}", request.CatalogId);
|
||||
return;
|
||||
}
|
||||
|
||||
Log.Verbose(
|
||||
"Market Board request resolved, starting upload: item#{CatalogId} listings#{ListingsObserved} sales#{SalesObserved}",
|
||||
request.CatalogId,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue