%load_ext autoreload
%autoreload 2
import sys
sys.path.append("..")
from optimus.livy import Livy
C:\Users\argenisleon\Anaconda3\lib\site-packages\socks.py:58: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working from collections import Callable You are using PySparkling of version 2.4.10, but your PySpark is of version 2.3.1. Please make sure Spark and PySparkling versions are compatible.
HOST = 'http://46.101.172.155:8998'
livy = Livy(HOST)
# Create session
livy.start()
{'id': 52, 'name': None, 'appId': None, 'owner': None, 'proxyUser': None, 'state': 'starting', 'kind': 'pyspark', 'appInfo': {'driverLogUrl': None, 'sparkUiUrl': None}, 'log': ['stdout: ', '\nstderr: ']}
# Get session info
livy.session()
{'id': 52, 'name': None, 'appId': None, 'owner': None, 'proxyUser': None, 'state': 'starting', 'kind': 'pyspark', 'appInfo': {'driverLogUrl': None, 'sparkUiUrl': None}, 'log': ['stdout: ', '\nstderr: ']}
data = {'code': '1 + 1'}
response = livy.execute(data)
print(response)
{'id': 2, 'code': '1 + 1', 'state': 'available', 'output': {'status': 'ok', 'execution_count': 2, 'data': {'text/plain': '2'}}, 'progress': 1.0}
import textwrap, pprint
code = """
from optimus import Optimus
op = Optimus(spark)
df = op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.csv")
"""
response = livy.submit(code)
print(response)
{'id': 33, 'state': 'running', 'output': None, 'progress': 0.0}
import textwrap, pprint
code = """
import json
json.dumps(df.to_json())
"""
response = livy.submit(code)
print(response)
{'id': 34, 'state': 'waiting', 'output': None, 'progress': 0.0}
a = livy.result()
import json
print(a)
json.loads(a)
'[{"id": 1, "product": "Cake", "firstName": "Luis", "price": 10, "lastName": "Alvarez$$%!", "billingId": 123, "dummyCol": "never", "birth": "1980/07/07"}, {"id": 2, "product": "piza", "firstName": "Andr\\u00e9", "price": 8, "lastName": "Amp\\u00e8re", "billingId": 423, "dummyCol": "gonna", "birth": "1950/07/08"}, {"id": 3, "product": "pizza", "firstName": "NiELS", "price": 8, "lastName": "B\\u00f6hr//((%%", "billingId": 551, "dummyCol": "give", "birth": "1990/07/09"}, {"id": 4, "product": "pizza", "firstName": "PAUL", "price": 8, "lastName": "dirac$", "billingId": 521, "dummyCol": "you", "birth": "1954/07/10"}, {"id": 5, "product": "pizza", "firstName": "Albert", "price": 8, "lastName": "Einstein", "billingId": 634, "dummyCol": "up", "birth": "1990/07/11"}, {"id": 6, "product": "arepa", "firstName": "Galileo", "price": 5, "lastName": " GALiLEI", "billingId": 672, "dummyCol": "never", "birth": "1930/08/12"}, {"id": 7, "product": "taco", "firstName": "CaRL", "price": 3, "lastName": "Ga%%%uss", "billingId": 323, "dummyCol": "gonna", "birth": "1970/07/13"}, {"id": 8, "product": "taaaccoo", "firstName": "David", "price": 3, "lastName": "H$$$ilbert", "billingId": 624, "dummyCol": "let", "birth": "1950/07/14"}, {"id": 9, "product": "taco", "firstName": "Johannes", "price": 3, "lastName": "KEPLER", "billingId": 735, "dummyCol": "you", "birth": "1920/04/22"}, {"id": 10, "product": "taco", "firstName": "JaMES", "price": 3, "lastName": "M$$ax%%well", "billingId": 875, "dummyCol": "down", "birth": "1923/03/12"}, {"id": 11, "product": "pasta", "firstName": "Isaac", "price": 9, "lastName": "Newton", "billingId": 992, "dummyCol": "never ", "birth": "1999/02/15"}, {"id": 12, "product": "pasta", "firstName": "Emmy%%", "price": 9, "lastName": "N\\u00f6ether$", "billingId": 234, "dummyCol": "gonna", "birth": "1993/12/08"}, {"id": 13, "product": "hamburguer", "firstName": "Max!!!", "price": 4, "lastName": "Planck!!!", "billingId": 111, "dummyCol": "run ", "birth": "1994/01/04"}, {"id": 14, "product": "pizzza", "firstName": "Fred", "price": 8, "lastName": "Hoy&&&le", "billingId": 553, "dummyCol": "around", "birth": "1997/06/27"}, {"id": 15, "product": "pizza", "firstName": "((( Heinrich )))))", "price": 8, "lastName": "Hertz", "billingId": 116, "dummyCol": "and", "birth": "1956/11/30"}, {"id": 16, "product": "BEER", "firstName": "William", "price": 2, "lastName": "Gilbert###", "billingId": 886, "dummyCol": "desert", "birth": "1958/03/26"}, {"id": 17, "product": "Rice", "firstName": "Marie", "price": 1, "lastName": "CURIE", "billingId": 912, "dummyCol": "you", "birth": "2000/03/22"}, {"id": 18, "product": "110790", "firstName": "Arthur", "price": 5, "lastName": "COM%%%pton", "billingId": 812, "dummyCol": "#", "birth": "1899/01/01"}, {"id": 19, "product": "null", "firstName": "JAMES", "price": 10, "lastName": "Chadwick", "billingId": 467, "dummyCol": "#", "birth": "1921/05/03"}]'
--------------------------------------------------------------------------- JSONDecodeError Traceback (most recent call last) <ipython-input-96-0785c2deabf7> in <module> 1 import json 2 print(a) ----> 3 json.loads(a) ~\Anaconda3\lib\json\__init__.py in loads(s, encoding, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, **kw) 346 parse_int is None and parse_float is None and 347 parse_constant is None and object_pairs_hook is None and not kw): --> 348 return _default_decoder.decode(s) 349 if cls is None: 350 cls = JSONDecoder ~\Anaconda3\lib\json\decoder.py in decode(self, s, _w) 335 336 """ --> 337 obj, end = self.raw_decode(s, idx=_w(s, 0).end()) 338 end = _w(s, end).end() 339 if end != len(s): ~\Anaconda3\lib\json\decoder.py in raw_decode(self, s, idx) 353 obj, end = self.scan_once(s, idx) 354 except StopIteration as err: --> 355 raise JSONDecodeError("Expecting value", s, err.value) from None 356 return obj, end JSONDecodeError: Expecting value: line 1 column 1 (char 0)
{}
{}
livy.delete_session(46)
http://46.101.172.155:8998/sessions/46
livy.sessions()
[{'id': 52, 'name': None, 'appId': None, 'owner': None, 'proxyUser': None, 'state': 'idle', 'kind': 'pyspark', 'appInfo': {'driverLogUrl': None, 'sparkUiUrl': None}, 'log': ['19/06/04 03:52:01 INFO Executor: Fetching spark://10.19.0.5:42304/jars/livy-repl_2.11-0.6.0-incubating.jar with timestamp 1559619584198', '19/06/04 03:52:01 INFO Utils: Fetching spark://10.19.0.5:42304/jars/livy-repl_2.11-0.6.0-incubating.jar to /tmp/spark-e906068a-b29b-4594-aef1-58a2d37bde15/userFiles-d5869831-80e5-4da1-aa9c-37245d12c7f2/fetchFileTemp3930129784218452512.tmp', '19/06/04 03:52:01 INFO Executor: Adding file:/tmp/spark-e906068a-b29b-4594-aef1-58a2d37bde15/userFiles-d5869831-80e5-4da1-aa9c-37245d12c7f2/livy-repl_2.11-0.6.0-incubating.jar to class loader', '19/06/04 03:52:02 INFO PythonRunner: Times: total = 898, boot = 676, init = 55, finish = 167', '19/06/04 03:52:02 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1401 bytes result sent to driver', '19/06/04 03:52:02 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1515 ms on localhost (executor driver) (1/1)', '19/06/04 03:52:02 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool ', '19/06/04 03:52:02 INFO PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 49371', '19/06/04 03:52:02 INFO DAGScheduler: ResultStage 0 (reduce at <stdin>:8) finished in 1.819 s', '19/06/04 03:52:02 INFO DAGScheduler: Job 0 finished: reduce at <stdin>:8, took 1.931369 s']}]