Ching-Chuan Chen's Blogger

Statistics, Machine Learning and Programming

0%

PySpark with MongoDB

這篇主要是要用PySpark去操作MongoDB

我的環境是windows 10 x64,下面會用到的檔案都有傳到github repo

需要的元件有:

  1. winutils repo
  2. mongo-spark repo
  3. IntelliJ Idea (IDE, 可用其他代替)

安裝篇

簡單安裝一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
tee /etc/yum.repos.d/mongodb-org-3.4.repo << EOF
[mongodb-org-3.4]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/\$releasever/mongodb-org/3.4/x86_64/
gpgcheck=0
enabled=1
EOF

#install
yum install -y mongodb-org

#ignore update
sed -i '$a exclude=mongodb-org,mongodb-org-server,mongodb-org-shell,mongodb-org-mongos,mongodb-org-tools' /etc/yum.conf

#disable selinux
sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/sysconfig/selinux
sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config
setenforce 0

#kernel settings
if [[ -f /sys/kernel/mm/transparent_hugepage/enabled ]];then
echo never > /sys/kernel/mm/transparent_hugepage/enabled
fi
if [[ -f /sys/kernel/mm/transparent_hugepage/defrag ]];then
echo never > /sys/kernel/mm/transparent_hugepage/defrag
fi

tee -a /etc/rc.local <<EOF
if [[ -f /sys/kernel/mm/transparent_hugepage/enabled ]];then
echo never > /sys/kernel/mm/transparent_hugepage/enabled
fi
if [[ -f /sys/kernel/mm/transparent_hugepage/defrag ]];then
echo never > /sys/kernel/mm/transparent_hugepage/defrag
fi
EOF

#configure
sed -i "s/bindIp:.*/bindIp: $(hostname -I)/" /etc/mongod.conf

# start service
systemctl enable mongod
systemctl start mongod

# create user
tee createUser.js << EOF
use admin;
db.createUser({
user: "admin",
pwd: "password",
roles: [{role: "root", db: "admin"}]
});
EOF
mongo 192.168.1.112/admin createUser.js

# restart mongod
systemctl restart mongod

# enable authorization
tee -a /etc/mongod.conf << EOF
security:
authorization: enabled
EOF

# open mongo shell
mongo -u "admin" -p "password" -host 192.168.1.112 --authenticationDatabase "admin"

# open port to connect mongodb
sudo firewall-cmd --zone=public --add-port=27017/tcp --permanent
sudo firewall-cmd --reload

Spark環境

下載winutils後,把hadoop-2.7.1丟到C:\hadoop

然後加一個環境變數HADOOP_HOME指向C:\hadoop\hadoop-2.7.1

然後用conda install pyspark把pyspark安裝起來,然後用pyspark指令打開來測試看看

執行PySpark

test_script.py如下:

1
2
3
import pyspark
spark = pyspark.sql.SparkSession.builder.appName('test').getOrCreate()
print(spark.range(10).collect())

用下面指令來執行:

1
spark-submit --master local[2] test_script.py

執行,就可以看到輸出為

1
[Row(id=0), Row(id=1), Row(id=2), Row(id=3), Row(id=4), Row(id=5), Row(id=6), Row(id=7), Row(id=8), Row(id=9)]

這樣就代表pyspark環境完成了

IDEA跟編譯mongo spark connector

接著打開Idea,安裝sbt, scala這兩個plugins

接著create一個scala的sbt專案於mongo-spark的資料夾

稍待一下,直接停掉idea的build job

打開Terminal打bash sbt check就會開始編譯 (要有mingw)

然後到sbt shell輸入asembly,就可以編譯出mongo spark connector的fat jar了

target/scala-2.11裡面就會有mongo-spark-connector-assembly-2.2.2.jar

mongo-spark-connector-assembly-2.2.2.jar丟到你的project下裡面

使用PySpark access mongo

先來一個簡單的例子,mongo.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql import SparkSession

uri = "mongodb://admin:password@192.168.1.112"
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", uri + "/admin.test") \
.config("spark.mongodb.output.uri", uri + "/admin.test") \
.getOrCreate()

people = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])

people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

執行命令如下:

1
spark-submit --master local[8] --jars mongo-spark-connector-assembly-2.2.2.jar --conf spark.driver.extraClassPath=mongo-spark-connector-assembly-2.2.2.jar --conf spark.executor.extraClassPath=mongo-spark-connector-assembly-2.2.2.jar mongo.py

用mongo shell執行就可以看到資料已經被塞進去了

1
2
3
4
5
6
7
use admin;
db.test.findOne();
// {
// "_id" : ObjectId("5ae1e04885beb03a349ffbcf"),
// "name" : "Dwalin",
// "age" : NumberLong(169)
// }

再來是本次目標,我們想要用pyspark從一個csv塞入一個架構如下面這樣的資料:

(舉一個可能類似婚友社的資料長相)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[{
"id": 1,
"name": "Kevin",
"sex": "man",
"age": 27,
"history": [
{
"pairId": 2,
"statusCheck": "Interested",
"memo": "Rose is following"
},
{
"pairId": 5,
"statusCheck": "Not interested"
}
]
}, {
"id": 2,
"name": "Cindy",
"sex": "woman",
"age": 22,
"optional": [
{
"pairId": 1,
"memo": "unchecked, need to be followed!"
}
]
}]

profiles.csv檔案:

1
2
3
4
id,name,sex,age,pairId,statusCheck,memo
1,Kevin,man,27,2,"Interested","Rose is following"
1,Kevin,man,27,5,"Not Interested",
2,Cindy,woman,22,1,,"unchecked, need to be followed!"

profile_data_builder.py檔案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import struct, col, collect_list

df_pd = pd.read_csv("profile.csv")

spark = SparkSession\
.builder\
.appName("dataTransform")\
.getOrCreate()
schema = StructType(
[StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("sex", StringType(), True),
StructField("age", IntegerType(), True),
StructField("pairId", StringType(), True),
StructField("statusCheck", StringType(), True),
StructField("memo", StringType(), True)]
)
df = spark.createDataFrame(df_pd, schema)

grpByCols = ["id", "name", "sex", "age"]
historyCols = ["pairId", "statusCheck", "memo"]
outDf = df.withColumn("history", struct(*[col(name) for name in historyCols]))\
.drop(*historyCols).groupBy(*grpByCols)\
.agg(collect_list(col("history")).alias("history"))
uri = 'mongodb://admin:password@192.168.1.112'
outDf.write.format("com.mongodb.spark.sql.DefaultSource")\
.option('uri', uri).option('database', 'admin').option('collection', 'profile')\
.mode("append").save()

最後用mongo shell查看塞進去的資料:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use admin;
db.profile.findOne();
// {
// "_id" : ObjectId("5ae1ff1885beb047444fc948"),
// "id" : 1,
// "name" : "Kevin",
// "sex" : "man",
// "age" : 27,
// "history" : [
// {
// "pairId" : "2",
// "statusCheck" : "Interested",
// "memo" : "Rose is following"
// },
// {
// "pairId" : "5",
// "statusCheck" : "Not Interested",
// "memo" : "NaN"
// }
// ]
// }