-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path01_graphframes_user_guide.py
More file actions
469 lines (326 loc) · 16.8 KB
/
Copy path01_graphframes_user_guide.py
File metadata and controls
469 lines (326 loc) · 16.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# Databricks notebook source
# MAGIC %md ## Introduction to Apache Spark™ GraphFrames
# MAGIC
# MAGIC ####Origins
# MAGIC - Started in 2016 as a collaboration between Databricks, Berkeley, MIT
# MAGIC - Intended as a successor to Apache Spark GraphX
# MAGIC
# MAGIC ####Current status
# MAGIC - Published as a [Spark Package](http://spark-packages.org/package/graphframes/graphframes)
# MAGIC - Supported by open-source contributors and committers
# MAGIC - Packaged with the Databricks Runtime for ML
# MAGIC
# MAGIC This notebook demonstrates examples from the [GraphFrames User Guide](https://graphframes.github.io/graphframes/docs/_site/user-guide.html).
# COMMAND ----------
from functools import reduce
from pyspark.sql.functions import col, udf, lit, when, DataFrame, collect_set, concat, array_distinct, coalesce, sum
from graphframes import *
from pyspark.sql.types import *
from math import comb
import networkx as nx
from warnings import filterwarnings
filterwarnings('ignore', 'DataFrame.sql_ctx is an internal property')
# COMMAND ----------
# A support fucntion for drawing networks
def draw_network(g, directed=False, with_labels=True, node_color='#F39C12'):
if directed:
nxg = nx.DiGraph()
else:
nxg = nx.Graph()
nx.draw(nx.from_pandas_edgelist(g.edges.toPandas(), create_using=nxg, source='src', target='dst'), \
arrows=directed, with_labels=with_labels, node_color=node_color, \
connectionstyle='arc3, rad = 0.1')
# COMMAND ----------
# MAGIC %md ## Creating GraphFrames
# MAGIC
# MAGIC Users can create GraphFrames from vertex and edge DataFrames.
# MAGIC
# MAGIC * Vertex DataFrame: A vertex DataFrame should contain a special column named "id" which specifies unique IDs for each vertex in the graph.
# MAGIC * Edge DataFrame: An edge DataFrame should contain two special columns: "src" (source vertex ID of edge) and "dst" (destination vertex ID of edge).
# MAGIC
# MAGIC Both DataFrames can have arbitrary other columns. Those columns can represent vertex and edge attributes.
# COMMAND ----------
# MAGIC %md Create the vertices first:
# COMMAND ----------
vertices = sqlContext.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)], ["id", "name", "age"])
# COMMAND ----------
# MAGIC %md And then some edges:
# COMMAND ----------
edges = sqlContext.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
], ["src", "dst", "relationship"])
# COMMAND ----------
# MAGIC %md Let's create a graph from these vertices and these edges:
# COMMAND ----------
g = GraphFrame(vertices, edges)
draw_network(g, True)
# COMMAND ----------
# This example graph also comes with the GraphFrames package.
from graphframes.examples import Graphs
same_g = Graphs(sqlContext).friends()
draw_network(same_g, True)
# COMMAND ----------
# MAGIC %md ## Basic graph and DataFrame queries
# MAGIC
# MAGIC GraphFrames provide several simple graph queries, such as node degree.
# MAGIC
# MAGIC Also, since GraphFrames represent graphs as pairs of vertex and edge DataFrames, it is easy to make powerful queries directly on the vertex and edge DataFrames. Those DataFrames are made available as vertices and edges fields in the GraphFrame.
# COMMAND ----------
display(g.vertices)
# COMMAND ----------
display(g.edges)
# COMMAND ----------
# MAGIC %md ## Neighbors
# MAGIC
# MAGIC Return the list of neighbors for each vertex:
# COMMAND ----------
def neighbors(g: GraphFrame) -> DataFrame:
# Here we have to consider connections in both directions
df = g.edges.groupBy('src').agg(collect_set('dst').alias('neighbors_dst')).withColumnRenamed('src','id')
df1 = g.edges.groupBy('dst').agg(collect_set('src').alias('neighbors_src')).withColumnRenamed('dst','id')
return df.join(df1, on='id', how="full")\
.withColumn('neighbors', array_distinct(concat(col('neighbors_dst'), col('neighbors_src'))))\
.select('id','neighbors')
display(neighbors(g))
# COMMAND ----------
# MAGIC %md The incoming degree of the vertices:
# COMMAND ----------
display(g.inDegrees)
# COMMAND ----------
# MAGIC %md The outgoing degree of the vertices:
# COMMAND ----------
display(g.outDegrees)
# COMMAND ----------
# MAGIC %md The degree of the vertices:
# COMMAND ----------
display(g.degrees)
# COMMAND ----------
# MAGIC %md You can run queries directly on the vertices DataFrame. For example, we can find the age of the youngest person in the graph:
# COMMAND ----------
youngest = g.vertices.groupBy().min("age")
display(youngest)
# COMMAND ----------
# MAGIC %md Likewise, you can run queries on the edges DataFrame. For example, let's count the number of 'follow' relationships in the graph:
# COMMAND ----------
numFollows = g.edges.filter("relationship = 'follow'").count()
print("The number of follow edges is", numFollows)
# COMMAND ----------
# MAGIC %md ## Motif finding
# MAGIC
# MAGIC Using motifs you can build more complex relationships involving edges and vertices. The following cell finds the pairs of vertices with edges in both directions between them. The result is a DataFrame, in which the column names are given by the motif keys.
# MAGIC
# MAGIC Check out the [GraphFrame User Guide](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding) for more details on the API.
# COMMAND ----------
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)
# COMMAND ----------
# MAGIC %md Since the result is a DataFrame, more complex queries can be built on top of the motif. Let us find all the reciprocal relationships in which one person is older than 30:
# COMMAND ----------
filtered = motifs.filter("b.age > 30 or a.age > 30")
display(filtered)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Stateful queries
# MAGIC
# MAGIC Most motif queries are stateless and simple to express, as in the examples above. The next example demonstrates a more complex query that carries state along a path in the motif. Such queries can be expressed by combining GraphFrame motif finding with filters on the result where the filters use sequence operations to operate over DataFrame columns.
# MAGIC
# MAGIC For example, suppose you want to identify a chain of 4 vertices with some property defined by a sequence of functions. That is, among chains of 4 vertices `a->b->c->d`, identify the subset of chains matching this complex filter:
# MAGIC
# MAGIC * Initialize state on path.
# MAGIC * Update state based on vertex a.
# MAGIC * Update state based on vertex b.
# MAGIC * Etc. for c and d.
# MAGIC
# MAGIC If final state matches some condition, then the filter accepts the chain.
# MAGIC The below code snippets demonstrate this process, where we identify chains of 4 vertices such that at least 2 of the 3 edges are “friend” relationships. In this example, the state is the current count of “friend” edges; in general, it could be any DataFrame Column.
# COMMAND ----------
# Find chains of 4 vertices.
chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
# Query on sequence, with state (cnt)
# (a) Define method for updating state given the next element of the motif.
def cumFriends(cnt, edge):
relationship = col(edge)["relationship"]
return when(relationship == "friend", cnt + 1).otherwise(cnt)
# (b) Use sequence operation to apply method to sequence of elements in motif.
# In this case, the elements are the 3 edges.
numFriends = reduce(cumFriends, ["ab", "bc", "cd"], lit(0))
chainWith2Friends2 = chain4.withColumn("num_friends", numFriends).where(numFriends >= 2)
display(chainWith2Friends2)
# COMMAND ----------
# MAGIC %md ## Subgraphs
# MAGIC
# MAGIC GraphFrames provides APIs for building subgraphs by filtering on edges and vertices. These filters can be composed together, for example the following subgraph only includes people who are more than 30 years old and have friends who are more than 30 years old.
# COMMAND ----------
g2 = g.filterEdges("relationship = 'friend'").filterVertices("age > 30").dropIsolatedVertices()
# COMMAND ----------
display(g2.vertices)
# COMMAND ----------
display(g2.edges)
# COMMAND ----------
draw_network(g2, True)
# COMMAND ----------
# MAGIC %md ## Standard graph algorithms
# MAGIC
# MAGIC GraphFrames comes with a number of standard graph algorithms built in:
# MAGIC * Breadth-first search (BFS)
# MAGIC * Connected components
# MAGIC * Strongly connected components
# MAGIC * Label Propagation Algorithm (LPA)
# MAGIC * PageRank (regular and personalized)
# MAGIC * Shortest paths
# MAGIC * Triangle count
# COMMAND ----------
# MAGIC %md ###Breadth-first search (BFS)
# MAGIC
# MAGIC Search from "Esther" for users of age < 32.
# COMMAND ----------
paths = g.bfs("name = 'Esther'", "age < 32")
display(paths)
# COMMAND ----------
# MAGIC %md The search may also be limited by edge filters and maximum path lengths.
# COMMAND ----------
filteredPaths = g.bfs(
fromExpr = "name = 'Esther'",
toExpr = "age < 32",
edgeFilter = "relationship != 'friend'",
maxPathLength = 3)
display(filteredPaths)
# COMMAND ----------
# MAGIC %md ## Connected components
# MAGIC
# MAGIC Compute the connected component membership of each vertex and return a DataFrame with each vertex assigned a component ID. The GraphFrames connected components implementation can take advantage of checkpointing to improve performance.
# COMMAND ----------
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
result = g.connectedComponents()
display(result)
# COMMAND ----------
# MAGIC %md ## Strongly connected components
# MAGIC
# MAGIC Compute the strongly connected component (SCC) of each vertex and return a DataFrame with each vertex assigned to the SCC containing that vertex.
# COMMAND ----------
result = g.stronglyConnectedComponents(maxIter=10)
display(result.select("id", "component"))
# COMMAND ----------
# MAGIC %md ## Label Propagation
# MAGIC
# MAGIC Run static Label Propagation Algorithm for detecting communities in networks.
# MAGIC
# MAGIC Each node in the network is initially assigned to its own community. At every superstep, nodes send their community affiliation to all neighbors and update their state to the most frequent community affiliation of incoming messages.
# MAGIC
# MAGIC LPA is a standard community detection algorithm for graphs. It is very inexpensive computationally, although (1) convergence is not guaranteed and (2) one can end up with trivial solutions (all nodes are identified into a single community).
# COMMAND ----------
result = g.labelPropagation(maxIter=5)
display(result)
# COMMAND ----------
# MAGIC %md ## PageRank
# MAGIC
# MAGIC PageRank is a measure of the importance or centrality of a node in a graph, originally developed by Larry Page and Sergey Brin while they were studying at Stanford University. It is used by the Google search engine to rank web pages in its search results.
# MAGIC
# MAGIC The PageRank of a node in a graph is based on the idea that a node's importance is determined by the number and quality of the incoming links it receives from other nodes in the graph. In other words, the more incoming links a node has from other important nodes, the more important it is considered to be.
# MAGIC
# MAGIC The PageRank algorithm assigns a score to each node in the graph based on this idea. The score of a node is calculated iteratively, by considering the scores of all the nodes that link to it, and the scores of all the nodes that those nodes link to, and so on. The algorithm uses a damping factor to prevent the score of a node from becoming too large, and it terminates after a fixed number of iterations or when the scores converge.
# MAGIC
# MAGIC The PageRank score of a node can be used to rank the nodes in the graph by importance or centrality. Nodes with higher PageRank scores are considered to be more important or central to the graph. The PageRank algorithm is widely used in network analysis and information retrieval, and has been extended to many other applications beyond the web.
# MAGIC
# MAGIC <img src="https://github.com/nuwan-db/Graph_Analytics_Telco_Churn_Prediction/blob/dev/_resources/images/pagerank.png?raw=true" width="600" />
# COMMAND ----------
results = g.pageRank(resetProbability=0.15, tol=0.01)
display(results.vertices)
# COMMAND ----------
display(results.edges)
# COMMAND ----------
# Run PageRank for a fixed number of iterations.
g.pageRank(resetProbability=0.15, maxIter=10)
# COMMAND ----------
# Run PageRank personalized for vertex "a"
g.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")
# COMMAND ----------
# MAGIC %md ## Shortest paths
# MAGIC
# MAGIC Computes shortest paths to the given set of landmark vertices, where landmarks are specified by vertex ID.
# COMMAND ----------
results = g.shortestPaths(landmarks=["a", "d"])
display(results)
# COMMAND ----------
# MAGIC %md ###Triangle count
# MAGIC
# MAGIC Computes the number of triangles passing through each vertex.
# COMMAND ----------
trian_count = g.triangleCount()
display(trian_count)
# COMMAND ----------
# DBTITLE 0,Defining custom graph matrices
# MAGIC %md ###Defining custom graph matrices
# MAGIC
# MAGIC User defined function (UDFs) can be used to define custom graph matrices.
# COMMAND ----------
# MAGIC %md #### Clustering coefficient
# MAGIC
# MAGIC The clustering coefficient of a node in a graph is a measure of the degree to which the neighbors of the node are connected to each other. It is defined as the ratio of the number of edges between the neighbors of the node to the maximum number of edges that could exist between them.
# MAGIC
# MAGIC Clustering coefficient of a given node is defined as:
# MAGIC $$ cc(i) = {\text{Number of complete triangles with coner } i \over \text{Number of all triangular graphs with coner } i} $$
# MAGIC
# MAGIC Example:
# MAGIC
# MAGIC <img src="https://github.com/nuwan-db/Graph_Analytics_Telco_Churn_Prediction/blob/dev/_resources/images/clustering_coefficient.png?raw=true" width="800" />
# COMMAND ----------
degree = g.degrees
custer_df = trian_count.join(degree, on='id', how='inner')
draw_network(g, False)
# COMMAND ----------
# Clustering coefficient
def clusterCoefficient(t, e):
if e==0 or t==0:
return 0.0
else:
return t/comb(e, 2)
clusterCoefficientUDF = udf(clusterCoefficient, FloatType())
custer_df = custer_df.withColumn("cc", clusterCoefficientUDF(col("count"), col("degree")))
display(custer_df)
# COMMAND ----------
# MAGIC %md
# MAGIC ##Pregel-like bulk-synchronous message-passing API based on DataFrame operations
# MAGIC
# MAGIC Below is an implementation of the PageRank algorithm using GraphFrames and Pregel. See [Malewicz et al., Pregel: a system for large-scale graph processing](https://dl.acm.org/doi/10.1145/1807167.1807184) for a detailed description of the Pregel algorithm.
# MAGIC
# MAGIC The alpha value is set to 0.15, which is a damping factor used in the PageRank algorithm. The pregel function is then called on the graph object to perform the PageRank algorithm. This function takes several arguments to configure the algorithm, including the maximum number of iterations, the initial vertex values, the message sending function, the message aggregation function, and whether to cache the graph during computation.
# MAGIC
# MAGIC In this implementation, the algorithm is run for a maximum of 5 iterations. You can control the number of iterations by setMaxIter() and [API docs](https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.lib.html) docs for advanced controls. The initial vertex values are set to 1.0 divided by the total number of vertices in the graph. The message sending function sends each vertex's current rank divided by its outDegree to its neighboring vertices. The message aggregation function sums up all the incoming messages for each vertex. Finally, the run() function executes the algorithm and returns a DataFrame of vertex IDs and their PageRank values.
# COMMAND ----------
from graphframes.lib import Pregel
numVertices = vertices.count()
vertices_od = g.outDegrees
graph = GraphFrame(vertices_od, edges)
alpha = 0.15
ranks = graph.pregel \
.setMaxIter(5) \
.withVertexColumn("rank", lit(1.0 / numVertices), \
coalesce(Pregel.msg(), lit(0.0)) * lit(1.0 - alpha) + lit(alpha / numVertices)) \
.sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) \
.aggMsgs(sum(Pregel.msg())) \
.run()
display(ranks)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Exploratory Data Analysis
# MAGIC Our first job is to analyze and understand the data.
# MAGIC
# MAGIC
# MAGIC Next: [Exploratory Data Analysis]($./02_exploratory_data_analysis)
# COMMAND ----------