{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Spark tutorial\n", "\n", "### Install Spark\n", "\n", "1. Install Java\n", "\n", "```\n", "sudo apt install default-jdk\n", "java -version\n", "```\n", "\n", "2. Download Apache Spark and unpack it\n", "https://spark.apache.org/downloads.html\n", "\n", "```\n", "sudo mv spark-3.2.0-bin-hadoop3.2/ /opt/spark\n", "```\n", "\n", "3. Install PySpark\n", "\n", "```\n", "pip install pyspark\n", "```\n", "\n", "4. Set Spark environment\n", "\n", "```\n", "vi ~/.bashrc\n", "export SPARK_HOME=/opt/spark/spark-3.2.0-bin-hadoop3.2\n", "export PATH=$SPARK_HOME/bin:$PATH\n", "source ~/.bashrc\n", "```\n", "\n", "\n", "### Spark & Jupyter notebook\n", "\n", "To set up Spark in [Jupyter notebook](https://jupyter.org/), do the following:\n", "\n", "1. add the following lines into ~/.bashrc\n", " - local access\n", "```\n", " export PYSPARK_DRIVER_PYTHON=jupyter\n", " export PYSPARK_DRIVER_PYTHON_OPTS=\"notebook\"\n", "``` \n", " - remote access\n", "```\n", " export PYSPARK_DRIVER_PYTHON=jupyter\n", " export PYSPARK_DRIVER_PYTHON_OPTS=\"notebook --no-browser --port= --ip='*'\"\n", "``` \n", " - Windows subsystem for Linux\n", "```\n", " export PYSPARK_DRIVER_PYTHON=jupyter\n", " export PYSPARK_DRIVER_PYTHON_OPTS=\"notebook --no-browser\"\n", "``` \n", "\n", "2. run from terminal:\n", "```\n", "pyspark\n", "```\n", "\n", "Note that remote access to jupyter notebook requires a tunnel. On Windows machines, you can use [Putty](https://www.putty.org/) to set it up. In Linux environments, the following command can be used:\n", "\n", " ssh -N -L localhost::localhost: \n", "\n", "Finally, you can run the notebook in your browser:\n", "\n", " http://localhost:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import random\n", "import re" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## PySpark Python API \n", "\n", "PySpark can be used from standalone Python scripts by creating a `SparkContext`. You can set configuration properties by passing a `SparkConf` object to `SparkContext`.\n", "\n", "Documentation: [pyspark package](https://spark.apache.org/docs/latest/api/python/pyspark.html)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark import SparkContext, SparkConf" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# cannot run multiple SparkContexts at once (so stop one just in case)\n", "sc = SparkContext.getOrCreate()\n", "sc.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# spark conf\n", "conf = SparkConf()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create a Spark context\n", "sc = SparkContext(conf=conf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallelism demo" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "num_samples = 100000000" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def inside(p):\n", " x, y = random.random(), random.random()\n", " return x*x + y*y < 1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# watch jupyter notebook server output\n", "count = sc.parallelize(range(0, num_samples)).filter(inside).count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pi = 4 * count / num_samples" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(pi)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# stop Spark context\n", "sc.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD - Resilient Distributed Datasets\n", "\n", "resilient:\n", "- (of a person or animal) able to withstand or recover quickly from difficult conditions\n", "- (of a substance or object) able to recoil or spring back into shape after bending, stretching, or being compressed\n", "\n", "Spark is RDD-centric!\n", "- RDDs are immutable\n", "- RDDs are computed lazily\n", "- RDDs can be cached\n", "- RDDs know who their parents are\n", "- RDDs that contain only tuples of two elements are “pair RDDs”" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD Actions\n", "\n", "**RDD** - Resilient Distributed Datasets\n", "\n", "Some useful actions:\n", "- take(n) – return the first n elements in the RDD as an array.\n", "- collect() – return all elements of the RDD as an array. Use with caution.\n", "- count() – return the number of elements in the RDD as an int.\n", "- saveAsTextFile(‘path/to/dir’) – save the RDD to files in a directory. Will create the directory if it doesn’t exist and will fail if it does.\n", "- foreach(func) – execute the function against every element in the RDD, but don’t keep any results." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Demo files\n", "\n", "```\n", "file1.txt:\n", " Apple,Amy\n", " Butter,Bob\n", " Cheese,Chucky\n", " Dinkel,Dieter\n", " Egg,Edward\n", " Oxtail,Oscar\n", " Anchovie,Alex\n", " Avocado,Adam\n", " Apple,Alex\n", " Apple,Adam\n", " Dinkel,Dieter\n", " Doughboy,Pilsbury\n", " McDonald,Ronald\n", "\n", "file2.txt:\n", " Wendy,\n", " Doughboy,Pillsbury\n", " McDonald,Ronald\n", " Cheese,Chucky\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sc = SparkContext(conf=conf)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# input files\n", "file1 = 'file1.txt'\n", "file2 = 'file2.txt'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# load data\n", "data1 = sc.textFile(file1)\n", "data2 = sc.textFile(file2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"file1: %d lines\" % data1.count())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data2.collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"file2: %d lines\" % data2.count())" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "data2.take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: the following produces output on Jupyter notebook server!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# prints each element in the Jupyter notebook output\n", "data1.foreach(print)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# prints each element in the Jupyter notebook output\n", "data2.foreach(print)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD Operations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### map()\n", "Return a new RDD by applying a function to each element of this RDD.\n", "- apply an operation to every element of an RDD\n", "- return a new RDD that contains the results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "data.map(lambda line: line.split(',')).take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### flatMap()\n", "Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.\n", "- apply an operation to the value of every element of an RDD\n", "- return a new RDD that contains the results after dropping the outermost container" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.take(4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.flatMap(lambda line: line.split(',')).take(7)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### mapValues()\n", "Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.\n", "- apply an operation to the value of every element of an RDD\n", "- return a new RDD that contains the results\n", "\n", "Only works with pair RDDs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(','))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.mapValues(lambda name: name.lower()).take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.mapValues(lambda name: name.upper()).take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### flatMapValues()\n", "Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning.\n", "- apply an operation to the value of every element of an RDD\n", "- return a new RDD that contains the results after removing the outermost container\n", "\n", "Only works with pair RDDs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(','))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.flatMapValues(lambda name: name.lower()).take(9)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.flatMapValues(lambda name: name.upper()).take(9)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### filter()\n", "Return a new RDD containing only the elements that satisfy a predicate.\n", "- return a new RDD that contains only the elements that pass a **filter operation**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.filter(lambda line: re.match(r'^[AEIOU]', line)).take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.filter(lambda line: re.match(r'.+[y]$', line)).take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.filter(lambda line: re.search(r'[x]$', line)).take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### groupByKey()\n", "Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.\n", "- apply an operation to the value of every element of an RDD\n", "- return a new RDD that contains the results after removing the outermost container\n", "\n", "Only works with pair RDDs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(','))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.groupByKey().take(1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "for pair in data.groupByKey().take(1):\n", " print(\"%s: %s\" % (pair[0], \",\".join([n for n in pair[1]])))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### reduceByKey()\n", "Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.\n", "- combine elements of an RDD by key and then \n", "- apply a reduce operation to pairs of keys\n", "- until only a single key remains.\n", "- return the result in a new RDD" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(\",\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.reduceByKey(lambda v1, v2: v1 + \":\" + v2).take(6)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### sortBy()\n", "Sorts this RDD by the given keyfunc.\n", "- sort an RDD according to a sorting function\n", "- return the results in a new RDD" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(\",\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.sortBy(lambda pair: pair[1]).take(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.sortBy(lambda pair: pair[1][1]).take(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### sortByKey()\n", "Sorts this RDD, which is assumed to consist of (key, value) pairs.\n", "- sort an RDD according to the natural ordering of the keys\n", "- return the results in a new RDD" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(\",\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data.sortByKey().take(6)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### subtract()\n", "Return each value in self that is not contained in other.\n", "- return a new RDD that contains all the elements from the original RDD \n", "- that do not appear in a target RDD" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1 = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data2 = sc.textFile(file2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data2.collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data2.count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.subtract(data2).collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.subtract(data2).count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### join()\n", "Return an RDD containing all pairs of elements with matching keys in self and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.\n", "- return a new RDD that contains all the elements from the original RDD\n", "- joined (inner join) with elements from the target RDD" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1 = sc.textFile(file1).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data2 = sc.textFile(file2).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data2.collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data2.count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.join(data2).collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.join(data2).count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.fullOuterJoin(data2).take(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data1.fullOuterJoin(data2).collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# stop Spark context\n", "sc.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## MapReduce demo\n", "\n", "We will now count the occurences of each word. The typical \"Hello, world!\" app for Spark applications is known as word count. The map/reduce model is particularly well suited to applications like counting words in a document." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create a Spark context\n", "sc = SparkContext(conf=conf)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# read the target file into an RDD\n", "lines = sc.textFile(file1)\n", "lines.take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `flatMap()` operation first converts each line into an array of words, and then makes\n", "each of the words an element in the new RDD." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# split the lines into individual words\n", "words = lines.flatMap(lambda l: re.split(r'[^\\w]+', l))\n", "words.take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `map()` operation replaces each word with a tuple of that word and the number 1. The\n", "pairs RDD is a pair RDD where the word is the key, and all of the values are the number 1." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# replace each word with a tuple of that word and the number 1\n", "pairs = words.map(lambda w: (w, 1))\n", "pairs.take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `reduceByKey()` operation keeps adding elements' values together until there are no\n", "more to add for each key (word)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# group the elements of the RDD by key (word) and add up their values\n", "counts = pairs.reduceByKey(lambda n1, n2: n1 + n2)\n", "counts.take(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# sort the elements by values in descending order\n", "counts.sortBy(lambda pair: pair[1], ascending=False).take(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Simplify chained transformations\n", "\n", "It is good to know that the code above can also be written in the following way:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sorted_counts = (lines.flatMap(lambda l: re.split(r'[^\\w]+', l)) # words\n", " .map(lambda w: (w, 1)) # pairs\n", " .reduceByKey(lambda n1, n2: n1 + n2) # counts\n", " .sortBy(lambda pair: pair[1], ascending=False)) # sorted counts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sorted_counts.take(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# stop Spark context\n", "sc.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 2 }