分布式计算

Posted by QingJun's Blog on July 4, 2021

本文笔记针对的是西安电子科技大学计算机科学与技术专业计算机软件与理论方向或数字媒体方向或大数据技术方向在大三下学期方向限选的课程《分布式计算》

配图参考的是李龙海老师给的 PPT 或 PDF

第一讲 概述

分布式系统的定义

分布式计算:多个通过网络互联的计算节点通过相互协作共同完成计算任务

分布式系统

一个分布式系统由多个通过网络互联的独立自治的计算节点组成,这些计算节点基于消息传递机制进行相互协作,以完成共同的目标。

理解分布式系统定义的要点:

  1. 计算节点:分布式计算的基本计算单元,可以是单个计算机,也可以是单个进程、线程或虚拟机。
  2. 网络互联:节点间通过网络进行消息传递,以实现同步协作。
  3. 独立自治:每个计算节点都是独立的,不同节点的动作是同时进行的。
  4. 相互协作以完成共同目标
  5. 消息传递

分布式系统的优点:

  • 提高计算能力
  • 提高存储能力
  • 提高网络吞吐能力(并发访问能力)
  • 提高可靠性(解决局部失效问题)
  • 提高安全性(解决被局部攻击问题)
  • 提高可扩展性(解决瓶颈问题)
  • 实现资源共享
  • 实现跨越时空的协同服务(发挥不同节点的优势)

两种模型

消息传递模型:节点间没有公共状态,需要通过相互通信进行协作;影响效率的主要因素是通信复杂度

共享内存模型:不同节点间通过共享存储器共享公共状态,通过共享公共状态进行通信

传统并行计算采用共享内存模型,分布式计算采用消息传递模型。

衡量分布式系统优劣的特性

  • 可扩展性/可伸缩性
    • 垂直可扩展性
    • 水平可扩展性
  • 容错性
    • 可用性
    • 可恢复性
  • 透明性
  • 开放性
  • 安全性
  • 可维护性

分布式系统面临的挑战

局部视图

局部视图:每个节点只能看到整个系统的某个局部视图,无法以“上帝视角”进行动作

通过节点染色问题来理解:

image-20210324160041821

如果这些节点都在一台计算机上,问题会简单很多。但每个节点是一个独立的计算节点,则任何一个节点都无法掌控全局,或者说掌控全局的通信复杂度会很高(以 n 个节点为例,两两通信,需要 n(n-1) 次通信,代价非常大)

P3C:3-colouring a path,用来解决染色问题的算法,算法过程如下:

image-20210324160640347

分布式一致性

通过数据库备份问题来理解:

大型数据库系统一般采用一主多备的模型进行数据备份,即一个主节点,多个备份节点,且只有主节点数据库接收写入请求,由主节点向备份节点发送数据更新命令。

这种模式的优点:

  • 解决数据库备份的问题,提升可靠性。即使一个节点出现问题,仍有其他节点可以正常工作。主节点出现问题时,也可以随时切换备份节点为主节点
  • 提高效率。主备节点状态一致时,客户端可以从任意节点读取数据

当然,这种模式也有问题,主要问题就是如何保持主备节点状态一致,即分布式一致性的问题。

解决分布式一致性问题的协议:Paxos、Raft

分布式计算任务

按任务限定完成时间长短(时效性)分类:

  • 实时处理任务(OLTP),要求高并发

    在线购物、在线交易等

  • 准实时处理任务,流处理模型

    Web 搜索时的联想、广告推送、商品推荐等

  • 批处理任务(OLAP),只有单一用户

分布式系统构架

  • 客户端-服务器模式(C-S)

    • 客户端发出服务请求,服务器端根据客户端请求参数完成实际运算,并将运算结果返回给客户端。
    • 客户端运算任务轻,服务器端运算任务重
    • 服务器一般要应对并发问题
    • 客户端一般负责和用户进行交互
    • 瘦客户端/胖客户端

    变种——Client-Cluster模式,服务器端由多个服务器构成,共同分担计算任务。

    三层结构的Client-Cluster模式:

    • 表示层,展现给用户的界面
    • 应用层,将负责业务分解成功能单一的服务,服务由独立的组件实现,应用层由组件组成
    • 数据层,直接操作数据库

    优点:解耦,开发人员只关注某一层,降低层间依赖,利于复用,扩展性强

  • 主-从模式(Master-Slave)

    • 主节点负责根据负载情况分发任务和监视从节点执行情况进行调度
    • 从节点负责执行由主节点分发的任务
  • 总线模式

    • 不同节点之间通过虚拟总线相连,不必知道接收者和发送者
    • 异步方式通信
  • 对等模式(P2P)

    • 每个计算节点在任务分工上是完全对等的
  • 混合模式

