Big data, code explanation

Extracting sentiment from thousands of financial reports in just minutes— a code explanation

A simple introduction to Spark, User-Defined Functions (UDFs) and Joins.

This post is targeted at anyone who understands the concepts of Hadoop, HDFS and Spark engine, and a basic knowledge of Python and Linux terminal. For a general audience-friendly version, click here

In this post, I will explain the code that helped me leverage the power of distributed processing, bringing processing times down from 8 hours to under 8 minutes!

Want to see the code? Click here for GitHub repository

This post is split up into 5 parts:

  1. Brief technical point on the difference between Spark and PySpark, to emphasise
  2. Quick overview of the original (non-Spark) project, the goal of the PySpark version and summary of the key result.
  3. How I converted parts of it into a Spark project, including code explanations and justifications of method.
  4. Graphs to show the processing time differences between a locally ran program, a single node Hadoop cluster and a 6-node Hadoop cluster.
  5. Suggested future improvements.

The difference between PySpark and Spark

Let’s assume you know what Hadoop, HDFS and Spark are. If you don’t, click here and read the Distributed computing’ section.

We are using Spark via PySpark. Let me explain.

Spark is the engine

It was written in Scala and is the underlying logic that does the heavy lifting when it comes to distributed processing.

You can actually write code in Scala, Python, Java or R, and there is a way to run it on the Spark engine.

PySpark is a python library

If you want to write code in python that leverages the power of the Spark engine, you need to use the pyspark library.

The point to emphasise

All the code below is written in python using the pyspark library. But it must be executed on a Hadoop cluster with Spark installed to leverage the power of distributed processing and crush those processing times.

Project brief

My team and I were asked to analyse a specific type of financial report, called the 10-k reports, of all S&P 100 companies, extract sentiment (i.e. how positive or negative their wording was) and tried to use it as a predictor for short-term stock price changes.

Crucially, this project was a mix of different challenges:

  • Downloading all the reports (which are all in .html format) with webscraping from the US SEC website.
  • Cleaning up the text using BeautifulSoup
  • Extracting sentiment by comparing document text to the Loughran-Mcdonald sentiment word list,
  • Downloading relevant stock price data using yahoofinancials.
  • Analysing stock prices and sentiment counts, visualising any relationships.

The problem

However, locally ran, this project lead to huge processing of times of around 8 hours. We were working with

  • 1200 html files, totalling 4GB.
  • A total of 7.5 million words.
  • 7 sentiments and a total 4000 sentiment words to look for.

The solution

I converted the cleaning and extracting sentiment parts into a PySpark program and ran them on a cloud-based Hadoop cluster with 6 nodes and Spark installed.

The key result

Originally, this part of the program took 8 hours to run. With my PySpark version, it took just under 8 minutes!

The code, explained

Load data into distributed storage

The raw data were a series of raw .html files, which were put into a directory in HDFS using the hdfs dfs -put command. Additonally, a JSON file of all sentiments and their word lists was needed so was added to HDFS too.

Spark DataFrames of the data

To use the data into the script (to be used), I had to load them into Spark’s preferred format, the Spark DataFrame. Essentially, a tabular structure similar to Pandas DataFrames, but immensely optimised for distributed processing.

For the documents, I decided to use one DataFrame — each row was a different document. *.html just means “every file ending .html”. Every time a file is read in, input_file_name() returns the name of it.

from pyspark.sql.functions import input_file_namedf = spark.read.text('/edgar/raw_full/*.html', wholetext=True).withColumn('raw file name', input_file_name())

The DataFrame had two columns: name of source file, and the whole body of text (i.e. 1000s of words per cell). This seems inefficient, but actually worked really well — recall, Spark is used to handling millions or billions of rows of data.

For the sentiment word list, I opted to use 7 DataFrames — one for each sentiment.

sentiment_words = spark.read.json('/edgar/sentiment_words.json').toPandas().transpose().to_dict()[0]dfs = [spark.createDataFrame(data = [(i, ) for i in sentiment_words[k]], schema=[k]) for k in sentiment_words.keys()]

The first line was just a very convoluted way to read in the JSON file as dictionary (in hindsight, I could’ve just used the json library).

The second line creates the 7DataFrames — each has 1 column labelled the sentiment, and its rows are the words related to that sentiment. This allows joins to be done much easier (see later).

UDF to clean the documents

Our original program had a simple function to take html text as input and return clean text as output — text with no html tags and non-alphanumeric characters removed.

import re
from bs4 import BeautifulSoup
def clean_line(raw_html_text):
soup = BeautifulSoup(raw_html_text.strip(),"html.parser")
html_text = soup.text
cleaned_html_text = re.sub(r'[^a-zA-Z0-9 ]','', html_text)
return cleaned_html_text

The simplest way to keep this logic was register it as a user-defined function (UDF). When you do this, Spark is able to share this function with all nodes in such a way that it can leverage distributed processing.

clean_line_UDF = udf(clean_line, StringType())
spark.udf.register("clean_line_UDF", clean_line, StringType())

Two simple lines of code and now you can use it anywhere you want. The first line creates the function in “UDF” talk. The second registers it — that is, tells Spark to distribute it to all nodes. You now call the function clean_line_UDF everywhere you want to use the original function.

