This code pattern will guide you through StarCraft II replay analysis.
Note: Instructions assume this notebook is running in IBM Watson Studio.
This code pattern covers the following learning goals:
# We install the prerequisites using the `!pip install` syntax here.
# In some cases, running pip install from a notebook may require a one-time kernel restart. Check the output for messages.
!pip install --user cloudant==2.10.2
!pip install --user bokeh==0.12.10
!pip install --user seaborn==0.9.0
!pip install --user sc2reader==1.3.1
Collecting cloudant==2.10.2 Downloading https://files.pythonhosted.org/packages/3d/01/c2e2829675fcd3f3af93a8a6a39cb6a5606c6cfa76973dacd3fac0a8b8ea/cloudant-2.10.2.tar.gz (57kB) 100% |████████████████████████████████| 61kB 7.3MB/s eta 0:00:01 Requirement not upgraded as not directly required: requests<3.0.0,>=2.7.0 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from cloudant==2.10.2) Requirement not upgraded as not directly required: chardet<3.1.0,>=3.0.2 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from requests<3.0.0,>=2.7.0->cloudant==2.10.2) Requirement not upgraded as not directly required: idna<2.7,>=2.5 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from requests<3.0.0,>=2.7.0->cloudant==2.10.2) Requirement not upgraded as not directly required: urllib3<1.23,>=1.21.1 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from requests<3.0.0,>=2.7.0->cloudant==2.10.2) Requirement not upgraded as not directly required: certifi>=2017.4.17 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from requests<3.0.0,>=2.7.0->cloudant==2.10.2) Building wheels for collected packages: cloudant Running setup.py bdist_wheel for cloudant ... done Stored in directory: /home/dsxuser/.cache/pip/wheels/dd/0d/0b/6e0ca65193cde5e824aa56cacb4a96c087138464691979edba Successfully built cloudant Installing collected packages: cloudant Successfully installed cloudant-2.10.2 Requirement not upgraded as not directly required: bokeh==0.12.10 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages Requirement not upgraded as not directly required: six>=1.5.2 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from bokeh==0.12.10) Requirement not upgraded as not directly required: PyYAML>=3.10 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from bokeh==0.12.10) Requirement not upgraded as not directly required: python-dateutil>=2.1 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from bokeh==0.12.10) Requirement not upgraded as not directly required: Jinja2>=2.7 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from bokeh==0.12.10) Requirement not upgraded as not directly required: numpy>=1.7.1 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from bokeh==0.12.10) Requirement not upgraded as not directly required: tornado>=4.3 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from bokeh==0.12.10) Requirement not upgraded as not directly required: MarkupSafe>=0.23 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from Jinja2>=2.7->bokeh==0.12.10) Collecting seaborn==0.9.0 Downloading https://files.pythonhosted.org/packages/a8/76/220ba4420459d9c4c9c9587c6ce607bf56c25b3d3d2de62056efe482dadc/seaborn-0.9.0-py3-none-any.whl (208kB) 100% |████████████████████████████████| 215kB 4.3MB/s eta 0:00:01 Requirement not upgraded as not directly required: pandas>=0.15.2 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from seaborn==0.9.0) Requirement not upgraded as not directly required: scipy>=0.14.0 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from seaborn==0.9.0) Requirement not upgraded as not directly required: numpy>=1.9.3 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from seaborn==0.9.0) Requirement not upgraded as not directly required: matplotlib>=1.4.3 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from seaborn==0.9.0) Requirement not upgraded as not directly required: python-dateutil>=2 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from pandas>=0.15.2->seaborn==0.9.0) Requirement not upgraded as not directly required: pytz>=2011k in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from pandas>=0.15.2->seaborn==0.9.0) Requirement not upgraded as not directly required: six>=1.10 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from matplotlib>=1.4.3->seaborn==0.9.0) Requirement not upgraded as not directly required: cycler>=0.10 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from matplotlib>=1.4.3->seaborn==0.9.0) Requirement not upgraded as not directly required: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from matplotlib>=1.4.3->seaborn==0.9.0) Installing collected packages: seaborn Successfully installed seaborn-0.9.0 Collecting sc2reader==1.3.1 Downloading https://files.pythonhosted.org/packages/f3/1d/805429dd1a044623df643369d523ba12b0c27062bb4da7a34f7f451bd703/sc2reader-1.3.1-py3-none-any.whl (281kB) 100% |████████████████████████████████| 286kB 3.3MB/s eta 0:00:01 Collecting mpyq>=0.2.4 (from sc2reader==1.3.1) Downloading https://files.pythonhosted.org/packages/ff/10/76041d97aa01e4d0f93481942b4faf5652123acdd90fbff4e40bb8d9024c/mpyq-0.2.5.tar.gz Building wheels for collected packages: mpyq Running setup.py bdist_wheel for mpyq ... done Stored in directory: /home/dsxuser/.cache/pip/wheels/59/f3/2e/c798fde3edd6431c5c9e66ebebe77d1af338487b77d058cc37 Successfully built mpyq Installing collected packages: mpyq, sc2reader Successfully installed mpyq-0.2.5 sc2reader-1.3.1
# @hidden_cell
credentials_1 = {
"apikey": "redacted",
"username": "redacted"
}
import sys
import types
import pandas as pd
from ibm_botocore.client import Config
import ibm_boto3
def __iter__(self): return 0
# @hidden_cell
# The following code accesses a file in your IBM Cloud Object Storage. It includes your credentials.
# You might want to remove those credentials before you share your notebook.
# ... credential code redacted ...
# Your data file was loaded into a botocore.response.StreamingBody object.
# Please read the documentation of ibm_boto3 and pandas to learn more about your possibilities to load the data.
# ibm_boto3 documentation: https://ibm.github.io/ibm-cos-sdk-python/
# pandas documentation: http://pandas.pydata.org/
streaming_body_1 = streaming_body_2
Load in the replay with sc2reader. We'll use bytes that the inserted code read from our IBM Cloud Object Storage container.
import sc2reader
from sc2reader.engine.plugins import APMTracker, ContextLoader, SelectionTracker
from sc2reader.events import PlayerStatsEvent, UnitBornEvent, UnitDiedEvent, UnitDoneEvent, UnitTypeChangeEvent, UpgradeCompleteEvent
# Some extra code here helps catch setup errors.
try:
replay_file = streaming_body_1
except NameError:
print('\n'
'SETUP ERROR: Please follow the directions to add a .SC2Replay file and use\n'
' "Insert to code" to set the streaming_body_1 variable to the resulting bytes.\n'
' You may need to rename the data_* variable.')
raise
replay = sc2reader.load_replay(
replay_file,
engine=sc2reader.engine.GameEngine(plugins=[ContextLoader(), APMTracker(), SelectionTracker()]))
print("Replay successfully loaded.")
Replay successfully loaded.
With the replay added, we can now inspect the object in the notebook. We can easily get information like date, map name, participants, winner/loser, etc.
print("Date: %s" % replay.date)
print("Map Name: " + replay.map_name)
for player in replay.players:
print("%s: %s" % (player.result, player))
Date: 2016-07-29 14:20:32 Map Name: King Sejong Station LE Win: Player 1 - Neeb (Protoss) Loss: Player 2 - ShoWTimE (Protoss)
# Establish some unit and building groups
VESPENE_UNITS = ["Assimilator", "Extractor", "Refinery"]
SUPPLY_UNITS = ["Overlord", "Overseer", "Pylon", "SupplyDepot"]
WORKER_UNITS = ["Drone", "Probe", "SCV", "MULE"]
BASE_UNITS = ["CommandCenter", "Nexus", "Hatchery", "Lair", "Hive", "PlanetaryFortress", "OrbitalCommand"]
GROUND_UNITS = ["Barracks", "Factory", "GhostAcademy", "Armory", "RoboticsBay", "RoboticsFacility", "TemplarArchive",
"DarkShrine", "WarpGate", "SpawningPool", "RoachWarren", "HydraliskDen", "BanelingNest", "UltraliskCavern",
"LurkerDen", "InfestationPit"]
AIR_UNITS = ["Starport", "FusionCore", "RoboticsFacility", "Stargate", "FleetBeacon", "Spire", "GreaterSpire"]
TECH_UNITS = ["EngineeringBay", "Armory", "GhostAcademy", "TechLab", "FusionCore", "Forge", "CyberneticsCore",
"TwilightCouncil", "RoboticsFacility", "RoboticsBay", "FleetBeacon", "TemplarArchive", "DarkShrine",
"SpawningPool", "RoachWarren", "HydraliskDen", "BanelingNest", "UltraliskCavern", "LurkerDen", "Spire",
"GreaterSpire", "EvolutionChamber", "InfestationPit"]
ARMY_UNITS = ["Marine", "Colossus", "InfestorTerran", "Baneling", "Mothership", "MothershipCore", "Changeling", "SiegeTank", "Viking", "Reaper",
"Ghost", "Marauder", "Thor", "Hellion", "Hellbat", "Cyclone", "Liberator", "Medivac", "Banshee", "Raven", "Battlecruiser", "Nuke", "Zealot",
"Stalker", "HighTemplar", "Disruptor", "DarkTemplar", "Sentry", "Phoenix", "Carrier", "Oracle", "VoidRay", "Tempest", "WarpPrism", "Observer",
"Immortal", "Adept", "Zergling", "Overlord", "Hydralisk", "Mutalisk", "Ultralisk", "Roach", "Infestor", "Corruptor",
"BroodLord", "Queen", "Overseer", "Archon", "Broodling", "InfestedTerran", "Ravager", "Viper", "SwarmHost"]
ARMY_AIR = ["Mothership", "MothershipCore", "Viking", "Liberator", "Medivac", "Banshee", "Raven", "Battlecruiser",
"Viper", "Mutalisk", "Phoenix", "Oracle", "Carrier", "VoidRay", "Tempest", "Observer", "WarpPrism", "BroodLord",
"Corruptor", "Observer", "Overseer"]
ARMY_GROUND = [k for k in ARMY_UNITS if k not in ARMY_AIR]
# Establish our event parsers
def handle_count(caller, event, key, add_value, start_val=0):
if len(caller.players[event.unit.owner.pid][key]) == 0:
caller.players[event.unit.owner.pid][key].append((0, 0))
# Get the last value
last_val = caller.players[event.unit.owner.pid][key][-1][1]
caller.players[event.unit.owner.pid][key].append((event.frame, last_val + add_value))
def handle_expansion_events(caller, event):
if type(event) is UnitDoneEvent:
unit = str(event.unit).split()[0]
if unit in BASE_UNITS:
caller.players[event.unit.owner.pid]["expansion_event"].append((event.frame, "+", unit))
handle_count(caller, event, "expansion_buildings", 1, start_val=1)
elif type(event) is UnitDiedEvent:
unit = str(event.unit).split()[0]
if unit in BASE_UNITS:
caller.players[event.unit.owner.pid]["expansion_event"].append((event.frame, "-", unit))
handle_count(caller, event, "expansion_buildings", -1, start_val=1)
elif type(event) is UnitTypeChangeEvent:
if event.unit.name in BASE_UNITS:
caller.players[event.unit.owner.pid]["expansion_event"].append((event.frame, "*", event.unit.name))
def handle_worker_events(caller, event):
if type(event) is PlayerStatsEvent:
caller.players[event.pid]["workers_active"].append((event.frame, event.workers_active_count))
elif type(event) is UnitBornEvent:
unit = str(event.unit).split()[0]
if unit in WORKER_UNITS:
caller.players[event.control_pid]["worker_event"].append((event.frame, "+", unit))
elif type(event) is UnitDiedEvent:
unit = str(event.unit).split()[0]
if unit in WORKER_UNITS:
caller.players[event.unit.owner.pid]["worker_event"].append((event.frame, "-", unit))
def handle_supply_events(caller, event):
if type(event) is PlayerStatsEvent:
caller.players[event.pid]["supply_available"].append((event.frame, int(event.food_made)))
caller.players[event.pid]["supply_consumed"].append((event.frame, int(event.food_used)))
utilization = 0 if event.food_made == 0 else event.food_used / event.food_made
caller.players[event.pid]["supply_utilization"].append((event.frame, utilization))
worker_ratio = 0 if event.food_used == 0 else event.workers_active_count / event.food_used
caller.players[event.pid]["worker_supply_ratio"].append((event.frame, worker_ratio))
elif type(event) is UnitDoneEvent:
unit = str(event.unit).split()[0]
if unit in SUPPLY_UNITS:
caller.players[event.unit.owner.pid]["supply_event"].append((event.frame, "+", unit))
elif type(event) is UnitBornEvent:
# Specifically for Overlord
unit = str(event.unit).split()[0]
if unit == "Overlord":
caller.players[event.control_pid]["supply_event"].append((event.frame, "+", unit))
elif type(event) is UnitDiedEvent:
# Buildings/ Overlord/Overseer
unit = str(event.unit).split()[0]
if unit in SUPPLY_UNITS:
caller.players[event.unit.owner.pid]["supply_event"].append((event.frame, "-", unit))
elif type(event) is UnitTypeChangeEvent:
if event.unit_type_name == "Overseer":
caller.players[event.unit.owner.pid]["supply_event"].append((event.frame, "*", event.unit_type_name))
def handle_vespene_events(caller, event):
if type(event) is PlayerStatsEvent:
caller.players[event.pid]["vespene_available"].append((event.frame, event.vespene_current))
caller.players[event.pid]["vespene_collection_rate"].append((event.frame, event.vespene_collection_rate))
vesp_per_worker = 0 if event.workers_active_count == 0 else event.vespene_collection_rate / event.workers_active_count
caller.players[event.pid]["vespene_per_worker_rate"].append((event.frame, vesp_per_worker))
caller.players[event.pid]["vespene_cost_active_forces"].append((event.frame, event.vespene_used_active_forces))
caller.players[event.pid]["vespene_spend"].append((event.frame, event.vespene_used_current))
caller.players[event.pid]["vespene_value_current_technology"].append((event.frame, event.vespene_used_current_technology))
caller.players[event.pid]["vespene_value_current_army"].append((event.frame, event.vespene_used_current_army))
caller.players[event.pid]["vespene_value_current_economic"].append((event.frame, event.vespene_used_current_economy))
caller.players[event.pid]["vespene_queued"].append((event.frame, event.vespene_used_in_progress))
caller.players[event.pid]["vespene_queued_technology"].append((event.frame, event.vespene_used_in_progress_technology))
caller.players[event.pid]["vespene_queued_army"].append((event.frame, event.vespene_used_in_progress_technology))
caller.players[event.pid]["vespene_queued_economic"].append((event.frame, event.vespene_used_in_progress_economy))
elif type(event) is UnitDoneEvent:
unit = str(event.unit).split()[0]
if unit in VESPENE_UNITS:
caller.players[event.unit.owner.pid]["vespene_event"].append((event.frame, "+", unit))
elif type(event) is UnitDiedEvent:
unit = str(event.unit).split()[0]
if unit in VESPENE_UNITS:
caller.players[event.unit.owner.pid]["vespene_event"].append((event.frame, "-", unit))
def handle_resources_events(caller, event):
if type(event) is PlayerStatsEvent:
caller.players[event.pid]["mineral_destruction"].append((event.frame, event.minerals_killed))
caller.players[event.pid]["mineral_destruction_army"].append((event.frame, event.minerals_killed_army))
caller.players[event.pid]["mineral_destruction_economic"].append((event.frame, event.minerals_killed_economy))
caller.players[event.pid]["mineral_destruction_technology"].append((event.frame, event.minerals_killed_technology))
caller.players[event.pid]["mineral_loss"].append((event.frame, event.minerals_lost))
caller.players[event.pid]["mineral_loss_army"].append((event.frame, event.minerals_lost_army))
caller.players[event.pid]["mineral_loss_economic"].append((event.frame, event.minerals_lost_economy))
caller.players[event.pid]["mineral_loss_technology"].append((event.frame, event.minerals_lost_technology))
caller.players[event.pid]["vespene_destruction"].append((event.frame, event.vespene_killed))
caller.players[event.pid]["vespene_destruction_army"].append((event.frame, event.vespene_killed_army))
caller.players[event.pid]["vespene_destruction_economic"].append((event.frame, event.vespene_killed_economy))
caller.players[event.pid]["vespene_destruction_technology"].append((event.frame, event.vespene_killed_technology))
caller.players[event.pid]["vespene_loss"].append((event.frame, event.vespene_lost))
caller.players[event.pid]["vespene_loss_army"].append((event.frame, event.vespene_lost_army))
caller.players[event.pid]["vespene_loss_economic"].append((event.frame, event.vespene_lost_economy))
caller.players[event.pid]["vespene_loss_technology"].append((event.frame, event.vespene_lost_technology))
def handle_ground_events(caller, event):
if type(event) is UnitDoneEvent:
unit = str(event.unit).split()[0]
if unit in GROUND_UNITS:
count_name = "_".join(["building", unit, "count"])
caller.players[event.unit.owner.pid]["ground_building"].append((event.frame, "+", unit))
handle_count(caller, event, count_name, 1)
elif type(event) is UnitDiedEvent:
unit = str(event.unit).split()[0]
if unit in GROUND_UNITS:
count_name = "_".join(["building", unit, "count"])
caller.players[event.unit.owner.pid]["ground_building"].append((event.frame, "-", unit))
handle_count(caller, event, count_name, -1)
elif type(event) is UnitTypeChangeEvent:
if event.unit_type_name == "LurkerDen":
count_name = "_".join(["building", event.unit_type_name, "count"])
caller.players[event.unit.owner.pid]["ground_building"].append((event.frame, "*", event.unit_type_name))
handle_count(caller, event, count_name, 1)
def handle_air_events(caller, event):
if type(event) is UnitDoneEvent:
unit = str(event.unit).split()[0]
if unit in AIR_UNITS:
count_name = "_".join(["building", unit, "count"])
caller.players[event.unit.owner.pid]["air_building"].append((event.frame, "+", unit))
handle_count(caller, event, count_name, 1)
elif type(event) is UnitDiedEvent:
unit = str(event.unit).split()[0]
if unit in AIR_UNITS:
count_name = "_".join(["building", unit, "count"])
caller.players[event.unit.owner.pid]["air_building"].append((event.frame, "-", unit))
handle_count(caller, event, count_name, -1)
elif type(event) is UnitTypeChangeEvent:
if event.unit_type_name == "GreaterSpire":
count_name = "_".join(["building", event.unit_type_name, "count"])
caller.players[event.unit.owner.pid]["air_building"].append((event.frame, "*", event.unit_type_name))
handle_count(caller, event, count_name, 1)
def handle_unit_events(caller, event):
if type(event) is UnitBornEvent:
unit = event.unit_type_name
if unit in ARMY_UNITS:
unit_count_name = "_".join(["unit", unit, "count"])
caller.players[event.control_pid]["army_event"].append((event.frame, "+", unit))
handle_count(caller, event, unit_count_name, 1)
if unit in ARMY_AIR:
handle_count(caller, event, "army_air", 1)
elif unit in ARMY_GROUND:
handle_count(caller, event, "army_ground", 1)
handle_count(caller, event, "army_count", 1)
elif type(event) is UnitDoneEvent:
unit = str(event.unit).split()[0]
if unit in ARMY_UNITS:
unit_count_name = "_".join(["unit", unit, "count"])
caller.players[event.unit.owner.pid]["army_event"].append((event.frame, "+", unit))
handle_count(caller, event, unit_count_name, 1)
if unit in ARMY_AIR:
handle_count(caller, event, "army_air", 1)
elif unit in ARMY_GROUND:
handle_count(caller, event, "army_air", 1)
handle_count(caller, event, "army_count", 1)
elif type(event) is UnitDiedEvent:
unit = str(event.unit).split()[0]
if unit in ARMY_UNITS:
unit_count_name = "_".join(["unit", unit, "count"])
caller.players[event.unit.owner.pid]["army_event"].append((event.frame, "-", unit))
if unit in ARMY_AIR:
handle_count(caller, event, "army_air", -1)
elif unit in ARMY_GROUND:
handle_count(caller, event, "army_ground", -1)
handle_count(caller, event, unit_count_name, -1)
handle_count(caller, event, "army_count", -1)
elif type(event) is UnitTypeChangeEvent:
unit = str(event.unit).split()[0]
if event.unit_type_name in ARMY_UNITS:
unit_count_name = "_".join(["unit", event.unit_type_name, "count"])
caller.players[event.unit.owner.pid]["army_event"].append((event.frame, "*", unit))
handle_count(caller, event, unit_count_name, 1)
def handle_tech_events(caller, event):
if type(event) is UnitDoneEvent:
unit = str(event.unit).split()[0]
if unit in TECH_UNITS:
caller.players[event.unit.owner.pid]["tech_building"].append((event.frame, "+", unit))
elif type(event) is UnitDiedEvent:
unit = str(event.unit).split()[0]
if unit in TECH_UNITS:
caller.players[event.unit.owner.pid]["tech_building"].append((event.frame, "-", unit))
elif type(event) is UnitTypeChangeEvent:
if event.unit_type_name in ["GreaterSpire", "LurkerDen"]:
caller.players[event.unit.owner.pid]["tech_building"].append((event.frame, "*", event.unit_type_name))
def handle_upgrade_events(caller, event):
if type(event) is UpgradeCompleteEvent and event.frame > 0:
if not event.upgrade_type_name.startswith("Spray"):
caller.players[event.pid]["upgrades"].append((event.frame, event.upgrade_type_name))
def handle_mineral_events(caller, event):
if type(event) is PlayerStatsEvent:
caller.players[event.pid]["minerals_available"].append((event.frame, event.minerals_current))
caller.players[event.pid]["mineral_collection_rate"].append((event.frame, event.minerals_collection_rate,))
caller.players[event.pid]["mineral_cost_active_forces"].append((event.frame, event.minerals_used_active_forces))
mins_per_worker = 0 if event.workers_active_count == 0 else event.minerals_collection_rate / event.workers_active_count
caller.players[event.pid]["mineral_per_worker_rate"].append((event.frame, mins_per_worker))
caller.players[event.pid]["mineral_spend"].append((event.frame, event.minerals_used_current))
caller.players[event.pid]["mineral_value_current_technology"].append((event.frame, event.minerals_used_current_technology))
caller.players[event.pid]["mineral_value_current_army"].append((event.frame, event.minerals_used_current_army))
caller.players[event.pid]["mineral_value_current_economic"].append((event.frame, event.minerals_used_current_economy))
caller.players[event.pid]["mineral_queued"].append((event.frame, event.minerals_used_in_progress))
caller.players[event.pid]["mineral_queued_technology"].append((event.frame, event.minerals_used_in_progress_technology))
caller.players[event.pid]["mineral_queued_army"].append((event.frame, event.minerals_used_in_progress_army))
caller.players[event.pid]["mineral_queued_economic"].append((event.frame, event.minerals_used_in_progress_economy))
# Aggregate all of our event parsers for use by our ReplayData class
handlers = [handle_expansion_events, handle_worker_events, handle_supply_events, handle_mineral_events,
handle_vespene_events, handle_ground_events, handle_air_events, handle_tech_events, handle_upgrade_events,
handle_unit_events]
# Below we define our class ReplayData for helping us structure and process our replay files
class ReplayData:
__parsers__ = handlers
@classmethod
def parse_replay(cls, replay=None, replay_file=None, file_object=None):
replay_data = ReplayData(replay_file)
try:
# This is the engine that holds some required plugins for parsing
engine = sc2reader.engine.GameEngine(plugins=[ContextLoader(), APMTracker(), SelectionTracker()])
if replay:
pass
elif replay_file and not file_object:
# Then we are not using ObjectStorage for accessing replay files
replay = sc2reader.load_replay(replay_file, engine=engine)
elif file_object:
# We are using ObjectStorage to access replay files
replay = sc2reader.load_replay(file_object, engine=engine)
else:
pass
# Get the number of frames (one frame is 1/16 of a second)
replay_data.frames = replay.frames
# Gets the game mode (if available)
replay_data.game_mode = replay.real_type
# Gets the map hash (if we want to download the map, or do map-based analysis)
replay_data.map_hash = replay.map_hash
# Use the parsers to get data
for event in replay.events:
for parser in cls.__parsers__:
parser(replay_data, event)
# Check if there was a winner
if replay.winner is not None:
replay_data.winners = replay.winner.players
replay_data.losers = [p for p in replay.players if p not in replay.winner.players]
else:
replay_data.winners = []
replay_data.losers = []
# Check to see if expansion data is available
print(replay.expansion)
replay_data.expansion = replay.expansion
return replay_data
except:
# print our error and return NoneType object
print_exc()
return None
def as_dict(self):
return {
"processed_on": datetime.utcnow().isoformat(),
"replay_name": self.replay,
"expansion": self.expansion,
"frames": self.frames,
"mode": self.game_mode,
"map": self.map_hash,
"matchup": "v".join(sorted([s.detail_data["race"][0].upper() for s in self.winners + self.losers])),
"winners": [(s.pid, s.name, s.detail_data['race']) for s in self.winners],
"losers": [(s.pid, s.name, s.detail_data['race']) for s in self.losers],
"stats_names": [k for k in self.players[1].keys()],
"stats": {player: data for player, data in self.players.items()}
}
def __init__(self, replay):
self.players = defaultdict(lambda: defaultdict(list))
self.replay = replay
self.winners = []
self.losers = []
self.expansion = None
The replay file was processed to add event statistics using pandas and helper functions.
from datetime import datetime
from collections import defaultdict
from traceback import print_exc
from bokeh.io import output_notebook, reset_output
from bokeh.models import Span, Range1d, Legend, BoxAnnotation, HoverTool, Arrow, NormalHead
from bokeh.plotting import figure, show, gridplot, ColumnDataSource
output_notebook()
def merge(i, last_entry, sign=None, length=3):
if last_entry is not None:
if sign is not None:
# Check to see if this is a continuation
if last_entry[1] == i + length - 1 and last_entry[2] == sign:
return [(last_entry[0], i + length, sign)]
else:
return [last_entry, (i, i + length, sign)]
else:
# Check to see if this is a continuation
if last_entry[1] == i + length - 1:
return [(last_entry[0], i + length)]
else:
return [last_entry, (i, i + length)]
else:
if sign is not None:
return [(i, i + length, sign)]
else:
return [(i, i + length)]
def detect_nelson_bias(src_data, x_bar):
# Bias is defined as 9 or more consecutive points sitting above or below our x-bar line
bias_ranges = []
length = 9
for i in range(len(src_data) - length):
last_entry = bias_ranges.pop() if len(bias_ranges) > 0 else None
if all([src_data[k] > x_bar for k in range(i, i + length)]):
sign = "+"
bias = merge(i, last_entry, sign=sign, length=length)
bias_ranges.extend(bias)
elif all([src_data[k] < x_bar for k in range(i, i+length)]):
sign = "-"
bias = merge(i, last_entry, sign=sign, length=length)
bias_ranges.extend(bias)
else:
if last_entry:
bias_ranges.append(last_entry)
return bias_ranges
def detect_nelson_trend(src_data, std):
# Trend is defined as 6 or more consecutive points all increasing or decreasing (or 6 or more consecutive non(increasing, decreasing) where difference between start and end points greater than 1.5 standard deviations )
trend_ranges = []
length = 6
for i in range(len(src_data) - length):
last_entry = trend_ranges.pop() if len(trend_ranges) > 0 else None
if (all(x<y for x, y in zip(src_data[i:i+length], src_data[i+1:i+length]))
or (all(x<=y for x,y in zip(src_data[i:i+length], src_data[i+1:i+length]))
and abs(src_data[i] - src_data[i+length]) >= 1.5*std)):
sign = "+"
trend_ranges.extend(merge(i, last_entry, sign=sign, length=length))
elif (all(x>y for x, y in zip(src_data[i:i+length], src_data[i+1:i+length]))
or (all(x>=y for x,y in zip(src_data[i:i+length], src_data[i+1:i+length]))
and abs(src_data[i] - src_data[i+length]) >= 1.5*std)):
sign = "-"
trend_ranges.extend(merge(i, last_entry, sign=sign, length=length))
else:
if last_entry:
trend_ranges.append(last_entry)
return trend_ranges
def detect_nelson_oscillation(src_data):
# Oscillation is defined as 14 or more consecutive points, all alternating in direction
diff = lambda x, y: 1 if y - x > 0 else -1 if y - x < 0 else None
oscillation_ranges = []
length=14
deltas = []
for i in range(len(src_data) - length):
last_entry = oscillation_ranges.pop() if len(oscillation_ranges) > 0 else None
sign = None
is_oscillating = True
for curr in range(i, i + length - 1):
if sign == None and curr == i:
sign = diff(src_data[curr], src_data[curr + 1])
elif sign is None and curr != i:
is_oscillating = False
break
else:
new_sign = diff(src_data[curr], src_data[curr + 1])
if new_sign is None or new_sign == sign:
is_oscillating = False
break
elif new_sign != sign and new_sign is not None:
sign = new_sign
if is_oscillating:
# check if this is a continuation of a previous oscillation
oscillation_ranges.extend(merge(i, last_entry, length=length))
else:
if last_entry:
oscillation_ranges.append(last_entry)
return oscillation_ranges
def avg_last_minute(process, pid, time, replay):
data = pd.DataFrame({
"Data": [k[1] for k in replay.stats[pid-1][process]]},
index=[int(k[0]/16) for k in replay.stats[pid - 1][process]])
rolling = data.rolling(6).mean()
pct_change = data.pct_change()
ndx = data.index.get_loc(time, method="ffill")
prev_ndx = max(ndx - 1, 0)
print(ndx, prev_ndx)
r_mean = rolling.get_value(rolling.index[ndx], "Data")
prev_mean = rolling.get_value(rolling.index[prev_ndx], "Data")
print(r_mean, prev_mean)
print(pct_change)
pcng = pct_change.get_value(rolling.index[ndx], "Data")
change = "⬆️" if r_mean > prev_mean else "⬇️" if r_mean < prev_mean else ""
return r_mean if not pd.isnull(r_mean) else 0, change, pcng if not (pd.isnull(pcng) or pcng != np.Inf) else 0
# Define Nelson Rules Chart Generator
def nelson_rules_chart_generator(src, timeseries, player, pid, process_name, unit_name, replay, plot_width=350,fill_color="blue", line_color="blue", line_width=2, annotations=None, fixed_lcl=None, fixed_ucl=None):
# We strip the first two data points (first data point is 0 and second data point should roughly be the same for all games)
x_bar = src[2:].mean()
std = src[2:].std()
ctrl_limits = [x_bar + (k*std) for k in range(-3, 4)]
ctrl_labels = ["LCL", "-2σ", "-1σ", "x-bar", "1σ", "2σ", "UCL"]
ctrl_colors = ["#55597F", "#5D6DFF","#A9B2FF","#000000", "#FF9E9F", "#FF5253","#7F2929"]
ctrl_dash = ["solid", "dashed", "dashed", "solid", "dashed", "dashed", "solid"]
ctrl_legend = ["{0} - {1:10.4f}".format(cl[0], cl[1]) for cl in zip(ctrl_labels, ctrl_limits)]
ctrl_width = [3, 2, 2, 3, 2, 2, 3]
significant = lambda x: x > ctrl_limits[5] or x < ctrl_limits[1]
hover = HoverTool(
tooltips=[
("time", "@x"),
("value", "@y")
])
p = figure(plot_width=plot_width, plot_height=300, x_axis_label="Game Time (in seconds)", y_axis_label=unit_name, tools=[hover], toolbar_location="above")
# Generate control lines
lines = []
source = ColumnDataSource(data=dict(x=[x/16 for x in timeseries],
y=src,
alpha=[1 if significant(y) and ndx > 2 else 0.7 for ndx, y in enumerate(src)],
radius=[6 if significant(y) and ndx > 2 else 4 for ndx, y in enumerate(src)],
))
for ndx, cl in enumerate(ctrl_limits):
limit = cl
lines.append(p.line([x/16 for x in timeseries],
[limit]*len(timeseries),
line_width=ctrl_width[ndx],
line_dash=ctrl_dash[ndx],
tags=[ctrl_labels[ndx] if k == 0 else None for k, _ in enumerate(timeseries)],
line_color=ctrl_colors[ndx]))
p.circle("x", "y",
source=source,
alpha="alpha",
radius="radius",
fill_color=fill_color,
line_width=line_width)
# Handle bias
bias_ranges = detect_nelson_bias(src, x_bar)
for rng in bias_ranges:
if rng[2] is "+":
p.add_layout(BoxAnnotation(bottom=x_bar, top=ctrl_limits[-1], left=timeseries[rng[0]]/16, right=timeseries[rng[1]]/16, fill_color="green"))
elif rng[2] is "-":
p.add_layout(BoxAnnotation(top=x_bar, bottom=ctrl_limits[0], left=timeseries[rng[0]]/16, right=timeseries[rng[1]]/16, fill_color="red"))
# Handle trends
trend_ranges = detect_nelson_trend(src, std)
for rng in trend_ranges:
if rng[2] is "+":
p.add_layout(Arrow(end=NormalHead(line_color="goldenrod",
fill_color="goldenrod"),
x_start=timeseries[rng[0]]/16,
y_start=src[rng[0]],
x_end=timeseries[rng[1]]/16,
y_end=src[rng[1]],
line_width=4,
line_alpha=0.6,
line_dash="solid"))
elif rng[2] is "-":
p.add_layout(Arrow(end=NormalHead(line_color="#7F0000",
fill_color="#7F0000"),
x_start=timeseries[rng[0]]/16,
y_start=src[rng[0]],
x_end=timeseries[rng[1]]/16,
y_end=src[rng[1]],
line_width=4,
line_alpha=0.6,
line_dash="solid"))
p.title.text = "{0} for {1}".format(unit_name, player)
p.y_range = p.y_range = Range1d(ctrl_limits[0] - 0.125 * ctrl_limits[0], 1.125 * ctrl_limits[-1])
legend = Legend(items=list(zip(ctrl_legend, [[l] for l in lines])), location=(10,-30))
p.add_layout(legend, "right")
return p, ctrl_limits, min(src[2:]), max(src[2:]), timeseries[-1]
replay_object = ReplayData.parse_replay(replay=replay)
replay_dict = replay_object.as_dict()
players = {}
for player in replay_dict["winners"]:
players[int(player[0])] = {"full": "Winning Player {num}: {name} ({race})".format(num=player[0], name=player[1], race=player[2]),
"short": "{name} ({race})".format(name=player[1], race=player[2]) }
for player in replay_dict["losers"]:
players[int(player[0])] = {"full": "Losing Player {num}: {name} ({race})".format(num=player[0], name=player[1], race=player[2]),
"short": "{name} ({race})".format(name=player[1], race=player[2]) }
econ = ["mineral_collection_rate", "vespene_collection_rate", "workers_active", "supply_utilization", "worker_supply_ratio"]
units = ["Minerals per Minute (MPM)", "Vespene per Minute (VPM)", "Workers", "Supply Used / Supply Available", "Workers / Supply Used"]
player_charts = defaultdict(dict)
player_aggregate = defaultdict(dict)
LotV
for pid, player in players.items():
for ndx, process in enumerate(econ):
# Generate charts per player
timeseries = [k[0] for k in replay_dict["stats"][pid][process]]
proc_data = [j[1] for j in replay_dict["stats"][pid][process]]
player_charts[pid][process], limits, v_min, v_max, game_length = nelson_rules_chart_generator(
pd.Series(proc_data),
timeseries,
player["full"],
pid,
process,
units[ndx],
replay,
fixed_lcl=0)
player_aggregate[process][pid] = proc_data
grid = [[player_charts[k][measurement] for k in player_charts] for ndx, measurement in enumerate(econ)]
show(gridplot(grid, sizing_mode="scale_width"))
import seaborn
import matplotlib.pyplot as plt
%matplotlib inline
fig, axes = plt.subplots(nrows=len(grid),ncols=1,figsize=(15,20))
for ndx in range(len(grid)):
sample_max = min([len(player_aggregate[econ[ndx]][j]) for j in player_aggregate[econ[ndx]].keys()])
# Remember, we are removing the first 20 samples (beginning of game should be the same for all, so data is useless)
frame = pd.DataFrame({ units[ndx]: [player_aggregate[econ[ndx]][1][k] for k in range(2,sample_max)] + [player_aggregate[econ[ndx]][2][l] for l in range(2,sample_max)],
"Player": [players[(i//sample_max) + 1]["short"] for i in range(0,len(players.keys())*(sample_max - 2))]})
bp = seaborn.boxplot(x=units[ndx], orient="h", y='Player', data=frame, width=0.5, palette="muted", ax=axes[ndx])
bp = seaborn.swarmplot(x=units[ndx], y='Player', data=frame, color='gold', alpha=0.5, ax=axes[ndx])
plt.show()
Now that we have loaded and processed the replay, we can store it for future use and aggregation.
The replay_object has an as_dict() method to give us JSON.
We'll store this in Cloudant for future use.
# Peek at what is in the object?
# This creates a lot of output, but if you are curious just uncomment the following 2 lines:
# from pprint import pprint
# pprint(replay_object.as_dict())
from cloudant.client import Cloudant
# Some extra code here helps catch setup errors.
try:
sc2replay_creds = credentials_1
print("Cloudant credentials added for storing replay data as JSON.")
except NameError:
print('\n'
'SETUP ERROR: Please follow the directions to add Cloudant credentials to the notebook.\n'
' You may need to rename the credentials_* variable.')
raise
# Now we need to send this data to 2 databases "sc2replays for aggregating, and sc2recents.
# Connect to cloudant service with IBM Cloud IAM credentials username and apikey.
esports = Cloudant.iam(sc2replay_creds["username"], sc2replay_creds["apikey"], connect=True)
print(esports.all_dbs())
session = esports.session()
if 'sc2replays' not in esports.all_dbs():
esports.create_database('sc2replays')
sc2replays = esports['sc2replays']
document = sc2replays.create_document(replay_object.as_dict())
if document.exists():
print("sc2replays entry saved. Latest id: {0}".format(document["_id"]))
if 'sc2recents' not in esports.all_dbs():
esports.create_database('sc2recents')
sc2recents = esports['sc2recents']
# clear out everything in sc2recents db
for d in sc2recents:
d.delete()
document = sc2recents.create_document(replay_object.as_dict())
if document.exists():
print("sc2recents entry saved. Latest id: {0}".format(document["_id"]))
Cloudant credentials added for storing replay data as JSON. ['bogusdb', 'sc2recents', 'sc2replays'] sc2replays entry saved. Latest id: c600ffad339f5f4b451e3971583f28ca sc2recents entry saved. Latest id: 6234bb4ad4e7ee52c985aca97cff367f
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.