forked from extern/httpie-cli
239 lines
6.8 KiB
Python
239 lines
6.8 KiB
Python
"""Persistent, JSON-serialized sessions.
|
|
|
|
"""
|
|
import os
|
|
import sys
|
|
import json
|
|
import glob
|
|
import errno
|
|
import codecs
|
|
import shutil
|
|
import subprocess
|
|
|
|
from requests.compat import urlparse
|
|
from requests import Session as RSession
|
|
from requests.cookies import RequestsCookieJar, create_cookie
|
|
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
|
|
|
|
from.import __version__
|
|
from.config import CONFIG_DIR
|
|
from.output import PygmentsProcessor
|
|
|
|
|
|
SESSIONS_DIR = os.path.join(CONFIG_DIR, 'sessions')
|
|
|
|
|
|
def get_response(name, request_kwargs):
|
|
host = Host(request_kwargs['headers'].get('Host', None)
|
|
or urlparse(request_kwargs['url']).netloc.split('@')[-1])
|
|
|
|
session = Session(host, name)
|
|
session.load()
|
|
|
|
# Update session headers with the request headers.
|
|
session['headers'].update(request_kwargs.get('headers', {}))
|
|
# Use the merged headers for the request
|
|
request_kwargs['headers'] = session['headers']
|
|
|
|
auth = request_kwargs.get('auth', None)
|
|
if auth:
|
|
session.auth = auth
|
|
elif session.auth:
|
|
request_kwargs['auth'] = session.auth
|
|
|
|
rsession = RSession(cookies=session.cookies)
|
|
try:
|
|
response = rsession.request(**request_kwargs)
|
|
except Exception:
|
|
raise
|
|
else:
|
|
session.cookies = rsession.cookies
|
|
session.save()
|
|
return response
|
|
|
|
|
|
class Host(object):
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
def __iter__(self):
|
|
for fn in sorted(glob.glob1(self.path, '*.json')):
|
|
yield os.path.splitext(fn)[0], os.path.join(self.path, fn)
|
|
|
|
def delete(self):
|
|
shutil.rmtree(self.path)
|
|
|
|
@property
|
|
def path(self):
|
|
# Name will include ':' if a port is specified, which is invalid
|
|
# on windows. DNS does not allow '_' in a domain, or for it to end
|
|
# in a number (I think?)
|
|
path = os.path.join(SESSIONS_DIR, self.name.replace(':', '_'))
|
|
try:
|
|
os.makedirs(path, mode=0o700)
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
raise
|
|
return path
|
|
|
|
@classmethod
|
|
def all(cls):
|
|
for name in sorted(glob.glob1(SESSIONS_DIR, '*')):
|
|
if os.path.isdir(os.path.join(SESSIONS_DIR, name)):
|
|
yield Host(name)
|
|
|
|
|
|
class Session(dict):
|
|
def __init__(self, host, name, *args, **kwargs):
|
|
super(Session, self).__init__(*args, **kwargs)
|
|
self.host = host
|
|
self.name = name
|
|
self['headers'] = {}
|
|
self['cookies'] = {}
|
|
|
|
@property
|
|
def path(self):
|
|
return os.path.join(self.host.path, self.name + '.json')
|
|
|
|
def load(self):
|
|
try:
|
|
with open(self.path, 'rt') as f:
|
|
try:
|
|
data = json.load(f)
|
|
except ValueError as e:
|
|
raise ValueError('Invalid session: %s [%s]' %
|
|
(e.message, self.path))
|
|
self.update(data)
|
|
except IOError as e:
|
|
if e.errno != errno.ENOENT:
|
|
raise
|
|
|
|
def save(self):
|
|
self['__version__'] = __version__
|
|
with open(self.path, 'w') as f:
|
|
json.dump(self, f, indent=4, sort_keys=True, ensure_ascii=True)
|
|
f.write('\n')
|
|
|
|
def delete(self):
|
|
try:
|
|
os.unlink(self.path)
|
|
except OSError as e:
|
|
if e.errno != errno.ENOENT:
|
|
raise
|
|
|
|
@property
|
|
def cookies(self):
|
|
jar = RequestsCookieJar()
|
|
for name, cookie_dict in self['cookies'].items():
|
|
jar.set_cookie(create_cookie(
|
|
name, cookie_dict.pop('value'), **cookie_dict))
|
|
jar.clear_expired_cookies()
|
|
return jar
|
|
|
|
@cookies.setter
|
|
def cookies(self, jar):
|
|
excluded = [
|
|
'_rest', 'name', 'port_specified',
|
|
'domain_specified', 'domain_initial_dot',
|
|
'path_specified', 'comment', 'comment_url'
|
|
]
|
|
self['cookies'] = {}
|
|
for host in jar._cookies.values():
|
|
for path in host.values():
|
|
for name, cookie in path.items():
|
|
cookie_dict = {}
|
|
for k, v in cookie.__dict__.items():
|
|
if k not in excluded:
|
|
cookie_dict[k] = v
|
|
self['cookies'][name] = cookie_dict
|
|
|
|
@property
|
|
def auth(self):
|
|
auth = self.get('auth', None)
|
|
if not auth:
|
|
return None
|
|
Auth = {'basic': HTTPBasicAuth,
|
|
'digest': HTTPDigestAuth}[auth['type']]
|
|
return Auth(auth['username'], auth['password'])
|
|
|
|
@auth.setter
|
|
def auth(self, cred):
|
|
self['auth'] = {
|
|
'type': {HTTPBasicAuth: 'basic',
|
|
HTTPDigestAuth: 'digest'}[type(cred)],
|
|
'username': cred.username,
|
|
'password': cred.password,
|
|
}
|
|
|
|
|
|
def list_command(args):
|
|
if args.host:
|
|
for name, path in Host(args.host):
|
|
print(name + ' [' + path + ']')
|
|
else:
|
|
for host in Host.all():
|
|
print(host.name)
|
|
for name, path in host:
|
|
print(' ' + name + ' [' + path + ']')
|
|
|
|
|
|
def show_command(args):
|
|
path = Session(Host(args.host), args.name).path
|
|
if not os.path.exists(path):
|
|
sys.stderr.write('Session "%s" does not exist [%s].\n'
|
|
% (args.name, path))
|
|
sys.exit(1)
|
|
|
|
with codecs.open(path, encoding='utf8') as f:
|
|
print(path + ':\n')
|
|
proc = PygmentsProcessor()
|
|
print(proc.process_body(f.read(), 'application/json', 'json'))
|
|
print('')
|
|
|
|
|
|
def delete_command(args):
|
|
host = Host(args.host)
|
|
if not args.name:
|
|
host.delete()
|
|
else:
|
|
Session(host, args.name).delete()
|
|
|
|
|
|
def edit_command(args):
|
|
editor = os.environ.get('EDITOR', None)
|
|
if not editor:
|
|
sys.stderr.write(
|
|
'You need to configure the environment variable EDITOR.\n')
|
|
sys.exit(1)
|
|
command = editor.split()
|
|
command.append(Session(Host(args.host), args.name).path)
|
|
subprocess.call(command)
|
|
|
|
|
|
def add_commands(subparsers):
|
|
# List
|
|
list_ = subparsers.add_parser('session-list', help='list sessions')
|
|
list_.set_defaults(command=list_command)
|
|
list_.add_argument('host', nargs='?')
|
|
|
|
# Show
|
|
show = subparsers.add_parser('session-show', help='list or show sessions')
|
|
show.set_defaults(command=show_command)
|
|
show.add_argument('host')
|
|
show.add_argument('name')
|
|
|
|
# Edit
|
|
edit = subparsers.add_parser(
|
|
'session-edit', help='edit a session in $EDITOR')
|
|
edit.set_defaults(command=edit_command)
|
|
edit.add_argument('host')
|
|
edit.add_argument('name')
|
|
|
|
# Delete
|
|
delete = subparsers.add_parser('session-delete', help='delete a session')
|
|
delete.set_defaults(command=delete_command)
|
|
delete.add_argument('host')
|
|
delete.add_argument('name', nargs='?',
|
|
help='The name of the session to be deleted.'
|
|
' If not specified, all host sessions are deleted.')
|