负载均衡

  • 随机,随机选择
  • 轮询,按顺序分发
  • 固定权重值,请求更多地分发到高配置服务器
  • IP 哈希
  • 最少 TCP 连接数,分发到连接数最少的服务器
  • 最小响应时间,分发到相应最快的服务器
  • 基于各服务器实际负载的动态负载均衡算法

中间件

中间件:介于应用程序和操作系统之间的组件,将具体业务和底层逻辑解耦。

作用:

  • 为开发者提供高层的变成抽象,屏蔽分布式系统底层的异构性和复杂性
  • 提高互操作性和可移植性
  • 提供分布式系统的基础设施服务

常用中间件:

  • 远程过程调用中间件
  • 分布式对象中间件
  • 分布式组件中间件
  • 消息队列中间件
  • Web 服务中间件
  • P2P 中间件

抽象理论模型

设计分布式算法时使用的抽象分布式系统模型,从以下三个方面进行定义:

  • 交互模式,定义了在算法运行期间哪些节点之间有消息传递动作
    • 同步模型
    • 异步模型
  • 信道故障模式
  • 节点故障模式
    • 失效停止模式(Fail-Stop),节点失效后停止工作,显然也停止对外发送消息
    • 失效停止恢复模式
    • 拜占庭模式,节点失效后行为模式是任意的,例如可以故意发送恶意消息,以破坏分布式系统的正常运行。
    • 发送者认证拜占庭模式
    • 理性拜占庭模式

image-20210627193630561

第二讲 分布式节点之间的通信技术

该讲的主要内容是研究分布式节点间的通信技术,我们首先从最简单的点对点的通信开始,利用 socket 编程实现,随后考虑多对多的通信,也就是研究如何并发,最后,跳脱出底层实现,直接使用已经完善的上层通信技术。

底层通信技术

TCP/IP 网络体系(这个计网学过了,属于计算机专业学生必修课,在此略过)

socket

socket(套接字):传输层和网络层提供给应用层的标准化编程接口

socket类型:

  • 流式套接字,对应 TCP 协议
  • 数据包套接字,对应 UDP 协议
  • 原始套接字

标识 socket :五元组 (<源 IP="" 地址="">,<源端口号>,<目的 IP="" 地址="">,<目的端口号>,<协议>)

TCP 套接字编程模型

image-20210324164202114

监听 socket,监听来自客户端的请求,用于三次握手建立连接

通信 socket,用于客户端与服务器间的通信

UDP 套接字编程模型

image-20210429200420716

并发服务技术

本节介绍三种并发请求处理技术,即多线程、线程池、事件驱动,同时介绍部分使用 java 需要注意的东西。

线程

线程的定义及特点

  • 不同的线程在逻辑上相互独立地并行地执行
  • 每个的线程都拥有自己独立的运行上下文(Runtime Context),主要包括独立的内存栈(主要存放局部变量和函数参数)和独立的CPU寄存器状态。每个线程可以不受干扰的访问自己的局部变量
  • 属于同一个进程的多个线程共享该进程的内存地址空间
  • 每一个线程都有自己的“主函数”。线程启动后,其主函数开始执行;线程主函数执行完毕,线程的生命周期就结束了

Java 的线程对象

  • Java 线程对象是对操作系统线程对象的一个封装
  • 一个Java线程对象一般都是 Thread 类(JDK实现的基类)的子类。子类需要重载 run 函数,作为自己的线程主函数

基于多线程的并发服务技术

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 多线程
// 1. 重载 Thread 类
public class UDPThread extends Thread {

    DatagramSocket asocket = null;
    DatagramPacket request = null;

