十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
本篇文章为大家展示了如何利用sqlmapapi发起扫描,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
靖江网站制作公司哪家好,找成都创新互联公司!从网页设计、网站建设、微信开发、APP开发、响应式网站等网站项目制作,到程序开发,运营维护。成都创新互联公司从2013年创立到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选成都创新互联公司。
sqlmap可谓是sql注入探测的神器,但是利用sqlmap测试SQL注入的效率很低,每一个url都需要手动测试。sqlmap的开发者新加了sqlmapapi.py,可以直接通过接口调用来操作,简化了sqlmap命令执行方式。
sqlmap api分为服务端和客户端,sqlmap api有两种模式,一种是基于HTTP协议的接口模式,一种是基于命令行的接口模式。
sqlmap源码下载地址:https://github.com/sqlmapproject/sqlmap/
python sqlmapapi.py -h
无论是基于HTTP协议的接口模式还是基于命令行的接口模式,首先都是需要开启api服务端的。通过输入以下命令即可开启api服务端:python sqlmapapi.py -s
命令成功后,在命令行中会返回一些信息。以下命令大概的意思是api服务端在本地8775端口上运行,admin token为c6bbb0c1f86b7d7bc2ed6ce3e3bbdcb5等
但是通过上面的这种方式开启api服务端有一个缺点,当服务端和客户端不是一台主机会连接不上,因此如果要解决这个问题,可以通过输入以下命令来开启api服务端:python sqlmapapi.py -s -H "0.0.0.0" -p 8775
命令成功后,远程客户端就可以通过指定远程主机IP和端口来连接到API服务端。
python sqlmapapi.py -c
如果是客户端和服务端不是同一台计算机的话,输入以下命令:
python sqlmapapi.py -c -H "192.168.1.101" -p 8775
help 显示帮助信息
new ARGS 开启一个新的扫描任务
use TASKID 切换
taskid data 获取当前任务返回的数据
log 获取当前任务的扫描日志
status 获取当前任务的扫描状态
option OPTION 获取当前任务的选项
options 获取当前任务的所有配置信息
stop 停止当前任务
kill 杀死当前任务
list 显示所有任务列表
flush 清空所有任务
exit 退出客户端
3.3.1.new命令
new -u "url"
实例:new -u "http://www.baidu.com"
虽然我们仅仅只指定了-u参数,但是从返回的信息中可以看出,输入new命令后,首先先请求了/task/new,来创建一个新的taskid,后又发起了一个请求去开始任务,因此可以发现该模式实质也是基于HTTP协议的。
3.3.2. status 命令
获取该任务的扫描状态,若返回内容中的status字段为terminated,说明扫描完成,若返回内容中的status字段为run,说明扫描还在进行中。下图是扫描完成的截图:
3.3.3. data 命令
获取扫描完成后注入出来的信息,若返回的内容中data字段不为空就说明存在注入。下图是存在SQL注入返回的内容,可以看到返回的内容有数据库类型、payload、注入的参数等等。
简单介绍下sqlmapapi.py的h基于http接口调用模式的主要函数,进入到lib/utils/api.py的server类,可以发现通过向server提交数据进行与服务的交互。 一共分为3种类型。
Users' methods 用户方法
Admin function 管理函数
sqlmap core interact functions 核心交互函数
可以提交数据的种类如下:
@get("/task/new")
@get("/task/new") def task_new(): """ Create a new task """ taskid = encodeHex(os.urandom(8), binary=False) remote_addr = request.remote_addr DataStore.tasks[taskid] = Task(taskid, remote_addr) logger.debug("Created new task: '%s'" % taskid) return jsonize({"success": True, "taskid": taskid})
@get("/task/delete")
@get("/task//delete") def task_delete(taskid): """ Delete an existing task """ if taskid in DataStore.tasks: DataStore.tasks.pop(taskid) logger.debug("(%s) Deleted task" % taskid) return jsonize({"success": True}) else: response.status = 404 logger.warning("[%s] Non-existing task ID provided to task_delete()" % taskid) return jsonize({"success": False, "message": "Non-existing task ID"})
@get("/option/list")
@post("/option/get")
@post("/option/set")
@post("/option//set") def option_set(taskid): """ Set value of option(s) for a certain task ID """ if taskid not in DataStore.tasks: logger.warning("[%s] Invalid task ID provided to option_set()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) if request.json is None: logger.warning("[%s] Invalid JSON options provided to option_set()" % taskid) return jsonize({"success": False, "message": "Invalid JSON options"}) for option, value in request.json.items(): DataStore.tasks[taskid].set_option(option, value) logger.debug("(%s) Requested to set options" % taskid) return jsonize({"success": True})
@post("/scan/start")
@post("/scan//start") def scan_start(taskid): """ Launch a scan """ if taskid not in DataStore.tasks: logger.warning("[%s] Invalid task ID provided to scan_start()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) if request.json is None: logger.warning("[%s] Invalid JSON options provided to scan_start()" % taskid) return jsonize({"success": False, "message": "Invalid JSON options"}) # Initialize sqlmap engine's options with user's provided options, if any for option, value in request.json.items(): DataStore.tasks[taskid].set_option(option, value) # Launch sqlmap engine in a separate process DataStore.tasks[taskid].engine_start() logger.debug("(%s) Started scan" % taskid) return jsonize({"success": True, "engineid": DataStore.tasks[taskid].engine_get_id()})
@get("/scan/stop")
@get("/scan//stop") def scan_stop(taskid): """ Stop a scan """ if (taskid not in DataStore.tasks or DataStore.tasks[taskid].engine_process() is None or DataStore.tasks[taskid].engine_has_terminated()): logger.warning("[%s] Invalid task ID provided to scan_stop()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) DataStore.tasks[taskid].engine_stop() logger.debug("(%s) Stopped scan" % taskid) return jsonize({"success": True})
@get("/scan/kill")
@get("/scan//kill") def scan_kill(taskid): """ Kill a scan """ if (taskid not in DataStore.tasks or DataStore.tasks[taskid].engine_process() is None or DataStore.tasks[taskid].engine_has_terminated()): logger.warning("[%s] Invalid task ID provided to scan_kill()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) DataStore.tasks[taskid].engine_kill() logger.debug("(%s) Killed scan" % taskid) return jsonize({"success": True})
@get("/scan/status")
@get("/scan//status") def scan_status(taskid): """ Returns status of a scan """ if taskid not in DataStore.tasks: logger.warning("[%s] Invalid task ID provided to scan_status()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) if DataStore.tasks[taskid].engine_process() is None: status = "not running" else: status = "terminated" if DataStore.tasks[taskid].engine_has_terminated() is True else "running" logger.debug("(%s) Retrieved scan status" % taskid) return jsonize({ "success": True, "status": status, "returncode": DataStore.tasks[taskid].engine_get_returncode() })
@get("/scan/data")
@get("/scan//data") def scan_data(taskid): """ Retrieve the data of a scan """ json_data_message = list() json_errors_message = list() if taskid not in DataStore.tasks: logger.warning("[%s] Invalid task ID provided to scan_data()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) # Read all data from the IPC database for the taskid for status, content_type, value in DataStore.current_db.execute("SELECT status, content_type, value FROM data WHERE taskid = ? ORDER BY id ASC", (taskid,)): json_data_message.append({"status": status, "type": content_type, "value": dejsonize(value)}) # Read all error messages from the IPC database for error in DataStore.current_db.execute("SELECT error FROM errors WHERE taskid = ? ORDER BY id ASC", (taskid,)): json_errors_message.append(error) logger.debug("(%s) Retrieved scan data and error messages" % taskid) return jsonize({"success": True, "data": json_data_message, "error": json_errors_message})
@get("/scan/log")
@get("/download/")
@get("/admin/list")
@get("/admin/list") @get("/admin//list") def task_list(token=None): """ Pull task list """ tasks = {} for key in DataStore.tasks: if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr: tasks[key] = dejsonize(scan_status(key))["status"] logger.debug("(%s) Listed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr)) return jsonize({"success": True, "tasks": tasks, "tasks_num": len(tasks)})
@get("/admin//flush")
@get("/admin/flush") @get("/admin//flush") def task_flush(token=None): """ Flush task spool (delete all tasks) """ for key in list(DataStore.tasks): if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr: DataStore.tasks[key].engine_kill() del DataStore.tasks[key] logger.debug("(%s) Flushed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr)) return jsonize({"success": True})
从sqlmapapi.py文件分析,提取调用关系。不难发现这些操作可以完全满足我们的测试需求,因此利用这些就可以批量了。
sqlmapapi.py很方便的提供了http请求入口,但是使用起来只能得到最终是否注入的结果,每个接口进行注入扫描时具体发起了什么样的请求,多少个请求就难以得到,下面分享一下个人梳理sqlmap源码后记录的流程图。从图中可以定位到具体发起payload级别的请求位置,从而想要获取发起了什么样的请求,多少个请求,只需要在此处增加自定义代码即可。
上述内容就是如何利用sqlmapapi发起扫描,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。