Dandanes podatki rastejo in se kopičijo hitreje kot kdaj koli prej. Trenutno je približno 90% vseh podatkov, ustvarjenih v našem svetu, ustvarjenih le v zadnjih dveh letih. Zaradi te neverjetne stopnje rasti so morale platforme za velike podatke sprejeti radikalne rešitve, da bi lahko ohranile tako velike količine podatkov.
Eden glavnih virov podatkov danes so socialna omrežja. Dovolite mi, da predstavim resnični primer: trgovanje, analiziranje in pridobivanje vpogledov iz podatkov socialnih omrežij v realnem času z uporabo ene najpomembnejših rešitev za odmev velikih podatkov - Apache Spark in Python.
V tem članku vas bom naučil, kako zgraditi preprosto aplikacijo, ki s pomočjo Pythona bere spletne tokove iz Twitterja, nato obdeluje tvite s pomočjo Apache Spark Streaming za prepoznavanje hashtagov in nazadnje vrne najbolj priljubljene hashtagove in te podatke predstavi na resničnem -časovna armaturna plošča.
Če želite prejemati tvite iz Twitterja, se morate registrirati TwitterApps s klikom na »Ustvari novo aplikacijo« in nato izpolnite spodnji obrazec kliknite na »Ustvari svojo aplikacijo Twitter«.
Nato pojdite na novo ustvarjeno aplikacijo in odprite zavihek »Ključi in žetoni za dostop«. Nato kliknite na 'Ustvari moj dostopni žeton.'
Novi žetoni za dostop bodo prikazani spodaj.
In zdaj ste pripravljeni na naslednji korak.
V tem koraku vam bom pokazal, kako zgraditi preprostega odjemalca, ki bo s pomočjo Pythona prejemal tvite iz Twitter API-ja in jih posredoval primerku Spark Streaming. Vsakemu strokovnjaku bi moralo biti enostavno slediti Razvijalec Python .
Najprej ustvarimo datoteko twitter_app.py
nato bomo vanj dodali kodo, kot je prikazano spodaj.
Uvozite knjižnice, ki jih bomo uporabili, kot spodaj:
import socket import sys import requests import requests_oauthlib import json
In dodajte spremenljivke, ki bodo uporabljene v OAuth za povezavo s Twitterjem, kot spodaj:
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Zdaj bomo ustvarili novo funkcijo, imenovano get_tweets
ki bo poklical URL Twitter API in vrnil odgovor na tok tvitov.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Nato ustvarite funkcijo, ki sprejme odziv zgoraj in izvleče besedilo tvitov iz predmeta JSON celotnega tvita. Po tem pošlje vsak tweet primerku Spark Streaming (o tem bomo razpravljali kasneje) prek povezave TCP.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + '
') except: e = sys.exc_info()[0] print('Error: %s' % e)
Zdaj bomo naredili glavni del, ki bo vzpostavil povezave vtičnice gostitelja aplikacije, s katerimi se bo povezala iskra. Tu bomo nastavili naslov IP localhost
saj se bodo vsi izvajali na istem računalniku in vratih 9009
. Potem bomo poklicali get_tweets
metodo, ki smo jo naredili zgoraj, za pridobivanje tweetov s Twitterja in posredovanje njegovega odziva skupaj s povezavo vtičnice na send_tweets_to_spark
za pošiljanje tweetov na Spark.
TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)
Izdelajmo našo aplikacijo za pretakanje Spark, ki bo v realnem času obdelala dohodne tvite, iz njih izvlekla hashtagove in izračunala, koliko hashtagov je bilo omenjenih.
Najprej moramo ustvariti primerek konteksta Spark sc
, nato smo ustvarili kontekst pretakanja ssc
od sc
z intervalom serije dve sekundi, ki bo opravil preoblikovanje v vseh tokovih, prejetih vsaki dve sekundi. Upoštevajte, da smo raven dnevnika nastavili na ERROR
da bi onemogočili večino dnevnikov, ki jih piše Spark.
Tu smo določili kontrolno točko, da omogočimo občasno kontrolno točko RDD; to je obvezno za uporabo v naši aplikaciji, saj bomo uporabili transformacije s stanjem (o njih bomo razpravljali kasneje v istem poglavju).
Nato določimo naš glavni DStream dataStream, ki se bo povezal s strežnikom vtičnic, ki smo ga prej ustvarili na vratih 9009
in preberite tvite iz tega pristanišča. Vsak zapis v DStreamu bo tvit.
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName('TwitterStreamApp') # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint('checkpoint_TwitterApp') # read data from port 9009 dataStream = ssc.socketTextStream('localhost',9009)
Zdaj bomo opredelili svojo transformacijsko logiko. Najprej bomo vse tvite razdelili na besede in jih zapisali z besedami RDD. Nato bomo iz vseh besed filtrirali samo hashtagove in jih preslikali v par (hashtag, 1)
in jih postavite v oznake RDD.
Nato moramo izračunati, kolikokrat je bil omenjen hashtag. To lahko storimo z uporabo funkcije reduceByKey
. Ta funkcija izračuna, kolikokrat je bil hashtag omenjen za posamezen paket, to pomeni, da bo ponastavila štetje v posameznem paketu.
V našem primeru moramo izračunati štetje v vseh serijah, zato bomo uporabili drugo funkcijo, imenovano updateStateByKey
, saj vam ta funkcija omogoča vzdrževanje stanja RDD, medtem ko ga posodabljate z novimi podatki. Ta način se imenuje Stateful Transformation
.
Če želite uporabiti updateStateByKey
, morate konfigurirati kontrolno točko in to, kar smo storili v prejšnjem koraku.
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(' ')) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
The updateStateByKey
vzame funkcijo kot parameter, imenovan update
funkcijo. Poteka na vsakem elementu v RDD in naredi želeno logiko.
V našem primeru smo ustvarili funkcijo posodobitve, imenovano aggregate_tags_count
to bo seštelo vse new_values
za vsak hashtag in jih dodajte v total_sum
to je vsota vseh serij in shranite podatke v tags_totals
RDD.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
Nato obdelamo tags_totals
RDD v vsaki seriji, da jo pretvorite v začasno tabelo z uporabo Spark SQL Context in nato izvedete stavek select, da dobite prvih deset hashtagov s štetjem in jih postavite v hashtag_counts_df
podatkovni okvir.
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable('hashtags') # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)
Zadnji korak v naši aplikaciji Spark je pošiljanje hashtag_counts_df
podatkovni okvir v aplikacijo armaturne plošče. Tako bomo pretvorili podatkovni okvir v dva polja, enega za hashtage in drugega za njihovo štetje. Nato jih bomo prek aplikacije REST API poslali na aplikacijo na nadzorni plošči.
def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
Na koncu, tukaj je vzorec izhoda pretočnega iskanja med zagonom in tiskanjem hashtag_counts_df
, opazili boste, da se izhod natisne natanko vsaki dve sekundi v intervalih serije.
Zdaj bomo ustvarili preprosto aplikacijo na armaturni plošči, ki jo bo Spark sproti posodabljal. Zgradili ga bomo z uporabo Python, Flask in Charts.js .
Najprej ustvarimo projekt Python s spodnjo strukturo ter naložimo in dodamo Chart.js datoteko v statični imenik.
Nato v app.py
datoteko, ustvarili bomo funkcijo update_data
, ki jo bo Spark poklical prek URL-ja http://localhost:5001/updateData
za posodobitev globalnih nizov oznak in vrednosti.
kako uporabljati Discord api
Tudi funkcija refresh_graph_data
je ustvarjen tako, da ga pokliče zahteva AJAX za vrnitev novih posodobljenih nizov oznak in vrednosti kot JSON. Funkcija get_chart_page
bo upodobil chart.html
strani, ko je poklican.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)
Zdaj pa ustvariva preprost grafikon v chart.html
datoteko, da prikaže podatke hashtaga in jih posodobi v realnem času. Kot je določeno spodaj, moramo uvoziti Chart.js
in jquery.min.js
Knjižnice JavaScript.
V oznaki telesa moramo ustvariti platno in mu dati ID, da se lahko sklicuje nanj, medtem ko grafikon v naslednjem koraku prikazujemo z uporabo JavaScript.
Top Trending Twitter Hashtags Top Trending Twitter Hashtags
Zdaj pa sestavimo tabelo s spodnjo kodo JavaScript. Najprej dobimo element platna, nato pa ustvarimo nov objekt grafikona in mu posredujemo element platna ter definiramo njegov podatkovni objekt kot spodaj.
Upoštevajte, da so oznake in podatki omejeni na spremenljivke oznak in vrednosti, ki se vrnejo med upodabljanjem strani, ko kličete get_chart_page
v app.py
mapa.
Zadnji preostali del je funkcija, ki je konfigurirana tako, da vsako sekundo naredi zahtevo Ajaxa in pokliče URL /refreshData
, ki bo izvedel refresh_graph_data
v app.py
in vrnite nove posodobljene podatke ter nato posodobite znak, ki prikazuje nove podatke.
var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);
Zaženimo tri aplikacije v spodnjem vrstnem redu: 1. Twitter App Client. 2. Aplikacija Spark. 3. Spletna aplikacija na nadzorni plošči.
Nato lahko dostopate do nadzorne plošče v realnem času z uporabo URL-ja
Zdaj lahko vidite, kako se grafikon posodablja, kot spodaj:
Naučili smo se, kako sproti analizirati podatke v realnem času s pomočjo Spark Streaming in jih integrirati neposredno s preprosto nadzorno ploščo z uporabo spletne storitve RESTful. Iz tega primera lahko razberemo, kako močna je Spark, saj zajema ogromen tok podatkov, jih preoblikuje in pridobi dragocene vpoglede, ki jih je mogoče zlahka uporabiti za hitro odločanje. Obstaja veliko uporabnih primerov uporabe, ki jih je mogoče uporabiti in ki lahko služijo različnim panogam, kot so novice ali trženje.
Primer industrije novic
Najpogosteje omenjenim hashtagom lahko sledimo, da vemo, o katerih temah ljudje najpogosteje govorijo na družbenih omrežjih. Prav tako lahko sledimo določenim hashtagom in njihovim tvitom, da vemo, kaj ljudje govorijo o določenih temah ali dogodkih na svetu.
Primer trženja
Lahko zbiramo tok tvitov in jih z analizo sentimenta kategoriziramo in določimo interese ljudi, da jih ciljno usmerimo s ponudbami, povezanimi z njihovimi interesi.
Obstaja tudi veliko primerov uporabe, ki jih je mogoče uporabiti posebej za analizo velikih podatkov in lahko služijo številnim panogam. Za več primerov uporabe Apache Spark na splošno predlagam, da si ogledate enega izmed naših prejšnje objave .
Priporočam vam, da preberete več o Spark Streaming iz tukaj da bi vedeli več o njegovih zmožnostih in naredili bolj napredno preoblikovanje podatkov za večji vpogled v realnem času.
Opravlja hitro obdelavo podatkov, pretakanje in strojno učenje v zelo velikem obsegu.
Uporablja se lahko pri preoblikovanju podatkov, napovedni analitiki in odkrivanju prevar na velikih podatkovnih platformah.
Twitter vam omogoča pridobivanje njegovih podatkov z uporabo njihovih API-jev; eden od načinov, s katerim dajo na voljo, je sprotno pretakanje tweetov po iskalnih merilih, ki jih določite.