feat(network): Send market board listings/sales in one request

This also removes a ton of state-management code that used to
make it difficult to express this logic cleanly.
This commit is contained in:
karashiiro 2023-02-16 19:51:00 -08:00
parent d295fa1494
commit 93fbd69530
4 changed files with 131 additions and 166 deletions

View file

@ -2,7 +2,6 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices;
@ -28,12 +27,9 @@ internal class NetworkHandlers : IDisposable, IServiceType
{
private readonly IMarketBoardUploader uploader;
private readonly List<MarketBoardItemRequest> marketBoardRequests;
private readonly ISubject<NetworkMessage> messages;
private readonly IDisposable handleMarketBoardItemRequest;
private readonly IDisposable handleMarketBoardOfferings;
private readonly IDisposable handleMarketBoardHistory;
private readonly IDisposable handleMarketTaxRates;
private readonly IDisposable handleMarketBoardPurchaseHandler;
private readonly IDisposable handleCfPop;
@ -47,14 +43,11 @@ internal class NetworkHandlers : IDisposable, IServiceType
private NetworkHandlers(GameNetwork gameNetwork)
{
this.uploader = new UniversalisMarketBoardUploader();
this.marketBoardRequests = new List<MarketBoardItemRequest>();
this.CfPop = (_, _) => { };
this.messages = new Subject<NetworkMessage>();
this.handleMarketBoardItemRequest = this.HandleMarketBoardItemRequest();
this.handleMarketBoardOfferings = this.HandleMarketBoardOfferings();
this.handleMarketBoardHistory = this.HandleMarketBoardHistory();
this.handleMarketTaxRates = this.HandleMarketTaxRates();
this.handleMarketBoardPurchaseHandler = this.HandleMarketBoardPurchaseHandler();
this.handleCfPop = this.HandleCfPop();
@ -86,8 +79,6 @@ internal class NetworkHandlers : IDisposable, IServiceType
return;
this.handleMarketBoardItemRequest.Dispose();
this.handleMarketBoardOfferings.Dispose();
this.handleMarketBoardHistory.Dispose();
this.handleMarketTaxRates.Dispose();
this.handleMarketBoardPurchaseHandler.Dispose();
this.handleCfPop.Dispose();
@ -113,7 +104,7 @@ internal class NetworkHandlers : IDisposable, IServiceType
return this.messages.Where(message => message.DataManager?.IsDataReady == true);
}
private IObservable<MarketBoardItemRequest> OnMarketBoardItemRequest()
private IObservable<MarketBoardItemRequest> OnMarketBoardItemRequestStart()
{
return this.OnNetworkMessage()
.Where(message => message.Direction == NetworkMessageDirection.ZoneDown)
@ -176,129 +167,100 @@ internal class NetworkHandlers : IDisposable, IServiceType
.Where(message => message.Opcode == message.DataManager?.ServerOpCodes["CfNotifyPop"]);
}
private IObservable<List<MarketBoardCurrentOfferings.MarketBoardItemListing>> OnMarketBoardListingsBatch(
IObservable<MarketBoardItemRequest> start)
{
var startShared = start.Publish().RefCount();
var offeringsObservable = this.OnMarketBoardOfferings().Publish().RefCount();
void LogStartObserved(MarketBoardItemRequest request)
{
Log.Verbose(
"Observed start of request for item#{CatalogId} with {NumListings} expected listings",
request.CatalogId,
request.AmountToArrive);
}
void LogEndObserved(MarketBoardCurrentOfferings offerings)
{
Log.Verbose(
"Observed end of request {RequestId}",
offerings.RequestId);
}
void LogOfferingsObserved(MarketBoardCurrentOfferings offerings)
{
Log.Verbose(
"Observed element of request {RequestId} with {NumListings} listings",
offerings.RequestId,
offerings.ItemListings.Count);
}
IObservable<MarketBoardCurrentOfferings> UntilBatchEnd(MarketBoardItemRequest request)
{
var totalPackets = Convert.ToInt32(Math.Ceiling((double)request.AmountToArrive / 10));
return offeringsObservable
.Skip(totalPackets - 1)
.Do(LogEndObserved);
}
// When a start packet is observed, begin observing a window of listings packets
// according to the count described by the start packet. Aggregate the listings
// packets, and then flatten them to the listings themselves.
return offeringsObservable
.Do(LogOfferingsObserved)
.Window(startShared.Where(request => request.Ok).Do(LogStartObserved), UntilBatchEnd)
.SelectMany(
o => o.Aggregate(
new List<MarketBoardCurrentOfferings.MarketBoardItemListing>(),
(agg, next) =>
{
agg.AddRange(next.ItemListings);
return agg;
}));
}
private IObservable<List<MarketBoardHistory.MarketBoardHistoryListing>> OnMarketBoardSalesBatch()
{
return this.OnMarketBoardHistory().Select(history => history.HistoryListings);
}
private IDisposable HandleMarketBoardItemRequest()
{
return this.OnMarketBoardItemRequest()
.Where(this.ShouldUpload)
.Subscribe(
request =>
{
this.marketBoardRequests.Add(request);
Log.Verbose($"NEW MB REQUEST START: item#{request.CatalogId} amount#{request.AmountToArrive}");
},
ex => Log.Error(ex, "Failed to handle Market Board item request event"));
var startObservable = this.OnMarketBoardItemRequestStart();
return Observable.When(
startObservable
.And(this.OnMarketBoardSalesBatch())
.And(this.OnMarketBoardListingsBatch(startObservable))
.Then((request, sales, listings) => (request, sales, listings)))
.Where(this.ShouldUpload)
.Subscribe(
data =>
{
var (request, sales, listings) = data;
this.UploadMarketBoardData(request, sales, listings);
},
ex => Log.Error(ex, "Failed to handle Market Board item request event"));
}
private IDisposable HandleMarketBoardOfferings()
private void UploadMarketBoardData(
MarketBoardItemRequest request,
ICollection<MarketBoardHistory.MarketBoardHistoryListing> sales,
ICollection<MarketBoardCurrentOfferings.MarketBoardItemListing> listings)
{
return this.OnMarketBoardOfferings()
.Where(this.ShouldUpload)
.Subscribe(
listing =>
{
var request =
this.marketBoardRequests.LastOrDefault(
r => r.CatalogId == listing.ItemListings[0].CatalogId && !r.IsDone);
Log.Verbose(
"Market Board request resolved, starting upload: item#{CatalogId} listings#{ListingsObserved} sales#{SalesObserved}",
request.CatalogId,
listings.Count,
sales.Count);
if (request == default)
{
Log.Error(
$"Market Board data arrived without a corresponding request: item#{listing.ItemListings[0].CatalogId}");
return;
}
request.Listings.AddRange(listings);
request.History.AddRange(sales);
if (request.Listings.Count + listing.ItemListings.Count > request.AmountToArrive)
{
Log.Error(
$"Too many Market Board listings received for request: {request.Listings.Count + listing.ItemListings.Count} > {request.AmountToArrive} item#{listing.ItemListings[0].CatalogId}");
return;
}
if (request.ListingsRequestId != -1 && request.ListingsRequestId != listing.RequestId)
{
Log.Error(
$"Non-matching RequestIds for Market Board data request: {request.ListingsRequestId}, {listing.RequestId}");
return;
}
if (request.ListingsRequestId == -1 && request.Listings.Count > 0)
{
Log.Error(
$"Market Board data request sequence break: {request.ListingsRequestId}, {request.Listings.Count}");
return;
}
if (request.ListingsRequestId == -1)
{
request.ListingsRequestId = listing.RequestId;
Log.Verbose($"First Market Board packet in sequence: {listing.RequestId}");
}
request.Listings.AddRange(listing.ItemListings);
Log.Verbose(
"Added {0} ItemListings to request#{1}, now {2}/{3}, item#{4}",
listing.ItemListings.Count,
request.ListingsRequestId,
request.Listings.Count,
request.AmountToArrive,
request.CatalogId);
if (request.IsDone)
{
Log.Verbose(
"Market Board request finished, starting upload: request#{0} item#{1} amount#{2}",
request.ListingsRequestId,
request.CatalogId,
request.AmountToArrive);
Task.Run(() => this.uploader.Upload(request))
.ContinueWith(
task => Log.Error(task.Exception, "Market Board offerings data upload failed"),
TaskContinuationOptions.OnlyOnFaulted);
}
},
ex => Log.Error(ex, "Failed to handle Market Board offerings data event"));
}
private IDisposable HandleMarketBoardHistory()
{
return this.OnMarketBoardHistory()
.Where(this.ShouldUpload)
.Subscribe(
listing =>
{
var request = this.marketBoardRequests.LastOrDefault(r => r.CatalogId == listing.CatalogId);
if (request == default)
{
Log.Error(
$"Market Board data arrived without a corresponding request: item#{listing.CatalogId}");
return;
}
if (request.ListingsRequestId != -1)
{
Log.Error(
$"Market Board data history sequence break: {request.ListingsRequestId}, {request.Listings.Count}");
return;
}
request.History.AddRange(listing.HistoryListings);
Log.Verbose("Added history for item#{0}", listing.CatalogId);
if (request.AmountToArrive == 0)
{
Log.Verbose("Request had 0 amount, uploading now");
Task.Run(() => this.uploader.Upload(request))
.ContinueWith(
(task) => Log.Error(task.Exception, "Market Board history data upload failed"),
TaskContinuationOptions.OnlyOnFaulted);
}
},
ex => Log.Error(ex, "Failed to handle Market Board history data event"));
Task.Run(() => this.uploader.Upload(request))
.ContinueWith(
task => Log.Error(task.Exception, "Market Board offerings data upload failed"),
TaskContinuationOptions.OnlyOnFaulted);
}
private IDisposable HandleMarketTaxRates()
@ -307,22 +269,22 @@ internal class NetworkHandlers : IDisposable, IServiceType
.Where(this.ShouldUpload)
.Subscribe(
taxes =>
{
Log.Verbose(
"MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}",
taxes.LimsaLominsaTax,
taxes.GridaniaTax,
taxes.UldahTax,
taxes.IshgardTax,
taxes.KuganeTax,
taxes.CrystariumTax,
taxes.SharlayanTax);
{
Log.Verbose(
"MarketTaxRates: limsa#{0} grid#{1} uldah#{2} ish#{3} kugane#{4} cr#{5} sh#{6}",
taxes.LimsaLominsaTax,
taxes.GridaniaTax,
taxes.UldahTax,
taxes.IshgardTax,
taxes.KuganeTax,
taxes.CrystariumTax,
taxes.SharlayanTax);
Task.Run(() => this.uploader.UploadTax(taxes))
.ContinueWith(
task => Log.Error(task.Exception, "Market Board tax data upload failed"),
TaskContinuationOptions.OnlyOnFaulted);
},
Task.Run(() => this.uploader.UploadTax(taxes))
.ContinueWith(
task => Log.Error(task.Exception, "Market Board tax data upload failed"),
TaskContinuationOptions.OnlyOnFaulted);
},
ex => Log.Error(ex, "Failed to handle Market Board tax data event"));
}