這篇主要是要用PySpark去操作MongoDB
我的環境是windows 10 x64,下面會用到的檔案都有傳到github repo
需要的元件有:
winutils repo
mongo-spark repo
IntelliJ Idea (IDE, 可用其他代替)
安裝篇 簡單安裝一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 tee /etc/yum.repos.d/mongodb-org-3.4.repo << EOF [mongodb-org-3.4] name=MongoDB Repository baseurl=https://repo.mongodb.org/yum/redhat/\$releasever /mongodb-org/3.4/x86_64/ gpgcheck=0 enabled=1 EOF yum install -y mongodb-org sed -i '$a exclude=mongodb-org,mongodb-org-server,mongodb-org-shell,mongodb-org-mongos,mongodb-org-tools' /etc/yum.conf sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/sysconfig/selinux sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config setenforce 0 if [[ -f /sys/kernel/mm/transparent_hugepage/enabled ]];then echo never > /sys/kernel/mm/transparent_hugepage/enabled fi if [[ -f /sys/kernel/mm/transparent_hugepage/defrag ]];then echo never > /sys/kernel/mm/transparent_hugepage/defrag fi tee -a /etc/rc.local <<EOF if [[ -f /sys/kernel/mm/transparent_hugepage/enabled ]];then echo never > /sys/kernel/mm/transparent_hugepage/enabled fi if [[ -f /sys/kernel/mm/transparent_hugepage/defrag ]];then echo never > /sys/kernel/mm/transparent_hugepage/defrag fi EOF sed -i "s/bindIp:.*/bindIp: $(hostname -I) /" /etc/mongod.conf systemctl enable mongod systemctl start mongod tee createUser.js << EOF use admin; db.createUser({ user: "admin" , pwd : "password" , roles: [{role: "root" , db: "admin" }] }); EOF mongo 192.168.1.112/admin createUser.js systemctl restart mongod tee -a /etc/mongod.conf << EOF security: authorization: enabled EOF mongo -u "admin" -p "password" -host 192.168.1.112 --authenticationDatabase "admin" sudo firewall-cmd --zone=public --add-port=27017/tcp --permanent sudo firewall-cmd --reload
Spark環境 下載winutils
後,把hadoop-2.7.1
丟到C:\hadoop
然後加一個環境變數HADOOP_HOME
指向C:\hadoop\hadoop-2.7.1
然後用conda install pyspark
把pyspark安裝起來,然後用pyspark
指令打開來測試看看
執行PySpark test_script.py
如下:
1 2 3 import pysparkspark = pyspark.sql.SparkSession.builder.appName('test' ).getOrCreate() print(spark.range(10 ).collect())
用下面指令來執行:
1 spark-submit --master local [2] test_script.py
執行,就可以看到輸出為
1 [Row(id=0), Row(id=1), Row(id=2), Row(id=3), Row(id=4), Row(id=5), Row(id=6), Row(id=7), Row(id=8), Row(id=9)]
這樣就代表pyspark環境完成了
IDEA跟編譯mongo spark connector 接著打開Idea,安裝sbt, scala這兩個plugins
接著create一個scala的sbt專案於mongo-spark的資料夾
稍待一下,直接停掉idea的build job
打開Terminal打bash sbt check就會開始編譯 (要有mingw)
然後到sbt shell輸入asembly,就可以編譯出mongo spark connector的fat jar了
盪target/scala-2.11
裡面就會有mongo-spark-connector-assembly-2.2.2.jar
了
把mongo-spark-connector-assembly-2.2.2.jar
丟到你的project下裡面
使用PySpark access mongo 先來一個簡單的例子,mongo.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from pyspark.sql import SparkSessionuri = "mongodb://admin:password@192.168.1.112" spark = SparkSession \ .builder \ .appName("myApp" ) \ .config("spark.mongodb.input.uri" , uri + "/admin.test" ) \ .config("spark.mongodb.output.uri" , uri + "/admin.test" ) \ .getOrCreate() people = spark.createDataFrame([("Bilbo Baggins" , 50 ), ("Gandalf" , 1000 ), ("Thorin" , 195 ), ("Balin" , 178 ), ("Kili" , 77 ), ("Dwalin" , 169 ), ("Oin" , 167 ), ("Gloin" , 158 ), ("Fili" , 82 ), ("Bombur" , None )], ["name" , "age" ]) people.write.format("com.mongodb.spark.sql.DefaultSource" ).mode("append" ).save()
執行命令如下:
1 spark-submit --master local [8] --jars mongo-spark-connector-assembly-2.2.2.jar --conf spark.driver.extraClassPath=mongo-spark-connector-assembly-2.2.2.jar --conf spark.executor.extraClassPath=mongo-spark-connector-assembly-2.2.2.jar mongo.py
用mongo shell執行就可以看到資料已經被塞進去了
1 2 3 4 5 6 7 use admin; db.test.findOne(); // { // "_id" : ObjectId("5ae1e04885beb03a349ffbcf"), // "name" : "Dwalin", // "age" : NumberLong(169) // }
再來是本次目標,我們想要用pyspark從一個csv塞入一個架構如下面這樣的資料:
(舉一個可能類似婚友社的資料長相)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 [{ "id" : 1 , "name" : "Kevin" , "sex" : "man" , "age" : 27 , "history" : [ { "pairId" : 2 , "statusCheck" : "Interested" , "memo" : "Rose is following" }, { "pairId" : 5 , "statusCheck" : "Not interested" } ] }, { "id" : 2 , "name" : "Cindy" , "sex" : "woman" , "age" : 22 , "optional" : [ { "pairId" : 1 , "memo" : "unchecked, need to be followed!" } ] }]
profiles.csv
檔案:
1 2 3 4 id,name,sex,age,pairId,statusCheck,memo 1,Kevin,man,27,2,"Interested","Rose is following" 1,Kevin,man,27,5,"Not Interested", 2,Cindy,woman,22,1,,"unchecked, need to be followed!"
profile_data_builder.py
檔案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import pandas as pdfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import *from pyspark.sql.functions import struct, col, collect_listdf_pd = pd.read_csv("profile.csv" ) spark = SparkSession\ .builder\ .appName("dataTransform" )\ .getOrCreate() schema = StructType( [StructField("id" , IntegerType(), True ), StructField("name" , StringType(), True ), StructField("sex" , StringType(), True ), StructField("age" , IntegerType(), True ), StructField("pairId" , StringType(), True ), StructField("statusCheck" , StringType(), True ), StructField("memo" , StringType(), True )] ) df = spark.createDataFrame(df_pd, schema) grpByCols = ["id" , "name" , "sex" , "age" ] historyCols = ["pairId" , "statusCheck" , "memo" ] outDf = df.withColumn("history" , struct(*[col(name) for name in historyCols]))\ .drop(*historyCols).groupBy(*grpByCols)\ .agg(collect_list(col("history" )).alias("history" )) uri = 'mongodb://admin:password@192.168.1.112' outDf.write.format("com.mongodb.spark.sql.DefaultSource" )\ .option('uri' , uri).option('database' , 'admin' ).option('collection' , 'profile' )\ .mode("append" ).save()
最後用mongo shell查看塞進去的資料:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 use admin; db.profile.findOne(); // { // "_id" : ObjectId("5ae1ff1885beb047444fc948"), // "id" : 1, // "name" : "Kevin", // "sex" : "man", // "age" : 27, // "history" : [ // { // "pairId" : "2", // "statusCheck" : "Interested", // "memo" : "Rose is following" // }, // { // "pairId" : "5", // "statusCheck" : "Not Interested", // "memo" : "NaN" // } // ] // }