-
大小: 938KB文件類(lèi)型: .zip金幣: 2下載: 1 次發(fā)布日期: 2021-08-20
- 語(yǔ)言: 數(shù)據(jù)庫(kù)
- 標(biāo)簽: spark??mysql??toDF??rdd??
資源簡(jiǎn)介
記得自己要引入環(huán)境
(1)利用SparkStreaming從文件目錄讀入日志信息,日志內(nèi)容包含:
”日志級(jí)別、函數(shù)名、日志內(nèi)容“ 三個(gè)字段,字段之間以空格拆分。請(qǐng)看數(shù)據(jù)源的文件。
(2)對(duì)讀入都日志信息流進(jìn)行指定篩選出日志級(jí)別為error或warn的,并輸出到外部MySQL中。
需要用到的函數(shù)
(1)輸入采用textFileStream()算子
(2)輸出采用foreachRDD()算子
(3)將RDD轉(zhuǎn)為DataFrame
(4)DataFrame注冊(cè)為臨時(shí)表,使用SQL過(guò)濾
(5)將過(guò)濾后的數(shù)據(jù)保存到MySQL

代碼片段和文件信息
from?pyspark.shell?import?sc
from?pyspark.sql?import?Row
from?pyspark.sql?import?SparkSession
from?pyspark.sql.types?import?*
from?pyspark.streaming?import?StreamingContext
spark?=?SparkSession.builder.appName(“Streaming“).getOrCreate()
sc=spark.sparkContext
#兩個(gè)參數(shù):1、sc參數(shù)?2、采樣時(shí)間間隔(秒)
ssc?=StreamingContext(sc1)
#在ubuntu環(huán)境下數(shù)據(jù)源路勁
ds1?=ssc.textFileStream(“/home/zhuang/138/input/test“)
#把所有數(shù)據(jù)劃分為[[][]]格式
ds3?=?ds1.map(lambda?line:line.split(“\t“))
def?func(rdd):
????if?not?rdd.isEmpty():
????????#記得轉(zhuǎn)碼很重要
????????url?=?“jdbc:mysql://ip地址:3306/pyspark?user=root&password=zhuang&characterEncoding=UTF-8“
????????#構(gòu)建表結(jié)構(gòu)
????????schema?=?StructType([StructField(“日志級(jí)別“?StringType()?True)?StructField(“函數(shù)名“?StringType()?True)
?????????????????????????????StructField(“日志內(nèi)容“?StringType()?True)])
????????#對(duì)[[][][]]數(shù)據(jù)轉(zhuǎn)換成[[[][][]][][][]]因?yàn)閠odf數(shù)據(jù)是數(shù)據(jù)格式傳值
????????rdd.map(lambda?x:tuple(x)).toDF(schema).registerTempTable(“test_person1“)
????????df1?=?spark.sql(“select?*?from?test_person1?where?‘日志級(jí)別‘!=‘[info]‘“)
????????#?df2?=?spark
????????df1.show()
????????#寫(xiě)入mysql
????????df1.write.jdbc(mode=“overwrite“url=urltable=“test_person1“?properties={“driver“:‘com.mysql.jdbc.Driver‘})
????????df1.show()
ds3.pprint()
ds3.foreachRDD(func)
#?print(ds4.foreachRDD(func))
ssc.start();ssc.awaitTermination()
?屬性????????????大小?????日期????時(shí)間???名稱(chēng)
-----------?---------??----------?-----??----
?????文件?????????501??2019-05-04?11:03??20180103.log
?????文件?????????501??2019-05-04?11:03??20180104.log
?????文件?????1007502??2019-03-12?14:38??mysql-connector-java-5.1.47.jar
?????文件????????1514??2019-05-30?08:58??test02.py
評(píng)論
共有 條評(píng)論