All of Sparks built-in functions are made so that they are heavily optimised for distributed processing. UDFs are not nearly as efficient as them, but they are a good compromise if the logic is particularly complex. Using a standard python function just simply wouldn’t work with Spark.

So to clean documents, you simply apply this function to the column holding document text

df_cleaned = df.withColumn('cleaned', clean_line_UDF(df['value'])).drop('value')

There you have it — a DataFrame of cleaned document text, ready for the next stage.

Sentiment extraction with joins and counting

In the original program, a function would take each document’s clean text and iterate over all sentiment words, counting each time there was a match between the word list and document words.

Unfortunately, turning this into a UDF is definitely not the best solution — Spark isn’t made for optimising iterative processes. But, what it is optimised for are joins.

The approach? “Explode” each documents text — create a DataFrame where each row holds a single word and the document it came from. This results in a DataFrame of about 7 million rows (1 for each word across all documents). Recall, Spark is made to handle this situation really well.

df_with_split = df_cleaned.select('*', split(lower(col('cleaned')), ' ').alias('cleaned_split'))df_exploded = df_with_split.withColumn('exploded', explode('cleaned_split'))

Before explosion, the document text has to be “tokenized” into a list of words.

Now, takes those DataFrames of the sentiment word lists from earlier and join each, one by one, to this mahoosive document text DataFrame. Count all matches, group by document name and there you have it — sentiment word counts for every single document.

dfs_joined = [ df_exploded.join(dfs[i], df_exploded['exploded'] == dfs[i][dfs[i].columns[0]], 'inner') for i in range(len(dfs))]

dfs_counts = [dfs_joined[i].groupBy('raw file name').agg(count('*').alias(dfs_joined[i].columns[-1])) for i in range(len(dfs_joined))]

Seems fishy right, that joining such large tables together could ever be efficient? I kid you not, this process took seconds (38, to be precise) to do. You’ll see in the charts later on how much faster this was than the original program.

Now, we don’t want 7 DataFrames of results but just 1. So I joined all of these “results” DataFrames together.

df_counts = df_with_split.select('raw file name')
for sentiment in dfs_counts:
df_counts = df_counts.join(sentiment, 'raw file name', 'left')

Finally, I turned all NULL values into 0 .

df_counts = df_counts.na.fill(0)

Execute the plan

Up until this point, nothing intensive has happened. We’ve only setup the transformations, which is lazily evaluated (i.e. nothing has really happened). Only once we perform an action will there be any actual processing.

In my case, the action is to write all results to a csv file in HDFS.

df_counts.write.option("header","true").option("sep",",").mode("overwrite").csv('/edgar/sentiment_counts/')

And there you have it, documents cleaned and sentiment extracted.

The question is — was this all worth it? How much time did this actually save?

Time saving, visualised

The answer is — a lot of time!

I ran the program in 3 different environments:

  1. Local — 1 computer, no Hadoop. This is the original program, the one we made pre-distributed computing.
  2. Databricks (1 node) — this is platform that gives you free access to a cloud-hosted Hadoop environment, but you only have access to 1 node.
  3. AWS EMR (6 nodes) — EMR is Amazon’s cloud-based Hadoop service. I had the fortune of having accessing to a cluster of 6 nodes, thanks to Kubrick.

Result 1 — all 3 on a sample of the data

For each environment, I timed how long the whole program took on a 12% sample of the data. Here were the results:

No surprise that EMR obliterates the original program, reducing the processing time from 56 minutes to just under 3 minutes!

Furthermore, the 6 node EMR environment (unsurprisingly) outperformed the 1 node Databricks environment. So now, I will only make comparisons between the local and the EMR environments.

Extrapolation to full sample

Before I actually ran the program on the full data, I predicted what the processing times would be.

We can safely extrapolate the local environment to a total 8 hours — this is because it is a linear process (rows of data are processed one after the other).

For the other EMR environment, I assumed linear extrapolation and predicted a total processing time of 25 minutes.

Then, I actually ran the program on the full sample on the EMR to see just how off my prediction was.

Result 2 — Actual processing time of EMR on full data

This is the result you’ve all been waiting for:

Clearly, this project was a huge success.

The EMR environment actually took just 7.5 minutes to do what previously took 8 hours!

Furthermore, the prediction for the EMR was completely off.

The actual processing time was almost 5x shorter than prediction!

Safe to say, I was absolutely gobsmacked by this result

Improvements

There are many ways in which this project could’ve been improved. Here are a few:

  • Incorporate the downloading element (before the cleaning stage) into the program. Distributed downloading sounds exciting, but I’m yet to try it out.
  • Try and factor out the use of UDFs. Although they are acceptable, they’re not ideal. Whenever possible, I should try and use built-in Spark functions for ultimate efficiency.
  • Generalise program — accept user input of company names, types of reports and time periods.

Final remarks

Distributed computing is an exciting technology that we have today. But it’s going to be even bigger in the future. I am grateful to have the skills to at least navigate such a world, but I hope gain even more skills in the nearby future to fully tame it.

I welcome any and all feedback — this is my first techy post, it’s clearly not perfect, and I’d like to hear your thoughts.

Additionally, if you have any questions, do let me know! Send me a message on LinkedIn.

A data engineer with Kubrick. I will share some of my projects and knowledge here.