Thema dieser Beitragsreihe sind Big Data Anwendungen mit Apache Spark und dessen Ökosystem – den Berkeley Data Analytics Stack (BDAS). Angefangen mit theoretischen Grundlagen zu den Themen Big Data, Stream-Processing (Verarbeitung von flüchtigen Datenströmen), Cloud Computing, Graph-Datenbanken und natürlich Machine Learning bis hin zu praktischen Anwendungsbeispielen zum Nachbauen, sowie Tipps und Tricks, wird diese Reihe ständig erweitert.
Daten gelten heute als einer der Rohstoffe der Zukunft. Ständig werden extreme Datenmengen von den unterschiedlichsten Quellen generiert. Sei es durch Social-Media, durch Suchmaschinen, durch Bewegungs- oder andere Sensordaten, durch Logdateien, etc. Die Menge der generierten Daten steigt von Jahr zu Jahr an – von ca 130 Exabyte im Jahre 2005 bis zu 8591 Exabyte für das Jahr 2015 und für 2020 werden Prognosen zufolge sogar über 40000 Exabyte Daten erzeugt. Diese Daten haben wenig miteinander gemeinsam. Es handelt sich um strukturierte oder unstrukturierte Datensätze, um persistente oder um flüchtige Daten aus einem Strom. Für all diese Daten sollen aber vergleichbare Werkzeuge zur Analyse und Verarbeitung verfügbar sein. Ein Ansatz für dieses Problem liefert Apache Spark mit dem dazugehörigen Berkeley Data Analytics Stack. Dieser beinhaltet Werkzeuge für Analyse massiver Datenmengen, die Verarbeitung von flüchtigen Streamingdaten (Spark Streaming), Batchverarbeitung großer Datenpakete (Spark Core), Graphenanwendungen (GraphX), SQL-Abfragen (Spark SQL) und verschiedenen Aufgaben aus dem Bereich des Machine Learning (MLLib).
In diesem ersten Teil der Beitragsreihe geht es darum, den Begriff “Big Data” zu definieren und die geschichtliche Entwicklung über Hadoop bis zu Apache Spark zu skizzieren. Des weiteren soll ein kurzer Überblick über die Bestandteile des BDAS rund um Spark gegeben werden.
Was ist überhaupt Big Data?
“Big Data is notable not because of its size, but because of its relationality to other data. Due to efforts to mine and aggregate data, Big Data is fundamentally networked. Its value comes from the patterns that can be derived by making connections between pieces of data, about an individual, about individuals in relation to others, about groups of people, or simply about the structure of information itself.” – Danah Boyd, Kate Crawford: Six Provocations for Big Data
Big Data ist insbesondere in den letzten Jahren immer stärker in den verschiedensten Zusammenhängen in den allgemeinen Sprachgebrauch vorgedrungen und ist hier einem ständigen Bedeutungswandel ausgesetzt. Besonders in letzter Zeit wird dieses Thema auch verstärkt kontrovers diskutiert.
Der Begriff Big Data wurde vermutlich zum ersten Mal Ende des 20. Jahrhunderts von John R. Marshey, damals Chefwissenschaftler bei Silicon Graphics, im Rahmen einer Usenix-Konferenz öffentlich erwähnt. Mittlerweile ziert dieser Begriff gefühlt jedes zweite Cover von IT- Zeitschriften mit Business-Fokus und auch Manager und Sales-Professionals werten Ihre Produktpräsentationen gerne mit diesem Buzzword auf. Aber dieser Begriff ist nicht nur positiv assoziiert. Besonders seit Bekanntwerden der Tätigkeiten des Amerikanischen Auslandsgeheimdienstes weckt die Vorstellung des Datensammelns in großen Dimensionen auch Misstrauen.
Wie lässt sich der Begriff Big Data abgrenzen? Es existiert keine abschließend eindeutige Definition, jedoch gibt es einige Attribute, die sich in einem Großteil der Fachliteratur etabliert haben.
“Big data is data that exceeds the processing capacity of conventional database systems. The data is too big, moves too fast, or doesn’t fit the structures of your database architectures.” – Edd Dumbhill: What is big data?
Neben der reinen Menge spielt also offensichtlich auch die mangelnde oder fehlende Strukturierung und unter Umständen die Flüchtigkeit der Daten eine nicht unerhebliche Rolle. Dies können beispielsweise Daten aus Social-Media-Quellen sein, die aus allen möglichen verschiedenen Einzeldaten bestehen, Daten von Sensoren, die permanent überwacht werden müssen, oder Datenströme (Video, Audio, Bilder, Text), die nach einheitlichen Kriterien gefiltert werden sollen, um hier nur einige Beispiele zu nennen. Auch die temporäre Komponente ist ein Attribut von Big Data und auch hier ist wieder das Beispiel der Datenströme heranzuziehen.
Bei der Definition von Big Data werden auch immer wieder die „Four V’s“ angeführt. Dies sind Volume, also die Datenmenge, Variety, die Datenvielfalt, Velocity, die Geschwindigkeit der Auswertung, sowie Veracity, die häufig stark schwankende Datenqualität.

