Edgar is classifying the reporting companies by SIC (Standard Industrial Classification) Code. We can use this information to calculate the total sales per sector and then calculate the % share of the individual company.
This is helping us to identify the companies with a big market share.
We install the necessary libraries with the help of Maven...
%classpath config resolver maven-public http://software.pschatzmann.ch/repository/maven-public/
%%classpath add mvn
ch.pschatzmann:smart-edgar:LATEST
org.apache.spark:spark-sql_2.11:2.3.2
Added new repo: maven-public
... and we start a Spark Session
%%spark --start
val spark = SparkSession.builder()
.appName("Edgar")
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.ui.enabled", "false")
We get the "Revenues" by company and year. If there are no "Revenues" reported we use the "SalesRevenueNet" instead. For the time beeing we limit the year to 2017.
import ch.pschatzmann.edgar.reporting.EdgarModel;
import ch.pschatzmann.edgar.reporting.Table;
val model = new EdgarModel();
model.create();
model.setParameterAsPriorityAlternatives(true);
model.getNavigationField("values", "unitref").setFilterValues("USD");
model.getNavigationField("values", "segment").setFilterValues("");
model.getNavigationField("values", "segmentdimension").setFilterValues("");
model.getNavigationField("values", "form").setFilterValues("10-K");
model.getNavigationField("values", "parameterName").setFilterValues("Revenues","SalesRevenueNet");
model.getNavigationField("values", "numberOfMonths").setFilterValues("12");
val salesTable = new Table();
salesTable.setValueField(model.getTable("values").getValueField());
salesTable.addColumn(model.getNavigationField("values", "year").setFilterValues("2016","2017"));
salesTable.addRow(model.getNavigationField("company", "companyName"));
salesTable.addRow(model.getNavigationField("values", "identifier"));
salesTable.addRow(model.getNavigationField("company", "sicCode"));
salesTable.addRow(model.getNavigationField("company", "sicDescription"));
salesTable.addRow(model.getNavigationField("company", "tradingSymbol"));
salesTable.execute(model);
null
new java.util.ArrayList(salesTable.toList)
Next we write the data to a CSV file...
import ch.pschatzmann.edgar.utils.Utils
import ch.pschatzmann.common.table.TableFormatterCSV
val file = Utils.createTempFile(new TableFormatterCSV().format(salesTable))
/tmp/edgar5857024251653943782.tmp
... and we import the data into Spark
import spark.implicits._
import org.apache.spark.sql.types._
var df = spark.read.format("csv")
.option("delimiter", ";")
.option("header", "true")
.option("inferSchema","true")
.load(file.getAbsolutePath).toDF()
df.printSchema()
root |-- companyName: string (nullable = true) |-- identifier: integer (nullable = true) |-- sicCode: integer (nullable = true) |-- sicDescription: string (nullable = true) |-- tradingSymbol: string (nullable = true) |-- 2016: string (nullable = true) |-- 2017: string (nullable = true)
org.apache.spark.sql.SparkSession$implicits$@588b4464
Unfortunatly the Sales are imported as text. We therefore create a new numeric column. The we calculate the totals by SIC code
val df1 = df.withColumn("Sales",df("`2017`").cast(DoubleType))
val df2 = df1.withColumn("Sales2016",df1("`2016`").cast(DoubleType))
val sectorTotals = df1.groupBy("sicCode").sum("Sales")
sectorTotals.display(1000)
We join the totals with the company data
val joined = df2.withColumnRenamed("sicCode", "sicCode1").join(sectorTotals, $"sicCode1" === $"sicCode")
joined.display(1000)
Finally we can calculate the MarketShare by diviging the Sales by the total sales
We display the top 100 companies with the biggest shares
val result = joined
.withColumn("MarketShare",expr("Sales / `sum(Sales)` * 100.0"))
.withColumn("MarketShare2016",expr("Sales2016 / `sum(Sales)` * 100.0"))
.withColumn("MarketShareInc",expr("(MarketShare - MarketShare2016) / MarketShare2016 * 100.0"))
.orderBy(col("MarketShare").desc)
result.display(100)
null
Hmm, this is strange and needs to be investigated further: At first sight it seems that the US consists mainly of Monopolies and that the Market Econmomy where competition should reign is not working any more.
Now we try to find the biggest company for each SIC code
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy(result("sicCode")).orderBy(result("MarketShare").desc)
val topBySectorsDF = result
.withColumn("identifierWithHigestSales", first(result("identifier"))
.over(windowSpec))
.filter("identifier = identifierWithHigestSales")
.orderBy($"MarketShare".desc)
topBySectorsDF.display(1000)
null
In 179 of 383 sectors (this is 46% of the sectors) we have companies with a market share of >= 75% !
As a last step will visualize the result in a Chart
val list = topBySectorsDF.collect
topBySectorsDF.printSchema
root |-- companyName: string (nullable = true) |-- identifier: integer (nullable = true) |-- sicCode1: integer (nullable = true) |-- sicDescription: string (nullable = true) |-- tradingSymbol: string (nullable = true) |-- 2016: string (nullable = true) |-- 2017: string (nullable = true) |-- Sales: double (nullable = true) |-- Sales2016: double (nullable = true) |-- sicCode: integer (nullable = true) |-- sum(Sales): double (nullable = true) |-- MarketShare: double (nullable = true) |-- MarketShare2016: double (nullable = true) |-- MarketShareInc: double (nullable = true) |-- identifierWithHigestSales: integer (nullable = true)
null
val plot = new Plot()
plot.add(new Bars {
displayName = "MarketShare%"
y = list.map(r => r(11).asInstanceOf[Double]).toSeq
toolTip = list.map(r => r(0).toString+"<br>"+r(3)+"<br>("+r(11)+"%)").toSeq
})
We plot the companies as a the Market Share vs Market Share Growth portfolio:
val plot = new Plot() {
yLabel = "MarketShare Growth"
xLabel = "MarketShare%"
logY=true
yLogBase = 10
}
plot.add(new Points {
y = list.map(r => r(13).asInstanceOf[Double]).toSeq
x = list.map(r => r(11).asInstanceOf[Double]).toSeq
toolTip = list.map(r => r(0).toString+"<br>"+r(3)+"<br>Share: "+r(11)+"%"+"<br>Growth: "+r(13)+"%").toSeq
})
We limit our entries to the companies with a ticker symbol
val traded = list.filter(r => r(4).isInstanceOf[String])
val plot = new Plot() {
yLabel = "MarketShare Growth"
xLabel = "MarketShare%"
logY=true
yLogBase = 10
}
plot.add(new Points {
y = traded.map(r => r(13).asInstanceOf[Double]).toSeq
x = traded.map(r => r(11).asInstanceOf[Double]).toSeq
toolTip = traded.map(r => r(0).toString+"<br>"+r(3)+"<br>Share: "+r(11)+"%"+"<br>Growth: "+r(13)+"%<br>"+r(4)).toSeq
})