티스토리 뷰

# spark_streaming.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 스파크 인스턴스 정의
spark = SparkSession.builder.appName("stream-word-count").getOrCreate()

# readstream을 지정해줘야 한다.  소켓과 주소, 포트를 정의
lines_df = spark.readStream.format("socket").option("host", "localhost").option("port", "9999").load()

# transformation을 이용해 word데이터를 다뤄줄거다.
# expr : sql문을 select 함수 내에서 그대로 사용
words_df = lines_df.select(expr("explode(split(value, ' ')) as word"))
counts_df = words_df.groupBy("word").count()

# writestream을 이용해 콘솔로 데이터를 보내준다.
word_count_query = counts_df.writeStream.format("console")\
                            .outputMode("complete")\
                            .option("checkpointLocation", ".checkpoint")\
                            .start()

# awaitTermination으로 계속 열어두고 사용
word_count_query.awaitTermination()

 

 

이후 terminal을 열어 다음 명령어를 입력하여 소켓을 열어준다. 1번 터미널이라고 하겠다.

(명령어를 입력하면 완료되었다는 메시지는 뜨지 않고 무엇인가를 입력할 수 있는 상태가 된다.)

nc -lk 9999

 

그 다음 새로운 terminal을 열어 spark_steaming파일을 실행시켜준다. 2번 터미널이라고 하겠다.

spark-submit spark_streaming.py

 

소켓을 입력했던 1번 터미널에 단어들을 나열해 입력해본다.

computer ABC DEF explorer

 

그러면 스트림이 실행되게 되고 2번 터미널에 출력된 결과를 확인 할 수 있다.

+---------+-----+
|     word|count|
+---------+-----+
| explorer|    1|
| computer|    1|
|      DEF|    1|
|      ABC|    1|
|         |    1|
+---------+-----+

 

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함