• 系统架构 - 灰盒扫描工具
    • 背景
    • 系统架构
      • agent端
      • 扫描器端
    • 插件开发
      • 获取代码
      • 扫描插件
      • 去重插件

    系统架构 - 灰盒扫描工具

    背景

    IAST(交互式扫描)技术是一种实时动态交互的漏洞检测技术,通过在服务端部署agent程序,收集、监控Web应用程序运行时函数执行、数据传输,并与扫描器端进行实时交互,高效、准确的识别安全缺陷及漏洞。目前OpenRASP项目已实现相当于IAST agent端的OpenRASP agent,在此基础上引入一个扫描端,即可实现一个完整的IAST扫描工具。

    系统架构

    灰盒扫描工具采用Python3实现,数据库采用MySQL,通讯采用HTTP+JSON的方式。整体架构图如下:iast-main工具分为扫描器端和agent端,Agent端用于收集web应用的运行信息,扫描器端用于处理插件产生的请求信息,并完成整个IAST扫描逻辑

    agent端

    由一个单独的OpenRASP插件构成,用于提取http请求中hook点产生的信息,并通过HTTP协议以JSON形式发送至扫描器端。

    扫描器端

    该部分是一个独立运行的扫描工具,借助从OpenRASP插件部分获取的信息完成扫描任务。

    • 功能设计和模块划分

      扫描模块包括三个模块:预处理模块(Preprocessor)扫描模块(Scanner)监控模块(Monitor)

    预处理模块即图中HTTPServer部分,用于接收agent插件的http请求,处理、存储、分发http请求信息

    扫描模块用于运行扫描插件,执行漏洞扫描逻辑

    监控模块用于定期获取其他模块的运行时信息,调整参数,提供控制台的HTTP服务等

    • 运行流程示例

      一个典型扫描流程,以SQL注入的扫描过程为例:

      1、运行扫描器端,初始化所有模块

      2、测试人员发送了一条HTTP请求到web server,产生请求及其对应的HOOK信息被OpenRASP插件获取,发送至http_server

      3、http_server发现请求不是扫描器发出的,对其进行去重后写入数据库

      4、扫描模块从数据库获取一条HOOK信息,下发到所有扫描插件

      5、sql注入扫描插件分析HOOK信息发现用户输入参数拼接进了sql查询,运行对应扫描逻辑

      6、扫描插件生成扫描请求,把原始请求进入query的输入参数替换为单引号

      7、扫描插件在请求头添加用于识别扫描请求的scan_request_id,发送扫描请求给web server

      10、web server处理请求并返回结果,扫描插件获得http response,同时OpenRASP插件获取到请求hook信息,发送至http_server

      11、http_server发现请求是扫描器发出的,将其写入rasp_result_queue队列

      12、扫描模块读取rasp_result_queue队列,将rasp_result传给对应的扫描插件

      13、扫描插件检查收到的hook信息,发现query逻辑被改变,认为存在SQL注入,将漏洞信息写入数据库

    插件开发

    获取代码

    在安装了python3.7的环境中可以直接运行IAST

    1. git clone https://github.com/baidu-security/openrasp-iast.git
    2. cd openrasp-iast
    3. python3 openrasp_iast/main.py start -f

    运行参数与pip安装相同

    扫描插件

    所有扫描插件均位于plugin/scanner目录下,所有扩展名为.py的文件都会在启动扫描器时被加载,ScanPluginBase.py为插件基础类,提供编写插件所需的一系列接口,具体参考接口文档

    这里以编写一个简单的sql注入fuzz插件为例,介绍插件的编写方法:

    首先在plugin/scanner目录下新建一个new_plugin.py文件,粘贴以下内容:

    1. #!/usr/bin/env python3
    2. # -*- coding: UTF-8 -*-
    3. # 导入scanner插件基类
    4. class ScanPlugin(scan_plugin_base.ScanPluginBase):
    5. plugin_info = {
    6. "name": "name",
    7. "show_name": "plugin",
    8. "description": "des"
    9. }
    10. # 用于生成测试向量
    11. def mutant(self, rasp_result_ins):
    12. pass
    13. # 用于检测测试结果
    14. def check(self, request_data_list):
    15. pass

    在plugin_info中填写插件基本信息:

    1. plugin_info = {
    2. "name": "sql_basic", # 插件文件名去除扩展名.py后的部分
    3. "show_name": "SQL注入检测插件", # 插件在后台显示的名称
    4. "description": "基础sql注入漏洞检测插件" # 插件功能描述
    5. }

    实现mutant函数,生成扫描向量

    1. def mutant(self, rasp_result_ins):
    2. # 首先判断是否需要扫描
    3. if not rasp_result_ins.has_hook_type("sql"):
    4. return
    5. payload_list = [("1'openrasp", "1'openrasp"),
    6. ("1\"openrasp", "1\"openrasp"),
    7. ("`a openrasp", "`a openrasp")]
    8. # 获取所有待测试参数
    9. request_data_ins = self.new_request_data(rasp_result_ins)
    10. test_params = self.mutant_helper.get_params_list(request_data_ins, ["get", "post", "json", "headers", "cookies"])
    11. # 生成测试向量的逻辑,返回必须是一个iterable
    12. for param in test_params:
    13. # 只测试包含sql类型hook的请求
    14. if not request_data_ins.is_param_concat_in_hook("sql", param["value"]):
    15. continue
    16. # 每个测试点(参数)生成一个payload_seq,防止重复报警,含有相同payload_seq的测试请求仅保留产生的第一个报警
    17. payload_seq = self.gen_payload_seq()
    18. for payload in payload_list:
    19. # 基于RaspResult类的实例生成测试请求RequestData类的实例
    20. request_data_ins = self.new_request_data(rasp_result_ins, payload_seq, payload[1])
    21. request_data_ins.set_param(param["type"], param["name"], payload[0])
    22. request_data_list = [request_data_ins]
    23. # 每次迭代返回的应该是一个由RequestData类的实例组成的list, 该list中的每个RequestData实例都会被作为测试请求依次发送
    24. yield request_data_list

    每个被发送并获取结果的请求序列都会回调check函数,在check函数中来实现漏洞判定:

    1. def check(self, request_data_list):
    2. # 当前插件每个请求序列仅包含1个请求,取[0]
    3. request_data_ins = request_data_list[0]
    4. # 获取检测特征和请求结果
    5. feature = request_data_ins.get_payload_info()["feature"]
    6. rasp_result_ins = request_data_ins.get_rasp_result()
    7. # 检测是否触发sql注入,直接使用checker检测
    8. if self.checker.check_concat_in_hook(rasp_result_ins, "sql" , feature):
    9. # 存在漏洞,返回漏洞描述字符串
    10. return "sql语句逻辑可被用户输入控制"
    11. else:
    12. # 不存在漏洞,返回None
    13. return None

    如果自行实现检测函数,需要在检测到漏洞时,使用set_vuln_hook标记有漏洞的hook点信息:

    1. for hook_item in rasp_result_ins.get_hook_info():
    2. if has_vuln(hook_item):
    3. rasp_result_ins.set_vuln_hook(hook_item)

    需要注意的是,扫描模块是基于asyncio实现的异步扫描,每个插件会以一个coroutine的形式运行,因此应避免在插件中使用同步的io操作以免影响扫描性能。在进行异常捕获时,如果要捕获全部异常,应单独处理asyncio.CancelledError异常,以免扫描任务无法被终止:

    1. import asyncio
    2. try:
    3. # do something...
    4. except asyncio.CancelledError as e:
    5. raise e
    6. except Exception as e:
    7. # do something...
    去重插件

    所有扫描插件均位于plugin/deduplicate目录下,IAST启动时会加载core/config.py中指定的去重插件,去重插件是为了避免同一个请求被反复扫描,应当根据扫描目标的url结构、参数等特性来编写,DedupPluginBase.py为插件基础类,提供编写插件所需的一系列接口,具体参考接口文档

    • 编写一个去重插件

      在plugin/deduplicate目录下新建一个new_plugin.py文件,写入以下内容:

    1. #!/usr/bin/env python3
    2. # -*- coding: UTF-8 -*-
    3. # 导入去重插件基类
    4. from plugin.deduplicate import DedupPluginBase
    5. # 插件类必须命名为dedupPlugin,并继承自DedupPluginBase.DedupPluginBase
    6. class dedupPlugin(DedupPluginBase.DedupPluginBase):
    7. # 实现get_hash函数,返回hash字符串
    8. def get_hash(self, rasp_result_ins):
    9. return self.get_hash_default(rasp_result_ins)

    去重插件只需要实现一个get_hash方法即可,该方法传入当前待去重请求对应的raspResult类的实例,返回一个hash字符串,返回hash相同的请求会被视为重复请求,这里直接调用了默认的get_hash_default方法,其具体实现如下:

    1. import hashlib
    2. # 计算urlpath、参数、堆栈等数据的hash,连接后生成结果hash
    3. def get_hash_default(self, rasp_result_ins):
    4. path_str = rasp_result_ins.get_path()
    5. stack_hash = rasp_result_ins.get_all_stack_hash()
    6. param_keys = "".join(rasp_result_ins.get_parameters().keys())
    7. query_keys = "".join(rasp_result_ins.get_query_parameters().keys())
    8. json_struct = rasp_result_ins.get_json_struct()
    9. contact_str = "".join([path_str, stack_hash, param_keys, json_struct, query_keys]).encode("utf-8")
    10. return hashlib.md5(contact_str).hexdigest()

    注意:如果插件运行中抛出异常,会自动调用get_hash_default函数生成默认的hash