import pandas as pd
import numpy as np
import plotly.plotly as py
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot,iplot
import cufflinks as cf
For this example, we will use four assets: A, B, C and D. These assets will have the same constraints: No minimum investment and a maximum investment of 35% of the total.
asset_names = ["Asset A", "Asset B", "Asset C", "Asset D"]
asset_constraints = pd.DataFrame(
index=asset_names, columns=['min_weight', 'max_weight'],
data=[[0.0, 0.35], [0.0, 0.35], [0.0, 0.35], [0.0, 0.35]])
asset_constraints
min_weight | max_weight | |
---|---|---|
Asset A | 0.0 | 0.35 |
Asset B | 0.0 | 0.35 |
Asset C | 0.0 | 0.35 |
Asset D | 0.0 | 0.35 |
We will add an additional constraint that applies to the whole portfolio. This constraint is the maximum turnover, which limits the change in our investing portfolio.
max_turnover = 0.2
We will use an arbitrary estimation for the assets. To keep the example simple, the estimation is assigned in decreasing order. This estimation can be any estimation of return calculated for each of the components of our portfolio
estimation = pd.Series(index=asset_names, data=[0.006, 0.005, 0.002, 0.001])
estimation
Asset A 0.006 Asset B 0.005 Asset C 0.002 Asset D 0.001 dtype: float64
Finally, we need to start from the current weights of the portfolio. As we are creating one from scratch, we will asign the same weights to all the assets.
# Start in equally weighted
initial_weights = pd.Series(index=asset_names, data=1 / len(asset_names))
initial_weights
Asset A 0.25 Asset B 0.25 Asset C 0.25 Asset D 0.25 dtype: float64
class Distribution:
"""This class accepts different constraints associated to every asset (like maximum or minimum
weight) and the whole porfolio (like the maximum turnover). The algorithm start in a series of
given initial weights and assign the weights according to a estimation of return of the assets."""
def __init__(self, asset_constraints, max_turnover, initial_weights, estimation):
# Sort symbol, weights and asset constraints by estimation
ranked_asset_names = estimation.sort_values(ascending=False).index
# Sort constraints by estimation
self.asset_names = ranked_asset_names
self.asset_constraints = asset_constraints.reindex(self.asset_names)
self.max_turnover = max_turnover
self.initial_weights = initial_weights.reindex(self.asset_names)
self.available_weights = self.get_available_weights()
self.weight_evolution = None
self.selected_weights = None
def run(self):
"""Run the algorithm starting in the first level"""
self.weight_evolution = pd.DataFrame(columns=self.asset_names)
self.selected_weights = pd.Series(
index=self.asset_names, data=[min(row) for row in self.available_weights])
return self.backtracking(0)
def get_available_weights(self):
"""Pre-step before calling to backtracking algorithm. It reduces the amount of candidates by
removing all the weights that do not fulfill a constraint associated to an asset."""
available_weights = \
pd.DataFrame(index=np.arange(100, -1, -1) / 100, columns=self.asset_constraints.index,
data=np.tile(np.arange(100, -1, -1) / 100,
reps=(len(self.asset_constraints.index), 1)).T)
# Minimum Weight
available_weights[available_weights < self.asset_constraints.min_weight] = np.nan
# Maximum weight
available_weights[available_weights > self.asset_constraints.max_weight] = np.nan
# Total turnover
available_weights[available_weights < (self.initial_weights - self.max_turnover)] = np.nan
available_weights[available_weights > (self.initial_weights + self.max_turnover)] = np.nan
return np.array([row[~np.isnan(row)] for row in available_weights.T.values])
def backtracking(self, level):
"""Backtracking algorithm. It search recursively by every possible solution, starting by
the asset with more estimation and going deeper into the tree to look for possible alternatives
in assets with less stimation. It's important to have the assets sorted by estimation and
the available weights sorted by weight, both descending,to start assigned all the possible
weight to the asset with more estimation.
"""
# Are we in the last level (leaves)?
is_leaf = (level + 1) > len(self.available_weights) - 1
# Get available weights for the asset that correspond to the current level
available_weights = self.available_weights[level]
for i in range(0, len(available_weights)):
# Consider current option and all its sub-levels
previous_selected_weights = self.selected_weights.copy()
self.selected_weights[level] = available_weights[i]
self.weight_evolution = self.weight_evolution.append(
self.selected_weights, ignore_index=True)
if self.is_valid():
return self.selected_weights
elif not is_leaf:
# Look for more candidates if we are not in the last level.
# Increasing one level implies to assign some weight (in case it has no minimum)
# to the next asset with more estimation.
result = self.backtracking(level + 1)
if result is not None:
# It found a solution
return result
# Return to previous weights (Backtracking)
self.selected_weights = previous_selected_weights
return None
def is_valid(self):
"""Check that a candidate distribution of weights fulfil all the constraints"""
# Sum of all weights = 1
condition_1 = self.selected_weights.sum() == 1
# Total turnover
condition_2 = (self.selected_weights - self.initial_weights).abs().sum() < self.max_turnover
return condition_1 & condition_2
def plot (self, asset_names, notebook=True):
"""Plot the weight of the asssets in each visited state in an area plot"""
if notebook:
init_notebook_mode(connected=True)
cf.go_offline()
weights = self.weight_evolution[asset_names]
if weights.shape[0] > 1000:
weights = self.weight_evolution[asset_names].loc[::100]
weights.iplot(kind='area', fill=True, filename='backtracking_distribution/first_approach',
yTitle='Weight',xTitle='States visited',title='Weight distribution')
distribution = Distribution(asset_constraints, max_turnover, initial_weights, estimation)
pd.DataFrame(index=asset_names,data=distribution.available_weights)
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | ... | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Asset A | 0.35 | 0.34 | 0.33 | 0.32 | 0.31 | 0.3 | 0.29 | 0.28 | 0.27 | 0.26 | ... | 0.14 | 0.13 | 0.12 | 0.11 | 0.1 | 0.09 | 0.08 | 0.07 | 0.06 | 0.05 |
Asset B | 0.35 | 0.34 | 0.33 | 0.32 | 0.31 | 0.3 | 0.29 | 0.28 | 0.27 | 0.26 | ... | 0.14 | 0.13 | 0.12 | 0.11 | 0.1 | 0.09 | 0.08 | 0.07 | 0.06 | 0.05 |
Asset C | 0.35 | 0.34 | 0.33 | 0.32 | 0.31 | 0.3 | 0.29 | 0.28 | 0.27 | 0.26 | ... | 0.14 | 0.13 | 0.12 | 0.11 | 0.1 | 0.09 | 0.08 | 0.07 | 0.06 | 0.05 |
Asset D | 0.35 | 0.34 | 0.33 | 0.32 | 0.31 | 0.3 | 0.29 | 0.28 | 0.27 | 0.26 | ... | 0.14 | 0.13 | 0.12 | 0.11 | 0.1 | 0.09 | 0.08 | 0.07 | 0.06 | 0.05 |
4 rows × 31 columns
# Run the algorithm to obtain the distribution
result = distribution.run()
result
Asset A 0.35 Asset B 0.25 Asset C 0.25 Asset D 0.15 dtype: float64
distribution.plot(asset_names)
"Number of states visited: {}".format(distribution.weight_evolution.shape[0])
'Number of states visited: 10274'
from types import MethodType
def backtracking(self, level):
"""Backtracking algorithm. It search recursively by every possible solution, starting by
the asset with more estimation and going deeper into the tree to look for possible alternatives
in assets with less stimation. It's important to have the assets sorted by estimation and
the available weights sorted by weight, both descending,to start assigned all the possible
weight to the asset with more estimation.
Pruning
-------
1. Candidate weights that result in a sum of weights greater than 1 when are assigned in the portfolio.
"""
# Are we in the last level (leaves)?
is_leaf = (level + 1) > len(self.available_weights) - 1
# Get available weights for the asset that correspond to the current level
available_weights = self.available_weights[level]
# Prune branches that are not feasible
sum_candidate_weigths = self.selected_weights.sum() + (available_weights - self.selected_weights[level])
mask = (sum_candidate_weigths <= 1)
available_weights = available_weights[mask]
for i in range(0, len(available_weights)):
# Consider current option and all its sub-levels
previous_selected_weights = self.selected_weights.copy()
self.selected_weights[level] = available_weights[i]
self.weight_evolution = self.weight_evolution.append(
self.selected_weights, ignore_index=True)
if self.is_valid():
return self.selected_weights
elif not is_leaf:
# Look for more candidates if we are not in the last level.
# Increasing one level implies to assign some weight (in case it has no minimum)
# to the next asset with more estimation.
result = self.backtracking(level + 1)
if result is not None:
# It found a solution
return result
# Return to previous weights (Backtracking)
self.selected_weights = previous_selected_weights
return None
distribution.backtracking = MethodType(backtracking, distribution)
result = distribution.run()
result
Asset A 0.35 Asset B 0.25 Asset C 0.25 Asset D 0.15 dtype: float64
distribution.plot(asset_names)
"Number of states visited: {}".format(distribution.weight_evolution.shape[0])
'Number of states visited: 3753'
from types import MethodType
def backtracking(self, level):
"""Backtracking algorithm. It search recursively by every possible solution, starting by
the asset with more estimation and going deeper into the tree to look for possible alternatives
in assets with less stimation. It's important to have the assets sorted by estimation and
the available weights sorted by weight, both descending,to start assigned all the possible
weight to the asset with more estimation.
Pruning
-------
1. If the node is not a leaf: Candidate weights result in a sum of weights greater than 1 when are assigned in the portfolio.
2. If the node is a leaf: Candidate weights result in a sum of weights different from 1 when are assigned in the portfolio.
"""
# Are we in the last level (leaves)?
is_leaf = (level + 1) > len(self.available_weights) - 1
# Prune branches that are not feasible by reducing the amount of available weights (if possible)
available_weights = self.available_weights[level]
sum_candidate_weigths = self.selected_weights.sum() + available_weights - self.selected_weights[level]
mask = (sum_candidate_weigths == 1) if is_leaf else (sum_candidate_weigths <= 1)
available_weights = available_weights[mask]
for i in range(0, len(available_weights)):
# Consider current option and all its sub-levels
previous_selected_weights = self.selected_weights.copy()
self.selected_weights[level] = available_weights[i]
self.weight_evolution = self.weight_evolution.append(
self.selected_weights, ignore_index=True)
if self.is_valid():
return self.selected_weights
elif not is_leaf:
# Look for more candidates if we are not in the last level.
# Increasing one level implies to assign some weight (in case it has no minimum)
# to the next asset with more estimation.
result = self.backtracking(level + 1)
if result is not None:
# It found a solution
return result
# Return to previous weights (Backtracking)
self.selected_weights = previous_selected_weights
return None
distribution.backtracking = MethodType(backtracking, distribution)
result = distribution.run()
result
Asset A 0.35 Asset B 0.25 Asset C 0.25 Asset D 0.15 dtype: float64
distribution.plot(asset_names)
"Number of states visited: {}".format(distribution.weight_evolution.shape[0])
'Number of states visited: 501'