Кумулативна трансформация на състоянието в Apache Spark Streaming



Тази публикация в блога обсъжда трансформации със състояние в Spark Streaming. Научете всичко за кумулативно проследяване и повишаване на уменията за кариера в Hadoop Spark.

Принос от Prithviraj Bose

В предишния си блог съм обсъждал трансформации със състояния, използвайки концепцията за прозоречно стрийминг на Apache Spark Streaming. Можете да го прочетете тук .





В тази публикация ще обсъдя кумулативни операции със състояние в Apache Spark Streaming. Ако не сте начинаещи в Spark Streaming, силно ви препоръчвам да прочетете предишния ми блог, за да разберете как работи прозореца.

Видове трансформация с държавно състояние в искрено поточно предаване (Продължение ...)

> Кумулативно проследяване

Бяхме използвали reduceByKeyAndWindow (...) API за проследяване на състоянията на ключовете, но прозорците поставят ограничения за определени случаи на употреба. Ами ако искаме да натрупваме състоянията на ключовете през цялото време, вместо да го ограничаваме до времеви прозорец? В този случай ще трябва да използваме updateStateByKey (...) ПОЖАР.



Този API е въведен в Spark 1.3.0 и е много популярен. Въпреки това този API има някои режийни разходи, неговата производителност се влошава, тъй като размерът на състоянията се увеличава с течение на времето. Написах пример, за да покажа използването на този API. Можете да намерите кода тук .

Spark 1.6.0 представи нов API mapWithState (...) което решава режийните режийни разходи, представени от updateStateByKey (...) . В този блог ще обсъдя този конкретен API, използвайки примерна програма, която съм написал. Можете да намерите кода тук .

Преди да се потопя в разходка с код, нека отделим няколко думи за контролна точка. За всяка трансформация със състояние, проверката е задължителна. Checkpointing е механизъм за възстановяване на състоянието на ключовете в случай, че програмата на драйвера се провали. Когато драйверът се рестартира, състоянието на ключовете се възстановява от контролните файлове. Местоположенията на контролните пунктове обикновено са HDFS или Amazon S3 или всяко надеждно хранилище. Докато тествате кода, можете също да съхранявате в локалната файлова система.



как да направите jframe в java -

В примерната програма слушаме текстов поток на сокет на host = localhost и port = 9999. Той токенизира входящия поток в (думи, брой появявания) и проследява броя на думите, използвайки API 1.6.0 mapWithState (...) . Освен това ключовете без актуализации се премахват с помощта на StateSpec. timeout API. Проверяваме в HDFS и честотата на проверката е на всеки 20 секунди.

Нека първо създадем сесия на Spark Streaming,

Spark-streaming-session

Ние създаваме a checkpointDir в HDFS и след това извикайте обектния метод getOrCreate (...) . The getOrCreate API проверява checkpointDir за да видите дали има някакви предишни състояния за възстановяване, ако това съществува, то пресъздава сесията на Spark Streaming и актуализира състоянията на ключовете от данните, съхранявани във файловете, преди да премине с нови данни. В противен случай създава нова сесия на Spark Streaming.

The getOrCreate взема името на директорията на контролната точка и функция (която сме именували createFunc ) чийто подпис трябва да бъде () => StreamingContext .

какво е задънена улица в Java

Нека разгледаме кода вътре createFunc .

Ред №2: Създаваме контекст за поточно предаване с име на заданието до „TestMapWithStateJob“ и интервал на партида = 5 секунди.

Ред # 5: Задайте директорията на контролната точка.

Ред # 8: Задайте спецификацията на състоянието, като използвате класа org.apache.streaming.StateSpec обект. Първо задаваме функцията, която ще проследява състоянието, след това задаваме броя на дяловете за получените DStreams, които трябва да бъдат генерирани по време на последващи трансформации. Накрая задаваме времето за изчакване (на 30 секунди), където ако не се получи актуализация за ключ за 30 секунди, състоянието на ключа ще бъде премахнато.

Ред 12 #: Настройте потока на сокета, изравнете входящите партидни данни, създайте двойка ключ-стойност, обадете се mapWithState , задайте интервала за проверка на 20s и накрая отпечатайте резултатите.

Рамката Spark призовава th e createFunc за всеки ключ с предишната стойност и текущото състояние. Изчисляваме сумата и актуализираме състоянието с кумулативна сума и накрая връщаме сумата за ключа.

е връзка в Java

Източници на Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Имате въпрос към нас? Моля, споменете го в раздела за коментари и ние ще се свържем с вас.

Подобни публикации:

Започнете с Apache Spark & ​​Scala

Състоятелни трансформации с прозорци в Spark Streaming