티스토리 뷰
# spark_streaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 스파크 인스턴스 정의
spark = SparkSession.builder.appName("stream-word-count").getOrCreate()
# readstream을 지정해줘야 한다. 소켓과 주소, 포트를 정의
lines_df = spark.readStream.format("socket").option("host", "localhost").option("port", "9999").load()
# transformation을 이용해 word데이터를 다뤄줄거다.
# expr : sql문을 select 함수 내에서 그대로 사용
words_df = lines_df.select(expr("explode(split(value, ' ')) as word"))
counts_df = words_df.groupBy("word").count()
# writestream을 이용해 콘솔로 데이터를 보내준다.
word_count_query = counts_df.writeStream.format("console")\
.outputMode("complete")\
.option("checkpointLocation", ".checkpoint")\
.start()
# awaitTermination으로 계속 열어두고 사용
word_count_query.awaitTermination()
이후 terminal을 열어 다음 명령어를 입력하여 소켓을 열어준다. 1번 터미널이라고 하겠다.
(명령어를 입력하면 완료되었다는 메시지는 뜨지 않고 무엇인가를 입력할 수 있는 상태가 된다.)
nc -lk 9999
그 다음 새로운 terminal을 열어 spark_steaming파일을 실행시켜준다. 2번 터미널이라고 하겠다.
spark-submit spark_streaming.py
소켓을 입력했던 1번 터미널에 단어들을 나열해 입력해본다.
computer ABC DEF explorer
그러면 스트림이 실행되게 되고 2번 터미널에 출력된 결과를 확인 할 수 있다.
+---------+-----+
| word|count|
+---------+-----+
| explorer| 1|
| computer| 1|
| DEF| 1|
| ABC| 1|
| | 1|
+---------+-----+