Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from scapy.all import *
- from scapy.layers.dns import DNS, DNSQR
- import os
- from pyspark.sql import SparkSession
- from pyspark.conf import SparkConf
- from pyspark.sql.types import StructType, StructField, StringType, IntegerType
- if __name__ == "__main__":
- dir_path = r'/home/timur/Documents/testing 3/'
- res_pcap = []
- # Iterate directory
- for file in os.listdir(dir_path):
- # check only text files
- if file.endswith('.pcap'):
- res_pcap.append(file)
- #conf1 = SparkConf().setAll([('spark.executor.memory', '3g'), ('spark.executor.cores', '2'), ('spark.cores.max', '2'), ('spark.driver.memory','6g')])
- #spark.stop()
- #spark = pyspark.SparkContext(conf=conf)
- spark = SparkSession.builder.getOrCreate()
- spark.sparkContext._conf.getAll()
- conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '500m'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '5'), ('spark.cores.max', '5'), ('spark.executor.instances', '20') , ('spark.driver.cores', '2'), ('spark.driver.memory','4g')])
- spark.sparkContext.stop()
- spark = SparkSession.builder.config(conf=conf).getOrCreate()
- schema = StructType([\
- StructField("source", StringType(), True),\
- StructField("destination", StringType(), True),\
- StructField("ip_type", StringType(), True), \
- StructField("proto", StringType(), True), \
- StructField("sport", IntegerType(), True), \
- StructField("dport", IntegerType(), True), \
- StructField("qname", StringType(), True), \
- StructField("qtype", StringType(), True), \
- StructField("qclass", StringType(), True), \
- StructField("rrname", StringType(), True), \
- StructField("rtype", StringType(), True), \
- StructField("rclass", StringType(), True), \
- StructField("ttl", IntegerType(), True), \
- StructField("rdlen", StringType(), True), \
- StructField("rdata", StringType(), True), \
- StructField("rname", StringType(), True)
- ])
- df = spark.createDataFrame(data = [], schema=schema)
- #res_pcap = ['pcap4.pcap']
- types = {0: 'ANY', 255: 'ALL',1: 'A', 2: 'NS', 3: 'MD', 4: 'MD', 5: 'CNAME',
- 6: 'SOA', 7: 'MB',8: 'MG',9: 'MR',10: 'NULL',11: 'WKS',12: 'PTR',
- 13: 'HINFO',14: 'MINFO',15: 'MX',16: 'TXT',17: 'RP',18: 'AFSDB',
- 28: 'AAAA', 33: 'SRV',38: 'A6',39: 'DNAME'}
- protocols = {17: 'UDP'}
- classes = {1: 'IN'}
- data = []
- for pcap in res_pcap:
- dns_packets = PcapReader(pcap).read_all()
- for packet in dns_packets:
- if packet.haslayer(DNS):
- source, destination, ip_type, proto, sport, dport = None, None, None, None, None, None
- qname, qtype, qclass = None, None, None
- rrname, rtype, rclass, ttl, rdlen, rdata, rname = None, None, None, None, None, None, None
- if (packet[Ether].type == 2048): #IPv4
- source = packet[IP].src
- destination = packet[IP].dst
- ip_type = "IPv4"
- proto = protocols[packet[IP].proto]
- elif (packet[Ether].type == 34525): #IPv6
- source = packet[IPv6].src
- destination = packet[IPv6].dst
- ip_type = "IPv6"
- proto = protocols[packet[IPv6].nh]
- sport = packet[UDP].sport
- dport = packet[UDP].dport
- if (packet[DNS].qdcount != 0):
- qname = "".join(map(chr, packet[DNSQR].qname))
- qtype = types[packet[DNSQR].qtype]
- qclass = classes[packet[DNSQR].qclass]
- if (packet[DNS].ancount == packet[DNS].nscount == packet[DNS].arcount == 0):
- data = [(source, destination, ip_type, proto, sport, dport, qname, qtype, qclass, \
- rrname, rtype, rclass, ttl, rdlen, rdata, rname)]
- df2 = spark.createDataFrame(data = data, schema = schema)
- df = df.union(df2)
- else:
- for an in range(packet[DNS].ancount-1,-1,-1):
- rrname = "".join(map(chr, packet[DNSRR][an].rrname))
- rtype = types[packet[DNSRR][an].type]
- rclass = packet[DNSRR][an].rclass
- ttl = packet[DNSRR][an].ttl
- rdlen = packet[DNSRR][an].rdlen
- try:
- rdata = "".join(map(chr, packet[DNSRR][an].rdata))
- except:
- rdata = "".join(map(str, packet[DNSRR][an].rdata))
- data = [(source, destination, ip_type, proto, sport, dport, qname, qtype, qclass, \
- rrname, rtype, rclass, ttl, rdlen, rdata, rname)]
- df2 = spark.createDataFrame(data = data, schema = schema)
- df = df.union(df2)
- for ns in range(packet[DNS].nscount-1,-1,-1):
- try:
- rrname = "".join(map(chr, packet[DNSRRSOA][ns].rrname))
- rtype = types[packet[DNSRRSOA][ns].type]
- rclass = packet[DNSRRSOA][ns].rclass
- ttl = packet[DNSRRSOA][ns].ttl
- rdlen = packet[DNSRRSOA][ns].rdlen
- try:
- rname = "".join(map(chr, packet[DNSRRSOA][ns].rname))
- except:
- rname = "".join(map(str, packet[DNSRRSOA][ns].rname))
- except:
- rrname, rtype, rclass, ttl, rdlen, rdata, rname = None, None, None, None, None, None, None
- data = [(source, destination, ip_type, proto, sport, dport, qname, qtype, qclass, \
- rrname, rtype, rclass, ttl, rdlen, rdata, rname)]
- df2 = spark.createDataFrame(data = data, schema = schema)
- df = df.union(df2)
- for ar in range(packet[DNS].arcount-1,-1,-1):
- try:
- rrname = "".join(map(chr, packet[DNSRR][ar].rrname))
- rtype = types[packet[DNSRR][ar].type]
- rclass = packet[DNSRR][ar].rclass
- ttl = packet[DNSRR][ar].ttl
- rdlen = packet[DNSRR][ar].rdlen
- try:
- rdata = "".join(map(chr, packet[DNSRR][ar].rdata))
- except:
- rdata = "".join(map(str, packet[DNSRR][ar].rdata))
- except:
- rrname, rtype, rclass, ttl, rdlen, rdata, rname = None, None, None, None, None, None, None
- data = [(source, destination, ip_type, proto, sport, dport, qname, qtype, qclass, \
- rrname, rtype, rclass, ttl, rdlen, rdata, rname)]
- df2 = spark.createDataFrame(data = data, schema = schema)
- df = df.union(df2)
- #df = spark.createDataFrame(data=data, schema=schema)
- df.write.parquet("data_test16.parquet")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement