mirror of
https://github.com/goatcorp/Dalamud.git
synced 2025-12-12 18:27:23 +01:00
Merge pull request #1124 from karashiiro/fix/consistent-mb-sales
This commit is contained in:
commit
0012563593
1 changed files with 62 additions and 19 deletions
|
|
@ -2,8 +2,8 @@ using System;
|
|||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Reactive.Linq;
|
||||
using System.Reactive.Subjects;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
|
|
@ -62,10 +62,7 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
}
|
||||
|
||||
gameNetwork.NetworkMessage += Observe;
|
||||
return () =>
|
||||
{
|
||||
gameNetwork.NetworkMessage -= Observe;
|
||||
};
|
||||
return () => { gameNetwork.NetworkMessage -= Observe; };
|
||||
});
|
||||
|
||||
this.handleMarketBoardItemRequest = this.HandleMarketBoardItemRequest();
|
||||
|
|
@ -174,17 +171,8 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
private IObservable<List<MarketBoardCurrentOfferings.MarketBoardItemListing>> OnMarketBoardListingsBatch(
|
||||
IObservable<MarketBoardItemRequest> start)
|
||||
{
|
||||
var startShared = start.Publish().RefCount();
|
||||
var offeringsObservable = this.OnMarketBoardOfferings().Publish().RefCount();
|
||||
|
||||
void LogStartObserved(MarketBoardItemRequest request)
|
||||
{
|
||||
Log.Verbose(
|
||||
"Observed start of request for item#{CatalogId} with {NumListings} expected listings",
|
||||
request.CatalogId,
|
||||
request.AmountToArrive);
|
||||
}
|
||||
|
||||
void LogEndObserved(MarketBoardCurrentOfferings offerings)
|
||||
{
|
||||
Log.Verbose(
|
||||
|
|
@ -204,6 +192,7 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
{
|
||||
var totalPackets = Convert.ToInt32(Math.Ceiling((double)request.AmountToArrive / 10));
|
||||
return offeringsObservable
|
||||
.Where(offerings => offerings.ItemListings.All(l => l.CatalogId == request.CatalogId))
|
||||
.Skip(totalPackets - 1)
|
||||
.Do(LogEndObserved);
|
||||
}
|
||||
|
|
@ -213,7 +202,7 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
// packets, and then flatten them to the listings themselves.
|
||||
return offeringsObservable
|
||||
.Do(LogOfferingsObserved)
|
||||
.Window(startShared.Where(request => request.Ok).Do(LogStartObserved), UntilBatchEnd)
|
||||
.Window(start, UntilBatchEnd)
|
||||
.SelectMany(
|
||||
o => o.Aggregate(
|
||||
new List<MarketBoardCurrentOfferings.MarketBoardItemListing>(),
|
||||
|
|
@ -224,17 +213,59 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
}));
|
||||
}
|
||||
|
||||
private IObservable<List<MarketBoardHistory.MarketBoardHistoryListing>> OnMarketBoardSalesBatch()
|
||||
private IObservable<List<MarketBoardHistory.MarketBoardHistoryListing>> OnMarketBoardSalesBatch(
|
||||
IObservable<MarketBoardItemRequest> start)
|
||||
{
|
||||
return this.OnMarketBoardHistory().Select(history => history.HistoryListings);
|
||||
var historyObservable = this.OnMarketBoardHistory().Publish().RefCount();
|
||||
|
||||
void LogHistoryObserved(MarketBoardHistory history)
|
||||
{
|
||||
Log.Verbose(
|
||||
"Observed history for item {CatalogId} with {NumSales} sales",
|
||||
history.CatalogId,
|
||||
history.HistoryListings.Count);
|
||||
}
|
||||
|
||||
IObservable<MarketBoardHistory> UntilBatchEnd(MarketBoardItemRequest request)
|
||||
{
|
||||
return historyObservable
|
||||
.Where(history => history.CatalogId == request.CatalogId)
|
||||
.Take(1);
|
||||
}
|
||||
|
||||
// When a start packet is observed, begin observing a window of history packets.
|
||||
// We should only get one packet, which the window closing function ensures.
|
||||
// This packet is flattened to its sale entries and emitted.
|
||||
return historyObservable
|
||||
.Do(LogHistoryObserved)
|
||||
.Window(start, UntilBatchEnd)
|
||||
.SelectMany(
|
||||
o => o.Aggregate(
|
||||
new List<MarketBoardHistory.MarketBoardHistoryListing>(),
|
||||
(agg, next) =>
|
||||
{
|
||||
agg.AddRange(next.HistoryListings);
|
||||
return agg;
|
||||
}));
|
||||
}
|
||||
|
||||
private IDisposable HandleMarketBoardItemRequest()
|
||||
{
|
||||
var startObservable = this.OnMarketBoardItemRequestStart();
|
||||
void LogStartObserved(MarketBoardItemRequest request)
|
||||
{
|
||||
Log.Verbose(
|
||||
"Observed start of request for item#{CatalogId} with {NumListings} expected listings",
|
||||
request.CatalogId,
|
||||
request.AmountToArrive);
|
||||
}
|
||||
|
||||
var startObservable = this.OnMarketBoardItemRequestStart()
|
||||
.Where(request => request.Ok).Do(LogStartObserved)
|
||||
.Publish()
|
||||
.RefCount();
|
||||
return Observable.When(
|
||||
startObservable
|
||||
.And(this.OnMarketBoardSalesBatch())
|
||||
.And(this.OnMarketBoardSalesBatch(startObservable))
|
||||
.And(this.OnMarketBoardListingsBatch(startObservable))
|
||||
.Then((request, sales, listings) => (request, sales, listings)))
|
||||
.Where(this.ShouldUpload)
|
||||
|
|
@ -252,6 +283,18 @@ internal class NetworkHandlers : IDisposable, IServiceType
|
|||
ICollection<MarketBoardHistory.MarketBoardHistoryListing> sales,
|
||||
ICollection<MarketBoardCurrentOfferings.MarketBoardItemListing> listings)
|
||||
{
|
||||
if (listings.Count != request.AmountToArrive)
|
||||
{
|
||||
Log.Error("Wrong number of Market Board listings received for request: {ListingsCount} != {RequestAmountToArrive} item#{RequestCatalogId}", listings.Count, request.AmountToArrive, request.CatalogId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (listings.Any(listing => listing.CatalogId != request.CatalogId))
|
||||
{
|
||||
Log.Error("Received listings with mismatched item IDs for item#{RequestCatalogId}", request.CatalogId);
|
||||
return;
|
||||
}
|
||||
|
||||
Log.Verbose(
|
||||
"Market Board request resolved, starting upload: item#{CatalogId} listings#{ListingsObserved} sales#{SalesObserved}",
|
||||
request.CatalogId,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue