"""Module for receiving data and hosting receiving servers."""
from aiohttp import web
import asyncio
import humanize
from multiprocessing import Process
import os
import pkgutil
import platform
import requests
import socket
import sys
from time import sleep, strftime
from tqdm import tqdm
from zipfile import is_zipfile
from .exception import CodeExistsError, CodeNotFoundError, IsNotSenderError
from .utils import get_local_ip_address, get_service_info, qr_code, \
register_service, unzip_file
__all__ = ["receive", "receive_server", "receive_server_proc"]
# Request handlers
async def _upload_page(request):
"""Renders an upload page. GET handler for route '/'."""
upload = pkgutil.get_data(__name__, "static/upload.html").decode()
return web.Response(text=upload, content_type="text/html")
async def _uploaded_file_receiver(request):
"""Receives an uploaded file. POST handler for '/upload'."""
progress_queue = request.app["progress_queue"]
tqdm_position = await progress_queue.get()
decompress = request.app["decompress"]
compress_header = request.headers.get("airshare-compress") or "false"
if compress_header == "true":
decompress = True
total = 0
reader = await request.multipart()
field = await reader.next()
file_name = field.filename.replace("'", "")
file_path = os.getcwd() + os.path.sep + file_name
if os.path.isfile(file_path):
file_name, file_ext = os.path.splitext(file_name)
file_name = file_name + "-" + strftime("%Y%m%d%H%M%S") + file_ext
file_path = os.getcwd() + os.path.sep + file_name
desc = "Downloading `" + file_name + "`"
bar = tqdm(desc=desc, total=None, unit="B", unit_scale=1,
position=tqdm_position, leave=False)
with open(file_path, "wb") as f:
while True:
chunk = await field.read_chunk()
if not chunk:
break
total += len(chunk)
f.write(chunk)
bar.update(len(chunk))
await progress_queue.put(tqdm_position)
if is_zipfile(file_path) and decompress:
zip_dir = unzip_file(file_path)
tqdm.write("Downloaded and decompressed to `" + zip_dir + "`!")
os.remove(file_path)
else:
tqdm.write("Downloaded `" + file_name + "`!")
file_name = field.filename
file_size = humanize.naturalsize(total)
text = "{} ({}) successfully received!".format(file_name, file_size)
return web.Response(text=text)
async def _is_airshare_upload_receiver(request):
"""Returns 'Upload Receiver'. GET handler for '/airshare'."""
return web.Response(text="Upload Receiver")
# Receiver functions
[docs]def receive(*, code, decompress=False):
r"""Receive file(s) from a sending server.
Parameters
----------
code : str
Identifying code for the Airshare sending server.
decompress : boolean, default=False
Flag to enable or disable decompression (Zip).
Returns
-------
text (or) file_path : str
Returns the text or path of the file received, if successful.
"""
info = get_service_info(code)
if info is None:
raise CodeNotFoundError(code)
ip = socket.inet_ntoa(info.addresses[0])
url = "http://" + ip + ":" + str(info.port)
airshare_type = requests.get(url + "/airshare").text
if "Sender" not in airshare_type:
raise IsNotSenderError(code)
print("Receiving from Airshare `" + code + ".local`...")
sleep(2)
if airshare_type == "Text Sender":
text = requests.get(url + "/text").text
print("Received: " + text)
return text
elif airshare_type == "File Sender":
with requests.get(url + "/download", stream=True) as r:
r.raise_for_status()
header = r.headers["content-disposition"]
compress_header = r.headers.get("airshare-compress") or "false"
if compress_header == "true":
decompress = True
file_name = header.split("; ")[1].split("=")[1] \
.replace("'", "")
file_path = os.getcwd() + os.path.sep + file_name
file_size = int(header.split("=")[-1])
if os.path.isfile(file_path):
file_name, file_ext = os.path.splitext(file_name)
file_name += "-" + strftime("%Y%m%d%H%M%S") + file_ext
file_path = os.getcwd() + os.path.sep + file_name
with open(file_path, "wb") as f:
desc = "Downloading `" + file_name + "`"
bar = tqdm(desc=desc, total=file_size, unit="B",
unit_scale=1, leave=False)
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
bar.update(len(chunk))
file_path = os.path.realpath(file_path)
if is_zipfile(file_path) and decompress:
zip_dir = unzip_file(file_path)
tqdm.write("Downloaded and decompressed to `" + zip_dir + "`!")
os.remove(file_path)
file_path = zip_dir
else:
tqdm.write("Downloaded `" + file_path + "`!")
return file_path
[docs]def receive_server(*, code, decompress=False, port=80):
r"""Serves a file receiver and registers it as a Multicast-DNS service.
Parameters
----------
code : str
Identifying code for the Airshare service and server.
decompress : boolean, default=False
Flag to enable or disable decompression (Zip).
port : int, default=80
Port number at which the server is hosted on the device.
"""
info = get_service_info(code)
if info is not None:
raise CodeExistsError(code)
addresses = [get_local_ip_address()]
register_service(code, addresses, port)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
app = web.Application()
app["decompress"] = decompress
app["progress_queue"] = asyncio.Queue()
for pos in range(5):
app["progress_queue"].put_nowait(pos)
app.router.add_get(path="/", handler=_upload_page)
app.router.add_get(path="/airshare", handler=_is_airshare_upload_receiver)
app.router.add_post(path="/upload", handler=_uploaded_file_receiver)
runner = web.AppRunner(app)
loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, "0.0.0.0", str(port))
loop.run_until_complete(site.start())
url_port = ""
if port != 80:
url_port = ":" + str(port)
ip = socket.inet_ntoa(addresses[0]) + url_port
quit_msg = "`, press Ctrl+C to stop receiving..."
if platform.system() == "Windows" and sys.version_info < (3, 8):
quit_msg = "`, press Ctrl+Break to stop receiving..."
print("Waiting for uploaded files at " + ip + " and `http://"
+ code + ".local" + url_port + quit_msg)
qr_code("http://" + ip)
if decompress:
print("Note: Any Zip Archives will be decompressed!")
loop.run_forever()
[docs]def receive_server_proc(*, code, decompress=False, port=80):
r"""Creates a process with 'receive_server' as the target.
Parameters
----------
code : str
Identifying code for the Airshare service and server.
decompress : boolean, default=False
Flag to enable or disable decompression (Zip).
port : int, default=80
Port number at which the server is hosted on the device.
Returns
-------
process: multiprocessing.Process
A multiprocessing.Process object with 'receive_server' as target.
"""
kwargs = {"code": code, "decompress": decompress, "port": port}
process = Process(target=receive_server, kwargs=kwargs)
return process