# Imports
from gerrychain import Graph, GeographicPartition, Partition, Election
from gerrychain.updaters import Tally, cut_edges
import geopandas as gpd
import numpy as np
from gerrychain.random import random
from gerrychain import MarkovChain
from gerrychain.constraints import single_flip_contiguous, contiguous
from gerrychain.proposals import propose_random_flip, propose_chunk_flip
from gerrychain.accept import always_accept
from gerrychain.metrics import polsby_popper
from gerrychain import constraints
from gerrychain.metrics import partisan_gini
from gerrychain.tree import recursive_tree_part
# read in data
# set to "True" for North Carolina, "False for Pennsylvania"
nc = True
if nc:
df = gpd.read_file("https://github.com/mggg-states/NC-shapefiles/raw/master/NC_VTD.zip")
num_dist = 13
sen_elec = Election("SENElec", {"Democratic": "EL14G_US_1", "Republican": "EL14G_USS_"})
def_assn = "newplan"
else:
df = gpd.read_file("https://github.com/mggg-states/PA-shapefiles/raw/master/PA_VTDs.zip")
num_dist = 18
sen_elec = Election("SENElec", {"Republican": "T16SENR", "Democratic": "T16SEND"})
def_assn = "TS"
graph = Graph.from_geodataframe(df, reproject=False,ignore_errors=True)
graph.add_data(df,list(df))
pop = sum(df["TOTPOP"])
updaters = {
"population": Tally("TOTPOP", alias="population"),
"cut_edges": cut_edges,
"polsby_popper": polsby_popper,
sen_elec.name : sen_elec
}
initial_partition = GeographicPartition(graph, assignment=def_assn, updaters = updaters)
# function definitions
def no_worse_pg(partition):
if random.random() < .001: return True
return abs(partisan_gini(partition["SENElec"])) <= abs(partisan_gini(partition.parent["SENElec"]))
def no_worse_pp(partition):
if random.random() < .001: return True
return np.mean(list(partition["polsby_popper"].values())) >= np.mean(list(partition.parent["polsby_popper"].values()))
def step2temp(step,maxsteps):
if step/maxsteps < .33:
return 0
else:
return 4* (((step/maxsteps) - .33)/(1-.33))
class MH_Annealer:
def __init__(self, temp, maxsteps):
self.counter = 0 # counts number of steps in the chain, increment on accept
self.temp = temp # function which eats a count and gives the current temp
self.maxsteps = maxsteps
def __call__(self, partition):
if self.temp(self.counter, self.maxsteps) == 0 :
self.counter +=1
return True
else:
prob = get_score(partition) / (get_score(partition.parent) * self.temp(self.counter, self.maxsteps))
if prob > random.random():
self.counter += 1
return True
else:
return False
def sometimes_chunk(partition):
r = random.random()
if r < .65: return propose_random_flip(partition)
else: return propose_chunk_flip(partition)
# run the chain
pps = []
pgs = []
samples = []
# increase these for larger sample, will take much longer to run!
steps = 3
#chainsteps == 150000 in the paper, can be adjusted if desired
chainsteps = 20000
for i in range(steps):
samp = []
accept = MH_Annealer(temp = step2temp, maxsteps = chainsteps)
new_plan = recursive_tree_part(graph, range(num_dist), pop/num_dist, "TOTPOP", .02, 1)
new_plan = GeographicPartition(graph, assignment=new_plan, updaters = updaters)
def pareto_improves(partition):
if count > 9.5*chainsteps/10: return (no_worse_pg(partition) and no_worse_pp(partition))
return no_worse_pp(partition)
t = 1- (i+1)/(steps)
if random.random()**16 <= t:
return no_worse_pp(partition)
else:
return no_worse_pg(partition)
chain = MarkovChain(
proposal=sometimes_chunk,
constraints=[
constraints.within_percent_of_ideal_population(initial_partition, 0.05),
contiguous
],
accept=pareto_improves,
initial_state=new_plan,
total_steps=chainsteps,
)
samp.append(new_plan)
count = 0
for p in chain:
count+=1
if count %10000 ==0:
samp.append(p)
pgs.append(partisan_gini(p["SENElec"]))
pps.append(np.mean(list(p["polsby_popper"].values())))
samples.append(samp)