Python의 파일/스트림에서 여러 JSON 값을 쉽게 읽을 수 있는 방법은 무엇입니까?
파일을 읽을 때 사용할 수 있는 방법이 없는 것 같습니다.또한 오브젝트 위에 있는 오브젝트를 천천히 반복할 때 사용할 수 있는 방법은 없는 것 같습니다.
어떻게 할 수 있을까요?표준 라이브러리를 사용하는 것이 이상적이지만, 서드파티 라이브러리가 있다면 그 대신 사용할 것입니다.
을 한 해서 쓰고 요.json.loads(f.readline())
사용 예
import my_json as json
import sys
for o in json.iterload(sys.stdin):
print("Working on a", type(o))
{"foo": ["bar", "baz"]} 1 2 [] 4 5 6
샘플 세션
$ python3.2 < in.txt
Working on a dict
Working on a int
Working on a int
Working on a list
Working on a int
Working on a int
Working on a int
JSON은 일반적으로 이러한 증분 사용에는 그다지 적합하지 않습니다. 전체 로트를 구문 분석하지 않고 한 번에 하나씩 쉽게 로드할 수 있도록 여러 개체를 직렬화하는 표준 방법은 없습니다.
사용하고 있는 오브젝트·퍼·라인·솔루션은, 다른 장소에서도 볼 수 있습니다.스크래피는 이것을 'JSON 라인'이라고 부릅니다.
조금 더 Pythonic하게 할 수 있습니다.
for jsonline in f:
yield json.loads(jsonline) # or do the processing in this loop
이것이 가장 좋은 방법이라고 생각합니다.서드파티 라이브러리에 의존하지 않고 무슨 일이 일어나고 있는지 쉽게 이해할 수 있습니다.내 코드에도 써봤어
조금 늦었을지도 모르지만, 나는 정확히 이 문제를 안고 있었다.이러한 문제에 대한 표준 해결책은 일반적으로 잘 알려진 루트 개체에 대해 regex 분할을 수행하는 것이지만, 제 경우에는 불가능했습니다.이를 일반적으로 실행할 수 있는 유일한 방법은 적절한 토큰라이저를 구현하는 것입니다.
범용적이고 성능이 뛰어난 솔루션을 찾지 못한 후 모듈을 직접 작성했습니다.이것은 JSON과 XML을 이해하고 해석하기 위해 연속 스트림을 여러 청크로 분할하는 사전 토큰라이저입니다(단, 실제 해석은 사용자에게 맡겨집니다).어떤 퍼포먼스를 얻기 위해 C모듈로 표기되어 있습니다.
from splitstream import splitfile
for jsonstr in splitfile(sys.stdin, format="json")):
yield json.loads(jsonstr)
물물 、 할할수있 。 to to to to to to you you you 。raw_decode
에서는, 파일 그 합니다(대부분은 「」, 「」, 「」, 「」, 「」, 「」, 「」, 「」, 「」, 「」, 「」, 「」, 「」, 「」, 「」, 「」와 같이 동작합니다.json.load
큰 파일을 가지고 있는 경우는, 필요에 따라서 큰 어려움 없이 파일로부터 읽기 전용으로 변경할 수 있습니다.
import json
from json.decoder import WHITESPACE
def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
if isinstance(string_or_fp, file):
string =
string = str(string_or_fp)
decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()
용도: 요청하신 대로 발전기입니다.
이것은 실제로는 매우 어려운 문제입니다.왜냐하면 스트림을 행으로 해야 하는데 패턴은 여러 줄에 걸쳐 대괄호를 기준으로 일치하지만 패턴은 json과도 일치하기 때문입니다.일종의 json-preparse에 이어 json parse가 뒤따릅니다.Json은 다른 포맷에 비해 해석하기 쉬우므로 항상 해석 라이브러리를 찾을 필요는 없습니다만, 이러한 모순되는 문제를 어떻게 해결해야 할까요?
발전기들, 구조하라!
이러한 문제의 발생기의 장점은, 그 위에 쌓아 올려 나태함을 유지하면서, 문제의 어려움을 서서히 추상화해 나가는 것입니다.생성기(send)에 값을 되돌리는 메커니즘도 고려했지만 다행히 사용할 필요가 없었습니다.
첫 번째 문제를 해결하려면 re.finder의 스트리밍 버전과 같은 스트리밍 파인더가 필요합니다.다음 시도에서는 필요에 따라 행을 끌어와(debug 문을 주석 해제하여 표시) 일치 항목을 반환합니다.그런 다음 일치하지 않는 선과 일치하도록 약간 수정했습니다(표시된 튜플의 첫 부분에는 0 또는 1로 표시됨).
import re
def streamingfinditer(pat,stream):
for s in stream:
# print "Read next line: " + s
while 1:
m =,s)
if not m:
yield (0,s)
yield (1,
s = re.split(pat,s,1)[1]
그러면 괄호까지 매칭하여 각 괄호의 밸런스 여부를 계산한 후 필요에 따라 단순 객체 또는 복합 객체 중 하나를 반환할 수 있습니다.
whitespaceesc=' \t'
def simpleorcompoundobjects(stream):
obj = ""
unbalanced = 0
for (c,m) in streamingfinditer(re.compile(untilbracespat),stream):
if (c == 0): # remainder of line returned, nothing interesting
if (unbalanced == 0):
yield (0,m)
obj += m
if (c == 1): # match returned
if (unbalanced == 0):
yield (0,m[:-1])
obj += m[-1]
obj += m
unbalanced += balancemap[m[-1]]
if (unbalanced == 0):
yield (1,obj)
그러면 다음과 같이 튜플이 반환됩니다.
(0,"String of simple non-braced objects easy to parse")
(1,"{ 'Compound' : 'objects' }")
기본적으로 그건 끔찍한 부분이야.이제 마지막으로 적절하다고 생각되는 수준의 파싱만 수행하면 됩니다.예를 들어, Jeremy Roman의 iterload 함수(Thanks!)를 사용하여 한 줄에 대한 해석을 수행할 수 있습니다.
def streamingiterload(stream):
for c,o in simpleorcompoundobjects(stream):
for x in iterload(o):
yield x
of = open("test.json","w")
of.write("""[ "hello" ] { "goodbye" : 1 } 1 2 {
} 2
9 78
4 5 { "animals" : [ "dog" , "lots of mice" ,
"cat" ] }
// open & stream the json
f = open("test.json","r")
for o in streamingiterload(f.readlines()):
print o
다음과 같은 결과가 표시됩니다(디버깅라인을 켜면 필요에 따라 행이 삽입됩니다).
{u'goodbye': 1}
{u'animals': [u'dog', u'lots of mice', u'cat']}
이것은 모든 상황에서 효과가 있는 것은 아닙니다.「 」의 json
라이브러리, 파서를 직접 구현하지 않고 완전히 올바르게 작동하는 것은 불가능합니다.
여기 훨씬 더 간단한 해결책이 있습니다.비결은 예외에 포함된 정보를 올바르게 해석하기 위해 시도하고 실패하며 사용하는 것입니다.유일한 제한은 파일이 검색 가능해야 한다는 것입니다.
def stream_read_json(fn):
import json
start_pos = 0
with open(fn, 'r') as f:
while True:
obj = json.load(f)
yield obj
except json.JSONDecodeError as e:
json_str =
obj = json.loads(json_str)
start_pos += e.pos
yield obj
편집: 방금 Python > = 3.5에서만 작동합니다.이전에는 실패하면 ValueError가 반환되며 문자열에서 위치를 해석해야 합니다.
def stream_read_json(fn):
import json
import re
start_pos = 0
with open(fn, 'r') as f:
while True:
obj = json.load(f)
yield obj
except ValueError as e:
end_pos = int(re.match('Extra data: line \d+ column \d+ .*\(char (\d+).*\)',
json_str =
obj = json.loads(json_str)
start_pos += end_pos
yield obj
나는 그것을 하는 더 좋은 방법은 국영 기계를 사용하는 것이라고 믿는다.다음은 노드를 변환하여 해결한 샘플 코드입니다.아래 JS 코드는 Python 3에 링크됩니다(사용된 로컬 이외의 키워드는 Python 3에서만 사용 가능, Python 2에서는 코드가 작동하지 않습니다).
Edit-1: Python 2와 호환되는 코드 업데이트 및 생성
Edit-2: Python3 전용 버전도 업데이트 및 추가
Python 3 전용 버전
# A streaming byte oriented JSON parser. Feed it a single byte at a time and
# it will emit complete objects as it comes across them. Whitespace within and
# between objects is ignored. This means it can parse newline delimited JSON.
import math
def json_machine(emit, next_func=None):
def _value(byte_data):
if not byte_data:
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _value # Ignore whitespace
if byte_data == 0x22: # "
return string_machine(on_value)
if byte_data == 0x2d or (0x30 <= byte_data < 0x40): # - or 0-9
return number_machine(byte_data, on_number)
if byte_data == 0x7b: #:
return object_machine(on_value)
if byte_data == 0x5b: # [
return array_machine(on_value)
if byte_data == 0x74: # t
return constant_machine(TRUE, True, on_value)
if byte_data == 0x66: # f
return constant_machine(FALSE, False, on_value)
if byte_data == 0x6e: # n
return constant_machine(NULL, None, on_value)
if next_func == _value:
raise Exception("Unexpected 0x" + str(byte_data))
return next_func(byte_data)
def on_value(value):
return next_func
def on_number(number, byte):
return _value(byte)
next_func = next_func or _value
return _value
TRUE = [0x72, 0x75, 0x65]
FALSE = [0x61, 0x6c, 0x73, 0x65]
NULL = [0x75, 0x6c, 0x6c]
def constant_machine(bytes_data, value, emit):
i = 0
length = len(bytes_data)
def _constant(byte_data):
nonlocal i
if byte_data != bytes_data[i]:
i += 1
raise Exception("Unexpected 0x" + str(byte_data))
i += 1
if i < length:
return _constant
return emit(value)
return _constant
def string_machine(emit):
string = ""
def _string(byte_data):
nonlocal string
if byte_data == 0x22: # "
return emit(string)
if byte_data == 0x5c: # \
return _escaped_string
if byte_data & 0x80: # UTF-8 handling
return utf8_machine(byte_data, on_char_code)
if byte_data < 0x20: # ASCII control character
raise Exception("Unexpected control character: 0x" + str(byte_data))
string += chr(byte_data)
return _string
def _escaped_string(byte_data):
nonlocal string
if byte_data == 0x22 or byte_data == 0x5c or byte_data == 0x2f: # " \ /
string += chr(byte_data)
return _string
if byte_data == 0x62: # b
string += "\b"
return _string
if byte_data == 0x66: # f
string += "\f"
return _string
if byte_data == 0x6e: # n
string += "\n"
return _string
if byte_data == 0x72: # r
string += "\r"
return _string
if byte_data == 0x74: # t
string += "\t"
return _string
if byte_data == 0x75: # u
return hex_machine(on_char_code)
def on_char_code(char_code):
nonlocal string
string += chr(char_code)
return _string
return _string
# Nestable state machine for UTF-8 Decoding.
def utf8_machine(byte_data, emit):
left = 0
num = 0
def _utf8(byte_data):
nonlocal num, left
if (byte_data & 0xc0) != 0x80:
raise Exception("Invalid byte in UTF-8 character: 0x" + byte_data.toString(16))
left = left - 1
num |= (byte_data & 0x3f) << (left * 6)
if left:
return _utf8
return emit(num)
if 0xc0 <= byte_data < 0xe0: # 2-byte UTF-8 Character
left = 1
num = (byte_data & 0x1f) << 6
return _utf8
if 0xe0 <= byte_data < 0xf0: # 3-byte UTF-8 Character
left = 2
num = (byte_data & 0xf) << 12
return _utf8
if 0xf0 <= byte_data < 0xf8: # 4-byte UTF-8 Character
left = 3
num = (byte_data & 0x07) << 18
return _utf8
raise Exception("Invalid byte in UTF-8 string: 0x" + str(byte_data))
# Nestable state machine for hex escaped characters
def hex_machine(emit):
left = 4
num = 0
def _hex(byte_data):
nonlocal num, left
if 0x30 <= byte_data < 0x40:
i = byte_data - 0x30
elif 0x61 <= byte_data <= 0x66:
i = byte_data - 0x57
elif 0x41 <= byte_data <= 0x46:
i = byte_data - 0x37
raise Exception("Expected hex char in string hex escape")
left -= 1
num |= i << (left * 4)
if left:
return _hex
return emit(num)
return _hex
def number_machine(byte_data, emit):
sign = 1
number = 0
decimal = 0
esign = 1
exponent = 0
def _mid(byte_data):
if byte_data == 0x2e: # .
return _decimal
return _later(byte_data)
def _number(byte_data):
nonlocal number
if 0x30 <= byte_data < 0x40:
number = number * 10 + (byte_data - 0x30)
return _number
return _mid(byte_data)
def _start(byte_data):
if byte_data == 0x30:
return _mid
if 0x30 < byte_data < 0x40:
return _number(byte_data)
raise Exception("Invalid number: 0x" + str(byte_data))
if byte_data == 0x2d: # -
sign = -1
return _start
def _decimal(byte_data):
nonlocal decimal
if 0x30 <= byte_data < 0x40:
decimal = (decimal + byte_data - 0x30) / 10
return _decimal
return _later(byte_data)
def _later(byte_data):
if byte_data == 0x45 or byte_data == 0x65: # E e
return _esign
return _done(byte_data)
def _esign(byte_data):
nonlocal esign
if byte_data == 0x2b: # +
return _exponent
if byte_data == 0x2d: # -
esign = -1
return _exponent
return _exponent(byte_data)
def _exponent(byte_data):
nonlocal exponent
if 0x30 <= byte_data < 0x40:
exponent = exponent * 10 + (byte_data - 0x30)
return _exponent
return _done(byte_data)
def _done(byte_data):
value = sign * (number + decimal)
if exponent:
value *= math.pow(10, esign * exponent)
return emit(value, byte_data)
return _start(byte_data)
def array_machine(emit):
array_data = []
def _array(byte_data):
if byte_data == 0x5d: # ]
return emit(array_data)
return json_machine(on_value, _comma)(byte_data)
def on_value(value):
def _comma(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _comma # Ignore whitespace
if byte_data == 0x2c: # ,
return json_machine(on_value, _comma)
if byte_data == 0x5d: # ]
return emit(array_data)
raise Exception("Unexpected byte: 0x" + str(byte_data) + " in array body")
return _array
def object_machine(emit):
object_data = {}
key = None
def _object(byte_data):
if byte_data == 0x7d: #
return emit(object_data)
return _key(byte_data)
def _key(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _object # Ignore whitespace
if byte_data == 0x22:
return string_machine(on_key)
raise Exception("Unexpected byte: 0x" + str(byte_data))
def on_key(result):
nonlocal key
key = result
return _colon
def _colon(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _colon # Ignore whitespace
if byte_data == 0x3a: # :
return json_machine(on_value, _comma)
raise Exception("Unexpected byte: 0x" + str(byte_data))
def on_value(value):
object_data[key] = value
def _comma(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _comma # Ignore whitespace
if byte_data == 0x2c: # ,
return _key
if byte_data == 0x7d: #
return emit(object_data)
raise Exception("Unexpected byte: 0x" + str(byte_data))
return _object
Python 2 호환 버전
# A streaming byte oriented JSON parser. Feed it a single byte at a time and
# it will emit complete objects as it comes across them. Whitespace within and
# between objects is ignored. This means it can parse newline delimited JSON.
import math
def json_machine(emit, next_func=None):
def _value(byte_data):
if not byte_data:
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _value # Ignore whitespace
if byte_data == 0x22: # "
return string_machine(on_value)
if byte_data == 0x2d or (0x30 <= byte_data < 0x40): # - or 0-9
return number_machine(byte_data, on_number)
if byte_data == 0x7b: #:
return object_machine(on_value)
if byte_data == 0x5b: # [
return array_machine(on_value)
if byte_data == 0x74: # t
return constant_machine(TRUE, True, on_value)
if byte_data == 0x66: # f
return constant_machine(FALSE, False, on_value)
if byte_data == 0x6e: # n
return constant_machine(NULL, None, on_value)
if next_func == _value:
raise Exception("Unexpected 0x" + str(byte_data))
return next_func(byte_data)
def on_value(value):
return next_func
def on_number(number, byte):
return _value(byte)
next_func = next_func or _value
return _value
TRUE = [0x72, 0x75, 0x65]
FALSE = [0x61, 0x6c, 0x73, 0x65]
NULL = [0x75, 0x6c, 0x6c]
def constant_machine(bytes_data, value, emit):
local_data = {"i": 0, "length": len(bytes_data)}
def _constant(byte_data):
# nonlocal i, length
if byte_data != bytes_data[local_data["i"]]:
local_data["i"] += 1
raise Exception("Unexpected 0x" + byte_data.toString(16))
local_data["i"] += 1
if local_data["i"] < local_data["length"]:
return _constant
return emit(value)
return _constant
def string_machine(emit):
local_data = {"string": ""}
def _string(byte_data):
# nonlocal string
if byte_data == 0x22: # "
return emit(local_data["string"])
if byte_data == 0x5c: # \
return _escaped_string
if byte_data & 0x80: # UTF-8 handling
return utf8_machine(byte_data, on_char_code)
if byte_data < 0x20: # ASCII control character
raise Exception("Unexpected control character: 0x" + byte_data.toString(16))
local_data["string"] += chr(byte_data)
return _string
def _escaped_string(byte_data):
# nonlocal string
if byte_data == 0x22 or byte_data == 0x5c or byte_data == 0x2f: # " \ /
local_data["string"] += chr(byte_data)
return _string
if byte_data == 0x62: # b
local_data["string"] += "\b"
return _string
if byte_data == 0x66: # f
local_data["string"] += "\f"
return _string
if byte_data == 0x6e: # n
local_data["string"] += "\n"
return _string
if byte_data == 0x72: # r
local_data["string"] += "\r"
return _string
if byte_data == 0x74: # t
local_data["string"] += "\t"
return _string
if byte_data == 0x75: # u
return hex_machine(on_char_code)
def on_char_code(char_code):
# nonlocal string
local_data["string"] += chr(char_code)
return _string
return _string
# Nestable state machine for UTF-8 Decoding.
def utf8_machine(byte_data, emit):
local_data = {"left": 0, "num": 0}
def _utf8(byte_data):
# nonlocal num, left
if (byte_data & 0xc0) != 0x80:
raise Exception("Invalid byte in UTF-8 character: 0x" + byte_data.toString(16))
local_data["left"] -= 1
local_data["num"] |= (byte_data & 0x3f) << (local_data["left"] * 6)
if local_data["left"]:
return _utf8
return emit(local_data["num"])
if 0xc0 <= byte_data < 0xe0: # 2-byte UTF-8 Character
local_data["left"] = 1
local_data["num"] = (byte_data & 0x1f) << 6
return _utf8
if 0xe0 <= byte_data < 0xf0: # 3-byte UTF-8 Character
local_data["left"] = 2
local_data["num"] = (byte_data & 0xf) << 12
return _utf8
if 0xf0 <= byte_data < 0xf8: # 4-byte UTF-8 Character
local_data["left"] = 3
local_data["num"] = (byte_data & 0x07) << 18
return _utf8
raise Exception("Invalid byte in UTF-8 string: 0x" + str(byte_data))
# Nestable state machine for hex escaped characters
def hex_machine(emit):
local_data = {"left": 4, "num": 0}
def _hex(byte_data):
# nonlocal num, left
i = 0 # Parse the hex byte
if 0x30 <= byte_data < 0x40:
i = byte_data - 0x30
elif 0x61 <= byte_data <= 0x66:
i = byte_data - 0x57
elif 0x41 <= byte_data <= 0x46:
i = byte_data - 0x37
raise Exception("Expected hex char in string hex escape")
local_data["left"] -= 1
local_data["num"] |= i << (local_data["left"] * 4)
if local_data["left"]:
return _hex
return emit(local_data["num"])
return _hex
def number_machine(byte_data, emit):
local_data = {"sign": 1, "number": 0, "decimal": 0, "esign": 1, "exponent": 0}
def _mid(byte_data):
if byte_data == 0x2e: # .
return _decimal
return _later(byte_data)
def _number(byte_data):
# nonlocal number
if 0x30 <= byte_data < 0x40:
local_data["number"] = local_data["number"] * 10 + (byte_data - 0x30)
return _number
return _mid(byte_data)
def _start(byte_data):
if byte_data == 0x30:
return _mid
if 0x30 < byte_data < 0x40:
return _number(byte_data)
raise Exception("Invalid number: 0x" + byte_data.toString(16))
if byte_data == 0x2d: # -
local_data["sign"] = -1
return _start
def _decimal(byte_data):
# nonlocal decimal
if 0x30 <= byte_data < 0x40:
local_data["decimal"] = (local_data["decimal"] + byte_data - 0x30) / 10
return _decimal
return _later(byte_data)
def _later(byte_data):
if byte_data == 0x45 or byte_data == 0x65: # E e
return _esign
return _done(byte_data)
def _esign(byte_data):
# nonlocal esign
if byte_data == 0x2b: # +
return _exponent
if byte_data == 0x2d: # -
local_data["esign"] = -1
return _exponent
return _exponent(byte_data)
def _exponent(byte_data):
# nonlocal exponent
if 0x30 <= byte_data < 0x40:
local_data["exponent"] = local_data["exponent"] * 10 + (byte_data - 0x30)
return _exponent
return _done(byte_data)
def _done(byte_data):
value = local_data["sign"] * (local_data["number"] + local_data["decimal"])
if local_data["exponent"]:
value *= math.pow(10, local_data["esign"] * local_data["exponent"])
return emit(value, byte_data)
return _start(byte_data)
def array_machine(emit):
local_data = {"array_data": []}
def _array(byte_data):
if byte_data == 0x5d: # ]
return emit(local_data["array_data"])
return json_machine(on_value, _comma)(byte_data)
def on_value(value):
# nonlocal array_data
def _comma(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _comma # Ignore whitespace
if byte_data == 0x2c: # ,
return json_machine(on_value, _comma)
if byte_data == 0x5d: # ]
return emit(local_data["array_data"])
raise Exception("Unexpected byte: 0x" + str(byte_data) + " in array body")
return _array
def object_machine(emit):
local_data = {"object_data": {}, "key": ""}
def _object(byte_data):
# nonlocal object_data, key
if byte_data == 0x7d: #
return emit(local_data["object_data"])
return _key(byte_data)
def _key(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _object # Ignore whitespace
if byte_data == 0x22:
return string_machine(on_key)
raise Exception("Unexpected byte: 0x" + byte_data.toString(16))
def on_key(result):
# nonlocal object_data, key
local_data["key"] = result
return _colon
def _colon(byte_data):
# nonlocal object_data, key
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _colon # Ignore whitespace
if byte_data == 0x3a: # :
return json_machine(on_value, _comma)
raise Exception("Unexpected byte: 0x" + str(byte_data))
def on_value(value):
# nonlocal object_data, key
local_data["object_data"][local_data["key"]] = value
def _comma(byte_data):
# nonlocal object_data
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _comma # Ignore whitespace
if byte_data == 0x2c: # ,
return _key
if byte_data == 0x7d: #
return emit(local_data["object_data"])
raise Exception("Unexpected byte: 0x" + str(byte_data))
return _object
테스트 중
if __name__ == "__main__":
test_json = """[1,2,"3"] {"name":
"tarun"} 1 2
3 [{"name":"a",
"data": [1,
def found_json(data):
state = json_machine(found_json)
for char in test_json:
state = state(ord(char))
동일한 출력은 다음과 같습니다.
[1, 2, '3']
{'name': 'tarun'}
[{'name': 'a', 'data': [1, None, 2]}]
해결책을 제시하겠습니다.중요한 생각은 디코딩을 "시도"하는 것입니다. 실패하면 더 많은 피드를 공급하고, 그렇지 않으면 오프셋 정보를 사용하여 다음 디코딩을 준비하는 것입니다.
그러나 현재 json 모듈은 문자열 선두에 있는 SPACE를 디코딩할 수 없기 때문에 떼어내야 합니다.
import sys
import json
def iterload(file):
buffer = ""
dec = json.JSONDecoder()
for line in file:
buffer = buffer.strip(" \n\r\t") + line.strip(" \n\r\t")
r = dec.raw_decode(buffer)
yield r[0]
buffer = buffer[r[1]:].strip(" \n\r\t")
for o in iterload(sys.stdin):
print("Working on a", type(o), o)
=========================여러 txt 파일을 테스트해 보았습니다만, 정상적으로 동작합니다.(in1.txt)
{"foo": ["bar", "baz"]
1 2 [
] 4
{"foo1": ["bar1", {"foo2":{"A":1, "B":3}, "DDD":4}]
5 6
: ["bar",
1 2 [
] 4 5 6
(in.txt, 이니셜)
{"foo": ["bar", "baz"]} 1 2 [] 4 5 6
(베네딕트의 테스트 케이스 출력)
python < in.txt
('Working on a', <type 'list'>, [u'hello'])
('Working on a', <type 'dict'>, {u'goodbye': 1})
('Working on a', <type 'int'>, 1)
('Working on a', <type 'int'>, 2)
('Working on a', <type 'dict'>, {})
('Working on a', <type 'int'>, 2)
('Working on a', <type 'int'>, 9)
('Working on a', <type 'int'>, 78)
('Working on a', <type 'int'>, 4)
('Working on a', <type 'int'>, 5)
('Working on a', <type 'dict'>, {u'animals': [u'dog', u'lots of mice', u'cat']}) 를 사용하면, 이러한 목적을 실현할 수 있습니다.
import sys
from json_stream_parser import load_iter
for obj in load_iter(sys.stdin):
{'foo': ['bar', 'baz']}
여기 제 것이 있습니다.
import simplejson as json
from simplejson import JSONDecodeError
class StreamJsonListLoader():
When you have a big JSON file containint a list, such as
And it's too big to be practically loaded into memory and parsed by json.load,
This class comes to the rescue. It lets you lazy-load the large json list.
def __init__(self, filename_or_stream):
if type(filename_or_stream) == str: = open(filename_or_stream)
else: = filename_or_stream
if not == '[':
raise NotImplementedError('Only JSON-streams of lists (that start with a [) are supported.')
def __iter__(self):
return self
def next(self):
read_buffer =
while True:
json_obj = json.loads(read_buffer)
if not in [',',']']:
raise Exception('JSON seems to be malformed: object is not followed by comma (,) or end of list (]).')
return json_obj
except JSONDecodeError:
next_char =
read_buffer += next_char
while next_char != '}':
next_char =
if next_char == '':
raise StopIteration
read_buffer += next_char
@wuilang의 우아한 솔루션을 사용했습니다.1바이트를 읽고, 해독을 시도하고, 1바이트를 읽고, 해독을 시도하고, 하지만 불행히도 매우 느렸습니다.
제 경우, 파일에서 동일한 객체 유형의 "예쁜 인쇄" JSON 객체를 읽으려고 했습니다.이를 통해 접근 방식을 최적화할 수 있었습니다. 파일을 한 줄씩 읽을 수 있었습니다. 정확히 "}"을 포함하는 행을 찾은 경우에만 디코딩할 수 있었습니다.
def iterload(stream):
buf = ""
dec = json.JSONDecoder()
for line in stream:
line = line.rstrip()
buf = buf + line
if line == "}":
yield dec.raw_decode(buf)
buf = ""
스트링 리터럴의 줄바꿈을 생략한 한 줄당 콤팩트 JSON을 사용하는 경우 이 방법을 더욱 안전하게 단순화할 수 있습니다.
def iterload(stream):
dec = json.JSONDecoder()
for line in stream:
yield dec.raw_decode(line)
물론 이러한 간단한 접근법은 매우 특정 종류의 JSON에만 적용됩니다.그러나 이러한 전제 조건이 충족되면 이러한 솔루션은 정확하고 신속하게 작동합니다.
json 을 .는 JSONDecoder를 사용할 수 .raw_decode
을 반환합니다.JSON 비단뱀이를 통해 나머지 JSON 값을 쉽게 슬라이스(또는 스트림 개체에서 검색)할 수 있습니다.입력의 다른 JSON 값 사이의 공백을 건너뛰는 추가 while 루프가 마음에 들지 않지만, 제 생각에는 작업이 완료됩니다.
import json
def yield_multiple_value(f):
parses multiple JSON values from a file.
vals_str =
decoder = json.JSONDecoder()
nread = 0
while nread < len(vals_str):
val, n = decoder.raw_decode(vals_str[nread:])
nread += n
# Skip over whitespace because of bug, below.
while nread < len(vals_str) and vals_str[nread].isspace():
nread += 1
yield val
except json.JSONDecodeError as e:
다음 버전은 훨씬 짧으며 이미 구문 분석된 문자열 부분을 사용합니다.무슨 이유에선지 제이슨에게 제2의 콜이 온 것 같다.문자열의 첫 번째 문자가 공백일 때 JSONDecoder.raw_decode()가 실패하는 것 같습니다.그 때문에 위의 whileloop에서는 공백 위를 건너뜁니다.
def yield_multiple_value(f):
parses multiple JSON values from a file.
vals_str =
decoder = json.JSONDecoder()
while vals_str:
val, n = decoder.raw_decode(vals_str)
#remove the read characters from the start.
vals_str = vals_str[n:]
# remove leading white space because a second call to decoder.raw_decode()
# fails when the string starts with whitespace, and
# I don't understand why...
vals_str = vals_str.lstrip()
yield val
json에 대한 문서입니다.JSONDecoder 클래스 raw_method에는 다음이 포함됩니다.
이것은 끝에 관련 없는 데이터가 있을 수 있는 문자열에서 JSON 문서를 디코딩하는 데 사용할 수 있습니다.
이와 무관한 데이터는 쉽게 또 다른 JSON 값이 될 수 있습니다.즉, 이 방법은 이 목적을 염두에 두고 작성될 수 있습니다.
입력과 함께.txt는 상위 함수를 사용하여 원래 질문에서 제시된 출력 예를 얻습니다.
데이터 생성 방법을 제어할 수 있는 경우 ndjson과 같은 대체 형식으로 전환할 수 있습니다.ndjson은 Newline Delimited JSON을 나타내며 증분 JSON 형식의 데이터를 스트리밍할 수 있습니다.각 행은 각각 유효한 JSON입니다.사용 가능한 Python 패키지는 ndjson과 jsonlines 두 가지가 있습니다.
JSON을 읽을 때 처리할 수 있는 json-stream도 있으므로 JSON 전체를 사전에 로드할 필요가 없습니다.이를 사용하여 스트림에서 JSON 데이터를 읽을 수 있을 뿐만 아니라 다른 I/O 작업에도 동일한 스트림을 사용하거나 동일한 스트림에서 여러 JSON 개체를 읽을 수 있어야 합니다.
