DBII-11-freigeschaltet PDF
Document Details
Uploaded by FlatterForgetMeNot
FH Aachen University of Applied Sciences
Tags
Summary
These lecture notes provide an introduction to the use of Spark for distributed data processing..
Full Transcript
Cassandra zusammen mit Hive/Pig gibt es quasi nur noch kommerziell Die DataStax-Enterprise Edition bindet Cassandra auch an Hive an Hierdurch kann dann z.B. die Auswertung mittels DW-Werkzeugen geschehen Wir haben aber auch gesehen, dass MapReduce- Lösungen rückläu...
Cassandra zusammen mit Hive/Pig gibt es quasi nur noch kommerziell Die DataStax-Enterprise Edition bindet Cassandra auch an Hive an Hierdurch kann dann z.B. die Auswertung mittels DW-Werkzeugen geschehen Wir haben aber auch gesehen, dass MapReduce- Lösungen rückläufig sind Tatsächlich gibt es mittels Apache SPARK eine neue Möglichkeit, die auch über einen Opensource Connector für Cassandra verfügt https://github.com/datastax/spark-cassandra- connector © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 590 Wir erinnern uns an Big Data und die 3Vs..Hadoop fokussiert auf große Datensätze mittels Batch-Verarbeitung Application Data Processing Storage Infrastructure © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 591 Das bisherige Modell Wir transformieren Daten von dauerhaftem Speicher zu dauerhaftem Speicher Map Reduce Input Map Output Reduce Map © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 592 Wo liegt das Problem? Wir können zwar mit MapReduce-Workflows formulieren (azyklisches Abarbeitungsfolge von Daten), das permanente Zwischenspeichern stört aber – Iterative Algorithmen, wie diese oftmals beim maschinellen Lernen eingesetzt werden werden nicht optimal unterstützt – Interaktive Data Mining tools (R, Python) Apache Spark adressiert genau diesen Bedarf © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 593 Die Evolution © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 594 Apache SPARK © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 595 Die Ideen von Spark Verarbeitung möglichst im Speicher liegender Daten – Vermeide wenn möglich das Zwischenspeichern auf Platte – Cache die Daten um so immer wieder auf diese zugreifen zu können (Machine Learning) Kompatibilität mit Hadoop Map-Reduce (YARN) und die mögliche Anbindung an Cassandra Verallgemeinere Map&Reduce-Konzepte Ermögliche Echtzeit-nahe Anwendungen durch Micro-Batches (Streaming) Spark wurde in der Programmiersprache Scala geschrieben, eine Sprache die Java-Klassen sehr einfach integriert (tatsächlich sind Scala Klassen JVM Klassen) Tolle Integration in Python durch PySpark © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 596 Die Datenabstraktion: Resilient Distributed Datasets Operationen finden auf einer unveränderlichen, parallelen Datenstruktur statt Diese wird im Arbeitsspeicher gehalten Parallelität durch Partitionieren der Daten – Partitionierung auf der Basis eines Keys im Datensatz (Hash oder range partitioning) – key.hashCode() % numPartitions – rdd.partitions.size Ausfallsicherheit wird nicht durch physische Replikationen, sondern durch das mögliche Wiederholen der verwendeten Operationen realisiert – Lazy: RDDs werden nicht sofort erzeugt, sondern erst, wenn man damit etwas macht (eine Aktion darauf ausführt) – Faktisch ergibt sich ein Workflow von Transformationen der durch eine Aktion abgeschlossen wird © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 597 © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 598 SparkContext Einstieg in die Spark-Funktionalität In der Spark-Shell gibt es die Variable sc Auch in den Programmiersprachen Scala, Python, Java,... wird mit dem Context gearbeitet Mit dem Context wird die Ausführungsumgebung und deren Parameter spezifiziert from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark import SparkContext spark_master_ip = "spark://149.201.206.132:7077" app_name = “Wordcount" Spark SparkSession.builder.master("spark://149.201.206.132:7077").appName(app_name). config("spark.executor.memory", "4g").config("spark.executor.cores", "4"). config("spark.driver.memory", "16g"). config("spark.cores.max","16").getOrCreate() © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 599 SparkContext © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 600 Ein Beispiel-Job Optimierung Driver log = sc.textFile(“hdfs://...”) errors = log.filter(lambda line: “ERROR” in line) errors.cache() count errors.filter(lambda line:“I/O” in line).reduce(lambda x, y: x + y) Transformation Action! Transformationen auf einem cache-RDD werden einmal berechnet und die Ergebnisse werden im Worker Worker Worker Arbeitsspeicher zwischengespeichert. Nachfolgenden Aktionen, die auf das RDD Cache1 Cache2 Cache2 angewendet werden, verwenden die bereits zwischengespeicherten Daten Block1 Block2 Block3 Actions werden immer direkt ausgeführt und bedingen, dass der RDD auch wirklich erzeugt wird. Mittels Actions geben wir Resultate aus. Ohne Actions machen alle Transformationen keinen Sinn © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 601 Arbeiten mit Key-Value Paaren Spark-Transformationen arbeiten auf den verteilten RDDs mit key-value Paaren Python: pair = (a, b) pair # => a pair # => b Scala: val pair = (a, b) pair._1 // => a pair._2 // => b Java: Tuple2 pair = new Tuple2(a, b); pair._1 // => a pair._2 // => b © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 602 Transformationen Von RDDs Einfache Art einen parallelenRDD zu erzeugen > pets = sc.parallelize( [(“cat”, 1), (“dog”, 1), (“cat”, 2)]) > pets.reduceByKey(lambda x, y: x + y) # => {(cat, 3), (dog, 1)} > pets.groupByKey() # => {(cat, [1, 2]), (dog, )} > pets.sortByKey() # => {(cat, 1), (cat, 2), (dog, 1)} © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 603 RDD partition-level view Dataset-level view: Partition-level view: log: HadoopRDD path = hdfs://... FilteredRDD func = _.contains(…) shouldCache = true Task 1Task 2... source: https://cwiki.apache.org/confluence/display/SPARK/Spark+Internals © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 604 RDD-Ketten Jedes RDD-Objekt verwaltet auch Hinweise auf seine Historie, so dass auch verloren gegangene Partitionen durch Neuausführung rekonstruiert werden können cachedMsgs = textFile(...).filter(lambda line: “error” in line). map(lambda line: line.split('\t')).cache() HdfsRDD FilteredRDD MappedRDD func: CachedRDD path: hdfs://… func: split(…) contains(...) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 605 Beispiel: Logistische Regression Ziel: Finde die am besten geeigenet Trennlinie zwischen zwei Punktemengen Zufälllig gewählte Startlinie + + + + + – – + + + – – – + + – –– – – Ziel © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 606 Beispiel: Logistische Regression GIF: University of Toronto © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 607 Beispiel: Logistische Regression Ähnlich zu der linearen Regressionsanalyse wird versucht, eine Funktion zu finden, die möglichst gut zu den Daten passt. Diese Funktion ist jedoch im Gegensatz zur linearen Regressionsanalyse keine Gerade, sondern eine Sigmoid (logistische) Funktion, die Werte zwischen 0 und 1 annimmt und die Wahrscheinliche ausdrückt, dass die entsprechende Funktion den Wert 1 annimmt (Zuordnung zu der Menge) Linear Regression VS Logistic Regression Graph| Image: Data Camp © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 608 Beispiel: Logistische Regression Tatsächlich arbeitet man mit dem Logit, also dem Logarithmus: © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 609 Beispiel: Logistische Regression Odds: Verhältnis der Wahrscheinlichkeit, dass ein Ereignis eintritt zur Wahr- scheinlichkeit, dass es nicht eintritt (variiert zw. 0 und +∞ - ) Odds-Ratio: Verhältnis zweier Odds zueinander. OddsRatio >1: Odds in erster Gruppe (Zähler) sind größer. Beispiel: OddsRatio>1 bei Erkrankungen: Die Wahrscheinlichkeit zu erkranken ist in erster Gruppe größer als in zweiter Gruppe. In der logistischen Funktion wird nicht die Wahrscheinlichkeit des Zutreffens einer Beobachtung direkt berechnet/geschätzt, sondern die logit davon, da wir dann eine Abbildung von -∞ bis +∞ haben, was zu einer Regressionsgraden passt Sigmoid-Funktion als Verbindungsfunktion liefert 0-1 Bereich zur Klassifikation LogIt liefert lineares von © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 610 Beispiel: Logistische Regression nach dem Stochastic Gradient Descent (SGD) Verfahren val data = spark.textFile(...).map(readPoint).cache() var w = Vector.random(D) // Gewichtsvektoren initialisieren for (i Für jeden Datenpunkt p (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x).reduce(_ + _) p.y ist die abhängige Variable und p.x der Vektor der unabhängigen Variablen ist Unterschied zwischen der Vorhersage (Logistischen Funktion) und dem tatsächlichen Wert Das Produkt mit p.y und p.x skaliert diesen Unterschied entsprechend. Die berechneten Gradienten werden über alle Datenpunkte reduziert, indem sie summiert werden w -= gradient Der resultierende Wert wird dann von w subtrahiert. Dies ist der Schritt des Gradientenabstiegs, bei } dem die Gewichtsaktualisierung durchgeführt wird, um die Verlustfunktion zu minimieren println("Final w: " + w) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 611 Beispiel: Logistische Regression 4500 4000 127 s / iteration Running Time (s) 3500 3000 2500 Hadoop 2000 Spark 1500 1000 500 first iteration 174 s 0 further iterations 6 s 1 5 10 20 30 Number of Iterations https://www.usenix.org/legacy/event/hotcloud10/tech/full_papers/Zaharia.pdf © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 612 Hinweis: Es geht auch viel einfacher mit Spark PySpark sind DataFrames eine höhere Abstraktionsebene im Vergleich zu RDDs und bieten eine benutzerfreundliche Schnittstelle für die Verarbeitung von verteilten Daten in Spark. Ein PySpark DataFrame ist im Wesentlichen eine Tabelle mit benannten Spalten. Im Gegensatz zu RDDs enthält ein DataFrame somit Schemainformationen über die Daten from pyspark.ml.classification import LogisticRegression log_reg=LogisticRegression(labelCol='Status').fit(training_df) train_results=log_reg.evaluate(training_df).predictions train_results.filter(train_results['Status']==1). filter(train_results['prediction']==1).select(['Status‘, 'prediction','probability']).show(10,False) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 613 RDD Operationen Transformations Parallel operations (Definieren ein neues RDD) /Actions map (Starten eine Auswertung) filter reduce sample collect union count groupByKey save reduceByKey lookupKey join … cache … © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 614 RDD Transformationen map() transformiert jede Zeile des RDDs und liefert die transformierte Zeile. Dabei ist anschaulich das RDD eine Liste (iterable) und jede Zeile ist wieder ein iterable rdd.map(lambda line: line.split()) flatMap() arbeitet analog zu map, allerdings werden hier die Daten nicht eins zu eins returniert, vielmehr wird aus einer Zeile keine, eine oder eine Liste von Zeilen rdd.flatMap(lambda line: line.split()) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 615 RDD Transformationen in Java © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 616 RDD Transformations https://data-flair.training/blogs/apache-spark-map-vs-flatmap/ © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 617 RDD Transformations https://data-flair.training/blogs/apache-spark-map-vs-flatmap/ © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 618 RDD Transformations https://www.javatpoint.com/apache-spark-rdd-operations © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 619 RDD Actions https://www.javatpoint.com/apache-spark-rdd-operations © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 620 Beispiel: Reduce-Action # Summiere und Multipliziere die Zahlen von 1 bis 10 by adding spark = SparkSession.builder.appName("example").getOrCreate() # RDD erstellen: Zahlen von 1 bis 10 auf zwei Partitionen verteilen x = spark.sparkContext.parallelize(range(1, 11), 2) # Addition der Zahlen von 1 bis 10 mit reduce y_addition = x.reduce(lambda accum, n: accum + n) print("Addition result:", y_addition) # Kurze Syntax für die Addition y_addition_short = x.reduce(lambda accum, n: accum + n) print("Short addition result:", y_addition_short) # Multiplikation der Zahlen von 1 bis 10 mit reduce y_multiplication = x.reduce(lambda accum, n: accum * n) print("Multiplication result:", y_multiplication) # y: Int = 3628800 © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 621 Beispiel: Reduce-Action http://images.backtobazics.com/spark/apache-spark-reduce-example.gif 1 n © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 622 Beispiel: Reduce-Action http://images.backtobazics.com/spark/apache-spark-reduce-example.gif © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 623 Beispiel: ReduceByKey-Transformation https://stackoverflow.com/questions/30145329/reducebykey-how-does-it-work-internally reduceByKey nutzt bei Transformation eine assoziative und kommutative Funktion. Dadurch kann man parallel arbeiten © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 624 Erzeugen eines RDDs > visits = sc.parallelize([ (“index.html”, “1.2.3.4”), (“about.html”, “3.4.5.6”), (“index.html”, “1.3.3.1”) ]) > pageNames = sc.parallelize([ (“index.html”, “Home”), (“about.html”, “About”) ]) > visits.join(pageNames) # (“index.html”, (“1.2.3.4”, “Home”)) # (“index.html”, (“1.3.3.1”, “Home”)) # (“about.html”, (“3.4.5.6”, “About”)) > visits.cogroup(pageNames) # (“index.html”, ([“1.2.3.4”, “1.3.3.1”], [“Home”])) # (“about.html”, ([“3.4.5.6”], [“About”])) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 625 Einstellen der Parallelität Alle Key-Valye-RDD Operationen besitzen einen optionalen Parameter mit dem die Anzahl der Tasks angegeben warden kann > words.reduceByKey(lambda x, y: x + y, 5) > words.groupByKey(5) > visits.join(pageViews, 5) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 626 Verwendung lokaler Variablen Wann immer eine externe Variable in einer Closure/Lambda verwendet wird, so wird diese automatisch auf das Cluster verteilt : > query = sys.stdin.readline() > pages.filter(lambda x: query in x).count() Wissenswertes: Tasks erhalten eine Kopie, Änderungen gehen damit verloren Variablen müssen serialisierbar sein Nicht zu viel verschicken (komplexe Objekte) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 627 Beispiel: Log Mining (hier in Scala) Lade die Fehler-Logs in den Speicher und analysiere sie interaktiv nach Mustern Base Transformed RDD RDD Cache 1 lines = spark.sparkContext.textFile("hdfs://...") Worker errors = lines.filter(lambda line: line.startswith("ERROR")) results messages = errors.map(lambda line: line.split('\t')) tasks Block 1 cached_msgs = messages.cache() Driver Cached RDD Parallel operation cached_msgs.filter(lambda msg: "foo" in msg).count() cached_msgs.filter(lambda msg: ”bar" in msg).count() Cache 2 Worker... Cache 3 Worker Block 2 Block 3 © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 628 Job scheduling RDD Objects DAGScheduler TaskScheduler Worker Cluster Threads DAG TaskSet manager Task Block manager rdd1.join(rdd2) split graph into launch tasks via execute tasks.groupBy(…).filter(…) stages of tasks cluster manager submit each retry failed or store and serve build operator DAG stage as ready straggling tasks blocks source: https://cwiki.apache.org/confluence/display/SPARK/Spark+Internals © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 629 Beispiel: Word Count > lines = sc.textFile(“hamlet.txt”) > counts = lines.flatMap(lambda line: line.split(“ ”)).map(lambda word => (word, 1)).reduceByKey(lambda x, y: x + y) “to” (to, 1) (be, 2) “to be or” “be” (be, 1) (not, 1) “or” (or, 1) “not” (not, 1) (or, 1) “not to be” “to” (to, 1) (to, 2) “be” (be, 1) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 630 Einbettung in Sprachen Python Standalone Programs lines = sc.textFile(...) Python, Scala, & Java lines.filter(lambda s: “ERROR” in s).count() Scala Interactive Shells val lines = sc.textFile(...) Python & Scala lines.filter(x => x.contains(“ERROR”)).count() Performance Java Java & Scala sind JavaRDD lines = sc.textFile(...); lines.filter(new Function() { tatsächlich Boolean call(String s) { return s.contains(“error”); performannter } }).count(); © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 631 SparkConf sconf = new SparkConf(true).setMaster("spark://master:7077").setAppName("SanderTest").set("spark.submit.deployMode", "cluster").set("spark.executor.memory", "1g"); sconf.setJars(new String[] { "target/Spark/Job.jar" } ); JavaSparkContext sc = new JavaSparkContext(sconf); JavaRDD textFile = sc.textFile("hdfs://master:9000/input/bibel.txt"); JavaRDD words = textFile.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); JavaPairRDD counts = words.mapToPair((w -> new Tuple2((String)w, 1))).reduceByKey( (x, y) -> (Integer) x + (Integer) y); counts.saveAsTextFile("hdfs:... "); © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 632 Python import sys from pyspark import SparkContext conf = SparkConf().setAll([('spark.executor.memory', '3g'), ('spark.executor.cores', '8‘), ('spark.cores.max', '8‘), ('spark.driver.memory', '4g'), ( "spark.app.name", "Name"), ("spark.master", "spark://149.201.206.132:7077")]) sc = SparkContext(conf=conf) spark = SparkSession.builder.config(conf=conf).getOrCreate()] lines = sc.textFile(sys.argv) counts = lines.flatMap(lambda s: s.split(“ ”)) \.map(lambda word: (word, 1)) \.reduceByKey(lambda x, y: x + y) counts.saveAsTextFile(sys.argv) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 633 PySpark DataFrames Tabellenähnliche Struktur: DataFrames repräsentieren Daten als tabellenähnliche Struktur mit benannten Spalten. Dies erleichtert das Arbeiten mit Daten, insbesondere wenn sie strukturiert sind. Optimierte Verarbeitung: DataFrames ermöglichen Spark, Optimierungen wie Prädikatspushdown (Daten werden früh gefiltert) und Spaltenpruning (nur erforderliche Daten werden gelesen) durchzuführen, um die Leistung der Abfragen zu verbessern. APIs für relationale Operationen: PySpark DataFrames bieten APIs für relationale Operationen, ähnlich wie SQL- Abfragen. Benutzer können DataFrame-Transformationen und -Aktionen verwenden, um Daten zu filtern, zu gruppieren, zu aggregieren und zu analysieren. Integration mit Spark SQL: DataFrames können nahtlos mit Spark SQL integriert werden, was erweiterte SQL- Abfragemöglichkeiten ermöglicht © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 634 PySpark DataFrames eher als Panda DF? Die API und Syntax von PySpark DataFrames ähneln der von Pandas, aber es gibt einige Unterschiede. Zum Beispiel können einige Pandas- Operationen, die auf lokale Datenstrukturen abzielen, in PySpark möglicherweise nicht direkt angewendet werden from pyspark.sql import SparkSession import pandas as pd from pyspark.sql.functions import col # Ein Pandas DataFrame erstellen data = {'Name': ['Alice', 'Bob', 'Charlie'], sp = SparkSession.builder.appName(“test").getOrCreate() 'Alter': [25, 30, 35], 'Stadt': [‚JÜLICH', ‚Aachen', 'Köln']} # Erstellen eines PySpark DataFrame df = pd.DataFrame(data) data = [('Alice', 25, ‚Jülich'), ('Bob', 30, ‚Aachen‘), # Filtern von Zeilen ('Charlie', 35, 'Köln')] filtered_df = df[df['Alter'] > 30] columns = ['Name', 'Alter', 'Stadt'] print(filtered_df) df = sp.createDataFrame(data, columns) # Filtern von Zeilen filtered_df = df.filter(col('Age') > 30) filtered_df.show() PySpark DataFrame: Spark bietet Integrationen mit anderen Spark- Ökosystemkomponenten wie Spark SQL, MLlib (Machine Learning Library), GraphX usw. Wenn größere Datensätze oder verteilte Verarbeitung benötigt wird, so sollten die native Spark-Funktionalität und PySpark DataFrames genutzt werden © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 635 Spark Streaming © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 636 Motivation Viele Anwendungen müssen die eingehenden Daten in quasi- Echtzeit verarbeiten – Social network trends – Website statistics – Intrusion detection systems etc. – Require large clusters to handle workloads – Require latencies of few seconds © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 637 Spark Streaming Framework zur Verarbeitung kontinuierlich eingehender Daten Skaliert (100s of nodes) Kann geringe Latenzen erreichen Integriert mit Spark’s batch und interactive processing Besitzt eine einfache batch-like API © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 638 Spark streaming Spark streaming ist eine Erweiterung der core Spark API Skalierbares High-Throughput Stream Procssing of livestreams Designiert um Daten aus diversen Quellen zu verarbeiten: Kafka, Flume, Twitter, ZeroMQ, Kinesis, oder TCP sockets © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 639 Wie arbeitet Spark streaming? Spark Streaming empfängt kontinuierlich live input data streams und unterteilt diese in batches. Jeder batch wird von der Spark Engine verarbeitet und erzeugt eine (Micro-)Batch-Ausgabe © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 640 Discretized Stream Processing Streaming setzt sich demnach aus einem Ablauf multipler sehr kleiner Jobs zusammen © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 641 Beispiel – Hole die aktuellen hashtags von Twitter val tweets = sc.twitterStream() val hashTags = tweets.flatMap (status => getTags(status)) hashTags.saveAsHadoopFiles("hdfs://...") output operation: to push data to external storage batch @ t batch @ t+1 batch @ t+2 tweets DStream flatMap flatMap flatMap hashTags DStream save save save Every batch saved to HDFS, Write to database, update analytics UI, do whatever appropriate © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 642 Das Spark ecosystem © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 643 Spark workflow Spark Anwendungen laufen als eigenständige Prozesse auf einem Cluster und werden durch das SparkContext des treibenden Programms kontrolliert Sie benötigen einen Cluster Manager, der die benötigten Ressourcen auch allokiert und stoppt Sobald man verbunden ist werden Executor auf den Knoten im Cluster angelegt, die als Container für die Berechnungen agieren und die Daten Vorhalten Der Anwendungscode wird dann übertragen (JAR oder Python Dateien via SparkContext) und dann die tasks © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 644 SQL Spark SQL erlaubt die Verarbeitung von strukturierten und semi-strukturierten Daten unter Verwendung von SQL- Ausdrücken. Die Abarbeitung findet parallel statt sqlContext=SQLContext(sc) Da RDDs lediglich unstrukturierte Daten enthalten, benötigt man eine Datenabstraktion, die um ein Schema erweitert wurde. Spark DataFrames bieten hier eine Lösung. Es handelt sich um eine verteilte Datensammlungen mit benannten Spalten (kann auch repartitioniert werden) rdd.toDF(ggf. Angabe der Spalten) Das Konzept passt optimal zu Apache Parquet, ein spaltenbasiertes Format zur Speicherung strukturierter Daten, welches von allen Projekten aus dem Hadoop Ökosystem verwendet werden kann df.write.partitionBy(Keys).parquet(path) parqDF = spark.ead.parquet(path) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 645 SQL Spark SQL erlaubt auch die direkte Verwendung von SQL- Statements Hierzu sollte der DataFrame mittels der registerTempTable oder createOrReplaceTempView Methode über einen Tabellennamen zugänglich gemacht werden result = sqlContext.sql(SELECT_STMT_FROM_REGISTEREDTABLENAME) Hinweis für Python: Diese Methoden können auch auf Pandas Dataframes angewendet werden, allerdings müssen diese vorher in ein Spark Dataframe konvertiert werden (Mittels Apache Arrow) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 646 Spark SQL Manning: Spark in Action © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 647 SQL Erhebliche Vereinfachung des Arbeitens! © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 648 Apache Parquet ist keine klassische spaltenbasierte NoSQL-Datenbank, vielmehr handelt es sich um ein spaltenbasiertes Speicherkonzept (auch auf HDFS) https://community.ptc.com/t5/IoT-Tech-Tips/Parquet-Data-Format-used-in-ThingWorx-Analytics/td-p/535228 © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 649 Arbeiten mit CSV-Dateien Erstellen eines SparcContext Erstellen eines SQLContext Konvertiere CSV-Datei zu einem Spark-DataFrame mittels sqlContext.read.format('com.databricks.spark.csv‘) (ggf. mit eigener Bezeichnung der Spalten!) Optionales anpassen der Datentypen des DataFrames Sichern des angepassten Dataframes im Parquet-Format df.write.format('parquet').save(target) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 650 Arbeiten mit CSV-Dateien © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 651 Der Apache Spark Stack Scala, Java, Python, R, SQL DataFrames ML Pipelines Spark Spark SQL MLlib GraphX Streaming Spark Core Data Sources Apache Parquet, Hadoop HDFS, HBase, Hive, Apache S3, Streaming, JSON, MySQL, and HPC-style (GlusterFS, Lustre) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 652 Der Apache Spark Stack Manning: Spark in Action © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 653 MLlib © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 654 MLlib © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 655 Spark und BI PACKT: Fast Data Processing with Spark, 2nd Edition © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 656 Einführung in © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 657 Übersicht MongoDB positioniert sich zwischen den key-value stores (schnell und hoch-skalierbar) und traditionellen RDBMS-Systemen (mit hoher Funktionalität) MongoDB ist document-oriented, schema-free, scalable, high-performance und open source. Implementiert in C++ Wir haben keine Zeilen, denken Sie in Dokumenten und „Collections“. Wir haben keine Transaktionen, keine referentielle Integrität, keine Joins, kein Schema. Der Name wurde vom Wort Humongous abgeleitet © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 658 Kein Key-Value Store Mongo speichert semi-strukturierte Daten und hier konkret JSON-Dokumente Mongo liefert eine Abfragesprache Elementare Bedeutung bei der Nutzung haben die Indexe Ermöglicht Map/Reduce-Ansätze Automatisches sharding, GridFS zur Unterstützung sehr großer Datein, geospatial indexing (maximal distance), etc. © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 659 Mongo Document Elementare Dateneinheit (quasi ein Tupel/Zeile bei einer relationalen DB) Geordnete Menge an Keys mit entsprechenden Values Darstellende Syntax ist ein JSON-Document Intern wird dies binär gespeichert (BSON) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 660 MongoDB Dokument _id ist der primary key Dieser wird automatisch von MongoDB erzeugt { _id: ObjectId("5099803df3f4948bd2f98391"), name: { first: "Alan", last: "Turing" }, birth: new Date('Jun 23, 1912'), Name ist ein death: new Date('Jun 07, 1954'), eingebettetes contribs: [ "Turing machine", "Turing test", "Turingery”], Dokument views : NumberLong(1250000) } Die contribs werden als Feld von Strings gespeichert © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 661 Warum JSON-Dokumente? Serialisierte Objekte entsprechen dem Modell der Programmiersprachen Möglichkeit zur Nutzung von embedded documents liefert ein Umgehungstatbestand für das Fehlen von Joins JSON passt perfekt zu modernen Web- Architekturen: AJAX und REST!!!! Aber: Der hohe Grad an Denormalisierung bedeutet, dass wir Informationen mehrfach speichern © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 662 MongoDB Data Model © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 663 MongoDB Data Model Collection: – Gruppe von Dokumenten (zumeist mit einer spezifischen semantischen Bedeutung – Kein festes Schema, dies wird von der Anwendung bestimmt © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 664 MongoDB Data Model Eine database speichert einen Satz an Collections Eine collection speichert einen Satz an Dokumente Ein document besteht aus einem Satz an fields Ein field ist key-valuePaar Ein key ist ein (Variablen)Name (string) Eine value ist § Ein Basistyp wie string, integer, float, timestamp, binary, etc., § Ein document § Ein array of values © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 665 MongoDB Indexing Indexe sind in MongoDB der elementare Weg zum Auffinden der Dokumente MongoDB unterstützt indexe auf jedem field und auf jedem embedded sub-field in den Dokumenten MongoDB definiert Indexe pro collection Indexe werden über einen B-Baum realisiert © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 666 Beispielmodell Wir modellieren einen Blog. Ein Post hat einen Autor, einen Text und beliebig viele Kommentare Die Kommentare sind dem Post eindeutig zugeordnet Ein Autor hat viele Posts © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 667 Wie sollte man dies nicht in Mongo abbilden? post = { id: 150, author: 100, text: 'This is a pretty awesome post.', comments: [100, 105, 112] } author = { id: 100, name: 'Michael Arrington' posts: } comment = { id: 105, text: 'Whatever this sux.' } © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 668 Besser: Embedded Documents! post = { author: 'Michael Arrington', text: 'This is a pretty awesome post.', comments: [ 'Whatever this post sux.’, 'I agree, lame!’] } Wieso ist dies besser? © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 669 Vorteile Wir bekommen alle benötigten Information mit nur einer Anfrage wieder Objekte der gleichen Collection werden zusammenhängend auf Platte gespeichert Spatial locality = faster Wir benötigen keine Joins um die Dinge zu finden, die zusammen gehören © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 670 https://docs.mongodb.com/manual/core/sharded-cluster-components/ © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 671 MongoDB: Skalierung https://www.kenwalger.com/blog/nosql/mongodb/mongodb-horizontal-scaling-sharding/ © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 672 MongoDB: Replication https://severalnines.com/database-blog/turning-mongodb-replica-set-sharded-cluster © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 673 Arbeiten mit MongoDB Wir haben eine Shell in der wir mittels Dot- Notation Kommandos eingeben database_name.collection_name ‘embedded_document.key_name’ © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 674 MongoDB Usage MySQL MongoDB START TRANSACTION; db.contacts.save( { INSERT INTO contacts VALUES userName: “joeblow”, (NULL, ‘joeblow’); emailAddresses: [ INSERT INTO contact_emails VALUES “[email protected]”, ( NULL, ”[email protected]”, “[email protected]” ] } ); LAST_INSERT_ID() ), ( NULL, “[email protected]”, LAST_INSERT_ID() ); COMMIT; © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 675 SQL-to-MongoDB-Mapping © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 676 SQL-to-MongoDB-Mapping SQL Statement Mongo Statement © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 677 Map&Reduce und Java API http://www.javacodegeeks.com/2012/06/mapreduce-with-mongodb.html © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 678 Sperren Mongodb verwendet exklusive Sperren für Updates. Dies beeinflusst den Durchsatz! Beginning with version 2.2, MongoDB implements locks on a per-database basis for most read and write operations. Some global operations, typically short lived operations involving multiple databases, still require a global “instance” wide lock. Before 2.2, there is only one “global” lock per mongod instance. © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 679 Sperren For WiredTiger Beginning with version 3.0, MongoDB ships with the WiredTiger storage engine. For most read and write operations, WiredTiger uses optimistic concurrency control. WiredTiger uses only intent locks at the global, database and collection levels. When the storage engine detects conflicts between two operations, one will incur a write conflict causing MongoDB to transparently retry that operation. © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 680 CouchDB Dokumentenbasiert Datenbank Realisiert in der Programmiersprache Erlang Wie in MongoDB werden die Daten in JSON repräsentiert Die Abfrage ist jedoch vollkommen auf REST-abgestimmt und folgt dem Map / Reduce-Konzept © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 681 REST-Verben Method Description PUT PUT requests are used to create new resources where the URI of the request is different to the resource that is to be created. GET GET requests are used to request data from the database. POST POST requests are used to update the existing data, at the same resource the URI is requested from. DELETE DELETE requests to delete databases and documents. COPY Copies one resource to another resource. © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 682 SQL vs. CouchDB-REST SQL REST Insert into… HTTP PUT /db/id Select * from HTTP GET /db/id Update HTTP PUT /db/id Delete HTTP DELETE /db/id DBA HTTP GET /mydb/, HTTP GET/_all_dbs, HTTP PUT /_replicate, HTTP POST /mydb/_bulk_docs © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 683 HTTP Status Codes Status Code Description 200 (OK) The request was successfully processed. 201 (Created) The document was successfully created. 304 (Not Modified) The document has not been modified since the last update. 400 (Bad Request) The syntax of the request was invalid. 404 (Not Found) The request was not found. 405 (Method Not Allowed) The request was made using an incorrect request method. 409 (Conflict) The request failed because of a database conflict. 412 (Precondition Failed) could not create a database- a database with that name already exists. 500 (Internal Server Error) The request was invalid and failed, or an error occurred within the CouchDB server. © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 684 Curl als Schnittstelle zur CouchDB Läuft CouchDB? curl http://127.0.0.1:5984/ Response : {"couchdb":"Welcome","version":"0.10.1"} © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 685 Curl Command – Datenbank-API Anzeigen der verfügbaren Datenbanken: curl -X GET http://127.0.0.1:5984/_all_dbs Anlegen einer neuen Datenbank : curl -X PUT http://127.0.0.1:5984/DB_name Löschen einer Datenbank: curl -X DELETE http://127.0.0.1:5984/DB_name © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 686 Curl Command – Document-API Anlegen eines Dokuments: curl -X PUT http://127.0.0.1:5984/albums/ 6ert2gh45ji6h6tywe324743rtbhgtrg \ -d '{"title":"abc","artist":"xyz"}' Erhalten der UUID-Kennung des Dokuments: curl -X GET http://127.0.0.1:5984/_uuids Lesen eines Dokuments: curl -X GET http://127.0.0.1:5984/albums/ 6ert2gh45ji6h6tywe324743rtbhgtrg © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 687 Curl Command – Document-API Einfügen von Attachments: curl -vX PUT http://127.0.0.1:5984/albums/ 6ert2gh45ji6h6tywe324743rtbhgtrg/ \ artwork.jpg?rev=2-2739352689 --data-binary @artwork.jpg -H "Content-Type: image/jpg" Anzeigen des Attachments im Browser: http://127.0.0.1:5984/albums/6ert2gh45ji6h6 tywe324743rtbhgtrg/artwork.jpg © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 688 Logstash, Elasticsearch und Kibana © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 689 Logstash, Elasticsearch und Kibana https://stackoverflow.com/questions/56997656/elasticsearch-data-insertion-with-python © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 690 Der Elasticsearch ELK Softwarestack Besteht aus drei Open Source Produkten der Firma “Elastic” (ehemals Elasticsearch) – E => Elasticsearch (hochskalierbarer Index-Such-Server, Basis ist Lucene, dokumentenbasiert) – L => Logstash (Tool zum Sammeln, Aufbereiten, Filtern und Weiterleiten von Daten – insbesondere Log Daten) – K => Kibana (Tool zur Datenvisualisierung und - exploration) © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 691 Der Elasticsearch ELK Softwarestack © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 692 Der Elasticsearch ELK Softwarestack © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 693 Elasticsearch und Kibana https://towardsdatascience.com/covid-19-map-using-elk-7b8611e9f2f4 © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 694 Ein komplexes Szenario: Alibaba Cloud https://alibaba-cloud.medium.com/getting-started-with-beats-b88adfb8214a © FH AACHEN UNIVERSITY OF APPLIED SCIENCES Vorlesung Datenbanken II / Datenanalyse Dezember 23| 695