This notebook is available as a public gist. If you like it, feel free to let me know on Twitter.
Recently my attention was drawn on Twitter towards a video from a 1984 episode of the Thames Television programme "Database" on sending e-mail via the phone line. What's most interesting is that the episode includes an experimental piece of "tele-software" which is broadcast as an audio signal over the end credits.
You can watch the video here:
from IPython.display import YouTubeVideo
YouTubeVideo('szdbKz5CyhA')
I wondered how hard it would be to load (and run) this software from 1984. I knew the software was for the BBC Micro which was a machine I had growing up. It'd be a nice nostalgia trip to try and load some software again.
The first thing to do is to download the video and snip out the audio section. Below I've got a little snippet of bash script (assuming you're running on a Unix-like machine) which will download and snip out the audio. If you're not running on a Unix machine, you can download the video and use a program like Audacity to save the audio segment. YOU MUST SAVE IT AS AN UNCOMPRESSED WAV FILE. Python cannot load compressed WAV files.
%%bash
# You must have yourtube-dl and ffmpeg installed to run this script
# Change to a suitable download directory
cd ~/Downloads
# Download video
[ -f video.mp4 ] || youtube-dl -o video.mp4 szdbKz5CyhA
# Tele-software starts at around 5:38. Use ffmpeg to snip out the relevent section and to
# convert the audio track to an uncompressed wav.
[ -f database-telesoftware.m4a ] || ffmpeg -i video.mp4 -ss 5:38.5 -vn -acodec copy database-telesoftware.m4a
[ -f database-telesoftware.wav ] || ffmpeg -i database-telesoftware.m4a -acodec pcm_s16le database-telesoftware.wav
Let's check that our audio segment was clipped out correctly. Here I'm embedding the compressed AAC audio into the notebook to keep the size of the notebook down but you should be able to load the WAV file in the same way:
import os
from IPython.display import Audio
Audio(os.path.expanduser('~/Downloads/database-telesoftware.m4a'))
Now we have the audio data, we can use the Python wave module from the standard library to load it.
import wave
import contextlib
import numpy as np
# Where is the audio file on disk?
audio_fn = os.path.expanduser('~/Downloads/database-telesoftware.wav')
# Use contextlib's closing wrapper on wave.open() since the returned object must
# have .close() called on it
with contextlib.closing(wave.open(audio_fn)) as wf:
# Record the sample rate, number of channels and load the raw samples as
# little-endian 16-bit values
sample_rate = wf.getframerate()
n_channels = wf.getnchannels()
samples = np.fromstring(wf.readframes(wf.getnframes()), dtype='<i2')
# Downmix the samples to mono
samples = np.mean(samples.reshape((-1, 2), order='C'), axis=-1)
# Construct an array giving times in seconds for each sample
ts = np.arange(samples.shape[0]) / sample_rate
Let's have a look at the samples from somewhere in the middle of the data section:
# Enable IPython's matplotlib integration
%matplotlib inline
# Import matplotlib's pyplot API. Configure matplotlib to have slightly larger
# figures by default.
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (14, 9)
example_slice = slice(600000, 601500)
plt.plot(ts[example_slice], samples[example_slice])
plt.xlabel('Time [s]')
plt.ylabel('Sample value')
<matplotlib.text.Text at 0x7f8584ddaeb8>
There's some documentation available on the Acorn cassette tape format. The physical layer is built around two tones, one at 1200Hz and one at 2400Hz. A physical "zero" bit is represented by one cycle of the 1200Hz tone and a physical "one" bit by two cycles of the 2400Hz one. We can see that in the samples above; there are high frequency and low frequency sections corresponding to "one" and "zero" bit regions.
Since one physical bit is transferred by one cycle of the 1200Hz tone, the physical data transfer rate is 1200 baud. The upper limit for our 10 second clip is therefore around 1KB.
# Set "zero" and "one" bit frequencies and the number of cycles per bit.
zero_freq = 1200 # Hz
zero_cycles_per_bit = 1
one_freq = 2 * zero_freq
one_cycles_per_bit = 2 * zero_cycles_per_bit
There are lots of ways to look for sinusoids of known frequencies in signals. I'm going to make use of a matched filter approach where we correlate the input signal with some version of our expected signal. The output will be large when our signal looks like our expected signal and low otherwise.
The filter here is essentially a windowed DFT filter for a specific frequency bin. We create a complex sinusoid for the "zero" and "one" freqencies. For the "zero" frequency we window the filter to cover one cycle. For the "one" frequency we window it to cover two cycles.
We expect the absolute value of the filter responses to be maximum when the signal matches. In the case of the "zero" filter, when the signal is a single-cycle of a 1200Hz sinusoid and in the case of the "one" filter, when the signal is two-cycles of a 2400Hz sinusoid.
filter_length = zero_cycles_per_bit * sample_rate / zero_freq # samples
filter_ts = np.arange(-int(np.ceil(filter_length*0.5)), int(np.ceil(filter_length*0.5))+1) / sample_rate
window = np.hanning(filter_ts.shape[0])
window /= np.sqrt(np.square(window).sum())
zero_filter = window * np.exp(1j * zero_freq*filter_ts*np.pi*2)
plt.plot(filter_ts, np.real(zero_filter), label='Real')
plt.plot(filter_ts, np.imag(zero_filter), label='Imaginary')
plt.plot(filter_ts, np.abs(zero_filter), label='Magnitude')
plt.legend(loc='best')
plt.xlabel('Time [s]')
plt.ylabel('Filter')
plt.grid(True)
filter_length = one_cycles_per_bit * sample_rate / one_freq # samples
filter_ts = np.arange(-int(np.ceil(filter_length*0.5)), int(np.ceil(filter_length*0.5))+1) / sample_rate
window = np.hanning(filter_ts.shape[0])
window /= np.sqrt(np.square(window).sum())
one_filter = window * np.exp(1j * one_freq*filter_ts*np.pi*2)
plt.plot(filter_ts, np.real(one_filter), label='Real')
plt.plot(filter_ts, np.imag(one_filter), label='Imaginary')
plt.plot(filter_ts, np.abs(one_filter), label='Magnitude')
plt.legend(loc='best')
plt.xlabel('Time [s]')
plt.ylabel('Filter')
plt.grid(True)
Computing the "zero" and "one" respose can be done via the correlate
function from scipy
:
from scipy.signal import correlate
zero_response = np.abs(correlate(samples, zero_filter, 'same'))
one_response = np.abs(correlate(samples, one_filter, 'same'))
Let's look at the "zero" and "one" responses for our example signal.
plt.plot(ts[example_slice], zero_response[example_slice], label="Zero response")
plt.plot(ts[example_slice], one_response[example_slice], label="One response")
plt.legend(loc='best')
plt.xlabel('Time [s]')
plt.grid(True)
Comparing this to the figure above, we can see that the "zero" response is greatest when we have a "zero" bit and the "one" response is greatest when we have a "one" bit. Let's look at the sum of the response:
plt.plot(ts[::100], (zero_response + one_response)[::100])
plt.xlabel('Time [s]')
plt.ylabel('Total response')
plt.grid(True)
We see that when there is no carrier, the total response from both filters is very small. We can use this to ignore "no data" regions of the signal. We'll create an array which assigns a bit to each input sample. This bit is "0" for a "zero" bit, "1" for a "one" bit and "-1" when there is no carrier. This corresponds to the description of the physical layer from the source linked above:
The signal may be in one of three states; zero, one or no carrier.
# Create an array to hold the decoded bit for each input sample. Represent a
# "zero" by 0, "one" by 1 and "no carrier" by -1.
bits = np.zeros(samples.shape[0], dtype=np.int8)
bits[one_response > zero_response] = 1
bits[np.maximum(zero_response, one_response) < 500] = -1
Let's look at the decoded bits for all of the signal.
plt.plot(ts[::100], bits[::100])
plt.ylim(-1.1, 1.1)
plt.grid(True)
plt.xlabel('Time [s]')
plt.ylabel('Detected bit')
<matplotlib.text.Text at 0x7f857c82f5c0>
We now want to find where in the signal the data actually starts. We know from the documentation that:
To allow the tape deck circuitry to settle, each data stream is preceded by 5.1 seconds of 2400 Hz leader tone.
So data starts at the first "zero" bit after around 5 seconds of "one" bits. We can write a quick loop to examine every "zero" bit and find the first one which is preceeded by 5 seconds of "one"s.
leader_len = int(5 * sample_rate)
print('Leader length (samples):', leader_len)
# look for leader start
zero_idxs = np.nonzero(bits == 0)[0]
found_leader = False
for idx in zero_idxs:
if idx < leader_len:
continue
if np.all(bits[idx-leader_len:idx] == 1):
found_leader = True
break
data_start = idx
print('Found leader:', found_leader)
print('Data start index:', data_start)
Leader length (samples): 220500 Found leader: True Data start index: 264987
We've now found the start of the data. We now need to conver from the on-tape data storage format to a list of bytes we can interpret. From the documentation:
Data is recorded asynchronously on the tape in 8-bit bytes. Each byte consists of one start bit (a zero), 8 data bits lowest first, and one stop bit (a one).
We can calculate the number of samples for one data bit. It will be the number of samples in one cycle of a 1200Hz sinusoid. We then loop through the per-sample bit labels one data bit's number of samples at a time looking for regions which match the description above. Then we extract the bytes and add them to a list. Finally, we form a Python bytes
object from them.
one_bit_len = int(sample_rate / 1200)
start_idx = data_start
idx = start_idx + (one_bit_len>>1)
bs = []
while idx < bits.shape[0]:
if bits[idx] != 0 or bits[idx + one_bit_len*9] != 1:
break
byte = 0
for j in range(8):
byte += int(bits[idx+(1+j)*one_bit_len]) * (1<<j)
bs.append(byte)
idx += one_bit_len*9
while bits[idx] == 1:
idx += 1
idx += one_bit_len>>1
bs = bytes(bs)
print("Parsed {} bytes".format(len(bs)))
Parsed 1039 bytes
Let's look at the parsed bytes:
print(repr(bs))
b'*THAMES\x00\x00\x19\xff\xff#\x80\xff\xff\x00\x00\x00\x01\x00\x00\x00\x00\x00Z\x8e\r\x00\n\x0e\xeb7:\xfb129:\xdb \r\x00\x14\x19\xf1"Thames Television"\r\x00\x1e\x1b \xf1"-------------------"\r\x00(\x1f\xf1:\xf1:\xf1"DATABASE COMPETITION"\r\x0023 \xf1:\xf1:\xf1:\xf1"Send solutions on a postcard only to:"\r\x00<\x15\xf1:\xf1" Database"\r\x00F\x15\xf1" THAMES TV"\r\x00P%\xf1" 149 TOTTENHAM COURT ROAD"\r\x00Z\x14\xf1" LONDON"\r\x00d\x16\xf1" \xe2\xe9*THAMES\x00\x00\x19\xff\xff#\x80\xff\xff\x01\x00\x00\x01\x00\x00\x00\x00\x00\xb1\xad W1P 9LL"\r\x00m( \xf1:\xf1:\xf1"PRESS SPACE BAR TO CONTINUE" \r\x00n\x19 A$=\xbe:\xe7A$="" \x8c \x8dDn@ \r\x00o\x12 \xe7A$<>" "\x8c\x8dDn@\r\x00x\x07 \xeb5\r\x00\x82\x0b\xfb129:\xdb \r\x00\x8c\x08\xf1:\xf1;\r\x00\x96\x1f\xe3I=1\xb88:\xf3A$:\xe7I=1 \x8c\xe5\x8dt`@\x8b\x8dtj@\r\x00\xa0\r \xfb4:\xe5\x8dtt@\r\x00\xaa\x10 \xe7I=5\x8c\xfb3\x8b \xfbI\r\x00\xb4\x0b\xf1A$;:\xed \r\x00\xbe\x16\xfb3:\xf1" COMPETITION"\r\x00\xc3\x1c\xf1"--------------------" \r\x00\xc8\x0e \xf1:\xf1:\xf1:\xf1:\x1b\xc3*THAMES\x00\x00\x19\xff\xff#\x80\xff\xff\x02\x00\x00\x01\x00\x00\x00\x00\x00\x9c\xe9\xf1\r\x00\xd2\x11\xfb2:\xf1\x89(10);"2"\r\x00\xdc\x11\xfb2:\xf1\x89(8);"4";\r\x00\xe6\x0c\xfb3:\xf1" *"\r\x00\xf0\x12\xfb3:\xf1\x89(8);"* *"\r\x00\xfa!\xfb2::\xf1\x89(6);"1";:\xfb3:\xf1"******" \r\x01\x04\x10\xf1\x89(8);"* *" \r\x01\x0e \xfb2::\xf1\x89(6);"3";:\xfb3:\xf1"******" \r\x01\x18\r\xf1\x89(8);"*"\r\x01"4\xf1:\xf1:\xf1:\xfb2:\xf1"1.";:\xfb4:\xf1" Arresting pop group"\r\x01,% \xf1:\xfb2:\xf1"2.";:\xfb4:\xf1"_ to me only" \r\x0164\xf1:\xfb2\'\x85*THAMES\x00\x00\x19\xff\xff#\x80\xff\xff\x03\x00\x9b\x00\x80\x00\x00\x00\x00\xc7\x17:\xf1"3.";:\xfb4:\xf1" Four star transport"\r\x01@=\xf1:\xfb2:\xf1"4.";:\xfb4:\xf1" Underwater computer language?" \r\x01E\n \xe5\x8dDEA\r\x01J& \xdc "D","A","T","A","B","A","S","E"\r\xff\xc0B'
There's definitely recognisable data there.
The Acorn Cassette Format consists of several data blocks. Each block records information on a block of data within each file. We can write a function based on the description on the wiki page to parse each block from raw bytes.
from collections import deque, namedtuple
import codecs
CassetteBlock = namedtuple('CassetteBlock', 'filename load_addr exec_addr block_num block_flag data')
def parse_acorn_cassette_block(block):
"""Parse one Acorn Cassette Format block from a sequence of bytes. Returns a
CassetteBlock, deque tuple containing the parsed block and the remaining
unparsed bytes.
Note: this function does not check CRCs. In this application, if we've received
the data incorrectly, there's not much we can do beyond flipping bits.
"""
# Look for synchronisation byte
block = deque(block)
while block.popleft() != 0x2a:
pass
# Parse filename
fn_chars = []
while True:
fn_char = block.popleft()
if fn_char == 0x00:
break
fn_chars.append(fn_char)
filename = codecs.decode(bytes(fn_chars), 'ascii')
def popint(n_bytes):
rv = 0
for idx in range(n_bytes):
rv += block.popleft() * (1<<(8*idx))
return rv
# Parse load address, execution address and block number
load_addr = popint(4)
exec_addr = popint(4)
block_num = popint(2)
# Parse data block length
data_len = popint(2)
# Extract block flag byte
block_flag = popint(1)
# Address of next file
addr_next_file = popint(4)
# Note: CRCs are stored *high byte first*
header_crc = (1<<8)*block.popleft() + block.popleft()
data = bytes(block.popleft() for _ in range(data_len))
if data_len > 0:
data_crc = (1<<8)*block.popleft() + block.popleft()
record = CassetteBlock(
filename=filename, load_addr=load_addr, exec_addr=exec_addr,
block_num=block_num, block_flag=block_flag, data=data,
)
return record, block
Now we iterate through each block and build up a dictionary mapping filenames to file contents.
# A dict which will map file names to bytes-objects with their contents.
file_data = {}
raw_data = bs
while len(raw_data) > 0:
block, raw_data = parse_acorn_cassette_block(raw_data)
file_data[block.filename] = file_data.get(block.filename, bytes()) + block.data
Which files have we in the tele-software download?
print(', '.join(file_data.keys()))
THAMES
Just one file named THAMES
. We can write all the files out to the filesystem.
for fn, data in file_data.items():
out_path = os.path.join(os.path.expanduser('~/Downloads/'), fn)
print('Writing length {} file {} to {}'.format(len(data), fn, out_path))
with open(out_path, 'wb') as fobj:
fobj.write(data)
Writing length 923 file THAMES to /users/rjw57/Downloads/THAMES
Our file is only 923 bytes long. A short BASIC program perhaps?
To run the program, I used BeebEm. This emulator has a nice feature where you can import a file from the host machine to an emulated disk. You can also save screenshots. Let's see if the THAMES
file loads into BASIC.
from IPython.display import Image
Image(os.path.expanduser('~/Downloads/beebem-cat.png'))
So far, so good. We can LIST
the program to make sure that it is BASIC.
Image(os.path.expanduser('~/Downloads/beebem-list-1.png'))
Image(os.path.expanduser('~/Downloads/beebem-list-2.png'))
Image(os.path.expanduser('~/Downloads/beebem-list-3.png'))
It's definitely a BASIC program. Let's try RUN
-ing it.
Image(os.path.expanduser('~/Downloads/beebem-scrn-1.png'))
A competition? After pressing the space bar, we get a crossword.
Image(os.path.expanduser('~/Downloads/beebem-scrn-2.png'))
After a bit of head-scratching, consulting with work colleagues and Googling song lyrics, I got the solution:
D
C R
POLICE
R N
TANKER
L
It turns out that if you Google these words, one finds another brave soul on the Internet who wrote their own demodulator. (Their frequency detection was far simpler than mine.) At least they got the same answers.