Die sinnvolle Analyse dieser Daten kann Unternehmen oder anderen Organisationen wichtige Informationen beispielsweise über Marktentwicklungen, bestimmte Kundenbedürfnisse, Epedemie-Ausbreitungen sowie unzählige weitere wichtige Sachverhalte liefern. Diese Analyse, inklusive der dazu verwendeten Werkzeuge, wird zusammenfassend als Big Data Analytics bezeichnet.
Welche Technologien werden für Big Data Analytics genutzt?
Zahlreiche Hersteller herkömmlicher relationaler Datenbanksysteme versuchen derzeit, ihre bestehenden Lösungen mit dem Label Big Data zu versehen und diese so weiterhin in diesen sich verändernden Marktsegmenten zu positionieren. Wenn Big Data jedoch jenseits der Datengröße definiert wird und auch unstrukturierte und temporäre Daten-Stacks oder –ströme zu verarbeiten oder zu analysieren sind, stoßen klassische RDBMS sehr schnell an ihre Grenzen. Doch auch hinsichtlich der Skalierbarkeit sind relationale Datenbanken meist nur unzureichend flexibel.
Für die Anforderungen an dedizierte Aufgaben im Bereich Big-Data-Analytics sind seit einigen Jahren diverse Frameworks auf dem Markt, die für die drei oben genannten Aspekte wesentlich besser geeignet sind, als RDBMS. Der Ansatz ist hier primär, die Verarbeitung zu dezentralisieren, also auf unabhängige Knoten in einem Rechner-Cluster zu verteilen und nur Referenzen auf die Clusterknoten zentral zu verwalten.
Es existieren mittlerweile Lösungen am Markt, die speziell für derartige Aufgaben entwickelt wurden. Hier wären unter anderem Hadoop, Spark, HPCC, GPMR, Mincmeat, Sphere, Bashreduce und R3 zu nennen. Bis auf HPCC setzen alle eben genannten Implementierungen generell oder in Teilen auf das Programmierparadigma MapReduce.
Der unangefochtene De-facto-Standard in diesen Bereichen ist bereits seit einiger Zeit das Open-Source-Framework Apache Hadoop. Auf Hadoop basierend existieren etliche Derivate. Unter anderem sind hier Cloudera, Amazon Elastic MapReduce, Apache BigTop, Datameer, Apache Mahout, MapR und IBM PureData System zu nennen.
Was ist Hadoop?
Anfang des 21. Jahrhunderts wurde das Bedürfnis nach Möglichkeiten, sehr große Datenmengen effizient verarbeiten zu können, stetig größer. Nicht zuletzt durch die zu dieser Zeit exponentiell steigende Menge von Inhalten im World Wide Web und deren Indexierung durch Suchmaschinen wie Google. Davon motiviert wurde 2002 das Projekt Nutch mit dem Ziel gestartet, ein geeignetes Such- und Crawlersystem frei verfügbar zu machen. Die ersten Versuche skalierten sehr schlecht, bis Google 2003 die Funktionsweise ihres verteilten Dateisystem GFS (Google File System) veröffentlichte. Somit konnten die sehr großen Dateien, die durch die Indexierung entstanden, effizient auf verschiedene Knoten verteilt gespeichert und die Verwaltung dieser Knoten und Dateien aus dem eigentlichen Indexierungs- und Suchprozess ausgelagert werden.
Im Jahre 2004 publizierte Google den MapReduce-Algorithmus, der unter anderem die Indexierungs- und Analysefunktionen parallelisieren, delegieren und sinnvoll bündeln kann. In Nutch wurden daraufhin sämtliche wichtigen Algorithmen auf MapReduce umgestellt, nachdem zuvor auch GFS unter dem Namen NDFS (Nutch Distributed File System) integriert wurde. Die möglichen Anwendungsgebiete von Nutch waren damit auch weit über das reine Suchen und Indexieren von Webseiten hinaus gewachsen. 2006 wurde aus Nutch ein Unterprojekt mit dem Namen Hadoop ausgegliedert, das im Jahre 2008 zum Apache Top-Level-Project ernannt wurde. Zu dieser Zeit nutzten bereits Firmen wie Yahoo!, Facebook oder die New York Times Hadoop. Ein exemplarischer Anwendungsfall bei der NY Times war, mit Hilfe der Hadoop-basierten EC2-Cloud von Amazon ca. vier Terabyte gescannter Archivdateien in PDF-Dateien umzuwandeln und dies in weniger als 24 Stunden verteilt auf 100 Knoten. Auch beim Sortieren von sehr großen Datenmengen stellten Hadoop-basierte Systeme nach und nach sämtliche Rekorde ein.
Hadoop und Hadoop-basierte System gelten mittlerweile als Industriestandard für Big-Data- Analytics-Anwendungen. Jedoch ist Hadoop nicht für alle Anwendungsgebiete gleichermaßen geeignet. Aufgrund der Charakterisierung der Paradigmen für Big Data Analytics im Paper „Frontiers in Massive Data Analysis“ der National Academic Press, lassen sich die Einsatzgebiete und Schwächen für Hadoop ermitteln:
- Mit Hadoop lassen sich einfachere statistische Aufgabenstellungen sehr gut umsetzen. Dazu gehören Mittelwert, Median, Varianz und allgemein abzählende sowie ordnende Statistikaufgaben. Dies sind in der Regel Anwendungen mit einer Laufzeitkomplexität von O(n) für n Betrachtungswerte. Sie sind meist auch sehr gut parallelisierbar und eignen sich somit bestens für eine Verarbeitung mit Hadoop.
- Bei Hadoop werden sämtliche Datenstrukturen und Zwischenergebnisse von Berechnungen und Transformationen im Dateisystem persistiert. Für linear-algebraische Berechnungen aus dem Bereich Machine-Learning (lineare Regression, Eigenwertproblem, Hauptkomponentenanalyse), generalisierte n-Körper-Probleme (mit einer Komplexität von O(n^2) oder O(n^3)), Probleme aus der Graphentheorie, Optimierungsprobleme (Verlust-, Kosten- oder Energiefunktionen, sowie Integrations- und Ausrichtungsfunktionen) ist Hadoop deshalb nur in jeweils einfacher Problemausprägung einsetzbar. Auch für Interaktive Abfragen und für Streaming-Anwendungen ist Hadoop nur bedingt geeignet, da dieses Framework ursprünglich für die Batch-Verarbeitung entwickelt wurde.
Aus diesem Grund wurde am AMPLab der University of California in Berkeley nach Alternativen geforscht, die besonders auch für iterative oder interaktive Verarbeitung geeignet ist. Das Ergebnis nennt sich Spark, mittlerweile Apache Top-Level-Projekt und dazu geeignet, die Nachfolge von Hadoop als DAS Big-Data-Analytics-Framework anzutreten.
Was sind Apache Spark und der Berkeley Data Analytics Stack?
Im Folgenden werden die Schichten des BDAS vorgestellt.
Apache Mesos:
Bei Apache Mesos handelt es sich um ein Cluster-Management-Framework für Anwendungen, die in verteilten Serverpools laufen sollen. Bestandteil von Mesos ist wiederum Apache ZooKeeper, das für Konfigurationsinformationen, Naming-Services und die Synchronisation von verteilten Anwendungen zuständig ist.
Mesos wird im BDAS eingesetzt, um die Prozesse von Hadoop/Spark effizient auf die einzelnen Knoten im Cluster zu verteilen. Besonders das Ressourcen-Management und –Monitoring innerhalb des Clusters ist ein wichtiger Faktor, um Jobs performant auf verteilten Systemen ausführen zu können. Auch das Fehlerhandling für Knoten, Prozesse und im Netzwerk wird im Berkeley-Stack von Mesos übernommen.
Ein besonderer Vorteil von Mesos gegenüber Yarn oder anderen Alternativen, wie dem Cloudera Cluster Manager oder Ambari von Hortonworks ist die Möglichkeit, verschiedene Frameworks gleichzeitig und isoliert in einem Cluster betreiben zu können. So kann beispielsweise Hadoop mit Spark in einer gemeinsamen Infrastruktur koexistieren.
Hadoop Distributed File System (HDFS) und Tachyon:
Das Hadoop Distributed File System basiert ideologisch auf dem GoogleFileSystem (GFS) und hat zum Zweck, zuverlässig und fehlertolerant sehr große Dateien über verschiedene Maschinen hinweg in verteilten Umgebungen zu speichern. In entsprechenden Veröffentlichungen von Hortonworks (http://hortonworks.com/hadoop/hdfs/) wird von Produktivsystemen berichtet, die bis zu 200 PetaByte an Datenvolumen in einem Cluster von 4500 Servern basierend auf HDFS verwalten.
HDFS wurde speziell für den Einsatz mit MapReduce entwickelt, ist also auf geringe Datenbewegungen ausgelegt, da MR die Berechnungsprozesse jeweils zu den physischen Datensätzen selbst bringt und nicht, wie herkömmlich, die Daten zu den Prozessen geliefert werden müssen. So wird massiv Netzwerkverkehr innerhalb des Clusters eingespart und letztlich werden nur Prozesse und Prozessergebnisse verschickt.
Im BDAS lässt sich wahlweise direkt das HDFS bzw. dessen Alternativen ansprechen oder eine Zwischenschicht nutzen, die insbesondere auf das In-Memory-Model von Spark zugeschnitten ist. Dies ist innerhalb des BDAS das verteilte Dateisystem Tachyon. Hier werden die zu verarbeitenden oder zu analysierenden Datensätze direkt in den Hauptspeicher des jeweiligen Knoten in Form eines Cache gehalten. Somit werden Lade- und Speicheroperationen auf Massenspeicher minimiert und eine massiv höhere Ausführungsgeschwindigkeit erreicht.
Apache Spark:
Spark ist das Herzstück des BDAS. Bei Spark handelt es sich um ein open-source Data-Analytics-Framework, das, wie Hadoop, speziell für die Bedürfnisse im Rechner-Cluster konzipiert ist.
Im Gegensatz zu Hadoop bietet Spark jedoch Funktionen für In-Memory-Cluster-Berechnungen und ist nicht zwingend an MapReduce gebunden. Besonders interaktive Analyse oder Verarbeitung der Daten, Abfragen über verteilte Dateien und iterative Lernalgorithmen erfahren so laut Aussagen des AMPLab eine bis zu hundertfache Ausführungsgeschwindigkeit gegenüber Hadoop. Auch die anfangs angesprochenen Schwächen von Hadoop bei Berechnungen von komplexen linear-algebraischen Problemen, generalisierten n-Körper-Problemen, diversen Optimierungsproblemen und anderen Aufgaben, treten bei Spark auf Grund der offenen Architektur und der Zerlegung von Datensätzen in die sogenannten Resilient Distributed Datasets (RDD) nicht mehr auf.
Spark wurde komplett in Scala entwickelt und bietet APIs für Scala, Java (inklusive Lambda-Expressions ab Java 8) und Python. Im Labor existieren bereits Spark-Installationen mit bis zu 2000 Knoten, in Produktivsystemen sind bisher Systeme mit bis zu 1000 Knoten im Einsatz. Durch die Möglichkeit, die Datensätze im Speicher für interaktive Analyseaufgaben zu cachen und iterativ abzufragen, ist eine direkte Kommandozeileninteraktion über das integrierte Scala REPL (alternativ auch in Python) möglich.
Spark Streaming:
Hierbei handelt es sich um eine Erweiterung, um die integrierte API von Spark für Anwendungen auf Datenströmen nutzen zu können. Das Programmiermodell unterscheidet nicht zwischen Batch- und Streaming-Anwendungen. So lassen sich beispielswei- se Datenströme zur Laufzeit mit Archivdaten vergleichen und direkt Ad-hoc-Abfragen auf die Ströme formulieren. Im Fehlerfall ermöglicht Streaming zahlreiche Wiederherstellungsoptionen, sowohl von verlorenen Datenströmen, als auch von Einstellungen. Ein Anwendungsbeispiel ist die Echtzeitanalyse von Twitter-Meldungen.
GraphX:
GraphX ist eine Erweiterung für Spark, die verteilte, flexible Graphen-Anwendungen in einem Spark-Cluster ermöglicht. Besonders in den Disziplinen „Machine Learning“ und „Data Mining“ ist die Anwendung komplexer Graphen unerlässlich. Graph-Datenbanken kommen immer dann zum Einsatz, wenn stark vernetzte Informationen und ihre Beziehungen zueinander interessant sind. Hier werden die Entitäten als Knoten behandelt, die Beziehungsart definiert die Kanten. Die Kanten können auch gewichtet sein. Ein konkretes Beispiel sind die Mitglieder eines sozialen Netzwerks mit ihrem jeweiligen Beziehungsgeflecht. Je nach Kontaktintensität können diese Beziehungen auch priorisiert werden, was dem Kantengewicht entspricht.
GraphX nutzt hier die Vorteile der darunterliegenden Spark-Infrastruktur, in dem durch eine tabellarische Anordnung der Datenstrukturen eine massive Parallelisierung möglich ist und auch der Verarbeitung in RDDs voll unterstützt wird. So sind auch interaktive Konsolen-Operationen auf den Graphen jederzeit über die REPL möglich.
MLbase/MLlib:
MLlib ist eine verteilte Machine-Learning-Bibliothek die für die Spark-Laufzeitumgebung entwickelt wurde und die bekannten Algorithmen für Probleme wie Klassifikation, Regression, Clustering und kollaboratives Filtern enthält. Sämtliche Algorithmen nutzen die Stärken von Spark wie das In-Memory-Computing und die verteilten RDDs voll aus.
Bei MLI handelt es sich um eine API, die es ermöglicht, selbst ML-Features zu entwickeln und in erster Linie für komplexere Problemstellungen geeignet ist. Mit MLI lassen sich die Funktionen direkt gegen Spark entwickeln, gegebenenfalls unter Zuhilfenahme der Bibliotheken der MLlib.
Der ML-Optimizer soll ML-Probleme für Endnutzer vereinfachen, in dem Modellauswahlen automatisiert werden. Hierzu werden Features aus der MLlib und der MLI extrahiert und zu Hilfe genommen.
In der Tabelle unten ist eine Übersicht aller bisher in MLlib enthaltenen Algorithmen zu finden.
Durch den modularen Aufbau des BDAS lassen sich die Algorithmen der Machine-Learning-Library auch direkt von der Konsole REPL aus nutzen, sowie von allen anderen Bestandteilen der Applikationsschicht des BDAS. So lassen sich mit Spark SQL Abfragen auf Ergebnisse von MLlib-Berechnungen formulieren oder Ergebnisse von Abfragen als Input für weitere ML- Berechnungen nutzen. MLlib, bzw. MLbase lässt sich auch in Verbindung mit GraphX auf Graphen-Datenstrukturen anwenden oder mittels Spark Streaming auf flüchtige gestreamte Datenquellen.
Spark SQL:
Im Ökosystem von Hadoop ist Hive eine SQL-Query-Engine, die sich großer Beliebtheit in der Community erfreut und trotz inzwischen zahlreicher Konkurrenzimplementierungen immer noch als Quasi-Standard für diesen Anwendungsbereich gilt.
Spark SQL ist eine Portierung dieser Engine für Spark, um alle Vorteile der BDAS-Architektur nutzen zu können und ist kompatibel mit sämtlichen Hive-Daten, -Metastores und –Queries. Im Gegensatz zu Hive, das aus Datensätzen zur Laufzeit Java-Objekte generiert, nutzt Spark SQL eine zeilenorientierte Speicherung mittels Arrays primitiver Datentypen und ist somit selbst in einer Hadoop-Infrastruktur im Mittel bis zu fünfmal schneller als Hive.
Eine Besonderheit von Spark SQL ist neben seinem SQL-Interface die Möglichkeit, auch Machine-Learning-Funktionen als Abfragen formulieren zu können.
Für die Anwendung von Spark SQL hat sich die Architektur von Spark mit seinen RDDs als sehr vorteilhaft erwiesen, da Abfragen auf fehlerhaften RDDs nach dem Neuaufbau des entsprechenden Datasets direkt erneut ausgeführt werden können.
Ein weiterer Unterschied zu Hive ist die sogenannte Partial-DAG-Execution (PDE). Dies bedeutet, dass logische Abfragepläne in Spark SQL aufgrund gesammelter Statistiken zur Laufzeit flexibel erstellt werden im Gegensatz zu Hive oder herkömmlichen relationalen Datenbanksystemen, wo bereits zur Kompilierungszeit starre physische Abfragepläne generiert werden. Besonders die Machine-Learning- und Failover-Funktionen wären mit einer Planerstellung zu Kompilierzeit nicht umsetzbar.
Dies war eine kurze Einführung in die Themengebiete Big Data Analytics und Apache Spark. Im nächsten Teil zeigen wir, wie man lokal eine Spark-Infrastruktur aufsetzt und erste Demos erstellt.
Leave a Comment