    public UDPThread(DatagramSocket asocket, DatagramPacket request) {
        this.asocket = asocket;
        this.request = request;
    }
	// 重载 run 函数
    public void run() {
        try {
            // 创建数据报
            DatagramPacket reply = new DatagramPacket(request.getData(), request.getLength(), request.getAddress(),
                    request.getPort());
            // 发送数据报
            asocket.send(reply);
        } catch (SocketException e) {
            System.out.println("Socket: " + e.getMessage());
        } catch (IOException e) {
            System.out.println("IO: " + e.getMessage());
        }
    }
}
// 2. 遇到新任务时创建新线程进行处理
public class MultiThreadUDPServer {
    public static void main(String args[]) throws Exception {
        DatagramSocket aSocket = new DatagramSocket(6789);
        int count = 0;
        while (true) {
            byte[] buffer = new byte[1000];
            DatagramPacket request = new DatagramPacket(buffer, buffer.length);
            // 接收数据
            aSocket.receive(request);
            System.out.println("Message from Client: " + new String(request.getData()));
            count++;
            // 创建新线程
            UDPThread serverThread = new UDPThread(aSocket, request);
            // 启动线程
            serverThread.start();
        }
    }
}

基于线程池的并发服务技术

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// 线程池
public ThreadPoolExecutor(int corePoolSize, int maximunPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
/*
corePoolSize,核心线程数。
没有任务时,核心线程也一直存活;所需线程数小于核心线程数时,线程池也会创建新线程;设置allowCoreThreadTimeout=true时,核心线程会超时关闭

maximunPoolSize,最大线程数
当线程数=maxPoolSize,且任务队列已满,此时添加任务时会触发RejectedExecutionHandler进行处理

keepAliveTime,线程空闲时间
如果线程数>corePoolSize,且有线程空闲时间达到keepAliveTime时,线程会销毁,直到线程数量=corePoolSize;如果设置allowCoreThreadTimeout=true时,核心线程执行完任务也会销毁直到数量=0

workQueue,任务队列
*/

public class ThreadPoolServer {
    public static void main(String[] args) throws Exception {
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
        ServerSocket listenSocket = new ServerSocket(8189); // 创建监听端口
        Socket socket = null; // 记录连接端口
        int count = 0;
        System.out.println("Server listening at 8189");
        while (true) {
            socket = listenSocket.accept(); // 从队列中获取端口
            count++; // 计数
            MyTask myTask = new MyTask(socket,count); // 建立新线程
            executor.execute(myTask); // 执行
        }
    }
}

class MyTask implements Runnable {
    private Socket socket = null;
    private int num = 0;

    public MyTask(Socket socket, int num) {
        this.socket = socket;
        this.num = num;
    }
	// 重载 run 函数
    @Override
    public void run() {
        InputStream is = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        OutputStream os = null;
        PrintWriter pw = null;
        try {
            is = socket.getInputStream();
            isr = new InputStreamReader(is);
            br = new BufferedReader(isr);
            os = socket.getOutputStream();
            pw = new PrintWriter(os);
            String info = null;
            while ((info = br.readLine()) != null) {
                System.out.println("Thread" + num + ",Message from client:" + info);
                pw.println(info);
                pw.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (pw != null)
                    pw.close();
                if (os != null)
                    os.close();
                if (br != null)
                    br.close();
                if (isr != null)
                    isr.close();
                if (is != null)
                    is.close();
                if (socket != null)
                    socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

maven

老师希望教授我们一些理论知识以外的实际应用使用的工具,所以要求我们使用 maven ,虽然并不属于并发服务技术的内容,但 maven 确实很好用,是我以前使用 java 未曾接触过的,所以我还是介绍一下。

maven 的作用:

  • Maven 可以整合多个项目之间的引用关系,我们可以根据业务和分层需要任意拆分一个项目
  • Maven 提供规范的管理各个常用 jar 包及其各个版本,并且可以自动下载和引入项目中
  • Maven 可以根据指定版本自动解决 jar 包版本兼容问题
  • Maven 可以把 jar 包所依赖的其它 jar 包自动下载并引入项目

maven 常用命令:

  • mvn clean,清理,删除以前的编译结果,为重新编译做好准备
  • mvn compile,编译,将Java源程序编译为字节码文件
  • mvn test-compile,测试,针对项目中的关键点进行测试,确保项目在迭代开发过程中关键点的正确性
  • mvn package,打包,将一个包含诸多文件的工程封装为一个压缩文件用于安装或部署。Java 工程对应 jar 包,Web 工程对应war包
  • mvn install,安装,在Maven环境下特指将打包的结果——jar包或war包安装到本地仓库中
  • mvn deploy,部署,将打包的结果部署到远程仓库或将war包部署到服务器上运行

事件驱动技术(多路复用技术)

image-20210627155103577

多线程 线程池 事件驱动技术
线程共享内存和变量,线程切换,线程频繁创建销毁 解决线程创建销毁的问题,适用高并发、任务执行短的业务 解决线程切换的问题,适合大量短事务

上层通信技术

RPC / RMI

远程过程调用(RPC):使应用程序可以像调用本地节点上的过程(子程序) 那样去调用一个远程节点上的子程序。

除了 RPC ,还有远程方法调用(RMI),但两者不作严格区分。

为什么要使用 RPC ?

  • 实现跨进程、跨语言、跨网络、跨平台的过程调用,方便协作分工

  • 强化面向接口编程的编程风格

如何利用 Socket API 实现远程过程调用?

  • 客户端通过 Socket 发送请求,提供过程调用所需数据(如函数参数)
  • 服务器通过 Socket 返回结果

显然我们自己利用 Socket 实现 RPC 并不方便,所以可以采用 RPC/RMI 中间件。

RPC / RMI 中间件的作用:

  • 定义并利用Socket服务接口实现了一套调用者和被调用者之间的通信协议。(远程过程调用协议),例如 Java RMI 的 Java Remote Method Protocol (JRMP)
  • 实现了过程参数的序列化、反序列化;过程运算结果的序列化、反序列化
  • 通信过程中的错误处理
  • 过程服务进程(或远程对象)的集中注册与发现(目录服务)
  • 远程对象的统一标识和生命周期管理
  • 在服务端支持并发访问。(多采用多线程技术)

参考博客:RPC原理解析 - 牧梦者 - 博客园 (cnblogs.com)

RPC 架构

  • 客户端(Client),服务的调用方。
  • 客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
  • 服务端(Server),真正的服务提供者。
  • 服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。

RPC / RMI 中间件的实现原理:

image-20210406160751566

  • 客户端(client)以本地调用方式(即以接口的方式)调用服务;
  • 客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体(将消息体对象序列化为二进制);
  • 客户端通过sockets将消息发送到服务端;
  • 服务端存根( server stub)收到消息后进行解码(将消息对象反序列化);
  • 服务端存根( server stub)根据解码结果调用本地的服务;
  • 本地服务执行并将结果返回给服务端存根( server stub);
  • 服务端存根( server stub)将返回结果打包成消息(将结果消息对象序列化);
  • 服务端(server)通过sockets将消息发送到客户端;
  • 客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化);
  • 客户端(client)得到最终结果。

如果 RPC / RMI 中间件支持跨编程语言调用,需要定义**一套独立于各种编程语言的接口定义语言 IDL **,以实现跨编程语言。

gRPC

这里介绍 gRPC 作为 RPC 的实际例子,通过动手实践更深入理解 RPC 。


什么是 gRPC

gRPC 基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。


protocol buffers

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制。我们使用 proto files 创建 gRPC 服务,用 protocol buffers 消息类型来定义方法参数和返回类型。


使用 gRPC

  • 定义服务

    一个 RPC 服务通过参数和返回类型来指定可以远程调用的方法

    我们使用 protocol buffers 接口定义语言来定义服务方法,用 protocol buffer 来定义参数和返回类型。而客户端和服务端使用服务定义生成的接口代码。

  • 生成 gRPC 代码

    定义好服务后,我们使用 protocol buffer 编译器 protoc 生成客户端和服务端的代码,生成的代码包括客户端的存根和服务端要实现的抽象接口。

    可以生成任意 gRPC 支持的语言的代码,注意 PHP 和 Objective-C 仅支持创建客户端代码。


Protocolbuf 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
syntax = "proto3" // 指定使用 proto3 语法,否则默认 proto2

service <服务名> {
	rpc <方法名> (<消息名>) returns (<消息名>) {}
	...
}


message <消息名> { // 定义消息,消息可以理解为 class
	// 消息的每个字段都有唯一的数字标识符,标识符用于在消息的二进制格式中识别各个字段
	string val1 = 1;
	string val2 = 2;
	string val3 = 3;
	// protobuf 数组类型,通过 repeated 关键字实现
	...
}

maven 编译 proto 文件

1
2
3
4
5
mvn clean

mvn protobuf:compile  // 编译消息对象

mvn protobuf:compile-custom  // 依赖消息对象,生成接口服务

Web Service

Web Service:以 HTTP 为传输层协议实现 RPC 的一套协议标准

Web Service 包含的标准协议:

  • 消息编码标准(XML)
  • 传输协议标准(HTTP)
  • 远程对象访问协议(SOAP),SOAP是一种简单的、轻量级的基于XML 的机制,用于在网络应用程序之间进行结构化数据交换
  • Web 服务描述语言(WSDL),WSDL文件是一个XML文档,用于说明一组SOAP消息以及如何交换这些消息
  • 服务目录、服务注册、服务发现:UDDI,提供一种发布和查找服务描述的方法

image-20210627161022598

服务器实现 Web Service ,生成 WSDL ,在 UDDI 上注册。

用户在向 UDDI 发出请求,获取 Web Service 提供者的地址和服务接口信息,使用 SOAP 协议与 Web Service 提供者建立连接进行通信。

Web Services 的技术主要建立在 XML 的规范之上,这保证了这一体系结构的平台无关性、语言无关性和人机交互性能。

基于消息中间件的通信技术

面向消息中间件(MOM):提供分布式消息队列服务,使得节点之间可以实现基于消息的形式灵活的异步通信

异步的含义:

  • 发送方可以在任意时刻发出消息,不必等待接收方上线,更不必等待消息发送成功再做下一步工作
  • 接收方不必以阻塞方式等待消息的到来

两种通信模式:

  • 消息队列通信模式,一个消息只可以被一个消费者接收

    一个队列可以有多个生产者,也可以有多个消费者;消息队列中的消息一旦被某个消费者取走,该消息就从队列中删除

  • 主题订阅通信模式,一个消息可以被多个订阅的消费者接收

    支持向一个特定的消息主题发布消息

    多个订阅同一主题的消费者可以同时接收发布到该消息主题的消息

三种接收方式:

  • 阻塞接收,队列为空时阻塞,不为空时接收消息
  • 轮询接收,轮询队列是否为空,不为空时接收消息
  • 回调接收(通知接受),队列不为空时调用回调函数通知消费者接收

基于MOM实现通信的优点:

  • 异步通信,可以减少系统响应时间,提高吞吐量
  • 分布式节点之间的解耦
  • 保证消息的可靠递交,实现最终一致性
  • 实现广播、组播和多对多通信
  • 流量削峰和流控
  • 支持Push模型和Pull模型
ActiveMQ

这里介绍 ActiveMQ 作为实际例子深入理解 MOM

生产者:

  1. 建立 ConnectionFactory 对象,需要填入用户名、密码、连接地址(一般使用默认,如果没有修改的话)
  2. 通过 ConnectionFactory 对象创建一个 Connection ,并且调用 Connection 的 start 方法开启连接,Connection方法默认是关闭的
  3. 通过 Connection 对象创建 Session ,用于接收消息
  4. 通过 Session 对象创建 Destination 对象,指的是一个客户端用来制定生产消息目标和消费消息来源的对象
  5. 通过 Session 对象创建消息的发送和接收对象(生产者和消费者)
  6. 通过 MessageProducer 的 setDeliverMode 方法为其设置持久化或者非持久化特性
  7. 使用 JMS 规范的 TextMessage 形式创建数据(通过Session对象),并用 MessageProducer的 send 方法发送数据。客户端同理。记得关闭

第三讲 分布式存储

概述

分布式存储的目标

  • 提高数据存储容量
  • 提高数据吞吐量
  • 提高可靠性/可用性
  • 降低数据访问延时
  • 提高分布式数据处理系统的运行效率

分布式存储的两种基本手段:复制和分区(切片)

复制

基本思想:在多个不同的节点上保存相同数据的多个副本

优点:

  • 复制提供了冗余,如果一些节点不可用,剩余的节点仍然可以提供数据服务
  • 多个节点上存储副本也可以提高数据吞吐率、改善访问性能

带来的问题:

  • 硬件成本提高
  • 数据变更时,如何保障多数据副本之间的一致性

复制的方法:

主从复制,也称基于领导者的复制。一些副本被指定为领导者,其他副本被指定为追随者。

客户端要向数据存储系统写入数据时,它必须将请求发送给领导者;领导者将新数据写入本地存储,同时也会将数据变更发送给所有的追随者。

根据领导者的数量,分为单主复制,多主复制和无主复制

根据同步/异步划分:

  • 同步复制,领导者写入自己的存储器,并且接收到所有跟随者已经成功更新本地存储的应答后再向用户返回成功写入应答。
  • 异步复制,领导者自己写入成功后立即向向用户返回成功应答,不等待其他跟随者的应答消息。
  • 混合复制,与部分跟随者节点之间采用同步复制,另外的跟随者节点之间采用异步复制。

更新日志

数据副本节点一般分两步处理数据写入(或复制)请求:先将更新操作信息追加到更新日志;再根据操作参数更新本地存储系统(或称为本地状态机)

在传统数据库中,更新日志主要用于回溯;在分布式存储中还可以用于落后的追随者追赶上领导者的最新状态


快照

长时间运行后,更新日志会变得过于冗长,并且占用很多非易失存储器的存储空间。将某个时间点的本地存储系统完整的保存在非易失存储器中,形成系统的一个快照,并删除快照对应时间点之前的更新日志。


多副本分布式存储的一致性问题

出现的问题:

  • 多个客户端“同时”对分布式存储系统进行读写。
  • 分布式节点之间的网络不可靠:丢包、延迟、断裂、乱序
  • 部分存储节点会失效
  • 节点的局部时钟不同步

一致性的分类:

  • 强一致性
  • 顺序一致性
  • 因果一致性
  • 最终一致性

image-20210627193817627


CAP 定理

image-20210627193914117

分区

基本思想:将一个大型数据文件拆分成较小的分区,再将不同的分区指派给不同的节点

优点

  • 提高了吞吐率:访问数据的负载被分散到多个节点上
  • 提高了可靠性:鸡蛋被放到多个篮子里
  • 方便实现数据的并行处理

带来的问题

  • 跨区查询
  • 如何分区
  • 如何实现负载均衡,即分区如何分配给不同节点
  • 分布式事务处理

数据分区

合理数据分区的目标:

  • 将数据和查询负载均匀分布在各个节点上,避免出现偏斜(skew)热点(hot spot)问题;
  • 分区方式要兼顾跨区查询问题

数据分区的基本方法:

  • 根据主键范围进行分区

    • 优点

      按主键进行连续查询很方便

    • 缺点

      在主键范围非均匀分布时必须建立全局索引以记录数据分区和存储节点的对应关系

      一般要专门指定一个节点维护全局索引,该节点是中心节点

  • 根据主键的哈希值进行分区

    • 优点

      可以在一定程度上避免了偏斜和热点问题;

      无须全局索引,因而也无须中心节点

    • 缺点

      基于主键进行连续范围查询效率极低;

      在物理存储节点较少时仍然会出现偏斜和热点问题。(可以采用虚拟节点的方法进行缓解)

      桶的个数改变时会产生大量数据移动


理想哈希函数的特性:

  • 确定性,相同的输入会产生相同的输出
  • 随机性,输入输出的对应完全是随机的
  • 无碰撞性,假定函数输出n比特的整数,任取两个输入,它们的输出值相等的概率为 2^(-n)

一致性哈希算法

参考博客:(14条消息) 负载均衡一致性hash算法_通往花开的路上-CSDN博客

在使用一致哈希算法后,桶数量的改变平均只需要对 K/n 个关键字重新映射,其中 K 是关键字的数量,n 是桶的数量

线性 hash 与环形 hash

  • 平衡性,每个存储节点存储的数据保持平衡(尽可能一样多)
  • 单调性,增加存储节点时,进行的数据迁移只会将旧节点数据迁移到新节点,不涉及旧存储节点间的数据迁移

image-20210627162637698

插入数据:将数据按主键进行 hash ,找到顺时针的下一个存储节点,插入到该节点中

删除数据:

增加节点:新增节点进行 hash ,在环上取点,新增节点与上一节点中间的数据迁移到到新增节点上

删除节点:被删除节点的数据迁移到下一节点

虚拟节点:为满足平衡点,对每个实际节点映射多个虚拟节点

HDFS

分布式文件系统:

将分布式系统中多个节点的存储资源整合在一起,向用户/应用程序呈现统一的存储空间和文件系统目录树。用户无需关心数据存储在哪个节点上。大文件被自动分块并分别存储到不同的节点。

HDFS的定义及特点:

  • 主从结构,一个 NameNode 和多个 DataNode
  • 高容错,采用数据多副本方案
  • 高吞吐量
  • 大文件支持
  • 简单一致性模型

NameNode:负责执行有关文件系统命名空间的操作,例如打开,关闭、重命名文件和目录等。它同时还负责集群元数据的存储,记录着文件中各个数据块的位置信息。

DataNode:负责提供来自文件系统客户端的读写请求,执行块的创建,删除等操作。

HDFS中的每个数据块都有全局唯一的编号,NameNode 维护两张表:文件与数据块的对应表(文件被切分成哪些数据块)和数据块与物理节点对应表(数据块被存储在哪些物理节点)

image-20210627163139257

数据存储的可靠性:

  • 多副本复制
  • DataNode 定期向 NameNode 发送心跳,确保 DataNode 存活
  • DataNode 会存储数据块的校验和,客户端读取时根据校验和验证数据是否有效
  • NameNode 副本,CheckpointNode 和 BackupNode
  • 快照

第四讲 MapReduce模型和分布式计算框架

分布式并行计算

  • 将面向大数据的计算任务划分多个子任务,将输入文件划分成数量相等的多个分片。每个子任务处理一个分片。
  • 不同的子任务在不同的计算节点上运行。
  • 最后将各个子任务的中间结果合并,计算出最终结果。

image-20210627194330474

MapReduce并行计算模型

  • Map阶段
  • 聚集混洗阶段(含二次分区)
  • Reduce阶段

image-20210627163435960

image-20210627194535090

image-20210627205000309

combinermap 运算后的可选操作,它实际上是一个本地化的 reduce 操作,它主要是在 map 计算出中间文件后做一个简单的合并重复 key 值的操作。

但并非所有场景都适合使用 combiner,使用它的原则是 combiner 的输出不会影响到 reduce 计算的最终输入,例如:求总数,最大值,最小值时都可以使用 combiner,但是做平均值计算则不能使用 combiner

第五讲 Spark平台和基于RDD-DAG的计算模型

Spark

一个快速、通用、可扩展的分布式计算平台(引擎)。相对于 MapReduce 的批处理计算,Spark 可以带来上百倍的性能提升,因此成为继 MapReduce 之后,最为广泛使用的分布式计算平台。

image-20210627163642146

RDD

image-20210627194754238

DAG

image-20210627194808224

RDD 算子

分类:

  • 创建 RDD 的算子
  • Transformation 算子,将一个RDD转换成一个新的RDD
  • Action 算子,在RDD上运行计算后将结果返回到驱动程序本地;或者在RDD上运行 计算后将结果保存到外部存储系统上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Transformation

// map(func)对原 RDD 中每个元素运用 func 函数并生成新的 RDD
val list = List(1,2,3)
sc.parallelize(list).map(_ * 10).foreach(println)
// 输出结果 10 20 30 这里为了节省篇幅去掉了换行,后文亦同

// filter(func)对原 RDD 中每个元素使用func 函数进行过滤并生成新的 RDD
val list = List(3, 6, 9, 10, 12, 21)
sc.parallelize(list).filter(_ >= 10).foreach(println)
// 输出 10 12 21

// flatMap(func) map 类似但是每一个输入的 item 被映射成 0 个或多个输出的 items func 返回类型需要为 Seq )。
val list = List(List(1, 2), List(3), List(), List(4, 5))
sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)
// 输出结果  10 20 30 40 50

// union(otherDataset)合并两个 RDD
val list1 = List(1, 2, 3)
val list2 = List(4, 5, 6)
sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
// 输出: 1 2 3 4 5 6

// intersection(otherDataset)求两个 RDD 的交集
val list1 = List(1, 2, 3, 4, 5)
val list2 = List(4, 5, 6)
sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
// 输出:  4 5

// distinct([numTasks])去重
val list = List(1, 2, 2, 4, 4)
sc.parallelize(list).distinct().foreach(println)
// 输出: 4 1 2

// groupByKey([numTasks])按照 key 值进行分区即在一个 (K, V) 对的 dataset 上调用时返回一个 (K, Iterable<V>)
val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)
//输出
(spark,List(3, 5))
(hadoop,List(2, 2))
(storm,List(6))

// reduceByKey(func, [numTasks])按照 key 值进行分组并对分组后的数据执行归约操作
val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).reduceByKey(_ + _).foreach(println)
//输出
(spark,8)
(hadoop,4)
(storm,6)

// Action

BigData-Notes/Spark_Transformation和Action算子.md at master · heibaiying/BigData-Notes (github.com)