接口安全检测服务的实现

接口安全检测服务的实现

Administrator 22 2024-11-27

一、背景

  在当前团队中,我们测试不同的业务不同的需求之间往往有大量重复的测试用例,比如所有业务所有服务涉及的身份认证、签名校验都是同一套实现逻辑,开发可以直接复用的逻辑,但从测试角度所有功能实现下来都需要校验。为解决这个痛点问题,在这里实现apiSafetyScan接口自动化扫描策略服务并记录下来,后面工作中遇到类似问题可以来回顾下。

二、接口自动化检测流程

  • 流程说明:开发在服务提测前,需要将服务所有的对外接口和内部接口在apiSafetyScan进行策略扫描,只有所有扫描策略通过后,才允许提测,需要在提测单贴上扫描结果图。

1.drawio.png

三、数据库设计

  1. scan_status表
CREATE TABLE `scan_status` (
    `scan_id` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_unicode_ci',
    `status` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_unicode_ci',
    PRIMARY KEY (`scan_id`) USING BTREE
)
COMMENT='获取扫描状态'
COLLATE='utf8mb4_unicode_ci'
ENGINE=InnoDB
;
  1. scan_result表
CREATE TABLE `scan_results` (
    `scan_id` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_unicode_ci',
    `result` VARCHAR(50) NOT NULL DEFAULT '0' COMMENT '0-未发现安全问题 1-存在安全问题' COLLATE 'utf8mb4_unicode_ci',
    `status` VARCHAR(50) NOT NULL DEFAULT '' COLLATE 'utf8mb4_unicode_ci',
    `scan_strategy` JSON NOT NULL,
    `fail_data` JSON NOT NULL,
    PRIMARY KEY (`scan_id`) USING BTREE
)
COMMENT='存储扫描结果'
COLLATE='utf8mb4_unicode_ci'
ENGINE=InnoDB
;

四、单接口扫描实现

@app.route('/scan', methods = ['POST'])
def api_scan():
   # 入参协议校验
   try:
       validate(instance = request.json, schema = api_scan_schema)
   except jsonschema.exceptions.ValidationError as err:
       print(f'{request.remote_addr} request json error, err msg: {err}!')
       return jsonify({"scan_id": None, "scan_status": False}), 400
   # 生成scan_id
   create_scan_id = str(int(time.time() * 1000)) + '_id'
   # 基准测试
   if 'params' in request.json.keys():
       data_type = 'params'
       re_data = request.json['params']
       res = requests.request(method = request.json['method'], url = request.json['url'],
                              headers = request.json['headers'],
                              params = request.json['params'])
   elif 'json' in request.json.keys():
       data_type = 'json'
       re_data = request.json['json']
       res = requests.request(method = request.json['method'], url = request.json['url'],
                              headers = request.json['headers'],
                              json = request.json['json'])
   else:
       data_type = None
       re_data = None
       res = requests.request(method = request.json['method'], url = request.json['url'],
                              headers = request.json['headers'])
   if res.status_code != 200:
       return jsonify({"scan_id": create_scan_id, "scan_status": False}), 403
   # 执行扫描策略
   def scan(scan_id, method, url, headers, data, data_type):
       all_scan_res = []
       if strategy_config['sqlInjection']:
           # sql注入扫描
           all_scan_res.append(sql_injection_scan(method = method, url = url,
                                                  headers = headers,
                                                  data = data, data_type = data_type))
       # xss扫描
       if strategy_config['xssCheck']:
           all_scan_res.append(xss_check_scan(method = method, url = url,
                                              headers = headers,
                                              data = data, data_type = data_type))
       # 扫描完成后
       redis_conn.set(f'scan_id:{scan_id}', json.dumps(all_scan_res), ex = 3600 * 24)
   t = Thread(target = scan, args = (
       create_scan_id, request.json['method'], request.json['url'], request.json['headers'], re_data, data_type))
   t.start()
   return jsonify({"scan_id": create_scan_id, "scan_status": True}), 200

五、获取扫描结果实现

@app.route('/scan_result', methods = ['GET'])
def scan_result():
   scan_id = request.args['scan_id']
   res = redis_conn.get(f'scan_id:{scan_id}')
   if res is not None:
       res = json.loads(res.decode('utf-8'))
   else:
       return jsonify({"result": False, "status": "no scanning", "fail_data": res}), 200
   if res is []:
       result = True
   else:
       result = False
   return jsonify({"result": result, "status": "scan success", "fail_data": res}), 200

五、sql注入漏洞的扫描

将接口各入参进行sql注入字典集的遍历,如果存在sql注入漏洞,接口返回体可能会包含mysql的报错信息、返回多条数据。

def sql_injection_scan(method, url, headers=None, data=None, data_type=None):
   if data is None:
       return []
   with open(file=DIR + '\\scan_business\\sqlInjectDb.txt', mode='r', encoding='utf-8') as f:
       attack_f = f.readlines()
   with open(file=DIR + '\\scan_business\\api_checkDB.txt', mode='r', encoding='utf-8') as f2:
       check_list = f2.readlines()
   data = deepcopy(data)
   fail_data = []
   # 遍历每一个字段
   for k in data.keys():
       # 遍历每一个攻击语句
       for attack in attack_f:
           data[k] = attack
           # 接口参数篡改好后,发起请求
           if data_type == 'params':
               res = requests.request(method=method, url=url, params=data, headers=headers)
           else:
               res = requests.request(method=method, url=url, json=data, headers=headers)
           # 接口结果分析
           for i in check_list:
               if i in res.text:
                   fail_data.append(
                       {"attack": attack, "scan_key": k, "status_code": res.status_code, "res": res.text})
   return fail_data