Instituto de Computação - UNICAMP

Experimento com Spark e Python

MC855AB - Projeto em Sistemas de Computação

Segundo Semestre de 2017

Islene Calciolari Garcia


Escolha uma aplicação

Explore os usos possíveis do modelo de programação fornecido pelo Spark. Estude ou implemente uma aplicação e apresente para a turma. Registre o seu plano no Moodle.

Ambiente Databricks

Databricks é uma empresa fundada pelo time que criou o Spark. Eles disponibilizam um ambiente para experimentos iniciais (Community Edition) com uso gratuito, limitado a um mini cluster de 6GB.

Instalação simples do Spark

Na primeira parte do experimento, você deve instalar o Spark:
  $ wget http://ftp.unicamp.br/pub/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
  • Descompacte com
      $ tar xzf spark-2.0.0-bin-hadoop2.7.tgz
    
  • Utilize o pyspark em:
      $ bin/pyspark
    
  • Siga as instruções em Quick Start.

  • Veja mais detalhes em Spark Programming Guide

    Observação importante: Closures

    Um programa que gera o resultado esperado quando executa em uma única máquina, pode gerar um resultado diferente quando executar em um cluster. Isto pode ocorrer porque os nós que contêm o cluster estão fazendo acesso a cópias locais de uma variável que o programador considerava ser global. Veja mais detalhes em Local vs cluster modes

    Teste com tcpdump.list

    Para testar o Spark, podemos utilizar arquivos da base de dados DARPA Intrusion Detection Evaluation, disponibilizada pela DARPA para dectecção de intrusões. Os arquivos tipo tcpdump.list têm o seguinte formato:
       Start      Start                     Src   Dest Src         Dest         Attack
       Date       Time     Duration  Serv   Port  Port IP          IP           Score Name
    1  01/27/1998 00:00:01 00:00:23  ftp    1755  21  192.168.1.30 192.168.0.20 0.31 -
    2  01/27/1998 05:04:43 67:59:01  telnet 1042  23  192.168.1.30 192.168.0.20 0.42 -
    3  01/27/1998 06:04:36 00:00:59  smtp   43590 25  192.168.1.30 192.168.0.40 12.0 - 
    4  01/27/1998 08:45:01 00:00:01  finger 1050  79  192.168.0.40 192.168.1.30 2.56 guess
    5  01/27/1998 09:23:45 00:00:01  http   1031  80  192.168.1.30 192.168.0.40 -1.3 -
    7  01/27/1998 15:11:32 00:00:12  sunrpc 2025  111 192.168.1.30 192.168.0.20 3.10 rpc
    8  01/27/1998 21:53:17 00:00:45  exec   2032  512 192.168.1.30 192.168.0.40 2.95 exec
    9  01/27/1998 21:58:21 00:00:01  http   1031  80  192.168.1.30 192.168.0.20 0.45 -
    10 01/27/1998 22:57:53 26:59:00  login  2031  513 192.168.0.40 192.168.1.20 7.00 -
    11 01/27/1998 23:57:28 130:23:08 shell  1022  514 192.168.1.30 192.168.0.20 0.52 guess
    13 01/27/1998 25:38:00 00:00:01  eco/i  -     -   192.168.0.40 192.168.1.30 0.01 -
    
    Podemos transformar o arquivo em um RDD:
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
          /_/
    
    Using Python version 2.7.13 (default, May 10 2017 20:04:28)
    SparkSession available as 'spark'.
    >>> lines = sc.textFile("tcpdump.list");
    
    Verificar as primeiras linhas:
    >>> lines.take(5) 
    [u'1  06/02/1998 00:00:07 00:00:01 http 2127 80 172.016.114.207 152.163.214.011 0 -', u'2
    06/02/1998 00:00:07 00:00:01 http 2139 80 172.016.114.207 152.163.212.172 0 -', u'3  06/02/1998
    00:00:07 00:00:01 http 2128 80 172.016.114.207 152.163.214.011 0 -', u'4  06/02/1998 00:00:07
    00:00:01 http 2129 80 172.016.114.207 152.163.214.011 0 -', u'5  06/02/1998 00:00:07 00:00:01
    http 2130 80 172.016.114.207 152.163.214.011 0 -']
    
    >>> for x in lines.take(5) :
    ...    print x
    ... 
    1  06/02/1998 00:00:07 00:00:01 http 2127 80 172.016.114.207 152.163.214.011 0 -
    2  06/02/1998 00:00:07 00:00:01 http 2139 80 172.016.114.207 152.163.212.172 0 -
    3  06/02/1998 00:00:07 00:00:01 http 2128 80 172.016.114.207 152.163.214.011 0 -
    4  06/02/1998 00:00:07 00:00:01 http 2129 80 172.016.114.207 152.163.214.011 0 -
    5  06/02/1998 00:00:07 00:00:01 http 2130 80 172.016.114.207 152.163.214.011 0 -
    >>> 
    
    Listar o RDD inteiro:
    >>> for x in lines.collect() :
    ...    print x
    ... {\tiny
    1  06/02/1998 00:00:07 00:00:01 http 2127 80 172.016.114.207 152.163.214.011 0 -
    2  06/02/1998 00:00:07 00:00:01 http 2139 80 172.016.114.207 152.163.212.172 0 -
    3  06/02/1998 00:00:07 00:00:01 http 2128 80 172.016.114.207 152.163.214.011 0 -
    4  06/02/1998 00:00:07 00:00:01 http 2129 80 172.016.114.207 152.163.214.011 0 -
    5  06/02/1998 00:00:07 00:00:01 http 2130 80 172.016.114.207 152.163.214.011 0 -
    6  06/02/1998 00:00:07 00:00:01 http 2131 80 172.016.114.207 152.163.214.011 0 -
    7  06/02/1998 00:00:07 00:00:01 http 2132 80 172.016.114.207 152.163.214.011 0 -
    8  06/02/1998 00:00:07 00:00:01 http 2136 80 172.016.114.207 152.163.214.011 0 -
    9  06/02/1998 00:00:07 00:00:01 http 2137 80 172.016.114.207 152.163.212.172 0 -
    10  06/02/1998 00:00:07 00:00:01 http 2138 80 172.016.114.207 152.163.212.172 0 -
    .
    .
    .
    
    Filtrar por serviço telnet:
    >>> telnet = lines.filter(lambda x: "telnet" in x)
    >>> for x in telnet.collect():
    ...    print x
    ...
    75  06/02/1998 00:05:59 00:00:09 telnet 2680 23 172.016.115.087 196.227.033.189 0 -
    410  06/02/1998 01:17:31 00:00:01 telnet 19564 23 197.218.177.069 172.016.113.105 0 -
    412  06/02/1998 01:17:38 00:00:12 telnet 6255 23 172.016.113.084 135.008.060.182 0 -
    415  06/02/1998 01:22:51 00:00:01 telnet 19566 23 197.218.177.069 172.016.114.050 0 -
    471  06/02/1998 02:11:25 00:00:01 telnet 19617 23 197.182.091.233 172.016.116.194 0 -
    475  06/02/1998 02:14:41 00:00:01 telnet 19618 23 194.007.248.153 172.016.117.103 0 -
    581  06/02/1998 02:28:20 00:00:10 telnet 19619 23 192.168.001.010 172.016.114.050 0 -
    .
    .
    .
    
    Contar o número de chamadas telnet:
    >>> telnet.count()
    37
    
    Contar todos os serviços:
    >>> pairs = lines.map(lambda x: (str(x.split()[4]), 1))
    >>> pairs.take(30)
    [('http', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1),
    ('http', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1),
    ('http', 1), ('ntp/u', 1), ('eco/i', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1),
    ('http', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1), ('http', 1),
    ('http', 1), ('http', 1)]
    
    >>> totalByService = pairs.reduceByKey(lambda a,b: a + b)
    >>> for x in totalByService.collect():
    ...    print x
    ... 
    ('http', 32155)
    ('eco/i', 131)
    ('smtp', 896)
    ('auth', 75)
    ('domain/u', 254)
    ('telnet', 37)
    ('ftp', 118)
    ('ntp/u', 81)
    ('finger', 109)
    
    Ordenar os serviços em ordem decrescente de uso:
    >>> inverted = totalByService.map(lambda (k,v) : (v,k))
    >>> for x in inverted.collect():
    ...    print x
    ... 
    (32155, 'http')
    (131, 'eco/i')
    (896, 'smtp')
    (75, 'auth')
    (254, 'domain/u')
    (37, 'telnet')
    (118, 'ftp')
    (81, 'ntp/u')
    (109, 'finger')
    
    >>> sortedPairs = inverted.sortByKey(False)
    >>> for x in sortedPairs.collect():
    ...    print x
    ...
    (32155, 'http')
    (896, 'smtp')
    (254, 'domain/u')
    (131, 'eco/i')
    (118, 'ftp')
    (109, 'finger')
    (81, 'ntp/u')
    (75, 'auth')
    (37, 'telnet')